首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Swift并发性的去跳和节流任务

使用Swift并发性的去跳和节流任务
EN

Code Review用户
提问于 2021-11-02 15:22:31
回答 1查看 2.2K关注 0票数 4

有许多使用Grand Central Dispatch创建的减缩器和节流实现,甚至还有一个内置于Combine中。我想使用Swift 5.5+中的新Swift Concurrency特性创建一个。

下面是我与来自他人的帮助合并的内容:

代码语言:javascript
复制
actor Limiter {
    enum Policy {
        case throttle
        case debounce
    }

    private let policy: Policy
    private let duration: TimeInterval
    private var task: Task?

    init(policy: Policy, duration: TimeInterval) {
        self.policy = policy
        self.duration = duration
    }

    nonisolated func callAsFunction(task: @escaping () async -> Void) {
        Task {
            switch policy {
            case .throttle:
                await throttle(task: task)
            case .debounce:
                await debounce(task: task)
            }
        }
    }

    private func throttle(task: @escaping () async -> Void) {
        guard self.task?.isCancelled ?? true else { return }

        Task {
            await task()
        }

        self.task = Task {
            try? await sleep()
            self.task?.cancel()
            self.task = nil
        }
    }

    private func debounce(task: @escaping () async -> Void) {
        self.task?.cancel()

        self.task = Task {
            do {
                try await sleep()
                guard !Task.isCancelled else { return }
                await task()
            } catch {
                return
            }
        }
    }

    private func sleep() async throws {
        try await Task.sleep(nanoseconds: UInt64(duration * 1_000_000_000))
    }
}

我创建了相应的测试,但是testThrottlertestDebouncer随机失败,这意味着在某个地方存在一些竞赛条件,或者我在测试中的假设是不正确的:

代码语言:javascript
复制
final class LimiterTests: XCTestCase {
    func testThrottler() async throws {
        // Given
        let promise = expectation(description: "Ensure first task fired")
        let throttler = Limiter(policy: .throttle, duration: 1)
        var value = ""

        var fulfillmentCount = 0
        promise.expectedFulfillmentCount = 2

        func sendToServer(_ input: String) {
            throttler {
                value += input

                // Then
                switch fulfillmentCount {
                case 0:
                    XCTAssertEqual(value, "h")
                case 1:
                    XCTAssertEqual(value, "hwor")
                default:
                    XCTFail()
                }

                promise.fulfill()
                fulfillmentCount += 1
            }
        }

        // When
        sendToServer("h")
        sendToServer("e")
        sendToServer("l")
        sendToServer("l")
        sendToServer("o")

        await sleep(2)

        sendToServer("wor")
        sendToServer("ld")

        wait(for: [promise], timeout: 10)
    }

    func testDebouncer() async throws {
        // Given
        let promise = expectation(description: "Ensure last task fired")
        let limiter = Limiter(policy: .debounce, duration: 1)
        var value = ""

        var fulfillmentCount = 0
        promise.expectedFulfillmentCount = 2

        func sendToServer(_ input: String) {
            limiter {
                value += input

                // Then
                switch fulfillmentCount {
                case 0:
                    XCTAssertEqual(value, "o")
                case 1:
                    XCTAssertEqual(value, "old")
                default:
                    XCTFail()
                }

                promise.fulfill()
                fulfillmentCount += 1
            }
        }

        // When
        sendToServer("h")
        sendToServer("e")
        sendToServer("l")
        sendToServer("l")
        sendToServer("o")

        await sleep(2)

        sendToServer("wor")
        sendToServer("ld")

        wait(for: [promise], timeout: 10)
    }

    func testThrottler2() async throws {
        // Given
        let promise = expectation(description: "Ensure throttle before duration")
        let throttler = Limiter(policy: .throttle, duration: 1)

        var end = Date.now + 1
        promise.expectedFulfillmentCount = 2

        func test() {
            // Then
            XCTAssertLessThan(.now, end)
            promise.fulfill()
        }

        // When
        throttler(task: test)
        throttler(task: test)
        throttler(task: test)
        throttler(task: test)
        throttler(task: test)

        await sleep(2)
        end = .now + 1

        throttler(task: test)
        throttler(task: test)
        throttler(task: test)

        await sleep(2)

        wait(for: [promise], timeout: 10)
    }

    func testDebouncer2() async throws {
        // Given
        let promise = expectation(description: "Ensure debounce after duration")
        let debouncer = Limiter(policy: .debounce, duration: 1)

        var end = Date.now + 1
        promise.expectedFulfillmentCount = 2

        func test() {
            // Then
            XCTAssertGreaterThan(.now, end)
            promise.fulfill()
        }

        // When
        debouncer(task: test)
        debouncer(task: test)
        debouncer(task: test)
        debouncer(task: test)
        debouncer(task: test)

        await sleep(2)
        end = .now + 1

        debouncer(task: test)
        debouncer(task: test)
        debouncer(task: test)

        await sleep(2)

        wait(for: [promise], timeout: 10)
    }

    private func sleep(_ duration: TimeInterval) async {
        await Task.sleep(UInt64(duration * 1_000_000_000))
    }
}

我希望能在Limiter实现中看到我遗漏的任何东西,或者可能有更好的方法来解决Swift Concurrency的问题。

EN

回答 1

Code Review用户

回答已采纳

发布于 2022-01-18 04:19:46

问题是使用nonisolated函数来启动独立于参与者的属性的异步更新。(我很惊讶编译器竟然允许这样做。)这不仅是误导,而且演员也有特色的重入性,你介绍各种意想不到的种族。

在本答复的后半部分,下面我就我将改变你的执行情况提出我的建议。但是,现在,正确的解决方案是使用苹果debouncethrottleSwift异步算法库。

例如:

代码语言:javascript
复制
import AsyncAlgorithms

final class AsyncAlgorithmsTests: XCTestCase {

    // a stream of individual keystrokes with a pause after the first five characters

    func keystrokes() -> AsyncStream {
        AsyncStream { continuation in
            Task {
                continuation.yield("h")
                continuation.yield("e")
                continuation.yield("l")
                continuation.yield("l")
                continuation.yield("o")
                try await Task.sleep(seconds: 2)
                continuation.yield(",")
                continuation.yield(" ")
                continuation.yield("w")
                continuation.yield("o")
                continuation.yield("r")
                continuation.yield("l")
                continuation.yield("d")
                continuation.finish()
            }
        }
    }

    // A stream of the individual keystrokes aggregated together as strings (as we
    // want to search the whole string, not for individual characters)
    //
    // e.g.
    //  h
    //  he
    //  hel
    //  hell
    //  hello
    //  ...
    //
    // As the `keystrokes` sequence has a pause after the fifth character, this will
    // also pause after “hello” and before “hello,”. We can use that pause to test
    // debouncing and throttling

    func strings() -> AsyncStream {
        AsyncStream { continuation in
            Task {
                var string = ""
                for await keystroke in keystrokes() {
                    string += keystroke
                    continuation.yield(string)
                }
                continuation.finish()
            }
        }
    }

    func testDebounce() async throws {
        let debouncedSequence = strings().debounce(for: .seconds(1))

        // usually you'd just loop through the sequence with something like
        //
        // for await string in debouncedSequence {
        //     sendToServer(string)
        // }

        // but I'm just going to directly await the yielded values and test the resulting array

        let result: [String] = await debouncedSequence.reduce(into: []) { $0.append($1) }
        XCTAssertEqual(result, ["hello", "hello, world"])
    }

    func testThrottle() async throws {
        let throttledSequence = strings().throttle(for: .seconds(1))

        let result: [String] = await throttledSequence.reduce(into: []) { $0.append($1) }
        XCTAssertEqual(result, ["h", "hello,"])
    }
}

// MARK: - Task.sleep(seconds:)

extension Task where Success == Never, Failure == Never {

    /// Suspends the current task for at least the given duration
    /// in seconds.
    ///
    /// If the task is canceled before the time ends,
    /// this function throws `CancellationError`.
    ///
    /// This function doesn't block the underlying thread.
    public static func sleep(seconds duration: TimeInterval) async throws {
        try await Task.sleep(nanoseconds: UInt64(duration * .nanosecondsPerSecond))
    }
}

// MARK: - TimeInterval

extension TimeInterval {
    static let nanosecondsPerSecond = TimeInterval(NSEC_PER_SEC)
}

考虑到您是在征求“代码评审”,如果您真的想编写自己的“退出”和“节流”,并且出于某种原因不想使用异步算法,那么下面我的回答是关于您的实现的一些观察:

您可以向Limiter添加一个独立于参与者的函数:

代码语言:javascript
复制
func submit(task: @escaping () async -> Void) {
    switch policy {
    case .throttle: throttle(task: task)
    case .debounce: debounce(task: task)
    }
}

注意,我并没有像看起来那样(至少在Xcode 13.2.1中)使用callAsFunction作为一个独立于参与者的函数,因为这会导致编译器中的分段错误。

无论如何,您可以修改测试以使用submit参与者隔离函数,例如:

代码语言:javascript
复制
// test throttling as user enters “hello, world” into a text field

func testThrottler() async throws {
    // Given
    let promise = expectation(description: "Ensure first task fired")
    let throttler = Limiter(policy: .throttle, duration: 1)

    var fulfillmentCount = 0
    promise.expectedFulfillmentCount = 2
    var value = ""

    func accumulateAndSendToServer(_ input: String) async {
        value += input

        await throttler.submit { [value] in
            // Then
            switch fulfillmentCount {
            case 0:  XCTAssertEqual(value, "h")
            case 1:  XCTAssertEqual(value, "hello,")
            default: XCTFail()
            }

            promise.fulfill()
            fulfillmentCount += 1
        }
    }

    // When
    await accumulateAndSendToServer("h")
    await accumulateAndSendToServer("e")
    await accumulateAndSendToServer("l")
    await accumulateAndSendToServer("l")
    await accumulateAndSendToServer("o")

    try await Task.sleep(seconds: 2)

    await accumulateAndSendToServer(",")
    await accumulateAndSendToServer(" ")
    await accumulateAndSendToServer("w")
    await accumulateAndSendToServer("o")
    await accumulateAndSendToServer("r")
    await accumulateAndSendToServer("l")
    await accumulateAndSendToServer("d")

    wait(for: [promise], timeout: 10)
}

作为旁白:

  1. debounce中,对isCancelled的测试是多余的。如果任务被取消,Task.sleep将引发错误。
  2. 按照惯例,苹果使用operation作为闭包参数的名称,这大概是为了避免与Task实例混淆。
  3. 我会将Task改为Task?。然后,您可以将debounce简化为: func卸载(操作:@ defer ()异步-> Void) { task?.cancel()任务=任务{task?.cancel{ task = nil }尝试等待Task.sleep(秒:持续时间)等待操作()}}
  4. 当对用户输入的网络请求进行节流时,通常需要节流网络请求,而不是用户输入的累积。所以我已经把value += input从节流器/看门器里拉出来了。我还使用了一个[value]捕获列表,以确保我们避免了用户输入积累和网络请求之间的竞争条件。

这是我对Limiter的演绎:

代码语言:javascript
复制
actor Limiter {
    private let policy: Policy
    private let duration: TimeInterval
    private var task: Task?

    init(policy: Policy, duration: TimeInterval) {
        self.policy = policy
        self.duration = duration
    }

    func submit(operation: @escaping () async -> Void) {
        switch policy {
        case .throttle: throttle(operation: operation)
        case .debounce: debounce(operation: operation)
        }
    }
}

// MARK: - Limiter.Policy

extension Limiter {
    enum Policy {
        case throttle
        case debounce
    }
}

// MARK: - Private utility methods

private extension Limiter {
    func throttle(operation: @escaping () async -> Void) {
        guard task == nil else { return }

        task = Task {
            defer { task = nil }
            try await Task.sleep(seconds: duration)
        }

        Task {
            await operation()
        }
    }

    func debounce(operation: @escaping () async -> Void) {
        task?.cancel()

        task = Task {
            defer { task = nil }
            try await Task.sleep(seconds: duration)
            await operation()
        }
    }
}

它使用这些扩展。

代码语言:javascript
复制
// MARK: - Task.sleep(seconds:)

extension Task where Success == Never, Failure == Never {

    /// Suspends the current task for at least the given duration
    /// in seconds.
    ///
    /// If the task is canceled before the time ends,
    /// this function throws `CancellationError`.
    ///
    /// This function doesn't block the underlying thread.
    public static func sleep(seconds duration: TimeInterval) async throws {
        try await Task.sleep(nanoseconds: UInt64(duration * .nanosecondsPerSecond))
    }
}

// MARK: - TimeInterval

extension TimeInterval {
    static let nanosecondsPerSecond = TimeInterval(NSEC_PER_SEC)
}

以及下列测试:

代码语言:javascript
复制
final class LimiterTests: XCTestCase {

    // test throttling as user enters “hello, world” into a text field

    func testThrottler() async throws {
        // Given
        let promise = expectation(description: "Ensure first task fired")
        let throttler = Limiter(policy: .throttle, duration: 1)

        var fulfillmentCount = 0
        promise.expectedFulfillmentCount = 2
        var value = ""

        func accumulateAndSendToServer(_ input: String) async {
            value += input

            await throttler.submit { [value] in
                // Then
                switch fulfillmentCount {
                case 0:  XCTAssertEqual(value, "h")
                case 1:  XCTAssertEqual(value, "hello,")
                default: XCTFail()
                }

                promise.fulfill()
                fulfillmentCount += 1
            }
        }

        // When
        await accumulateAndSendToServer("h")
        await accumulateAndSendToServer("e")
        await accumulateAndSendToServer("l")
        await accumulateAndSendToServer("l")
        await accumulateAndSendToServer("o")

        try await Task.sleep(seconds: 2)

        await accumulateAndSendToServer(",")
        await accumulateAndSendToServer(" ")
        await accumulateAndSendToServer("w")
        await accumulateAndSendToServer("o")
        await accumulateAndSendToServer("r")
        await accumulateAndSendToServer("l")
        await accumulateAndSendToServer("d")

        wait(for: [promise], timeout: 10)
    }

    // test debouncing as user enters “hello, world” into a text field

    func testDebouncer() async throws {
        // Given
        let promise = expectation(description: "Ensure last task fired")
        let debouncer = Limiter(policy: .debounce, duration: 1)
        var value = ""

        var fulfillmentCount = 0
        promise.expectedFulfillmentCount = 2

        func accumulateAndSendToServer(_ input: String) async {
            value += input

            await debouncer.submit { [value] in
                // Then
                switch fulfillmentCount {
                case 0:  XCTAssertEqual(value, "hello")
                case 1:  XCTAssertEqual(value, "hello, world")
                default: XCTFail()
                }

                promise.fulfill()
                fulfillmentCount += 1
            }
        }

        // When
        await accumulateAndSendToServer("h")
        await accumulateAndSendToServer("e")
        await accumulateAndSendToServer("l")
        await accumulateAndSendToServer("l")
        await accumulateAndSendToServer("o")

        try await Task.sleep(seconds: 2)

        await accumulateAndSendToServer(",")
        await accumulateAndSendToServer(" ")
        await accumulateAndSendToServer("w")
        await accumulateAndSendToServer("o")
        await accumulateAndSendToServer("r")
        await accumulateAndSendToServer("l")
        await accumulateAndSendToServer("d")

        wait(for: [promise], timeout: 10)
    }

    func testThrottler2() async throws {
        // Given
        let promise = expectation(description: "Ensure throttle before duration")
        let throttler = Limiter(policy: .throttle, duration: 1)

        var end = Date.now + 1
        promise.expectedFulfillmentCount = 2

        func test() {
            // Then
            XCTAssertLessThanOrEqual(.now, end)
            promise.fulfill()
        }

        // When
        await throttler.submit(operation: test)
        await throttler.submit(operation: test)
        await throttler.submit(operation: test)
        await throttler.submit(operation: test)
        await throttler.submit(operation: test)

        try await Task.sleep(seconds: 2)
        end = .now + 1

        await throttler.submit(operation: test)
        await throttler.submit(operation: test)
        await throttler.submit(operation: test)

        try await Task.sleep(seconds: 2)

        wait(for: [promise], timeout: 10)
    }

    func testDebouncer2() async throws {
        // Given
        let promise = expectation(description: "Ensure debounce after duration")
        let debouncer = Limiter(policy: .debounce, duration: 1)

        var end = Date.now + 1
        promise.expectedFulfillmentCount = 2

        func test() {
            // Then
            XCTAssertGreaterThanOrEqual(.now, end)
            promise.fulfill()
        }

        // When
        await debouncer.submit(operation: test)
        await debouncer.submit(operation: test)
        await debouncer.submit(operation: test)
        await debouncer.submit(operation: test)
        await debouncer.submit(operation: test)

        try await Task.sleep(seconds: 2)
        end = .now + 1

        await debouncer.submit(operation: test)
        await debouncer.submit(operation: test)
        await debouncer.submit(operation: test)

        try await Task.sleep(seconds: 2)

        wait(for: [promise], timeout: 10)
    }
}
票数 7
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/269642

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档