Scala provides some great list processing API's, and
it's easy to parallelize Scala's list processing with a
java executor service.
The following demonstration shows a couple easy ways to
parallelize list processing. The second by2Groupwise
variation processes chunks of a work list in parallel,
and reports progress to the user between chunks.
Both by2Groupwise and by2Parallel
follow a fork-join pattern where a pass over the work list
submits callable tasks to an executor, then a subsequent
pass waits for the results.
import java.util.Date
import java.util.concurrent.{Callable,Executors,ExecutorService}
import java.util.logging.{Level,Logger}
object Demo {
val log = Logger.getLogger( getClass.getName )
implicit def callable[A]( func:() => A ): Callable[A] = new Callable[A] {
override def call:A = func()
}
val poolSize = 4
val exec = Executors.newFixedThreadPool( poolSize )
var lastReport = -1;
def reportProgress( soFar:Int, total:Int ):Unit = {
val progress = (soFar * 100) / total
if ( progress != lastReport ) {
log.log( Level.INFO, "Progress: {0}%", Array[Object]( progress.toString ) )
lastReport = progress
}
}
def timer[A]( work: => A ):(Long,A) = {
val start = new Date
val result = work
( (new Date).getTime - start.getTime, result )
}
def by2( arg:Int ):Int = {
Thread.sleep( 1000 )
arg * 2
}
def by2Sequential( work:Seq[Int] ):Seq[Int] = work.zipWithIndex.map(
_ match { case (arg, index) => {
reportProgress( index, work.size )
by2(arg)
} } )
def by2Parallel( work:Seq[Int] ):Seq[Int] = {
work.map( (arg) => exec.submit( () => by2(arg) )
).map( (future) => future.get )
}
def by2Groupwise( work:Seq[Int] ):Seq[Int] = {
val groups = work.grouped( poolSize ).toList
log.log( Level.FINE, "Frickjack size: {0}", Array[Object]( groups.size.toString ) )
groups.zipWithIndex.map(
_ match {
case (group,index) => {
log.log( Level.INFO, "Scan index: {0}", index )
reportProgress( index, groups.size )
group.map( (arg) => exec.submit( () => by2(arg) )
).map( (future) => future.get )
}
}
).toList.flatten
}
def main( args:Array[String] ):Unit = {
val work = 1 until 10
log.log( Level.INFO, "Seq {0}", Array[Object]( timer( by2Sequential( work ) )._1.toString ) )
log.log( Level.INFO, "Parallel {0}", Array[Object]( timer( by2Parallel( work ) )._1.toString ) )
log.log( Level.INFO, "Groupwise {0}", Array[Object]( timer( by2Groupwise( work ) )._1.toString ) )
exec.shutdown
}
}