





二、使用 Future

发布于 2023-07-17 23:38:22 字数 28006 浏览 0 评论 0 收藏 0

2.1 基础变换

  1. Scala 中,你可以对 Future 的结果指定变换,然后得到一个新的 Future,从而表示这两个异步计算的组合:原始的异步计算和异步变换。

  2. 最基础的变换是 map ,可以直接将下一个计算 map 到当前的 Future ,而不是阻塞等待结果。map 之后得到一个新的 Future,表示原始的异步计算经过传给 map 的函数异步变换之后的结果。

    val fut = Future{ Thread.sleep(10000); 21 + 21} // 异步计算 val fut2 = fut.map( x => x +1) // 异步变换

    当异步计算和异步变换完成之后,fut2.value 返回 Some(Success(43))

    注意:这里创建 fut, fut2 的线程、fut的异步计算线程、fut2 的异步变换线程可能分别在三个不同的线程中执行。

  3. 还可以使用 for 表达式来对 Future 进行变换。考虑两个 Future

    val fut1 = Future{ Thread.sleep(10000); 21 + 21} val fut2 = Future{ Thread.sleep(10000); 23 + 23}

    可以通过 for 表达式来构造一个新的 Future

    for { x <- fut1 y <- fut2 } yield x + y

    一旦这三个 Future 完成,你将会得到最终结果为 Some(Success(88))

    注意:因为 for 表达式会串行化它们的变换,因此如果你没有在 for 之前创建好 Future,则它们并不会并行执行。如:

    //*************************** fut1, fut2 并行执行 *******************// val fut1 = Future{ Thread.sleep(10000); 21 + 21} val fut2 = Future{ Thread.sleep(10000); 23 + 23} val result = for { x <- fut1 y <- fut2 } yield x + y //*************************** fut1, fut2 串行执行 *******************// val result2 = for { x <- Future{ Thread.sleep(10000); 21 + 21} y <- Future{ Thread.sleep(10000); 23 + 23} } yield x + y

    事实上这里的 for 表达式会被重写为 fut1.flatMap(x => fut2.map(y => x + y))

  4. 可以通过Future 伴生对象的几个方法来创建Future

    • Future 伴生对象的 apply 方法,如 Future{ Thread.sleep(10000); 21 + 21}

    • Future 伴生对象的Future.failed, Future.successful, Future.fromTry 等工厂方法。这些工厂方法并不需要 ExecutionContext

      • successful 工厂方法将创建一个已经成功完成的 Future

        Future.successful{ 21 + 21}
      • failed 工厂方法将创建一个已经失败的 Future

        Future.failed( new Exception("xxx Exception"))
      • fromTry 工厂方法将从给定的 Try 创建一个已经完成的 Future

        import scala.util.{Success, Failure} Future.fromTry(Success {21 + 21}) Future.fromTry(Failure(new Exception("xxx Exception")))
    • 创建 Future 最一般化的方法是使用 Promise。给定一个 Promise,可以得到由这个 Promise 控制的 Future。当你完成 Promise 时,对应的 Future 也会完成。

      val pro = Promise[Int] val fut = pro.future

      可以用名为 success, failure, complete 的方法来完成 Promise。这些方法和 Future 中的方法很相似:

      val pro = Promise[Int] pro.success(42) pro.failure(new Exception("xxx Exception")) pro.complete(Failure(new Exception("xxx Exception")))

      还有一个 completeWith,这个方法使得该 Promisefuture 的完成状态和你传入的 future 保持同步:

      val fut1 = Future{ Thread.sleep(10000); 21 + 21} val pro = Promise[Int] pro.completeWith(fut1)
  5. Futurefilter 方法对Future 的结果进行校验,如果合法就原样保留;如果非法,则filter 返回的这个 Future 就以 NoSuchElementException 失败:

    val fut1 = Future{ Thread.sleep(10000); 21 + 21} val valid = fut1.filter( x => x > 0) // 合法的结果 valid.value // 返回 Some(Success(42)) ​ val invalid = fut1.filter( x => x <0) // 非法的结果 invalid.value // 返回 Some(Failure(java.util.NoSuchElementException))

    另外,Future 还提供了 withFilter 方法,因此可以用 for 表达式的过滤器来执行相同的操作:

    val fut1 = Future{ Thread.sleep(10000); 21 + 21} val valid = for (x <- fut1 if x >0) yield x // 合法的结果 valid.value // 返回 Some(Success(42)) ​ val invalid = for (x <- fut1 if x <0) yield x // 非法的结果 invalid.value // 返回 Some(Failure(java.util.NoSuchElementException))
  6. Futurecollect 方法允许你在一次操作中同时完成校验和变换。

    • 如果传给 collect 方法的偏函数对 Future 结果有定义,则 collect 返回的 Future 就会以经过该函数变换后的值成功完成。

      val fut1 = Future{ Thread.sleep(10000); 21 + 21} val valid = fut1 collect {case x if x >0 => x + 46} valid.value // 返回 Some(Success(88))
    • 如果传给 collect 方法的偏函数对 Future 结果没有定义,则 collect 返回的 Future 就以 NoSuchElementException 失败。

      val fut1 = Future{ Thread.sleep(10000); 21 + 21} val invalid = fut1 collect {case x if x < 0 => x + 46} valid.value // 返回 Some(Failure(java.util.NoSuchElementException))

2.2 处理失败

  1. ScalaFuture 提供了处理失败的 Future 的方式,包括:failed, fallbackTo, recover, recoverWith

  2. Futurefailed 方法将任何类型的失败的 future 变换成一个成功的 Future[Throwable] ,并带上引发失败的异常。

    val fut1 = Future{ Thread.sleep(10000); 21/0} // 类型为 Future[Int] fut1.value // 返回 Some(Failure(java.lang.ArithmeticException)) ​ val fut2 = fut1.failed // 类型为 Future[Throwable] fut2.value // 返回 Some(Success(java.lang.ArithmeticException))

    如果调用 failedFuture 最终成功了,则failed 返回的这个 Future 将以 NoSuchElementException 失败。因此 failed 方法只有在你预期某个 future 一定会失败的情况下才适用。

    val fut1 = Future{ Thread.sleep(10000); 21/1} // 类型为 Future[Int] fut1.value // 返回 Some(Success(21)) ​ val fut2 = fut1.failed fut2.value // 返回 Some(Failure(java.util.NoSuchElementException))
  3. FuturefallbackTo 方法允许你提供一个额外可选的 Future,这个新的 Future 将用于在你调用 fallbackTo 的那个 Future 失败的情况。

    val fut1 = Future{ Thread.sleep(10000); 21/0} val fut2 = Future{ Thread.sleep(10000); 21/1} val fut3 = fut1.fallbackTo(fut2) fut3.value // 返回 Some(Success(21))
    • 如果 fut1 成功,则 fut1.fallbackTo(fut2) 返回fut1
    • 如果 fut1 失败,而 fut2 成功,则 fut1.fallbackTo(fut2) 返回 fut2
    • 如果 fut1 失败,且 fut2 失败,则 fut1.fallbackTo(fut2) 返回 fut1 。此时完全不考虑 fut2 的失败。
    val fut1 = Future{ Thread.sleep(10000); 21/0} val fut2 = Future{ Thread.sleep(10000); assert(21 <0)} val fut3 = fut1.fallbackTo(fut2) fut3.value // 返回 Some(Failure(java.lang.ArithmeticException))
  4. recover 方法让你把失败的 future 变换成成功的 future ,同时将成功的 future 结果原样透传。

    val fut1 = Future{ Thread.sleep(10000); 21/0} val fut2 = fut1 recover { case ex => -1 } fut2.value // 返回 Some(Success(-1))

    如果原始的Future 没有失败,则 recover 返回的这个 Future 就会以相同的值完成:

    val fut1 = Future{ Thread.sleep(10000); 21/1} val fut2 = fut1 recover { case ex => -1 } fut2.value // 返回 Some(Success(21))

    如果传给 recover 的偏函数没有对引发原始 Future 最终失败的那个异常有定义,则原始的失败会被透传:

    val fut1 = Future{ Thread.sleep(10000); 21/0} val fut2 = fut1 recover { case ex : IllegalArgumentException => -1 } fut2.value // 返回 Some(Failure(java.lang.ArithmeticException))
  5. recoverWith 方法和 recover 很像,不过它并不是像 recover 一样恢复成某个值,而是恢复成一个 Future

    val fut1 = Future{ Thread.sleep(10000); 21/0} val fut2 = fut1 recoverWith { case ex => Future{ Thread.sleep(10000); 21+21 } // 这里恢复成一个 Future,而不是一个值 } fut2.value // 返回 Some(Success(42))

    recover 一样,如果原始的Future 没有失败,则 recoverWith 返回的这个 Future 就会以相同的值完成:

    val fut1 = Future{ Thread.sleep(10000); 21/1} val fut2 = fut1 recoverWith { case ex => Future{ Thread.sleep(10000); 21+21 } } fut2.value // 返回 Some(Success(21))

    recover 一样,如果传给 recoverWih 的偏函数没有对引发原始 Future 最终失败的那个异常有定义,则原始的失败会被透传:

    val fut1 = Future{ Thread.sleep(10000); 21/0} val fut2 = fut1 recoverWith { case ex : IllegalArgumentException => Future{ Thread.sleep(10000); 21+21 } } fut2.value // 返回 Some(Failure(java.lang.ArithmeticException))

2.3 transform

  1. Futuretransform 方法接收两个函数来对 Future 进行变换:一个用于处理成功、另一个用于处理失败。

    val factor = 1 val fut1 = Future{ Thread.sleep(10000); 21/factor} val fut2 = fut1.transform( x => x * -1, // 如果 fut1 成功,则调用这一行。此时 fut2 返回 Some(Success(-21)) ex => new Exception("exception because:",ex) // 如果 fut1 失败(例如 factor = 0时),则调用这一行,此时 fut2 返回 Some(Failure(java.lang.Exception)) )

    如果 fut1 成功了,则调用第一个函数;如果 fut1 失败了,则调用第二个函数。

    注意:transform 方法并不能将成功的 Future 改变成失败的 Future,也不能将失败的 Future 改变成成功的 Future

  2. 为了将成功的 Future 改变成失败的 Future,或者将失败的 Future 改变成成功的 FutureScala 2.12 引入了一个重载的 transform 形式,接收一个从 TryTry 的函数:

    val factor = 1 val fut1 = Future{ Thread.sleep(10000); 21/factor} val fut2 = fut1.transform{ // 在 Scala 2.12 之后支持 case Success(x) => Success(x.abs + 1) case Failure(_) => Success(0) // 将 Failure 变换为 Success }

2.4 组合

  1. Future 和它的伴生对象提供了用于组合多个 Future 的方法。

  2. zip 方法将两个成功的 Future 变换成这两个值的元组的 Future

    val fut1 = Future{ Thread.sleep(10000); 21/1} val fut2 = Future{ Thread.sleep(10000); 21+21} val fut3 = fit1.zip(fut2) // 一个 Future[(Int,Int)] 对象 fut3.value // 返回 Some(Success(21,42))

    如果任何一个 Future 失败了,zip 返回的这个 Future 也会以相同的异常失败。

    val fut1 = Future{ Thread.sleep(10000); 21/0} val fut2 = Future{ Thread.sleep(10000); 21+21} val fut3 = fit1.zip(fut2) // 一个 Future[(Int,Int)] 对象 fut3.value // 返回 Some(Failure(java.lang.ArithmeticException))

    如果两个 Future 都失败了,则zip 返回的是第一个 Future (也就是调用方)的那个异常。

  3. Future 伴生对象提供了一个 fold 方法,用于累积一个 TranserableOnce 集合中所有 Future 的结果,并交出一个 Future 的结果。

    如果集合中所有 Future 都成功了,则累积的 Future 成功完成;如果集合中有任何一个 Future 失败了,则累积的 Future 失败,并且返回集合中首个失败的 Future 的失败。

    val fut1 = Future{ Thread.sleep(10000); 21/0} val fut2 = Future{ Thread.sleep(10000); 21+21} val fut3 = Future.fold(List(fut1,fut2))(0){ (acc, num) => acc + num }
  4. Future 伴生对象提供的 reduce 方法和 fold 方法都是一样的折叠操作,但是 reduce 不需要提供初始值,而是用第一个 Future 结果作为初始值。

    val fut1 = Future{ Thread.sleep(10000); 21/0} val fut2 = Future{ Thread.sleep(10000); 21+21} val fut3 = Future.reduce(List(fut1,fut2)){ (acc, num) => acc + num }

    如果传入的集合为空,则没有第一个 Future,此时 reduce 将以 NoSuchElementException 作为失败。

  5. Future.sequence 方法将一个 TransversableOnece[Future] 集合变换成 Future[TranversableOnece]Future

    val fut1 = Future{ Thread.sleep(10000); 21/0} val fut2 = Future{ Thread.sleep(10000); 21+21} val list = List(fut1, fut2) // List[Future[Int]] val fut3 = Future.sequence(list) // Future[List[Int]]
  6. Future.traverse 方法会将任何元素类型的 TranversableOnce 集合变成一个由 Future 组成的 TranversableOnce,并将它 sequence 成一个由值组成的 TraversableOnceFuture

    val fut1 = Future.traverse(List(1,2,3)){i => Future(i)} fut1.value // Some(Success(1,2,3))

2.5 执行副作用

  1. 有可能你希望在执行完某个 Future 完成之后执行一个副作用(而不是返回一个新的 Future),为此 Future 提供了好几种方法。

  2. 最基本的方法是 foreach,如果 Future 成功则它会执行一个副作用。

    val fut1 = Future{ Thread.sleep(10000); 21/0} val fut2 = Future{ Thread.sleep(10000); 21/1} fut1.foreach(x => println(x)) // fut1 执行失败,所以不会执行 println fut2.foreach(x => println(x)) // 当 fut2 执行成功的时候执行 println

    由于不带 yieldfor 会被编译器重写为对 foreach 的调用,因此也可以通过 for 表达式来达成同样的效果:

    val fut1 = Future{ Thread.sleep(10000); 21/0} val fut2 = Future{ Thread.sleep(10000); 21/1} for (x <- fut1) println(x) for (y <- fut2) println(y)
  3. Future 还提供了方法来 “注册” 函数。onComplete 方法在Future 最终成功或失败时都会被执行。被注册的函数会被传入一个 Try 对象(如果 Future 成功了,那么就算一个包含结果的 Success;否则是一个包含失败原因的 Failure)。

    import scala.util.{Success, Failure} val fut1 = Future{ Thread.sleep(10000); 21/1} fut1 onComplete { case Success(x) => println(x) case Failure(ex) => println(ex) }
  4. 可以多次调用 Future.onComplete 来注册多个函数,但是 onComplete 不能保证这些函数之间的执行顺序。如果你希望强制回调函数的顺序,则可以考虑使用 Future.andThen 方法。

    Future.andThen 方法会返回一个新的 Future,这个新的 Future 是对原始的 Future 调用回调函数:

    import scala.util.{Success, Failure} val fut1 = Future{ Thread.sleep(10000); 21/1} val fut2 = fut1 andThen { case Success(x) => println(x) case Failure(ex) => println(ex) }

    如果传入 adThen 的回调函数在执行时抛出异常,则这个异常是不会被传导到后续的 adThen,也不会体现在结果的 Future 中。

2.6 Scala 2.12 中的新方法

  1. scala 2.12 中添加的 Future.flatten 方法将一个Future[Future[Int]] 变换成一个 Future[Int] 。如:

    val fut = Future{ Future{Thread.sleep(10000); 21/1} } // Future[Future[Int]] val fut1 = fut.flatten // Future[Int]
  2. scala 2.12 中添加的 zipWith 方法将两个 Future zip 到一起,然后在对结果的元组执行 map

    val fut1 = Future{ Future{Thread.sleep(10000); 21/1} } val fut2 = Future{ Future{Thread.sleep(10000); "hello world"} } val fut3 = fut1 zip fut2 // Future[(Int,String)] val fut4 = fut3 map{ // Future[String] case (num, str) => s"$num and $str" } fut4.value // 返回 Some(Success(21 and hello world))

    zipWith 允许你一步完成相同的操作:

    val fut1 = Future{ Future{Thread.sleep(10000); 21/1} } val fut2 = Future{ Future{Thread.sleep(10000); "hello world"} } val fut4 = fut1.zipWith(fut2){ case (num, str) => s"$num and $str" } fut4.value // 返回 Some(Success(21 and hello world))
  3. scala 2.12 中还添加了 transformWith 方法,可以用一个从 TryFuture 的函数对 Future 进行变换。

    val fut1 = Future{ Future{Thread.sleep(10000); 21/1} } val fut2 = fut1.transformWith{ case Success(res) => Future{ throw new Exception(res.toString)} case Failure(ex) => Future(0) }

    transformWith 方法和 Future.transform 类似,但是 transformWith 方法允许你交出 Future(相比较之下,transform 方法需要你交出 Try

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。



需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。