Advertisement

Swift Combine — Publisher和Subscriber的交互流程(自定义Publisher、Subscriber、Subscription)

阅读量:

如前所述,在之前的综述中已经对PublisherSubscriber进行了简要介绍。鉴于这些概念属于基础性知识范畴,在阐述两者的交互流程之前,请补充一些未提及的相关内容。

Subscription

该订阅协议是一个协议的实现。该协议的实现方负责将订阅者与发布者连接起来。当该订阅协议处于内存环境中时,订阅者会持续接收数据值。它只包含一个方法:

复制代码
    public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible {
    /// Tells a publisher that it may send more values to the subscriber.
    func request(_ demand: Subscribers.Demand)
    }

当消费者在Publisher中接收到了subscription对象后,则会开始执行请求操作,在此过程中,demand参数指定了从发布者那里获取的具体数值数量

demand参数有几个可选的参数值:

  • none:被指定的子订户不会收到任何数据。
  • max(value): 该函数设置了一个最大数量的限制,在此范围内允许接收数据。
  • unlimited: 当参数设置为' unlimited'时,默认情况下允许接收无限制的数据量。

实例中包含了Subscriber引用对象以保持最新状态。
协议继承了Cancellable协议因此具有cancel方法。
在自定义Subscription时request和cancel方法必须实现。

Publisher和Subscriber的交互流程

在了解了Subscription协议之后,我们接下来探讨一下PublisherSubscriber之间的联系是如何建立的。

由 Publisher 调用 subscribe(:) 方法开启链接申请过程,并将参数传递给 Subscriber 实例对象。
随后,在第一次 subscribe(
:) 调用后,在 Publisher 内部触发接收订阅(subscriber)的方法,在此过程中创建一个连接两者的 Subscription 对象,并将该对象传递给 subscriber 以完成接收到订阅。
当 subscriber 执行 receive(subscription:) 方法时,则会利用接收到的对象(subscription)来发起请求操作并记录其计数。
在 subscription 对象处理 receive() 请求时,则会根据 subscriber 的计数器来决定如何响应。
当 subscriber 发送数据时,则会向 publisher 发送相应的响应信息。
随后会向 subscriber 发送完成状态或错误通知。

因为Subscription承担了一个连接的角色,在幕后发挥作用,并且上面提到的第5条、第6条在语义上等价于Publisher通过receive(_:)方法或receive(completion:)方法将数据或结束信息传递给Subscriber的行为。实际上而言, Subscription负责将数据传递给下游节点.

自定义Subscriber

首先看一下Subscriber协议的定义:

复制代码
    public protocol Subscriber<Input, Failure> : CustomCombineIdentifierConvertible {
    associatedtype Input
    associatedtype Failure : Error
    
    func receive(subscription: any Subscription)
    func receive(_ input: Self.Input) -> Subscribers.Demand
    func receive(completion: Subscribers.Completion<Self.Failure>)
    }

协议中包含两种类型以及三种方法。为了实现自定义Subscriber的功能,请采用类进行定义;否则会导致编译错误出现;此外,struct属于值类型,Subscription不会持有最初的那个Subscriber实例。

复制代码
    // 自定义Subscriber
    class CustomSubscriber: Subscriber {
      // 确定输入类型,需要和Publisher的输出类型一致。
      typealias Input = Int
      // 确定失败类型,需要和Publisher的失败类型一致,永远不会失败就定义为Never。
      typealias Failure = Never
    
      /** 交互流程中第3步
       *  接收subscription对象的方法。
       *  方法内subscription对象调用request方法,设置请求次数。
       */
      func receive(subscription: any Subscription) {
    debugPrint("CustomSubscriber subscription.request")
    subscription.request(.max(5))
      }
      
      /** 交互流程中第5步
       *  接收Publisher发送数据的方法。
       *  该方法返回`Subscribers.Demand`,用于在request方法中计算请求次数。
       */
      func receive(_ input: Int) -> Subscribers.Demand {
    print("New value \(input)")
    return .none
      }
    
      /** 交互流程中第6步
       *  接收Publisher发送结束的方法,或者正常结束,或者失败。
       */
      func receive(completion: Subscribers.Completion<Never>) {
    print("Completion: \(completion)")
      }
    }

自定义Publisher

在自定义Publisher前,再看一下Publisher协议的定义:

复制代码
    public protocol Publisher<Output, Failure> {
    
    associatedtype Output
    associatedtype Failure : Error
    
    func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
    }

自定义的Publisher需要继承这个协议,比如:

复制代码
    // 自定义Publisher
    class CustomPublisher: Publisher {
      // 确定输出类型,需要和Subscriber的输入类型一致。
      typealias Output = Int
      // 确定失败类型,需要和Subscriber的失败类型一致,永远不会失败就定义为Never。
      typealias Failure = Never
    
      /** 交互流程中第2步
       *  接收subscriber对象的方法。方法传入Subscriber实例对象,开始建立联系。
       *  方法内创建Subscription对象,然后调用Subscriber的receive(subscription:)方法,将Subscription对象传给Subscriber。
       */
      func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
    // 创建Subscription对象
    let subscription = CustomSubscription(subscriber: subscriber)
    debugPrint("CustomPublisher subscriber.receive")
    // 将Subscription对象传给Subscriber
    subscriber.receive(subscription: subscription)
      }
    }

自定义Subscription

先看一下Subscription协议:

复制代码
    public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible {
    /// Tells a publisher that it may send more values to the subscriber.
    func request(_ demand: Subscribers.Demand)
    }

该协议中的明确要求是必须实现一种名为「request」的方法。由于该协议继承自「Cancellable」类别的协议结构,则还需再实现一种名为「cancel」的方法。

复制代码
    public protocol Cancellable {
    func cancel()
    }

最终改写版

下面是自定义Subscription

复制代码
    // 自定义Subscription
    class CustomSubscription<S: Subscriber>: Subscription where S.Input == Int, S.Failure == Never {
      // 持有传入进来的Subscriber对象。
      private var subscriber: S
      private var counter = 0
      private var isCompleted = false
    
      // 初始化的时候将Subscriber对象传入进来,并持有,待后续发送数据使用。
      init(subscriber: S) {
    self.subscriber = subscriber
      }
      
      /** 交互流程中第4步
       *  该方法传入请求数据的次数,并给Subscriber发送数据。
       */
      func request(_ demand: Subscribers.Demand) {
    debugPrint("CustomSubscription request")
    guard !isCompleted else { return }
    
    for _ in 0..<(demand.max ?? 10) {
      _ = subscriber.receive(counter) // 给Subscriber发送数据
      counter += 1
    }
    
    if counter >= 5 {
      subscriber.receive(completion: .finished) // 通知Subscriber结束。
      isCompleted = true
    }
      }
    
      // 该方法中执行一些取消订阅的操作。
      func cancel() {
    isCompleted = true
      }
    }

如何使用

对上述内容进行了定义,请您现在看看如何运用它吧。基于SwiftUI的界面,在相应的ViewModel中新增一个方法,并调用之前自定义好的组件。

首先定义一个ViewModel

复制代码
    class CustomCombineViewModel: ObservableObject {
    
      var subscription: AnyCancellable?
    
      func testMethod1() {
    // 创建自定义的Publisher
    let publisher = CustomPublisher()
    // 创建自定义的Subscriber
    let subscriber = CustomSubscriber()
    
    debugPrint("Begin subscribe")
    
    /** 交互流程中第1步,申请订阅。
     *  由Publisher对象调用subscribe方法,传入Subscriber对象开始。
     */
    publisher.subscribe(subscriber)
      }
    
      func testMethod2() {
    // 创建自定义的Publisher
    let publisher = CustomPublisher()
    // 通过sink方法申请订阅,并将创建的subscription持有,否则订阅失败,sink方法返回的时AnyCancellable,这里做了类型抹除。
    subscription = publisher
      .sink { completion in
        print("sink completion: \(completion)")
      } receiveValue: { value in
        print("sink new value \(value)")
      }
      }
    }

在上述代码中,在testMethod1方法内部依次创建了Publisher和Subscriber两个对象,并通过Publisher实例调用subscribe接口以启动订阅流程这一操作即构成了订阅的启动入口

当执行testMethod1时候,输出打印:

复制代码
    "Begin subscribe"
    "CustomPublisher subscriber.receive"
    "CustomSubscriber subscription.request"
    "CustomSubscription request"
    New value 0
    New value 1
    New value 2
    New value 3
    New value 4
    Completion: finished

上面的结果也揭示了一个从开始订阅到发送数据结束的关键过程,在Subscriber类中通过设置subscriber.max(5)限制最大值时,在request方法被调用的过程中打印出了总共接收的数据数量为最多不超过五条

让我再看看第二个方法testMethod2()吧,请问这个方法中是否缺少了Publisher调用subscribe的方法呢?

该订阅系统包含两个内置的Subscriber实例,分别是Subscribers.Sink接口和Subscribers.Assign接口。当调用订阅相关的sunk或assign方法时即可启动订阅机制。

当执行testMethod2时候,输出打印:

复制代码
    "CustomPublisher subscriber.receive"
    "CustomSubscription request"
    sink new value 0
    sink new value 1
    sink new value 2
    sink new value 3
    sink new value 4
    sink new value 5
    sink new value 6
    sink new value 7
    sink new value 8
    sink new value 9
    sink completion: finished

因为sinks发起请求获取大量数据,在Subscription中存储了这些信息后被输出显示出来

Subscribers.Sink

Sink 创建的时候会立即调用 Subscription 对象的 request(.unlimited)

Publisher 有两个 sink 扩展方法:

  • sink(receiveCompletion:receiveValue:)
  • sink(receiveValue:)

Subscribers.Assign

The method will assign the received value to a class attribute or distribute it to another @Published publisher, which also specifies its demand as .unlimited.

Publisher 有两个 assign 扩展方法:

  • set<Root>(to keyPath: ReferenceWritableKeyPath<Root, Self.Output>, on the object of type Root)
    • set(to published: inout Published<Self.Output>.Publisher)

写在最后

我们已经透彻地理解了Combine订阅交互流程,并由此是否对Combine框架有了更深入的认识呢? 在实际开发过程中,不推荐我们自行搭建Publisher、Subscriber和Subscription架构。由于任何逻辑错误都可能导致发布者与订阅者之间的全部连接中断,并可能引发不可预见的结果。

最后希望能为需要帮助的朋友提供一些支持;如果觉得有所帮助,请大家帮忙点赞并关注;笔者也会持续努力,并致力于写出更多优质的内容。

全部评论 (0)

还没有任何评论哟~