scala的Future和Promise
前言
在Scala中使用Future和Promise时,不得不比较异步编程与多线程编程的区别。相同之处:两种方法都能有效地避免调用线程导致阻塞,并提升软件的响应速度。另一方面,在实现细节上两者的差异主要体现在任务执行机制的不同上。
- 线程不是一个计算机硬件的功能,而是操作系统提供的一种逻辑功能,线程本质上是进程中一段并发运行的代码,所以线程需要操作系统投入CPU资源来运行和调度。
- 多线程在单个线程中的处理程序依然是顺序执行,符合普通人的思维习惯,所以编程简单。但是多线程的缺点也同样明显,线程的使用(滥用)会给系统带来上下文切换的额外负担。并且线程间的共享变量可能造成死锁的出现。
- 异步操作无须额外的线程负担,并且使用
回调的方式进行结果处理,在设计良好的情况下,处理函数可以不必使用共享变量(即使无法完全不用,最起码可以减少 共享变量的数量),减少了死锁的可能。当然异步操作也并非完美无暇。编写异步操作的复杂程度较高,程序主要使用回调方式进行处理,与普通人的思维方式有些 初入,而且难以调试。
Future
Future 是一种缓存位置,在程序运行期间用于临时存放数据。
个人理解:我们可以将 Future 视为一个用于监控特定内存位置的变化的工具。
当异步操作完成并返回数据时,Future 就能够检测到这一变化。
Future 对象和Future 计算
Future实例作为异步操作结果的封装形式存在。
Future运算既是创建Future对象的异步计算方式也是真正意义上的异步操作核心部分。
Future的基本使用
使用轮询的方式获得计算结果
加载上下文,上下文jue’d
创建Future计算,并生成Future对象
为了持续地在每个周期内检测并确认Future状态的变化情况,在实现过程中会不断调用其isCompleted方法来获取相应的信息。
// step 1 加载上下文
import scala.concurrent.ExecutionContext.Implicits.global
// step 2 创建Future计算
val num = Future.apply({
1 + 1
})
// step 3 轮询
while (!num.isCompleted) {}
println(num.value)
AI写代码
使用回调函数获取计算结果
- 加载上下文
- 创建Future计算
- 使用回调函数
// step 1 加载上下文
import scala.concurrent.ExecutionContext.Implicits.global
// step 2 创建Future计算
val num = Future.apply({
1 + 1
})
// step 3 使用回调函数处理计算结果
// Success 和 Failures 是Try的两个子类
num.onComplete {
case Success(value) => {
println(value)
}
case Failure(exception) => {
exception.printStackTrace()
}
}
// 上面的方法相当于下面两个方法的组合
// num.onFailure()
// num.onSuccess()
Thread.sleep(1000) // 主线程并不会等Future执行结束才结束
AI写代码
注意:
除了通过类似于onSuccess的方法来实现功能外,在某些情况下也可以采用 foreach 这种方式来完成任务。然而,在这种情况下会导致无法应对异常错误。如果需要处理失败的情况,则可以选择 faild.forEach。
num.failed.foreach(print)
AI写代码
回调函数是可以添加多个的,按添加顺序依次执行
num.onComplete {
case Success(value) => {
println(value)
}
case Failure(exception) => {
exception.printStackTrace()
}
}
num.onComplete {
case Success(value) => {
println(value)
}
case Failure(exception) => {
exception.printStackTrace()
}
}
num.foreach(value => println("回调 1"))
num.foreach(value => println("回调 2"))
num.foreach(value => println("回调 3"))
AI写代码
回调函数的组合
如果我们希望依次为Future注册多个回调函数,并确保每个后续 callback 函数能够接收前一个 callback 返回的结果,则可以采用类似于集合 map 方法的方式实现这一功能。值得注意的是,在正常情况下, map 方法将所有 callback 组合成一个序列并进行处理;而在出现错误时,该过程将完整地将错误信息传递给上层机制处理。
import scala.concurrent.ExecutionContext.Implicits.global
val num = Future.apply({
1 / 1
})
val newNum = num.map(value => value * 10)
newNum.foreach(println)
newNum.failed.foreach(println)
AI写代码
回调函数和函数组合的选择
如果我们的函数有副租用,我们应该选择回调函数,否则,选择函数组合。
Future组合字
map
map操作将一个Future映射成另一个Future
flatMap
faltMap操作也是将Future映射成另一个Future
map和flatMap使用的区别
在特定情况下(如仅涉及简单的变量替换或与常见的数据结构如列表相似的应用场景),无需深入探讨。
举例而言:假设我们有两个 Futures 生成 int 类型的值(即 Future[int])。我们的目标是将这两个 int 值相加以生成一个新的 Future 对象。为此,我们可以采用两种不同的方法:一是利用 map 方法直接对结果进行处理;二是运用 flatMap 算子来完成这一操作。
import scala.concurrent.ExecutionContext.Implicits.global
val num1 = Future(1)
val num2 = Future(100)
val ans1 = num1.map(value1 => {
value1 + num2.value.get.get // 使用map这里要把第二个Future里解出来
})
ans1.foreach(println)
ans1.failed.foreach(println)
val ans2 = num1.flatMap(value1 => {
num2.map(value2 => value1 + value2) //这里不用解包
})
ans2.foreach(println)
ans2.failed.foreach(println)
Thread.sleep(3000)
AI写代码
输出:
101
101
AI写代码
在这一实例中(就如前面所述),输出结果一致(结果相同)。其核心在于:也就是通过调用map函数来获取num2中的数据内容。然而需要注意的是:如果我们尝试拆分num2中的数据项(即从num2中提取数据项),却无法直接获得原始的num2数据项(因无法恢复完整的键值对信息),这样操作后的情况如下:
java.util.NoSuchElementException: None.get
101
AI写代码
那我们是否可以用回调函数或者foreach应用呢? 不行。因为这两个功能项都会返回空值


因此,在这种情况下,默认情况下无法完成此操作除非借助flatMap的帮助。
总结而言,flatMap具备将异步操作转化为同步流程的能力,并通过逐步处理每一个Future任务来实现整体目标;相比之下,map功能则不具备这一特性。
for 推导在Future里的应用。
for 推导时间是就是上面的flatMap的简化形式。
一维for推导
一维for推导就是简单的遍历
val res = for (
a <- 0 to 10
if a % 2 == 0
) yield a
println(res)
AI写代码
输出:
Vector(0, 2, 4, 6, 8, 10)
AI写代码
高维for推导
高维for推导,实际上是求笛卡尔积
val res = for (
a <- 0 to 10 if a % 3 == 0;
b <- 0 to 10 if b % 3 == 1;
c <- 0 to 10 if c % 3 == 2
) yield (a, b, c)
println(res)
AI写代码
输出
Vector((0,1,2), (0,1,5), (0,1,8), (0,4,2), (0,4,5), (0,4,8), (0,7,2), (0,7,5), (0,7,8), (0,10,2), (0,10,5), (0,10,8), (3,1,2), (3,1,5), (3,1,8), (3,4,2), (3,4,5), (3,4,8), (3,7,2), (3,7,5), (3,7,8), (3,10,2), (3,10,5), (3,10,8), (6,1,2), (6,1,5), (6,1,8), (6,4,2), (6,4,5), (6,4,8), (6,7,2), (6,7,5), (6,7,8), (6,10,2), (6,10,5), (6,10,8), (9,1,2), (9,1,5), (9,1,8), (9,4,2), (9,4,5), (9,4,8), (9,7,2), (9,7,5), (9,7,8), (9,10,2), (9,10,5), (9,10,8))
AI写代码
for推导和flatMap的关系
如果仅仅涉及两个维度或者一个维度时,使用flatMap不会带来任何问题,但当我们面对需要将多个维度的数据通过flatMap进行整合时,情况就会变得复杂.
for 推导在Future中的应用
import scala.concurrent.ExecutionContext.Implicits.global
val func1 = Future({
Thread.sleep(1000)
1
})
val func2 = Future({
Thread.sleep(1000)
2
})
val func3 = Future({
Thread.sleep(3000)
3
})
val res = for (
a <- func1;
b <- func2;
c <- func3
) yield (a, b, c) // 这里生成的是Future[(int,int,int)] 对象
res.foreach(println)
Thread.sleep(4000)
AI写代码
这样就很轻易的完成了多个Future的组合和同步
Promise
Promise的基本使用
- 说明或阐述 Promise 对象的概念。
- 为 Promise 对象注册一个回 callback 函数(既可处理成功事件也可处理错误情况)。
- 将 Promise 对象设置为指定的结果(无论是成功状态还是错误状态)。
import scala.concurrent.ExecutionContext.Implicits.global
val promise = Promise[String]
promise.future.foreach(str => print("我的祖国是: " + str))
promise.future.failed.foreach(e => println(e))
/*
赋值方法一:
使用success和failure。complete方式相当于success和failure的抽象
这种赋值方法有个问题就是我们的promise只允许赋值一次,重复赋值会被抛出异常
*/
promise.success("china")
promise.failure(new Exception("not china"))
promise.complete(Success[String]("china"))
promise.complete(Failure[String](new Exception("not china")))
/*
赋值方法二:
使用try方法赋值,这种赋值和方法一类似,但是这种方法赋值不会抛出异常
*/
promise.trySuccess("china")
promise.tryFailure(new Exception("not china"))
promise.tryComplete(Success("china"))
promise.tryComplete(Failure(new Exception("not china")))
Thread.sleep(3000)
AI写代码
注意:方法一赋值会抛出异常,方法二不会
Promise 的使用模式
模式一
首先初始化一个 Promise, 然后在后续计算中完成该 Promise 的赋值过程. 最后返回该 Promise 对象所对应的 future 对象.
import scala.concurrent.ExecutionContext.Implicits.global
def getFuture[T](b: => T): Future[T] = {
val promised = Promise[T]
global.execute(new Runnable {
override def run(): Unit = {
try {
promised.success(b)
} catch {
case NonFatal(e) => promised.failure(e)
}
}
})
promised.future
}
AI写代码
模式二 基于回调函数的API转换
示例:自定义timeout
import scala.concurrent.ExecutionContext.Implicits.global
val timer = new Timer(true)
def timeout(t: Int) = {
val promisedUnit = Promise[Unit]
timer.schedule(new TimerTask {
override def run(): Unit = {
promisedUnit.success()
timer.cancel()
}
}, t)
promisedUnit.future
}
println("延时之前")
timeout(1000).foreach(_ => println("延时一秒"))
println("延时之后")
Thread.sleep(2000)
AI写代码
扩展Future的方法
import scala.concurrent.ExecutionContext.Implicits.global
/** * 接收一个future,生成一个FutureOps,FutureOps具有or方法
* 当future调用or方法的时候就会寻找该隐式类生成一个FutureOps
* * @param self
* @tparam T
*/
implicit class FutureOps[T](val self: Future[T]) {
def or(that: Future[T]): Future[T] = {
val promisedT = Promise[T]
// 下面两个赋值,只能成功一个,哪个Future先完善,哪个成功
self.onComplete(x => promisedT tryComplete (x))
that.onComplete(y => promisedT tryComplete (y))
promisedT.future
}
}
val future1 = Future {
Thread.sleep(1000)
1
}
val future2 = Future(2)
future1.or(future2).foreach(println)
Thread.sleep(2000)
AI写代码
和Future通信,结束Future
type Cancellable[T] = (Promise[Unit], Future[T])
/** * 这个方法接收首一个方法参数b,b接收一个future,返回一个T
* 我们在主线程里完善Future就可以在返回T的代码块里感知到
* 然后通过判断我们的Future是否完善来抛出异常来结束我们的异步程序
* * @param b
* @tparam T
* @return
*/
def cancellable[T](b: Future[Unit] => T): Cancellable[T] = {
import scala.concurrent.ExecutionContext.Implicits.global
val cancel = Promise[Unit]
val f = Future {
val r = b(cancel.future)
// 在这里向cancel对象执行赋值操作,如果我们的cancel已经被赋值
// 如果我们正常结束,这里就不会被赋值
// 如果从外部调用cancel结束,那这里就会抛出异常
if (!cancel.tryFailure(new Exception)) {
throw new CancellationException
}
r
}
(cancel, f)
}
AI写代码
使用await进行同步
基本用法
import scala.concurrent.ExecutionContext.Implicits.global
val future = Future {
Thread.sleep(1000)
1
}
// 使用await.ready,返回完善返回值的Future对象
Await.ready(future,Duration.apply(3,TimeUnit.SECONDS)).foreach(println)
// 使用await.result,返回Future里面的完善值
println(Await.result(future, Duration(3, TimeUnit.SECONDS)))
AI写代码
使用blocking语句,增加异步回调线程提高速度
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
//当前时间,单位是纳秒
val startTime = System.nanoTime()
// 如果异步程序数大于默认线程数,异步程序会分批次执行
val futures = for (- <- 0 until 10) yield Future {
Thread.sleep(1000)
}
// 如果异步程序里,使用block结构,会创建多余的线程执行程序,防止阻塞
// val futures = for (- <- 0 until 10) yield Future {
// blocking {
// Thread.sleep(1000)
// }
// }
for (f <- futures) Await.ready(f, Duration.Inf)
val endTime = System.nanoTime()
println((endTime - startTime) / 1000000)
AI写代码
