divendres, 20 de setembre del 2013

Parallel collection manipulation in scala

Scala collections API comes packed with a very cool feature which is parallelizing any processing. See this example:

I first create a list (I could use a range or s/thing else too):

scala> List(1,2,3,4,5,6,7,8,9)
res0: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9)

... and then build the skeleton of my processing. What I want to to multiply each value by 1000 and then divide each value by 500:

scala> res0.map{ 
    i => i*1000
  }.map{
    i => i/500
  } 
res1: List[Int] = List(2, 4, 6, 8, 10, 12, 14, 16, 18)

Nothing fancy so far.

Entering par

In scala every collection can be automagically wrapped into a counterpart that implements processing with a thread pool. I actually have no clue what the implementation is. Damn! I'll have to look it up. Anyway, insert 'par.' on your code and...

scala> res0.par.map{ 
    i => i*1000
  }.par.map{
    i => i/500
  }
res2: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(2, 4, 6, 8, 10, 12, 14, 16, 18)


... the list becomes a ParVector and keeps all items sorted in the original position.
Let's try and see it in action: (added random sleep to 'help' context switching)

scala> import java.util.concurrent.TimeUnitimport java.util.concurrent.TimeUnit

scala> import java.util.Random
import java.util.Random

scala> new Random

res6: java.util.Random = java.util.Random@b9d964d

scala> res0.par.map { 
    i => TimeUnit.MILLISECONDS.sleep(res6.nextInt(1000));
    println(i);
    i*1000
  }.par.map{
    i => TimeUnit.MILLISECONDS.sleep(res6.nextInt(1000));
    println(i); 
    i/500
  } 
3
7
4
5
8
2
1
9
6
7000
3000
5000
4000
1000
6000
8000
9000
2000
res13: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(2, 4, 6, 8, 10, 12, 14, 16, 18)

scala> 

Ta dah! Execution is run in parallel.

See more information re Parallel Collections on the overviews of the Scala Docs.

PS: For the curious...

If I get rid of the first 'par', the first processing is sequential, and the delays add up.


scala> res0.map { i => TimeUnit.MILLISECONDS.sleep( res6.nextInt(1000)  );println(i) ;i*1000}.par . map { i => TimeUnit.MILLISECONDS.sleep(  res6.nextInt(1000)  ); println(i); i/500 } 
1
2
3
4
5
6
7
8
9
5000
2000
1000
6000
3000
4000
9000
7000
8000
res17: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(2, 4, 6, 8, 10, 12, 14, 16, 18)