public protocol Publisher {
// 1
associatedtype Output
// 2
associatedtype failure: Error
// 4
func receive<S>(subscriber: S) where S: Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}
extension Publisher {
// 3
public func subscribe<S>(_ subscriber: S) where S: Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}Publisher 产生数据的类型Publisher可能产生错误的类型,如果能够保证一定不出错,使用Never
subscribe(-:)来接受消息
subscribe(-:)需要调用receive(subscriber:)来将订阅者订阅到publisher。
public protocol Subscriber: CustomCombineIdentifierConvertible {
// 1
associatedtype Input
// 2
associatedtype Failure: Error
// 3
func receive(subscription: Subscription)
// 4
func receive(_ input: Self.Input) -> Subscribers.Demand
// 5
func receive(completion: Subscribers.Completion<Self.Failure>)
}Publisher不会出错,则是Never。
Publisher会调用receive(subscription:)来传递 subscription
Publisher调用receive(_:)来传值。
Publisher调用receive(completion:),在结束或者出错时候。
public protocol Subscription: Cancellable, CustomCombineIdentifierconvertible {
func request(_ demand: Subscribers.Demand)
}订阅者调用request(_:_)来告知愿意接受更多的值,从一个最大的值,到无限制。
subscription.request背压 backpressure mmanagementSubscriber.receive(_:_)可以在每次接收到值时候调整,表示后续要接受的数量
Subscriber.receive(_:_)调整的值是增量的,也就是说不能是负数(否则会返回fatalError)。
// 1
let publisher = (1...6).publisher
// 2
final class IntSubscriber: Subscriber {
// 3
typealias Input = Int
typealias Failuse = Never
// 4
func reveive(subscription: Subscription) {
subscription.request(.max(3))
}
// 5
func receive(_ input: Int) -> Subscribers.Demand {
print("Recevied value", input)
// return .unlimited
return .none // 等于 .max(0)
}
// 6
func receive(completion: Subscribers.Completion<Never>) {
print("Received completion", completion)
}
}// 1
var subscriptions = Set<AnyCancellable>()
// 2
let subject = CurrentValueSubject<Int, Never>(0)
// 3
subject
.sink(receiveValue: {print($0)})
.store(in: &subscriptions) // 4 subscriptions中,通过inout参数引用而不是拷贝的方式来传递。subject.value的方式来获取值
subject.value = 3赋值,或者subject.send(1)方式来发送事件
subject.send(completion: .finished)来结束事件。
// 1
let subject = PassthroughSubject<Int, Never>
// 2
let publisher = subject.eraseToAnyPublisher()
// 3
publisher
.sink(receiveValue {print($0)})
.store(in: &subscriptions)
// 4
subject.send(0)anyPublisher<Int, Never>,后续的就不会知道具体的类型和细节send方法,