Advertisement

1、响应式编程基础概念

阅读量:

为了实现更高的资源利用率,使用异步非阻塞交互模型

后压力 是一种处理模块间工作负载的机制,在系统中通过将压力信号传递给上游模块以防止下个模块被过度负担。

该模式(Observer Pattern)描述了一种单向依赖关系,在相关对象的状态发生变化时,这些相关对象能够及时地被通知并自动进行更新操作。

在这里插入图片描述
复制代码
    public interface Observer {
    void update(String event);
    }
    
    class Subject {
     
    private final List<Observer> observers = new ArrayList<>();
      
    private void notifyObservers(String event) {
        observers.forEach(observer -> observer.update(event));
    }
      
    public void addObserver(Observer observer) {
        observers.add(observer);
    }
      
    public void scanSystemIn() {
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()) {
            String line = scanner.nextLine();
            notifyObservers(line);
        }
    }
    }
    
    public class ObserverDemo {
    public static void main(String[] args) {
        System.out.println("Enter Text : ");
        Subject subject = new Subject();
        
        subject.addObserver(event -> {
            System.out.println("Received response: " + event);
        });
    
        subject.scanSystemIn();
    }
    }

响应式流规范基础知识

响应式流规范包括了四个核心接口:PublisherSubscriberSubscriptionProcessor。由于该规范的发展完全独立于任何组织因此以单独的JAR文件形式获取所有接口位于 org.reactivestreams 包中。

在这里插入图片描述

A Publisher is capable of sending data to its Subscriber through the method called onNext, while also being able to indicate errors or completion via separate methods (onError or onComplete). Any occurrence of an error or completion will terminate the sequence.
Summarizing this behavior, we can describe it as follows: The next event in the sequence is determined by calling onNext, which may occur at a specific index. Additionally, if an error occurs during this process, calling onError will trigger an error response. Similarly, if the process completes normally, calling onComplete will handle that scenario.

Publisher

发布者

复制代码
    /** * Publisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber.
     * * Publisher can serve multiple Subscriber subscribed dynamically at various points in time.
     * */
    public interface Publisher<T> {
    
    /** * Request Publisher to start streaming data.
     * * This is a "factory method" and can be called multiple times, each time starting a new Subscription.
     * * Each Subscription will work for only a single Subscriber.
     * * A Subscriber should only subscribe once to a single Publisher.
     * * If the Publisher rejects the subscription attempt or otherwise fails it will signal the error via Subscriber#onError.
     * * @param s the Subscriber that will consume signals from this Publisher
     */
    public void subscribe(Subscriber<? super T> s);
    }

Subscriber

订阅者

复制代码
    /** * Will receive call to  #onSubscribe(Subscription)} once after passing an instance of Subscriber to Publisher#subscribe(Subscriber)
     * * No further notifications will be received until Subscription#request(long) is called.
     * <p>
     * After signaling demand:
     * *     One or more invocations of #onNext(Object) up to the maximum number defined by Subscription#request(long)
     *     Single invocation of #onError(Throwable) or #onComplete() which signals a terminal state after which no further events will be sent.
     * * Demand can be signaled via Subscription#request(long) whenever the Subscriber instance is capable of handling more.
     * * @param <T> the type of element signaled.
     */
    public interface Subscriber<T> {
    /** * Invoked after calling Publisher#subscribe(Subscriber)
     * * No data will start flowing until Subscription#request(long) is invoked.
     * * It is the responsibility of this Subscriber instance to call Subscription#request(long) whenever more data is wanted.
     * * The Publisher will send notifications only in response to Subscription#request(long).
     * * @param s Subscription that allows requesting data via Subscription#request(long)
     */
    public void onSubscribe(Subscription s);
    
    /** * Data notification sent by the Publisher in response to requests to Subscription#request(long)
     * * @param t the element signaled
     */
    public void onNext(T t);
    
    /** * Failed terminal state.
     * * No further events will be sent even if Subscription#request(long) is invoked again.
     * * @param t the throwable signaled
     */
    public void onError(Throwable t);
    
    /** * Successful terminal state.
     * * No further events will be sent even if Subscription#request(long) is invoked again.
     */
    public void onComplete();
    }

Subscription

订阅

为控制元素提供基础

  • cancel() 方法实现了对流的订阅取消甚至彻底实现了发布完全取消。
  • 该方法增强了Publisher与Subscriber之间的互动能力,并以指定数量的形式向Publisher传递所需的数据量信息。
复制代码
    /** * A Subscription represents a one-to-one lifecycle of a Subscriber subscribing to a Publisher.
     * * It can only be used once by a single Subscriber
     * * It is used to both signal desire for data and cancel demand (and allow resource cleanup).
     * */
    public interface Subscription {
    /** * No events will be sent by a Publisher until demand is signaled via this method.
     * *  It can be called however often and whenever needed—but if the outstanding cumulative demand ever becomes Long.MAX_VALUE or more,
     *  it may be treated by the Publisher as "effectively unbounded".
     * * Whatever has been requested can be sent by the Publisher so only signal demand for what can be safely handled.
     * * A Publisher can send less than is requested if the stream ends but then must emit either Subscriber#onError(Throwable) or Subscriber#onComplete()
     * * @param n the strictly positive number of elements to requests to the upstream Publisher
     */
    public void request(long n);
    
    /** * Request the Publisher to stop sending data and clean up resources.
     * * Data may still be sent to meet previously signalled demand after calling cancel.
     */
    public void cancel();
    }

Processor

复制代码
    /** * A Processor represents a processing stage—which is both a Subscriber
     * and a Publisher and obeys the contracts of both.
     * * @param <T> the type of element signaled to the {@link Subscriber}
     * @param <R> the type of element signaled by the {@link Publisher}
     */
    public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }

全部评论 (0)

还没有任何评论哟~