Advertisement

Swift Combine — Scheduler(subscribe(on:)和receive(on:)的使用)

阅读量:

在Swift框架中的Combine组件中,默认情况下会配置一个核心组件:Scheduler。该组件负责协调任务运行流程,并确保资源得到合理分配。本文旨在深入探讨Scheduler的作用机制、常见实现类型及其在实际开发中的应用方法,并提供详细的使用指导以帮助开发者高效管理任务执行过程。

Scheduler 的定义

Scheduler 负责管理任务的调度与执行流程,在各个线程或队列中安排处理任务,并决定了任务的具体运行顺序与处理位置

你可以使用调度器以最快速度运行代码,并在未来的某个时刻自动执行它。不同类型的调度器可能会根据自身需求采用不同的时间维持方案。为了方便表示这些时间间隔,《SchedulerTimeType》被用作标准表示方法。由于其兼容《SchedulerTimeIntervalConvertible》,因此我们可以方便地使用如 .milliseconds(500) 这样的函数来表示这些时间段。每个调度器都可以接受参数来控制它们如何执行传递的操作。

常见的 Scheduler 类型

常见的 Scheduler 类型包括一下几种:

  • DispatchQueue属于Grand Central Dispatch(GCD)家族的一员,在指定调度队列上执行任务。这些队列包括串行(sequential)、并发(concurrent)、主线程(main thread)以及全局队列(global queue)。通常将后台任务分配到串行或全局队列中处理,并将与用户界面相关的任务分配到主线程队列上。
  • RunLoop负责管理事件循环的核心逻辑,在特定的RunLoop实例上运行任务。例如,默认情况下会创建一个称为默认事件循环(DefaultEventLoop)的实例来处理日常事务。
  • OperationQueue用于在指定的操作队列中执行UI相关的操作或其他类型的任务。与DispatchQueue功能相似的是它也支持多个子队列来分别处理不同的业务流程。
  • ImmediateScheduler是一种专门用于立即执行任务的调度机制,在开发调试阶段时尤为重要。

涉及RunLoop.main, DispatchQueue.main以及OperationQueue.main这些类的开发人员都与UI操作相关联,并且这些类之间的差别不大。

默认Scheduler 类型

如果未指定调度器,请系统默认配置一个;该调度器实例的类型将与发布方的数据类型相同。例如当主线程发送数据时,则接收操作也会在主线程执行。

复制代码
    class SchedulerViewModel: ObservableObject {
    
      let publisher = PassthroughSubject<String, Never>()
      private var cancellable = Set<AnyCancellable>()
    
      init() {
    setUpSubscription()
      }
    
      func setUpSubscription() {
    publisher
      .sink { value in
        print("\(value) is on main thread: \(Thread.isMainThread)")
      }
      .store(in: &cancellable)
      }
    
      func sendMessage() {
    // on main thread
    publisher.send("Text 1")
    
    // on other thread
    DispatchQueue.global().async { [weak self] in
      self?.publisher.send("Text 2")
    }
      }
    }

上面的代码中,在sendMessage()方法内部直接执行了两次send方法:首先,在主线程处进行了操作;随后,在异步线处又进行了相应的操作。sink的闭包中输出结果为:

复制代码
    Text 1 is on main thread: true
    Text 2 is on main thread: false

Scheduler的使用

Combine 框架提供了两个基本的操作符来使用调度器:

  • 使用 subscribe\text{(on)} subscribe\text{(on options)} 在指定的 Scheduler 中注册订阅(启动运行)。
    • 使用 receive\text{(on)} receive\text{(on options)} 在指定的 Scheduler 中传递数据。

subscribe(on:)

subscribe(on:)

从订阅的创建到最后订阅者接收数据的过程可以被视为一个传输渠道。在这一过程中, 数据按照顺序从上端传递到下端。当执行 subscribe(on:) 操作时, 它会被配置为与当前订阅管道上游节点所使用的相同调度机制。然而存在一个潜在的问题是 subscribe(on:) 也会修改其后续节点的配置

在讨论这个问题时需要明确什么是管道的上游和下游部分,在其中有人给出过这样的解释

上游:

  • Real-world performance metrics (which involve receiving a subscription)
    Requests made by a subscriber account holder to the upstream publisher requesting additional content
    Cancellation notifications (these ascend through from the final subscriber)

上游大概指的就是在创建订阅,建立连接,请求数据和取消订阅的过程。

下游:

  • Value(s), which may be either experiencing a failure (error) or in good order.
  • Completions are either failures (errors) or in good order, which report that the publisher emitted its final value.

下游通常指的是从发布者发送数据到订阅者接收数据的流程。
当我们在系统中创建了发布者实例,并随后添加了一个操作节点,在这个过程中执行`sink|assign|node|event|topic|bus| subscribers| subscribers_list的操作部分也被视为下游的一部分。

subscribe(on:)大多情况下会影响管道的上游部分, 例如我们自定义了一个 Publisher, 命名为 MyPublisher.

复制代码
    extension Publishers {
      struct MyPublisher: Publisher {
    typealias Output = Int
    typealias Failure = Never
    
    func receive<S>(subscriber: S) where S : Subscriber, Never == S.Failure, Int == S.Input {
      debugPrint("receive: \(Thread.isMainThread)")
      subscriber.receive(subscription: Subscriptions.empty)
      _ = subscriber.receive(666)
    }
      }
    }

在MyPublisher类中定义了输出类型和失败类型后,在实现 receive(subscriber:) 方法时必须做两件事:一是生成订阅对象并传递给订阅者以建立连接;二是向订阅者发送数据666并打印相关信息。

复制代码
    Publishers.MyPublisher()
      .map { _ in
    print("Map: \(Thread.isMainThread)")
      }
      .sink { _ in
    print("Sink: \(Thread.isMainThread)")
      }
      .store(in: &cancellable)

当调用上面代码的时候,打印输出:

复制代码
    "receive: true"
    Map: true
    Sink: true

因为没有调用subscribe(on:)设置,所有默认都是在主线程执行的。

下面加上subscribe(on:)方法,并设置为异步线程。

复制代码
    Publishers.MyPublisher()
      .map { _ in
    print("Map: \(Thread.isMainThread)")
      }
      .subscribe(on: DispatchQueue.global()) // 设置为异步线程。
      .sink { _ in
    print("Sink: \(Thread.isMainThread)")
      }
      .store(in: &cancellable)

执行后打印结果为:

复制代码
    "receive: false"
    Map: false
    Sink: false

操作结果始终通过异步线程进行处理,在这种情况下,
.subscribe(on: DispatchQueue.global())
这一行代码的作用同样体现在将该方法用于
map
操作符之前的过程中,
同样的效果也体现在将该方法用于
map
操作符之前的过程。
与之前提到的一样,
.subscribe(on:)
不仅能够对管道中上游的具体执行流程产生影响,
同时也能够对下游环节的具体执行流程产生影响。

此外来说一点,在主线程中运行的管道部分 upstream component包括像 Just 和 Sequence 这样的 Publisher 类别;而在异步线程中运行的是 URLSession.DataTaskPublisher。

如果某个sink闭包需要处理相关的用户界面(UI)操作,则必须确保不会受到订阅事件的影响。为此,在实现时可以通过以下两种方式来解决:第一种是通过在sink方法内部主动切换至主线程;第二种是调用相应的接收事件的方法。

receive(on:)

通常情况下,在大多数场景中很少采用Publisher::subscribe(on:)的方法。如果您遇到较为复杂的业务逻辑需求,则更常采用Publisher::receive(on:)这一方法。该方法通过参数指定应使用的调度程序来传递订阅值。

更直接地说,在整个 Publishing 流程中,在 receive(on:) 方法之后进行的所有操作都将由该方法所决定的线程处理。

比如刚才的代码中,在subscribe(on:)方法后面加上了receive(on:)方法。

复制代码
    Publishers.MyPublisher()
      .map { _ in
    print("Map: \(Thread.isMainThread)")
      }
      .subscribe(on: DispatchQueue.global())
      .receive(on: DispatchQueue.main)
      .sink { _ in
    print("Sink: \(Thread.isMainThread)")
      }
      .store(in: &cancellable)

执行打印结果:

复制代码
    "receive: false"
    Map: false
    Sink: true

能够观察到sink方法真正地被主线程调用时。在整个发布链中,在使用订阅事件( subscribe(on:))之后,在后续也应配合接收事件( receive(on:))

我们了解URLSession.DataTaskPublisher是在异步执行的,在返回过程中,通过一系列转型操作完成。最后一步是走到sink方法里。因此可以在sink之前调用receive(on:)方法,并传递给主线程处理。

下面在看一个只有receive(on:)方法的示例。

复制代码
    Publishers.MyPublisher()
      .map { _ in
    print("Map: \(Thread.isMainThread)")
      }
      .receive(on: DispatchQueue.global())
      .sink { _ in
    print("Sink: \(Thread.isMainThread)")
      }
      .store(in: &cancellable)

在map后加上了receive(on:)方法,并传入异步线程参数。打印结果如下:

复制代码
    "receive: true"
    Map: true
    Sink: false

receive(on:)函数之前处于主线程执行状态,在receive(on:)函数之后进入异步线程处理流程。

如果加一个receive(on:)方法不过瘾,再看看加两个的。

复制代码
    Publishers.MyPublisher()
      .receive(on: DispatchQueue.global()) // 1 异步线程
      .map { _ in
    print("Map: \(Thread.isMainThread)")
      }
      .receive(on: DispatchQueue.main) // 2 主线程
      .sink { _ in
    print("Sink: \(Thread.isMainThread)")
      }
      .store(in: &cancellable)

先看打印结果:

复制代码
    "receive: true"
    Map: false
    Sink: true

管道入口必定位于主线程上,并随后被调用了一次 .receive(on: DispatchQueue.global())。随后进行的是 map 操作,并非同步线程。接着又被调用了一次 .receive(on: DispatchQueue.main),回到主线程后。因此,在主线程序上实现了 sink 操作

在开发过程中使用receive(on:)方法的频率会多余subscribe(on:)方法。

写在最后

SchedulerCombine 中发挥着关键作用,在处理任务调度与执行方面具有显著价值。通过选择不同的 Scheduler 类型,则能够将任务分配到特定的线程或队列中进行处理,并根据需求灵活配置资源分配策略。使用 Scheduler 有助于优化代码结构并提升运行效率,在确保代码可读性的同时也能显著改善系统性能表现。

最后分享希望能为有需要的朋友提供帮助——那些需要的人。大家如果觉得这篇内容对您有帮助,请尽量点赞并关注我们的账号哦!我们也会一直努力下去,并在未来带来更多精彩的内容。

全部评论 (0)

还没有任何评论哟~