Sunday, September 12, 2010

Simple Parallelism with Scala

Scala provides some great list processing API's, and it's easy to parallelize Scala's list processing with a java executor service. The following demonstration shows a couple easy ways to parallelize list processing. The second by2Groupwise variation processes chunks of a work list in parallel, and reports progress to the user between chunks. Both by2Groupwise and by2Parallel follow a fork-join pattern where a pass over the work list submits callable tasks to an executor, then a subsequent pass waits for the results.


import java.util.Date
import java.util.concurrent.{Callable,Executors,ExecutorService}
import java.util.logging.{Level,Logger}


object Demo {
    val log = Logger.getLogger( getClass.getName )
    
    implicit def callable[A]( func:() => A ): Callable[A] = new Callable[A] {
         override def call:A = func()
    }
    
    
    val poolSize = 4
    val exec = Executors.newFixedThreadPool( poolSize )
    
    var lastReport = -1;
    def reportProgress( soFar:Int, total:Int ):Unit = {
        val progress = (soFar * 100) / total
        if ( progress != lastReport ) {
            log.log( Level.INFO, "Progress: {0}%", Array[Object]( progress.toString ) )
            lastReport = progress
        }
    }
    
    def timer[A]( work: => A ):(Long,A) = {
        val start = new Date
        val result = work
        ( (new Date).getTime - start.getTime, result )
    }
    
    def by2( arg:Int ):Int = {
         Thread.sleep( 1000 )
         arg * 2     
    }
    
    def by2Sequential( work:Seq[Int] ):Seq[Int] = work.zipWithIndex.map( 
         _ match { case (arg, index) => { 
             reportProgress( index, work.size )
             by2(arg)
           } } )
    
    def by2Parallel( work:Seq[Int] ):Seq[Int] = {
         work.map( (arg) => exec.submit( () => by2(arg) )
                ).map( (future) => future.get )
    }
    
    def by2Groupwise( work:Seq[Int] ):Seq[Int] = {
        val groups = work.grouped( poolSize ).toList
        log.log( Level.FINE, "Frickjack size: {0}", Array[Object]( groups.size.toString ) )
        groups.zipWithIndex.map( 
          _ match { 
            case (group,index) => {
              log.log( Level.INFO, "Scan index: {0}", index )
              reportProgress( index, groups.size )
              group.map( (arg) => exec.submit( () => by2(arg) )
                     ).map( (future) => future.get )
            }
            }
        ).toList.flatten
    }
    
    def main( args:Array[String] ):Unit = {
        val work = 1 until 10
        log.log( Level.INFO, "Seq {0}", Array[Object]( timer( by2Sequential( work ) )._1.toString ) )
        log.log( Level.INFO, "Parallel {0}", Array[Object]( timer( by2Parallel( work ) )._1.toString ) )
        log.log( Level.INFO, "Groupwise {0}", Array[Object]( timer( by2Groupwise( work ) )._1.toString ) )
        exec.shutdown
    }
}

No comments: