Scala并发之Future、ExecutionContext
当我们需要同时执行多个任务、并且不会因为其中一个任务的执行时间漫长而陷入阻塞,就需要用到并发,也即多线程。Java中的并发实现主要在java.util.concurrent包中,Future这个概念其实Java里也有。Scala是在Java的基础上实现的函数式编程语言,所以Scala也可以调用Java API。今天会介绍Scala并发中应用比较多的两个概念——Future、ExecutionContext。
Future概念
Future表示未来,某线程执行一项操作,这个操作有延迟的话,Future会提供一系列方法来处理这个线程过程,可取消,可操作完成后执行其他操作等等。其实就相当于Java中的一个thread,只不过Future提供了回调函数,可以对线程中执行操作返回的数据进行处理,这样可以避免阻塞操作。
Future拥有两种状态:1.未完成:线程操作还未结束;2.已完成:操作操作完成,并且有返回值或者有异常。 当一个Future完成的时候,它就变成了一个不可变对象,永远不会被重写。
创建Future
val sumFuture = Future[Int] {
var sum = 0
for(i <- Range(1,100000)) sum = sum + i
sum
}
如上图所示,使用Future这个object提供的apply方法,就可以创建一个Future。这里需要注意的是使用Future的apply时,需要一个隐式的ExecutionContext(下面再作介绍)
Future回调函数
Future提供了onComplete,onFailure,onSuccess三种回调。
calFuture.onComplete {
case Success(result) => println(result)
case Failure(e) => println("error: " + e.getMessage)
}
successFuture.onSuccess {
case num => println(num)
}
errorFuture.onFailure {
case e => println(e.getMessage)
}
ExecutionContext
ExecutionContext与Java中的线程池类似,都是用来管理线程的。Java中可以利用类Executors来定义线程池ExecutorService:
newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool、newScheduledThreadPool分别可以定义不同的线程池。而ExecutionContext提供了方法fromExecutorService,该方法的传参就是Java中的ExecutorService。
def fromExecutorService(e : java.util.concurrent.ExecutorService) : scala.concurrent.ExecutionContextExecutorService = { /* compiled code */ }
应用:Future、ExecutionContext、lock
并发时如果多个线程之间需要共享变量,则需要用到lock。并且用的时候为了避免死锁,需要及时释放lock
private var lock= new ReentrantLock()
implicit lazy val ec: ExecutionContextExecutorService =
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(conf.getDSHealthCheckThreadPoolSize))
if (lock.tryLock()) {
logInfo(s"lock is available")
try {
val future = Future[(Boolean, String)] {
/* compiled code */
}
future.onSuccess {
case (value1, value2) =>
/* compiled code */
}
Await.result(future, Duration.Inf)
} finally {
lock.unlock()
}
}
- 点赞
- 收藏
- 关注作者
评论(0)