RxSwift之深入解析核心逻辑Observable的底层原理

举报
Serendipity·y 发表于 2022/02/16 23:52:43 2022/02/16
【摘要】 一、前言 现有一段 RxSwift 使用序列并监听序列消息发送的示例代码,如下所示: // 创建序列 let ob = Observable<Any>.create { (observ...

一、前言

  • 现有一段 RxSwift 使用序列并监听序列消息发送的示例代码,如下所示:
	// 创建序列
	let ob = Observable<Any>.create { (observer) -> Disposable in
		// 发送信号
		observer.onNext("发送信号")
		observer.onError(NSError.init(domain: "EpisodeError", code: 10086, userInfo: nil))
		return Disposables.create()
	}
	// 订阅信号
	let _ = ob.subscribe(onNext: { (text) in
		print("订阅到:\(text)")
	}, onError: { (error) in
		print("error:\(error)")
	}, onCompleted: {
		print("完成")
	}) {
		print("销毁")
	}.disposed(by: disposeBag)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 代码分析:
    • 通过 Observable 的 create 创建序列,在 create 闭包内调用 onNext 方法实现信号发送;
    • 调用 subscribe 方法订阅序列,并实现 subscribe 的参数闭包 onNext,在闭包内监听信号;
    • 最后通过 disposed 对序列打包等待销毁。
  • 运行程序,结果如下:
	订阅到:发送信号
	完成
	销毁

  
 
  • 1
  • 2
  • 3
  • 那么,发送的信号是如何被订阅到的呢?按正常逻辑,订阅后才能收到信息,我们可以猜测,在成为订阅者并布置好监听后,订阅者向序列发送了一条消息,通知可观察序列可以发送信号。大致流程如下所示:

在这里插入图片描述

二、核心逻辑分析

① 创建序列

  • 创建序列的源码如下所示:
	extension ObservableType {
	    // MARK: create
	    /**
	     Creates an observable sequence from a specified subscribe method implementation.
	     - seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
	     - parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
	     - returns: The observable sequence with the specified implementation for the `subscribe` method.
	     */
	    public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
	        return AnonymousObservable(subscribe)
	    }
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 该方法是对 ObservableType 协议的扩展,最外层实现的闭包 subscribe 则作为参数传入 AnonymousObservable,并返回 AnonymousObservable 对象。可观察序列的创建是利用协议拓展功能的 create 方法实现的,里面创建了一个 AnonymousObservable(匿名可观察序列) 。
  • 继续执行追踪到 AnonymousObservable 类,如下:
	final private class AnonymousObservable<Element>: Producer<Element> {
	    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
	
	    let _subscribeHandler: SubscribeHandler
	
	    init(_ subscribeHandler: @escaping SubscribeHandler) {
	        self._subscribeHandler = subscribeHandler
	    }
	
	    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
	        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
	        let subscription = sink.run(self)
	        return (sink: sink, subscription: subscription)
	    }
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • AnonymousObservable 的继承链如下:
	AnonymousObservable -> Product -> ObservableType -> ObservableConvertible

  
 
  • 1
  • 至此为止,序列的创建完成:
    • create 方法的时候创建了一个内部对象 AnonymousObservable;
    • AnonymousObservable 保存了外界的闭包;
    • AnonymousObservable 继承了 Producer 具有非常重要的方法 subscribe。
  • AnonymousObservable 的继承关系如下所示:

在这里插入图片描述

② 订阅序列

  • 根据方法名找到 subscribe 的实现(订阅方法 subscribe 和上面所说的 subscribe 不是同一个方法),该方法是对 ObservableType 的拓展。在方法内部已经出现对观察者的定义,AnonymousObserver 类型的闭包 observer,源码如下:
	extension ObservableType {
	    public func subscribe(onNext: ((E) -> Void)? = nil, ...) -> Disposable {
	           // 此处省略不影响探索的代码
	           ......
	            let observer = AnonymousObserver<E> { event in                
	                switch event {
	                case .next(let value):
	                    onNext?(value)
	                case .error(let error):
	                    if let onError = onError {
	                        onError(error)
	                    }
	                    else {
	                        Hooks.defaultErrorHandler(callStack, error)
	                    }
	                    disposable.dispose()
	                case .completed:
	                    onCompleted?()
	                    disposable.dispose()
	                }
	            }
	            return Disposables.create(
	                self.asObservable().subscribe(observer),
	                disposable
	            )
	    }
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 说明:
    • E 是 Swift 的关联类型,仔细观察可观察序列的继承链源码,应该不难得出:E 就是序列类型,这里就是 String:
	public class Observable<Element> : ObservableType {
	    /// Type of elements in sequence.
	    public typealias E = Element

  
 
  • 1
  • 2
  • 3
    • observer 内部调用的外部(应用层)实现的闭包,由此看出所有信号是由此发出,event 是 observer 的参数,不难看出,observer 闭包也是在其他地方调用,传入带有信号值的 event 参数。
    • observer 的继承链关系如下:

在这里插入图片描述

  • observer 被当做参数传入到 subscribe 中,而 observer 的调用必然是在 subscribe 中实现:
	self.asObservable().subscribe(observer)

  
 
  • 1
  • self.asObservable() 该方法返回本身,保证协议的一致性,方法如下:
	public class Observable<Element> : ObservableType {
	    // 省去代码若干
	    ......
	    public func asObservable() -> Observable<E> {
	        return self
	    }
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • self.asObservable().subscribe(observer) 其实本质就是 self.subscribe(observer),继续断点执行找到 subscribe 方法,通过可观察序列的继承关系,可以非常快速的定位 Producer 订阅代码,正是上面所提到的 Producer 中的方法,方法如下:
	override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
	    // 省去代码若干
	    ......
	    return CurrentThreadScheduler.instance.schedule(()) { _ in
	            let disposer = SinkDisposer()
	            let sinkAndSubscription = self.run(observer, cancel: disposer)
	            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
	
	            return disposer
	        }
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 接着,observer 观察者被传入到 run 中,上面说到该观察者一定会被调用,继续深入:
	let sinkAndSubscription = self.run(observer, cancel: disposer)

  
 
  • 1
  • 继续断点执行,发现 self.run 的调用,调用的是 AnonymousObservable 中的 run 方法,代码如下:
	final private class AnonymousObservable<Element>: Producer<Element> {
	    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
	
	    let _subscribeHandler: SubscribeHandler
	
	    init(_ subscribeHandler: @escaping SubscribeHandler) {
	        self._subscribeHandler = subscribeHandler
	    }
	
	    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
	        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
	        let subscription = sink.run(self)
	        return (sink: sink, subscription: subscription)
	    }
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 此处就是创建序列时的 AnonymousObservable 类,在 run 方法类创建了 sink 对象,在初始化时传入了上面所说的观察者,记住 sink 保存了观察者 observer 闭包,并且调用了 sink.run(self) 方法,传入的是创建时产生的可观察序列 observable 闭包对象,深入 run:
	final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
	    typealias E = O.E
	    typealias Parent = AnonymousObservable<E>
	    // 省去代码若干
	    // 此处向父类Sink初始化了observer对象
	    override init(observer: O, cancel: Cancelable) {
	        super.init(observer: observer, cancel: cancel)
	    }
	    func run(_ parent: Parent) -> Disposable {
	        return parent._subscribeHandler(AnyObserver(self))
	    }
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 此处 parent 由 let subscription = sink.run(self) 传入,self 即为创建序列 create 方法返回的 observable 对象,而 _subscribeHandler 是创建序列所保存的闭包,此时闭包就被调用了,被调用闭包如下:
    let obs = Observable<Any>.create { (observer) -> Disposable in
    // 发送消息
    observer.onNext("我是一条消息")
    return Disposables.create()
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 发送信号的闭包被调用,接下来就是信号发送。订阅的内部流程如下:

在这里插入图片描述

③ 发送信号

  • 继续探索,代码已经执行到业务层,在信号发送闭包中通常调用一下三种方法,用来发送信号,如下:
    • observer.onNext(“发送消息”) 信号发送;
    • observer.onCompleted() 序列完成,完成后序列将被释放;
    • observer.onError(error) 序列出错中断,序列不可继续使用,被释放。
  • 以上三个方法为 ObserverType 的拓展方法(E 表示一个泛型信号量,可表示任意类型的信号):
	extension ObserverType {
	    
	    /// Convenience method equivalent to `on(.next(element: E))`
	    ///
	    /// - parameter element: Next element to send to observer(s)
	    public func onNext(_ element: E) {
	        self.on(.next(element))
	    }
	    
	    /// Convenience method equivalent to `on(.completed)`
	    public func onCompleted() {
	        self.on(.completed)
	    }
	    
	    /// Convenience method equivalent to `on(.error(Swift.Error))`
	    /// - parameter error: Swift.Error to send to observer(s)
	    public func onError(_ error: Swift.Error) {
	        self.on(.error(error))
	    }
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • .next(element) 是一个带泛型参数的枚举,管理了三种类型事件的消息传递,如下:
	public enum Event<Element> {
	    /// Next element is produced.
	    case next(Element)
	
	    /// Sequence terminated with an error.
	    case error(Swift.Error)
	
	    /// Sequence completed successfully.
	    case completed
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • on 是 AnonymousObservableSink 中的方法,代码如下:
	final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
	    typealias E = O.E
	    typealias Parent = AnonymousObservable<E>
	    // 代码省略若干行
	    func on(_ event: Event<E>) {
	        #if DEBUG
	            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
	            defer { self._synchronizationTracker.unregister() }
	        #endif
	        switch event {
	        case .next:
	            if load(&self._isStopped) == 1 {
	                return
	            }
	            self.forwardOn(event)
	        case .error, .completed:
	            if fetchOr(&self._isStopped, 1) == 0 {
	                self.forwardOn(event)
	                self.dispose()
	            }
	        }
	    }
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 内部根据 Event 枚举不同成员变量做不同的信号发送,信号发送调用了 forwardOn 方法。方法实现如下:
	class Sink<O : ObserverType> : Disposable {
	    init(observer: O, cancel: Cancelable) {
	        self._observer = observer
	        self._cancel = cancel
	    }
	
	    final func forwardOn(_ event: Event<O.E>) {
	        if isFlagSet(&self._disposed, 1) {
	            return
	        }
	        self._observer.on(event)
	    }
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 代码有些长只保留了核心部分,Sink 即 AnonymousObservableSink 的父类,_observer 即是订阅中在内部产生的 AnonymousObserver 对象,而该对象调用了 on 方法并传递了信号,on 方法所在位置如下:AnonymousObserver -> ObserverBase -> on()
	class ObserverBase<ElementType> : Disposable, ObserverType {
	    typealias E = ElementType
	    func on(_ event: Event<E>) {
	        switch event {
	        case .next:
	            if load(&self._isStopped) == 0 {
	                self.onCore(event)
	            }
	        case .error, .completed:
	            if fetchOr(&self._isStopped, 1) == 0 {
	                self.onCore(event)
	            }
	        }
	    }
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 在方法内部又掉用了 self.onCore(event),此时该方法在 AnonymousObserver 中实现,代码如下:
	final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
	    typealias Element = ElementType
	    typealias EventHandler = (Event<Element>) -> Void
	    private let _eventHandler : EventHandler
	    init(_ eventHandler: @escaping EventHandler) {
	        self._eventHandler = eventHandler
	    }
	
	    override func onCore(_ event: Event<Element>) {
	        return self._eventHandler(event)
	    }
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 说明:
    • 此处通过 _eventHandler 来发送信号,_eventHandler 是从哪来的呢?逆推 onCore 调用者是 observer,而 observer 是订阅时在内部创建的,被一层层传入到此;
    • 而在 observer 初始化时即被保存为 _eventHandler,_eventHandler 调用即调用了订阅时创建的 observer 闭包,进而信号又通过闭包内的闭包传出到业务层。
	// 订阅序列
	obs.subscribe(onNext: { (val) in
	    print("onNext:\(val)")
	}).disposed(by: disposeBag)

  
 
  • 1
  • 2
  • 3
  • 4
  • 发送信息的内部流程如下:

在这里插入图片描述

  • 响应式编程的创建、订阅、发送、接收等流程就已完成,整个流程会觉得很复杂,但它统一了所有事件的创建与监听,统一思想快速开发,今后的开发流程就是:
	创建序列 -> 订阅序列 -> 发送序列 -> 响应序列

  
 
  • 1
  • sink 在 Rx 中充当管理者,管理序列,观察者和销毁者,将序列发送至观察者,并管理销毁者适时消耗序列,回收资源。

三、总结

  • 分析思维导图:

在这里插入图片描述

  • 核心逻辑流程图:

在这里插入图片描述

文章来源: blog.csdn.net,作者:Serendipity·y,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/Forever_wj/article/details/119515339

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。