Scala并发之Future、ExecutionContext

举报
我爱次火锅锅 发表于 2020/12/01 09:18:49 2020/12/01
【摘要】 当我们需要同时执行多个任务、并且不会因为其中一个任务的执行时间漫长而陷入阻塞,就需要用到并发,也即多线程。Java中的并发实现主要在java.util.concurrent包中,Future这个概念其实Java里也有。Scala是在Java的基础上实现的函数式编程语言,所以Scala也可以调用Java API。今天会介绍Scala并发中应用比较多的两个概念——Future、ExecutionCon

当我们需要同时执行多个任务、并且不会因为其中一个任务的执行时间漫长而陷入阻塞,就需要用到并发,也即多线程。Java中的并发实现主要在java.util.concurrent包中,Future这个概念其实Java里也有。Scala是在Java的基础上实现的函数式编程语言,所以Scala也可以调用Java API。今天会介绍Scala并发中应用比较多的两个概念——FutureExecutionContext

Future概念

Future表示未来,某线程执行一项操作,这个操作有延迟的话,Future会提供一系列方法来处理这个线程过程,可取消,可操作完成后执行其他操作等等。其实就相当于Java中的一个thread,只不过Future提供了回调函数,可以对线程中执行操作返回的数据进行处理,这样可以避免阻塞操作。

Future拥有两种状态:1.未完成:线程操作还未结束;2.已完成:操作操作完成,并且有返回值或者有异常。 当一个Future完成的时候,它就变成了一个不可变对象,永远不会被重写。

创建Future

val sumFuture = Future[Int] {
  var sum = 0
  for(i <- Range(1,100000)) sum = sum + i
  sum
}

如上图所示,使用Future这个object提供的apply方法,就可以创建一个Future。这里需要注意的是使用Futureapply时,需要一个隐式的ExecutionContext(下面再作介绍)

Future回调函数

Future提供了onCompleteonFailureonSuccess三种回调。

calFuture.onComplete {
  case Success(result) => println(result)
  case Failure(e) => println("error: " + e.getMessage)
}

successFuture.onSuccess {
  case num => println(num)
}

errorFuture.onFailure {
  case e => println(e.getMessage)
}

ExecutionContext

ExecutionContextJava中的线程池类似,都是用来管理线程的。Java中可以利用类Executors来定义线程池ExecutorService

newFixedThreadPoolnewSingleThreadExecutornewCachedThreadPoolnewScheduledThreadPool分别可以定义不同的线程池。而ExecutionContext提供了方法fromExecutorService,该方法的传参就是Java中的ExecutorService

def fromExecutorService(e : java.util.concurrent.ExecutorService) : scala.concurrent.ExecutionContextExecutorService = { /* compiled code */ }

应用:FutureExecutionContextlock

并发时如果多个线程之间需要共享变量,则需要用到lock。并且用的时候为了避免死锁,需要及时释放lock

private var lock= new ReentrantLock()
implicit lazy val ec: ExecutionContextExecutorService =
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(conf.getDSHealthCheckThreadPoolSize))

if
(lock.tryLock()) {   logInfo(s"lock is available")   try {     val future = Future[(Boolean, String)] {         /* compiled code */ }     future.onSuccess {       case (value1, value2) =>        /* compiled code */ }     Await.result(future, Duration.Inf)   } finally {     lock.unlock()   } }


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。