본문 바로가기

Computer

RxSwift 구현에 대한 이해 기초 (1)

- RxSwift 6.6.0

기준으로 작성된 글입니다.

 

해당 글은 RxSwift 의 사용법을 다루는 글이 아니라 구현에 대한 개인적인 이해를 돕기위해 작성된 글입니다.

 

 

목차:

  • ObservableConvertibleType (Protocol)
  • Observable (Class)
  • ObservableType (Protocol)
  • Observer (Protocol)
  • AnyObserver (Struct)
  • Event (Enum)
  • EventConvertible (Protocol)
  • Disposable (Protocol)
  • Cancelable (Protocol)
  • DisposeBase (Class)
  • DisposeBag (Class)
  • AtomicInt (Class)

 

ObservableConvertibleType (Protocol)

/// Type that can be converted to observable sequence (`Observable<Element>`).
public protocol ObservableConvertibleType {
    /// Type of elements in sequence.
    associatedtype Element

    /// Converts `self` to `Observable` sequence.
    ///
    /// - returns: Observable sequence that represents `self`.
    func asObservable() -> Observable<Element>
}

Source 1. `ObservableConvertibleType`

 

  • 자신만의 Element 타입을 가지고 있고 `Observable` 로 변환될 수 있음을 표현하는 프로토콜이다.
  • 다른 프로토콜을 상속하지 않는 가장 순수하고 기본이 되는 프로토콜이다.

그렇다면 `Observable` 은 무엇일까?

 

Observable (Class)

/// A type-erased `ObservableType`. 
///
/// It represents a push style sequence.
public class Observable<Element> : ObservableType {
    init() {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
    }
    
    public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        rxAbstractMethod()
    }
    
    public func asObservable() -> Observable<Element> { self }
    
    deinit {
#if TRACE_RESOURCES
        _ = Resources.decrementTotal()
#endif
    }
}

Source 2. `Observable`

 

  • `Observable` 클래스는 주석에도 적혀있다시피 모든 `Observable` 을 나타낼 수 있는 type-erased 클래스이다. 이 클래스를 상속하는 다른 모든 객체들을 이 클래스의 이름으로 사용할 수 있게 해준다.
  • 프로토콜이 아니므로 Generic 을 이용하여 Element 타입을 지정했다.
  • init() 과 deinit() 에는 개발 과정에서 Leak 을 찾는데 도움이 될 수 있도록 리소스를 증가/감소시키고 있지만 동작에는 연관이 없다.
  • 다른 여러 파생된 `Observable` 의 구현을 보면 알겠지만 직접 이 클래스의 인스턴스의 `subscribe()` 함수는 호출될 일이 없으므로 AbstractMethod 로 구현되어 있다.

다음은 이 클래스가 준수하고 있는 프로토콜인 `ObservableType` 을 알아보자.

 

ObservableType (Protocol)

/// Represents a push style sequence.
public protocol ObservableType: ObservableConvertibleType {
    /**
    Subscribes `observer` to receive events for this sequence.
    
    ### Grammar
    
    **Next\* (Error | Completed)?**
    
    * sequences can produce zero or more elements so zero or more `Next` events can be sent to `observer`
    * once an `Error` or `Completed` event is sent, the sequence terminates and can't produce any other elements
    
    It is possible that events are sent from different threads, but no two events can be sent concurrently to
    `observer`.
    
    ### Resource Management
    
    When sequence sends `Complete` or `Error` event all internal resources that compute sequence elements
    will be freed.
    
    To cancel production of sequence elements and free resources immediately, call `dispose` on returned
    subscription.
    
    - returns: Subscription for `observer` that can be used to cancel production of sequence elements and free resources.
    */
    func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}

Source 3. `ObservableType`

 

  • `Observable` 임을 직접적으로 표현하는 프로토콜이다.
  • `Observable` 은 당연히 자기자신이 `Observable` 이므로 `ObservableConvertibleType` 을 상속한다.
  • 이 프로토콜에 `subscribe()` 함수가 존재함으로써 Element 타입이 같은 `ObserverType` 과 `ObservableType` 은 구독 관계가 될 수 있음을 나타낸다.

나중에 나오겠지만 이 프로토콜에 extension 이 굉장히 많이 붙음으로써 `subscribe` 기능 외에 할 수 있는 일?이 매우 많아짐.

 

extension ObservableType {
    
    /// Default implementation of converting `ObservableType` to `Observable`.
    public func asObservable() -> Observable<Element> {
        // temporary workaround
        //return Observable.create(subscribe: self.subscribe)
        Observable.create { o in self.subscribe(o) }
    }
}

Source 4. `ObservableType` 의 `ObservableConvertibleType` 프토토콜에 대한 기본 구현

 

또한 이 프로토콜은 `ObservableConvertibleType` 프로토콜에 대한 기본 구현을 가지고 있다.

지금은 그냥 Concrete Observable 객체를 '어떤 방법'으로 생성하여 반환한다고 알고 있자.

이 기본 구현으로 인해 프로토콜을 준수하는 모든 객체는 `Observable` 로의 변환에 대해 필수적으로 구현을 하지 않아도 된다.

 

그럼 다음으로 구독의 짝이 되는 `Observer` 를 직접적으로 표현하는 `ObserverType` 프로토콜을 보자.

 

ObserverType (Protocol)

/// Supports push-style iteration over an observable sequence.
public protocol ObserverType {
    /// The type of elements in sequence that observer can observe.
    associatedtype Element

    /// Notify observer about sequence event.
    ///
    /// - parameter event: Event that occurred.
    func on(_ event: Event<Element>)
}

/// Convenience API extensions to provide alternate next, error, completed events
extension ObserverType {
    
    /// Convenience method equivalent to `on(.next(element: Element))`
    ///
    /// - parameter element: Next element to send to observer(s)
    public func onNext(_ element: Element) {
        self.on(.next(element))
    }
    
    /// Convenience method equivalent to `on(.completed)`
    public func onCompleted() {
        self.on(.completed)
    }
    
    /// Convenience method equivalent to `on(.error(Swift.Error))`
    /// - parameter error: Swift.Error to send to observer(s)
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}

Source 5. `ObserverType`

 

  • `ObservableType` 과 마찬가지로 자신만의 Element 타입을 가지고 있다.(정확히는 `ObservableConvertibleType` 에 정의되어 있지만) 주석에 따르면 관찰할 수 있는 element 의 타입이라고 함
  • `ObservableConvertibleType` 프로토콜과 마찬가지로 다른 프로토콜을 상속하지 않는 가장 순수하고 기본이 되는 프로토콜이다.
  • `Observer` 객체에게 Event 가 발생했다는 것을 알릴 수 있는 `on(_:)` 함수를 가지고 있다.(+편의 함수까지)
    관찰대상인 Sequence 에서 Event 가 발생했을 때 이 함수가 호출되어 관찰중인 `Observer` 에게 알림이 갈 것이라는 예측을 할 수 있다.

 

AnyObserver (Struct)

/// A type-erased `ObserverType`.
///
/// Forwards operations to an arbitrary underlying observer with the same `Element` type, hiding the specifics of the underlying observer type.
public struct AnyObserver<Element> : ObserverType {
    /// Anonymous event handler type.
    public typealias EventHandler = (Event<Element>) -> Void

    private let observer: EventHandler

    /// Construct an instance whose `on(event)` calls `eventHandler(event)`
    ///
    /// - parameter eventHandler: Event handler that observes sequences events.
    public init(eventHandler: @escaping EventHandler) {
        self.observer = eventHandler
    }
    
    /// Construct an instance whose `on(event)` calls `observer.on(event)`
    ///
    /// - parameter observer: Observer that receives sequence events.
    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        self.observer = observer.on
    }
    
    /// Send `event` to this observer.
    ///
    /// - parameter event: Event instance.
    public func on(_ event: Event<Element>) {
        self.observer(event)
    }

    /// Erases type of observer and returns canonical observer.
    ///
    /// - returns: type erased observer.
    public func asObserver() -> AnyObserver<Element> {
        self
    }
}

Source 6. `AnyObserver`

 

  • `Observable` 클래스와 마찬가지로 모든 `Observer` 를 나타낼 수 있는 type-erased 객체이며, `ObserverType` 프로토콜을 준수하는 가장 기본적인 `Observer` 이다. 놀랍게도 구조체로 구현되어 있다.(몰랐음)
  • 전달된 Event 에 따라서 어떤 일을 수행할 수 있도록 `EventHandler` 타입이 정의되어 있고, 이 타입의 handler 를 저장 할 수 있는 `observer` 프로퍼티를 가지고 있다.
  • `ObserverType` 프로토콜의 `on(_:)` 함수 구현에서는 매개 변수로 전달된 Event 를 다시 매개 변수로 `observer` 프로퍼티에 저장된 핸들러를 실행한다.
  • 2개 있는 생성자는 직접 정의된 EventHandler 를 매개 변수로 받는 생성자와, 다른 `Observer` 의 EventHandler 를 그대로 복사하는 생성자로 나눠져 있다.

따라서 이 객체는 이벤트에 대한 핸들러를 저장하며 생성된 후, 다른 객체가 `on(_:)` 함수를 호출하여(아마도 이벤트가 발생했을 때) 이 객체에게 알림을 주면 저장된 핸들러를 실행하는 책임을 가진다. (`Observable` 에서 Event 가 발생되면 구독하고 있던 `Observer` 의 `on(_:)` 함수가 호출될 것으로 예상)

 

Event (Enum)

/// Represents a sequence event.
///
/// Sequence grammar: 
/// **next\* (error | completed)**
@frozen public enum Event<Element> {
    /// Next element is produced.
    case next(Element)

    /// Sequence terminated with an error.
    case error(Swift.Error)

    /// Sequence completed successfully.
    case completed
}

Source 7. `Event` 열거형의 정의

  • RxSwift 에서 사용되는 이벤트의 종류를 나타내는 열거형.
  • @frozen 으로 선언되어 있는걸로 보아 next, error, completed 로 표현되는 3가지 이벤트의 종류는 ReactiveX 에서 근본적인 의미를 가지고 있는것같음.
extension Event: CustomDebugStringConvertible {
    /// Description of event.
    public var debugDescription: String {
        switch self {
        case .next(let value):
            return "next(\(value))"
        case .error(let error):
            return "error(\(error))"
        case .completed:
            return "completed"
        }
    }
}

extension Event {
    /// Is `completed` or `error` event.
    public var isStopEvent: Bool {
        switch self {
        case .next: return false
        case .error, .completed: return true
        }
    }

    /// If `next` event, returns element value.
    public var element: Element? {
        if case .next(let value) = self {
            return value
        }
        return nil
    }

    /// If `error` event, returns error.
    public var error: Swift.Error? {
        if case .error(let error) = self {
            return error
        }
        return nil
    }

    /// If `completed` event, returns `true`.
    public var isCompleted: Bool {
        if case .completed = self {
            return true
        }
        return false
    }
}

extension Event {
    /// Maps sequence elements using transform. If error happens during the transform, `.error`
    /// will be returned as value.
    public func map<Result>(_ transform: (Element) throws -> Result) -> Event<Result> {
        do {
            switch self {
            case let .next(element):
                return .next(try transform(element))
            case let .error(error):
                return .error(error)
            case .completed:
                return .completed
            }
        }
        catch let e {
            return .error(e)
        }
    }
}

Source 8. `Event` 열거형의 Extensions

 

여러 computed property 가 정의되어 있고 event 를 변환할 수 있는 map 함수가 정의되어 있다.

 

EventConvertible (Protocol)

/// A type that can be converted to `Event<Element>`.
public protocol EventConvertible {
    /// Type of element in event
    associatedtype Element

    /// Event representation of this instance
    var event: Event<Element> { get }
}

extension Event: EventConvertible {
    /// Event representation of this instance
    public var event: Event<Element> { self }
}

Source 9. `EventConvertible` 프로토콜

 

RxSwift 에서 사용되는 Event 타입으로 변환될 수 있음을 표현하는 protocol 이 하나 정의되어있다.

 

 

Disposable (Protocol)

/// Represents a disposable resource.
public protocol Disposable {
    /// Dispose resource.
    func dispose()
}

Source 10. `Disposable`

 

단순히 어떤 객체, 자원 등이 폐기(Dispose)될 수 있음을 표현하는 프로토콜이다. 폐기를 수행해야 하는 `dispose()` 함수를 구현해야 한다.

 

Cancelable (Protocol)

/// Represents disposable resource with state tracking.
public protocol Cancelable : Disposable {
    /// Was resource disposed.
    var isDisposed: Bool { get }
}

Source 11. `Cancelable`

 

바로 위의 `Disposable` protocol 을 상속하여 폐기될 수 있음을 표현함과 동시에 폐기되었는지를 나타내는 상태까지 가질것을 요구하는 protocol 이다.

 

DisposeBase (Class)

/// Base class for all disposables.
public class DisposeBase {
    init() {
#if TRACE_RESOURCES
    _ = Resources.incrementTotal()
#endif
    }
    
    deinit {
#if TRACE_RESOURCES
    _ = Resources.decrementTotal()
#endif
    }
}

Source 12. `DisposeBase`

모든 Disposable 의 Base class, RxSwift 를 이해하는데는 넘어가도 좋을것 같다.

 

DisposeBag (Class)

/**
Thread safe bag that disposes added disposables on `deinit`.

This returns ARC (RAII) like resource management to `RxSwift`.

In case contained disposables need to be disposed, just put a different dispose bag
or create a new one in its place.

    self.existingDisposeBag = DisposeBag()

In case explicit disposal is necessary, there is also `CompositeDisposable`.
*/
public final class DisposeBag: DisposeBase {
    
    private var lock = SpinLock()
    
    // state
    private var disposables = [Disposable]()
    private var isDisposed = false
    
    /// Constructs new empty dispose bag.
    public override init() {
        super.init()
    }

    /// Adds `disposable` to be disposed when dispose bag is being deinited.
    ///
    /// - parameter disposable: Disposable to add.
    public func insert(_ disposable: Disposable) {
        self._insert(disposable)?.dispose()
    }
    
    private func _insert(_ disposable: Disposable) -> Disposable? {
        self.lock.performLocked {
            if self.isDisposed {
                return disposable
            }

            self.disposables.append(disposable)

            return nil
        }
    }

    /// This is internal on purpose, take a look at `CompositeDisposable` instead.
    private func dispose() {
        let oldDisposables = self._dispose()

        for disposable in oldDisposables {
            disposable.dispose()
        }
    }

    private func _dispose() -> [Disposable] {
        self.lock.performLocked {
            let disposables = self.disposables
            
            self.disposables.removeAll(keepingCapacity: false)
            self.isDisposed = true
            
            return disposables
        }
    }
    
    deinit {
        self.dispose()
    }
}

Source 13. `DisposeBag`

  • SpinLock 타입의 lock 을 가지고 있다. 이 SpinLock 은 내부를 타고 들어가보면 `NSRecursiveLock` 으로 typealias 가 지정되어 있다.(정확하게 SpinLock 을 지원하는 Lock API 가 없기 때문인듯)
  • Bag(가방)답게 다른 Disposables 를 가지고 있고 이 가방 또한 isDisposed 상태를 가지고 있다.
  • 다른 Disposable 을 추가하는 `insert` 함수에서는 lock 을 걸고 추가를 수행하며, 가방이 이미 disposed 상태라면 파라미터로 전달된 Disposable 을 바로 dispose(폐기)한다.
  • `dispose` 함수에서는 가지고 있던 disposables 에 대하여 모두 dispose() 를 호출한다. 단 한번만 dispose() 가 호출될 수 있도록 lock 을 사용한다.
  • 가방이 deinit 될 때도 dispose() 를 호출한다.
extension DisposeBag {
    /// Convenience init allows a list of disposables to be gathered for disposal.
    public convenience init(disposing disposables: Disposable...) {
        self.init()
        self.disposables += disposables
    }
	...
}

Source 14. `DisposeBag` 의 extension

DisposeBag 에 대한 여러 `convenience init` 과 편의 methods 가 있다.

 

extension Disposable {
    /// Adds `self` to `bag`
    ///
    /// - parameter bag: `DisposeBag` to add `self` to.
    public func disposed(by bag: DisposeBag) {
        bag.insert(self)
    }
}

Source 15. `Disposable` 의 extension

`disposed(by:)` 함수를 추가함으로써 Disposable 이 바로 폐기되지 않고 저장될 수 있는 기능이 추가됨.

 


 

 

이제 위에서 소개한 코드들만 이용해서 가장 기본적인 RxSwift 예시를 구현해보....???

let disposeBag = DisposeBag()
let observable: Observable<Int> // 초기화는 어떻게...?
let observer = AnyObserver<Int> { event in
    switch event {
    case .next(let element):
        print(element)
    case .error(let error):
        print(error)
    case .completed:
        print("completed!")
    }
}

observable
    .subscribe(observer) // abstract method 인데...? `ObserverType.on(_:)` 언제 호출...? ㅇㅅㅇ
    .disposed(by: disposeBag)

Source 16. 기본적인 예시도 구현 안됨

 

Observable 을 생성할 때 항상 사용하는 `just()`, `of()` 도 없고 `subscribe()` 는 abstract method 로 구현되어 있었다.

그럼 과연 RxSwift 가 동작하려면 무엇이 더 필요할까? (다음편에 계속...)


 

+ RxSwift 의 핵심은 아니지만 이해가 필요한 것

AtomicInt (Class)

final class AtomicInt: NSLock {
    fileprivate var value: Int32
    public init(_ value: Int32 = 0) {
        self.value = value
    }
}

@discardableResult
@inline(__always)
func add(_ this: AtomicInt, _ value: Int32) -> Int32 {
    this.lock()
    let oldValue = this.value
    this.value += value
    this.unlock()
    return oldValue
}

@discardableResult
@inline(__always)
func sub(_ this: AtomicInt, _ value: Int32) -> Int32 {
    this.lock()
    let oldValue = this.value
    this.value -= value
    this.unlock()
    return oldValue
}

@discardableResult
@inline(__always)
func fetchOr(_ this: AtomicInt, _ mask: Int32) -> Int32 {
    this.lock()
    let oldValue = this.value
    this.value |= mask
    this.unlock()
    return oldValue
}

@inline(__always)
func load(_ this: AtomicInt) -> Int32 {
    this.lock()
    let oldValue = this.value
    this.unlock()
    return oldValue
}

@discardableResult
@inline(__always)
func increment(_ this: AtomicInt) -> Int32 {
    add(this, 1)
}

@discardableResult
@inline(__always)
func decrement(_ this: AtomicInt) -> Int32 {
    sub(this, 1)
}

@inline(__always)
func isFlagSet(_ this: AtomicInt, _ mask: Int32) -> Bool {
    (load(this) & mask) != 0
}

Source 17. `AtomicInt`

 

  • 외부에서 직접 접근할 수 없는 Int32 타입의 값을 가지고 있다.
  • 값의 변경과 접근은 클래스 정의 아래 나열된 add, sub, fetchOr, load 등의 함수를 이용해야 한다.
  • 함수가 호출되면 항상 thread 를 lock() 하고 변경, 접근이 이루어지기 때문에 다중 스레드 환경에서 정확한 동작을 예상할 수 있다.
  • fetchOr 함수는 현재값을 가져오면서(fetch) `mask` 파라미터로 전달된 값과 or 연산을 한 결과를 세팅한다.
  • isFlagSet 함수는 `mask` 파라미터로 전달된 값을 flag 로 보고 & 연산을 하여 하나라도 겹치는 bit 가 있으면 true 를 반환한다.

다중 스레드 환경에서 어떠한 값을 세야할 때나 특정 값이 thread-safe 가 보장되어야 할 때 사용할 수 있는 클래스.

RxSwift 에서는 Debug 환경에서의 resourceCounting, 상태를 나타내는 값(isDisposed) 등에 사용됨.(이미 폐기되었는데 Event 가 전달되는 등의 일이 일어나면 안되므로)