RxSwift之深入解析场景特征序列的使用和底层实现

举报
Serendipity·y 发表于 2022/02/16 22:19:53 2022/02/16
【摘要】 一、引序 任何序列都可以用 Observable 描述,创建序列 -> 订阅序列 -> 信号发送 -> 信号接收: Observable<Any>.create { (...

一、引序

  • 任何序列都可以用 Observable 描述,创建序列 -> 订阅序列 -> 信号发送 -> 信号接收:
	Observable<Any>.create { (observer) -> Disposable in
	    observer.onNext("信号1")
	    return Disposables.create()
	}.subscribe(onNext: { (val) in
	    print("信号接收区:\(val)")
	}).disposed(by: disposeBag)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • Observable 是通用序列的描述符,调用 .onNext,.onError,onCompleted 来发送信号,通用性强,但针对特殊需求可能会觉得繁琐,因此 RxSwift 还提供了一组特征序列,是 Observable 序列的变种,它能够更准确的描述序列,即 Single、Completable、Maybe、Driver、Signal、ControlEvent,binder 等。

二、Single

① Single 的使用

  • Single 单元素序列,信号只发送一次,响应信号或错误信号。
  • Single 是 Observable 的另外一个版本,不像 Observable 可以发出多个元素,它要么只能发出一个元素,要么产生一个 error 事件。
  • Single 发出一个元素,或一个 error 事件,不会共享附加作用。
	let singleOB = Single<Any>.create { (single) -> Disposable in
        print("singleOB 是否共享")
        single(.success("good"))
        single(.error(NSError.init(domain: "com.ydw.cn",
        code: 10086, userInfo: nil)))
        return Disposables.create()
    }

    singleOB.subscribe { (reslut) in
        print("订阅:\(reslut)")
        }.disposed(by: disposeBag)
        
	// 输出结果
	singleOB 是否共享
	订阅:success("good")

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 说明:
    • sinngle(.success(data)) -> onSuccess 发送响应元素到成功观察者;
    • sinngle(.error(error)) -> error 发送错误元素到错误观察者。
  • 响应元素和错误元素分开处理,此时可以联想到应用中的网络请求,成功数据用来渲染,错误数则据弹出提示框。

② Single 的源码分析

  • Single 定义:定位到 Single.swift 文件,可以看到 Single 是 PrimitiveSequence 结构体类型的别名,SingleEvent 是事件枚举,有 success 和 error 两个成员变量,如下所示:
	// Sequence containing exactly 1 element
	public enum SingleTrait { }
	// Represents a push style sequence containing 1 element.
	public typealias Single<Element> = PrimitiveSequence<SingleTrait, Element>
	
	public enum SingleEvent<Element> {
	    // One and only sequence element is produced. (underlying observable sequence emits: `.next(Element)`, `.completed`)
	    case success(Element)
	    
	    // Sequence terminated with an error. (underlying observable sequence emits: `.error(Error)`)
	    case error(Swift.Error)
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • Single 的 create 创建序列(1⃣️),如下所示:
	extension PrimitiveSequenceType where TraitType == SingleTrait {
	    public typealias SingleObserver = (SingleEvent<ElementType>) -> Void
	    // 此处省略部分代码
	    ......
	    public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single<ElementType> {
	        let source = Observable<ElementType>.create { observer in
	            return subscribe { event in
	                switch event {
	                case .success(let element):
	                    observer.on(.next(element))
	                    observer.on(.completed)
	                case .error(let error):
	                    observer.on(.error(error))
	                }
	            }
	        }       
	        return PrimitiveSequence(raw: source)
	    }
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 可以看到参数是一个带 Disposable 类型返回值的闭包,交由外部(业务层)实现,内部调用向外传入一个 SingleObserver 闭包,以上写法不太好理解,可以换一种写法:
    • 内部实现一个闭包 block,用来接收外界传入的 SingleEvent 信号,接着做进一步的信号发送;
    • 调用外部实现的闭包方法,将内部实现的闭包 block 发送出去,起连接作用;
    • 创建 PrimitiveSequence 对象并保存 Observable 序列对象 source,返回 PrimitiveSequence 对象;
	public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single<ElementType> {
	    let source = Observable<ElementType>.create { observer in
	        // 内部实现一个闭包,用来接收外界传入的SingleEvent信号,接着做进一步的信号发送
	        let block = { (event:SingleEvent<ElementType>) -> Void in
	            switch event {
	            case .success(let element):
	                observer.on(.next(element))
	                observer.on(.completed)
	            case .error(let error):
	                observer.on(.error(error))
	            }
	        }
	        // 调用外部实现的闭包方法,向外部发送内部实现的闭包方法做连接作用
	        let disposable = subscribe(block) // 返回值Disposable对象 
	        return disposable
	    }
	    return PrimitiveSequence(raw: source) // 创建PrimitiveSequence对象并保存Observable序列对象
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 在 create 方法内部实际上实现了一个 Observable 序列,由此可见 Single 序列是对 Observable 序列的封装,Disposable 对象通过闭包交由业务层创建,Single 序列在实现上,方式方法与 Observable 保持一致。Observable 序列的信号传递流程,请参考我的博客:RxSwift之深入解析核心逻辑Observable的底层原理
  • 订阅序列,也是在同 PrimitiveSequenceType 扩展中定义(2⃣️),如下:
	public func subscribe(onSuccess: ((ElementType) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil) -> Disposable {
	    #if DEBUG
	         let callStack = Hooks.recordCallStackOnError ? Thread.callStackSymbols : []
	    #else
	        let callStack = [String]()
	    #endif
	
	    return self.primitiveSequence.subscribe { event in
	        switch event {
	        case .success(let element):
	            onSuccess?(element)
	        case .error(let error):
	            if let onError = onError {
	                onError(error)
	            } else {
	                Hooks.defaultErrorHandler(callStack, error)
	            }
	        }
	    }
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 方法中先调用了 self.primitiveSequence 方法,返回了 self,方法是在遵循 PrimitiveSequenceType 协议的 PrimitiveSequence 的扩展中,为了保证协议的一致性,如下所示:
	extension PrimitiveSequence: PrimitiveSequenceType {
	    // Additional constraints
	    public typealias TraitType = Trait
	    // Sequence element type
	    public typealias ElementType = Element
	
	    // Converts `self` to primitive sequence.
	    //
	    // - returns: Observable sequence that represents `self`.
	    public var primitiveSequence: PrimitiveSequence<TraitType, ElementType> {
	        return self
	    }
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 接着调用另一个 subscribe 方法(3⃣️),self.primitiveSequence -> asObservable() -> subscribe,此处截断了 completed 信号的向上传递,因此 Single 序列只能收到响应信号和错误信号,如下:
	public func subscribe(_ observer: @escaping (SingleEvent<ElementType>) -> Void) -> Disposable {
	    var stopped = false
	    return self.primitiveSequence.asObservable().subscribe { event in
	        if stopped { return }
	        stopped = true
	        
	        switch event {
	        case .next(let element):
	            observer(.success(element))
	        case .error(let error):
	            observer(.error(error))
	        case .completed:
	            rxFatalErrorInDebug("Singles can't emit a completion event")
	        }
	    }
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • subscribe 方法也调用了 self.primitiveSequence 方法,接着调用 asObservable() 方法,查看代码发现此处是为了获取 source 对象,即 Observable 可观察序列。再查看 subscribe 的方法(4⃣️):
	public func subscribe(_ on: @escaping (Event<E>) -> Void)
	    -> Disposable {
	        let observer = AnonymousObserver { e in
	            on(e)
	        }
	        return self.asObservable().subscribe(observer)
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 代码创建了一个观察者,当前观察者将会收到发送过来的消息,并由此通过闭包一层层传到业务层:4️⃣ -> 3️⃣ -> 2️⃣ -> 1️⃣ ->业务层;
  • 当前 self 指向的是1️⃣处创建并保存的 Observable 类型的 source 对象,因此该处 subscribe 所调用的即是 Produce 类中的 subscribe 方法,在方法内部创建了 sink 对象,来触发创建序列时实现的闭包,即代码1️⃣处所 create 后的闭包;
  • 此时就到了业务层,通过 create 内部实现的闭包 single 向内部发送消息,再有 observer 调用 on 来向观察者发送信号;
  • 信号发送不做赘述,最终会到达4️⃣处代码的观察者,此时再由闭包一层层向上传递,直到业务层的监听闭包。
  • 序列的产生,订阅,发送,接收还是由 Observable 来实现的,Single 只是对 Observable 做了封装,去除了 onCompleted 的消息监听及消息发送。

三、Completable

  • Completable 是 Observable 的另外一个版本,不像 Observable 可以发出多个元素,它要么只能产生一个 completed 事件,要么产生一个 error 事件:
    • 发出零个元素;
    • 发出一个 completed 元素,或一个 error 事件;
    • 不会共享附加作用;
  • Completable 只能产生 completed 事件和 error 事件,没有序列元素值产生。适用于只关心任务是否完成,而不需要在意任务返回值的情况,它和 Observable 有点相似。
	let completableOB = Completable.create { (completable) -> Disposable in
        print("completableOB 是否共享")
        completable(.completed)
        completable(.error(NSError.init(domain: "com.ydw.cn",
        code: 10086, userInfo: nil)))
        return Disposables.create()
    }

    completableOB.subscribe { (reslut) in
        print("订阅:\(reslut)")
        }.disposed(by: disposeBag)

	// 输出结果
	completableOB 是否共享
	订阅:completed

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 在 Competable.swift 下,在 PrimitiveSequenceType 扩展中实现了序列的创建和订阅,即信号转发。Competable 的定义如下:
	// Sequence containing 0 elements
	public enum CompletableTrait { }
	// Represents a push style sequence containing 0 elements.
	public typealias Completable = PrimitiveSequence<CompletableTrait, Swift.Never>
	
	public enum CompletableEvent {
	    // Sequence terminated with an error. (underlying observable sequence emits: `.error(Error)`)
	    case error(Swift.Error)
	    
	    // Sequence completed successfully.
	    case completed
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 同样,Completable 类也是 PrimitiveSequence 的别名,并声明一个枚举包含,error 和 completed 成员变量,限定了事件产生类型,都是对 Observable 序列的封装,和 Single 一致,只是在订阅阶段对 .next 事件做了拦截。

四、Maybe

  • Maybe 是 Observable 的另外一个版本,它介于 Single 和 Completable 之间,它要么只能发出一个元素,要么产生一个completed 事件,要么产生一个 error 事件。
    • 发出一个元素或者一个 completed 事件或者一个 error 事件;
    • 不会共享附加作用。
  • 如果可能需要发出一个元素,又可能不需要发出时,就可以使用 Maybe。
	let maybeOB = Maybe<Any>.create { (maybe) -> Disposable in
        print("maybe 是否共享")
        maybe(.success("A"))
        maybe(.completed)
        maybe(.error(NSError.init(domain: "com.ydw.cn",
        code: 10086, userInfo: nil)))
        return Disposables.create()
    }

    maybeOB.subscribe { (reslut) in
        print("订阅:\(reslut)")
        }.disposed(by: disposeBag)

	// 输出结果
	maybe 是否共享
	订阅:success("A")

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

五、Driver

① Driver 使用

  • Driver 是一个精心准备的特征序列,它主要是对需要在 UI 上做出响应的序列进行了封装,简化 UI 层的代码。不过如果遇到的序列具有以下特征,也可以使用它:
    • Driver 序列不会产生 error 事件;
    • 在主线程中监听(一定要在 MainScheduler 中),会向新订阅者发送上次发送过的元素,简化 UI 层的代码;
    • 共享序列。
	let result = self.textFiled.rx.text.orEmpty
        .asDriver() // 普通序列转化
        .throttle(.milliseconds(500))
        .flatMap {
            self.dealwithData(inputText: $0)
                .asDriver(onErrorJustReturn: "检测到错误")
    }

    // 绑定到label上面
    result.map { "长度:\(($0 as! String).count)" }
        .drive(self.label.rx.text)
        .disposed(by: disposeBag)
    // 绑定到button上面
    result.map { ($0 as! String) }
        .drive(self.btn.rx.title())
        .disposed(by: disposeBag)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 说明:
    • asDriver() 将序列转换为 Driver 序列;
    • map 重新组合并生成新的序列;
    • driver 将元素在主线程中绑定到 label 和 button 上。
  • 相比非 driver 下的代码实现,Driver 序列省去了线程的设置,share 数据共享设置。

② Driver 的源码分析

  • 查看 asDriver() 方法,是 ControlProperty 的扩展方法,返回了一个Driver类:
extension ControlProperty {
    // Converts `ControlProperty` to `Driver` trait.
    //
    // `ControlProperty` already can't fail, so no special case needs to be handled.
    public func asDriver() -> Driver<E> {
        return self.asDriver { _ -> Driver<E> in
            #if DEBUG
                rxFatalError("Somehow driver received error from a source that shouldn't fail.")
            #else
                return Driver.empty()
            #endif
        }
    }
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • Driver 是 SharedSequence 的别名,用来描述不同类型的序列,最后又调用了 asDriver 方法,而该方法在 ObservableConvertibleType 的扩展中,一直追踪会发现很多类都是继承自 ObservableConvertibleType 下:
	extension ObservableConvertibleType {
	    public func asDriver(onErrorRecover: @escaping (_ error: Swift.Error) -> Driver<E>) -> Driver<E> {
	        let source = self
	            .asObservable()
	            .observeOn(DriverSharingStrategy.scheduler)
	            .catchError { error in
	                onErrorRecover(error).asObservable()
	            }
	        return Driver(source)
	    }
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 如上,代码也设置了 observerOn 方法,来指定线程,继续深入能够发现 DriverSharingStrategy.scheduler 内部指定的就是主线程,印证了上面所说的 Driver 的执行是在主线程的,最后初始化一个 Driver 对象返回,看一下初始化过程,及对 SharedSequence 类的初始化,代码如下:
	public struct SharedSequence<S: SharingStrategyProtocol, Element> : SharedSequenceConvertibleType {
	    public typealias E = Element
	    public typealias SharingStrategy = S
	
	    let _source: Observable<E>
	
	    init(_ source: Observable<E>) {
	        self._source = S.share(source)
	    }
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 此处调用了 share 并传入了可观察序列,感觉好像在哪见过,此处猜想它是用来共享序列的,使用 lldb:po S.self 查找 share 所在位置:RxCocoa.DriverSharingStrategy,cmd + 点击进入,代码如下:
	public typealias Driver<E> = SharedSequence<DriverSharingStrategy, E>
	
	public struct DriverSharingStrategy: SharingStrategyProtocol {
	    public static var scheduler: SchedulerType { return SharingScheduler.make() }
	    public static func share<E>(_ source: Observable<E>) -> Observable<E> {
	        return source.share(replay: 1, scope: .whileConnected)
	    }
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 在此处传入的序列用来掉用 share,使得当前序列作为共享序列,即 driver 序列为共享序列。

六、Signal

  • Signal 和 Driver 相似,唯一的区别是,Driver 会对新观察者回放(重新发送)上一个元素,而 Signal 不会对新观察者回放上一个元素。
    • 不会产生 error 事件;
    • 一定在 MainScheduler 监听(主线程监听);
    • 会共享附加作用。
	let result = self.textFiled.rx.text.orEmpty
	    .asSignal(onErrorJustReturn: "没有值") // 普通序列转化为signle
	    .throttle(.milliseconds(500))
	    .flatMap {
	        self.dealwithData(inputText: $0)
	            .asSignal(onErrorJustReturn: "检测到错误")
	}
	
	// 绑定到label上面
	result.map { "长度:\(($0 as! String).count)" }
	    .emit(to: self.label.rx.text)
	    .disposed(by: disposeBag)
	// 绑定到button上面
	result.map { ($0 as! String) }
	    .emit(to: self.btn.rx.title())
	    .disposed(by: disposeBag)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 说明:
    • catchErrorJustReturn(onErrorJustReturn) 保证了无错误信号;
    • observeOn(DriverSharingStrategy.scheduler) 保证了调度环境在主线程;
    • source.share(scope: .whileConnected) 序列状态共享。

七、ControlEvent

  • ControlEvent: 专门用于描述 UI 控件所产生的事件,它具有以下特征:
    • 不会产生 error 事件;
    • 一定在 MainScheduler 订阅(主线程订阅);
    • 一定在 MainScheduler 监听(主线程监听);
    • 会共享附加作用。
	let controlEventOB = button.rx.controlEvent(.touchUpInside)
	
	controlEventOB.subscribe { (reslut) in
	        print("订阅:\(reslut) \n \(Thread.current)")
	    }.disposed(by: disposeBag)
	
	controlEventOB.subscribe { (reslut) in
	        print("订阅:\(reslut) \n \(Thread.current)")
	    }.disposed(by: self.disposeBag)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 监听点击事件:
	let event : ControlEvent<Void> = button.rx.tap.asControlEvent()
	event.bind(onNext: {
	    print("controllerEvent")
	}).disposed(by: disposeBag)

  
 
  • 1
  • 2
  • 3
  • 4
  • 监听点击事件并绑定数据到其他 UI:
	let event : ControlEvent<Void> = button.rx.tap.asControlEvent()
	event.map{"yahibo"}.bind(to: label1.rx.text).disposed(by: disposeBag)

  
 
  • 1
  • 2

八、binder

  • 不会处理错误事件;
  • 确保绑定都是在给定 Scheduler 上执行(默认 MainScheduler);
  • 一旦产生错误事件,在调试环境下将执行 fatalError,在发布环境下将打印错误信息。
	let observer : AnyObserver<Bool> = AnyObserver { (event) in
	    print("observer当前线程:\(Thread.current)")
	    switch event{
	    case .next(let isHidden) :
	        print("label的状态")
	        self.label.isHidden = isHidden
	    case .error(let error) :
	        print("\(error)")
	    case .completed :
	        print("完成")
	    }
	}
	
	let binder = Binder<Bool>(self.label) { (lab, isHidden) in
	    print("Binder当前线程:\(Thread.current)")
	    lab.isHidden = isHidden
	}
	
	let observable = Observable<Bool>.create { (ob) -> Disposable in
	    ob.onNext(true)
	    ob.onError(NSError.init(domain: "com.ydw.cn", code: 10086, userInfo: nil))
	    ob.onCompleted()
	    return Disposables.create()
	    }.observeOn(ConcurrentDispatchQueueScheduler(queue: DispatchQueue.global()))
	
	// observable.bind(to: observer).disposed(by: self.disposeBag)
	observable.bind(to: binder).disposed(by: self.disposeBag)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • RxSwift 里面也封装了很多的 binder,开发使用很直接:
	extension Reactive where Base: UIButton {
	    
	    // Reactive wrapper for `setTitle(_:for:)`
	    public func title(for controlState: UIControl.State = []) -> Binder<String?> {
	        return Binder(self.base) { button, title -> Void in
	            button.setTitle(title, for: controlState)
	        }
	    }
	
	    // Reactive wrapper for `setImage(_:for:)`
	    public func image(for controlState: UIControl.State = []) -> Binder<UIImage?> {
	        return Binder(self.base) { button, image -> Void in
	            button.setImage(image, for: controlState)
	        }
	    }
	
	    // Reactive wrapper for `setBackgroundImage(_:for:)`
	    public func backgroundImage(for controlState: UIControl.State = []) -> Binder<UIImage?> {
	        return Binder(self.base) { button, image -> Void in
	            button.setBackgroundImage(image, for: controlState)
	        }
	    }
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

文章来源: blog.csdn.net,作者:Serendipity·y,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/Forever_wj/article/details/119610302

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。