Scala provides some great list processing API's, and it's easy to parallelize Scala's list processing with a java executor service. The following demonstration shows a couple easy ways to parallelize list processing. The second by2Groupwise variation processes chunks of a work list in parallel, and reports progress to the user between chunks. Both by2Groupwise and by2Parallel follow a fork-join pattern where a pass over the work list submits callable tasks to an executor, then a subsequent pass waits for the results.
import java.util.Date import java.util.concurrent.{Callable,Executors,ExecutorService} import java.util.logging.{Level,Logger} object Demo { val log = Logger.getLogger( getClass.getName ) implicit def callable[A]( func:() => A ): Callable[A] = new Callable[A] { override def call:A = func() } val poolSize = 4 val exec = Executors.newFixedThreadPool( poolSize ) var lastReport = -1; def reportProgress( soFar:Int, total:Int ):Unit = { val progress = (soFar * 100) / total if ( progress != lastReport ) { log.log( Level.INFO, "Progress: {0}%", Array[Object]( progress.toString ) ) lastReport = progress } } def timer[A]( work: => A ):(Long,A) = { val start = new Date val result = work ( (new Date).getTime - start.getTime, result ) } def by2( arg:Int ):Int = { Thread.sleep( 1000 ) arg * 2 } def by2Sequential( work:Seq[Int] ):Seq[Int] = work.zipWithIndex.map( _ match { case (arg, index) => { reportProgress( index, work.size ) by2(arg) } } ) def by2Parallel( work:Seq[Int] ):Seq[Int] = { work.map( (arg) => exec.submit( () => by2(arg) ) ).map( (future) => future.get ) } def by2Groupwise( work:Seq[Int] ):Seq[Int] = { val groups = work.grouped( poolSize ).toList log.log( Level.FINE, "Frickjack size: {0}", Array[Object]( groups.size.toString ) ) groups.zipWithIndex.map( _ match { case (group,index) => { log.log( Level.INFO, "Scan index: {0}", index ) reportProgress( index, groups.size ) group.map( (arg) => exec.submit( () => by2(arg) ) ).map( (future) => future.get ) } } ).toList.flatten } def main( args:Array[String] ):Unit = { val work = 1 until 10 log.log( Level.INFO, "Seq {0}", Array[Object]( timer( by2Sequential( work ) )._1.toString ) ) log.log( Level.INFO, "Parallel {0}", Array[Object]( timer( by2Parallel( work ) )._1.toString ) ) log.log( Level.INFO, "Groupwise {0}", Array[Object]( timer( by2Groupwise( work ) )._1.toString ) ) exec.shutdown } }
No comments:
Post a Comment