有许多使用Grand Central Dispatch创建的减缩器和节流实现,甚至还有一个内置于Combine中。我想使用Swift 5.5+中的新Swift Concurrency特性创建一个。
下面是我与来自他人的帮助合并的内容:
actor Limiter {
enum Policy {
case throttle
case debounce
}
private let policy: Policy
private let duration: TimeInterval
private var task: Task?
init(policy: Policy, duration: TimeInterval) {
self.policy = policy
self.duration = duration
}
nonisolated func callAsFunction(task: @escaping () async -> Void) {
Task {
switch policy {
case .throttle:
await throttle(task: task)
case .debounce:
await debounce(task: task)
}
}
}
private func throttle(task: @escaping () async -> Void) {
guard self.task?.isCancelled ?? true else { return }
Task {
await task()
}
self.task = Task {
try? await sleep()
self.task?.cancel()
self.task = nil
}
}
private func debounce(task: @escaping () async -> Void) {
self.task?.cancel()
self.task = Task {
do {
try await sleep()
guard !Task.isCancelled else { return }
await task()
} catch {
return
}
}
}
private func sleep() async throws {
try await Task.sleep(nanoseconds: UInt64(duration * 1_000_000_000))
}
}我创建了相应的测试,但是testThrottler和testDebouncer随机失败,这意味着在某个地方存在一些竞赛条件,或者我在测试中的假设是不正确的:
final class LimiterTests: XCTestCase {
func testThrottler() async throws {
// Given
let promise = expectation(description: "Ensure first task fired")
let throttler = Limiter(policy: .throttle, duration: 1)
var value = ""
var fulfillmentCount = 0
promise.expectedFulfillmentCount = 2
func sendToServer(_ input: String) {
throttler {
value += input
// Then
switch fulfillmentCount {
case 0:
XCTAssertEqual(value, "h")
case 1:
XCTAssertEqual(value, "hwor")
default:
XCTFail()
}
promise.fulfill()
fulfillmentCount += 1
}
}
// When
sendToServer("h")
sendToServer("e")
sendToServer("l")
sendToServer("l")
sendToServer("o")
await sleep(2)
sendToServer("wor")
sendToServer("ld")
wait(for: [promise], timeout: 10)
}
func testDebouncer() async throws {
// Given
let promise = expectation(description: "Ensure last task fired")
let limiter = Limiter(policy: .debounce, duration: 1)
var value = ""
var fulfillmentCount = 0
promise.expectedFulfillmentCount = 2
func sendToServer(_ input: String) {
limiter {
value += input
// Then
switch fulfillmentCount {
case 0:
XCTAssertEqual(value, "o")
case 1:
XCTAssertEqual(value, "old")
default:
XCTFail()
}
promise.fulfill()
fulfillmentCount += 1
}
}
// When
sendToServer("h")
sendToServer("e")
sendToServer("l")
sendToServer("l")
sendToServer("o")
await sleep(2)
sendToServer("wor")
sendToServer("ld")
wait(for: [promise], timeout: 10)
}
func testThrottler2() async throws {
// Given
let promise = expectation(description: "Ensure throttle before duration")
let throttler = Limiter(policy: .throttle, duration: 1)
var end = Date.now + 1
promise.expectedFulfillmentCount = 2
func test() {
// Then
XCTAssertLessThan(.now, end)
promise.fulfill()
}
// When
throttler(task: test)
throttler(task: test)
throttler(task: test)
throttler(task: test)
throttler(task: test)
await sleep(2)
end = .now + 1
throttler(task: test)
throttler(task: test)
throttler(task: test)
await sleep(2)
wait(for: [promise], timeout: 10)
}
func testDebouncer2() async throws {
// Given
let promise = expectation(description: "Ensure debounce after duration")
let debouncer = Limiter(policy: .debounce, duration: 1)
var end = Date.now + 1
promise.expectedFulfillmentCount = 2
func test() {
// Then
XCTAssertGreaterThan(.now, end)
promise.fulfill()
}
// When
debouncer(task: test)
debouncer(task: test)
debouncer(task: test)
debouncer(task: test)
debouncer(task: test)
await sleep(2)
end = .now + 1
debouncer(task: test)
debouncer(task: test)
debouncer(task: test)
await sleep(2)
wait(for: [promise], timeout: 10)
}
private func sleep(_ duration: TimeInterval) async {
await Task.sleep(UInt64(duration * 1_000_000_000))
}
}我希望能在Limiter实现中看到我遗漏的任何东西,或者可能有更好的方法来解决Swift Concurrency的问题。
发布于 2022-01-18 04:19:46
问题是使用nonisolated函数来启动独立于参与者的属性的异步更新。(我很惊讶编译器竟然允许这样做。)这不仅是误导,而且演员也有特色的重入性,你介绍各种意想不到的种族。
在本答复的后半部分,下面我就我将改变你的执行情况提出我的建议。但是,现在,正确的解决方案是使用苹果debounce和throttle的Swift异步算法库。
例如:
import AsyncAlgorithms
final class AsyncAlgorithmsTests: XCTestCase {
// a stream of individual keystrokes with a pause after the first five characters
func keystrokes() -> AsyncStream {
AsyncStream { continuation in
Task {
continuation.yield("h")
continuation.yield("e")
continuation.yield("l")
continuation.yield("l")
continuation.yield("o")
try await Task.sleep(seconds: 2)
continuation.yield(",")
continuation.yield(" ")
continuation.yield("w")
continuation.yield("o")
continuation.yield("r")
continuation.yield("l")
continuation.yield("d")
continuation.finish()
}
}
}
// A stream of the individual keystrokes aggregated together as strings (as we
// want to search the whole string, not for individual characters)
//
// e.g.
// h
// he
// hel
// hell
// hello
// ...
//
// As the `keystrokes` sequence has a pause after the fifth character, this will
// also pause after “hello” and before “hello,”. We can use that pause to test
// debouncing and throttling
func strings() -> AsyncStream {
AsyncStream { continuation in
Task {
var string = ""
for await keystroke in keystrokes() {
string += keystroke
continuation.yield(string)
}
continuation.finish()
}
}
}
func testDebounce() async throws {
let debouncedSequence = strings().debounce(for: .seconds(1))
// usually you'd just loop through the sequence with something like
//
// for await string in debouncedSequence {
// sendToServer(string)
// }
// but I'm just going to directly await the yielded values and test the resulting array
let result: [String] = await debouncedSequence.reduce(into: []) { $0.append($1) }
XCTAssertEqual(result, ["hello", "hello, world"])
}
func testThrottle() async throws {
let throttledSequence = strings().throttle(for: .seconds(1))
let result: [String] = await throttledSequence.reduce(into: []) { $0.append($1) }
XCTAssertEqual(result, ["h", "hello,"])
}
}
// MARK: - Task.sleep(seconds:)
extension Task where Success == Never, Failure == Never {
/// Suspends the current task for at least the given duration
/// in seconds.
///
/// If the task is canceled before the time ends,
/// this function throws `CancellationError`.
///
/// This function doesn't block the underlying thread.
public static func sleep(seconds duration: TimeInterval) async throws {
try await Task.sleep(nanoseconds: UInt64(duration * .nanosecondsPerSecond))
}
}
// MARK: - TimeInterval
extension TimeInterval {
static let nanosecondsPerSecond = TimeInterval(NSEC_PER_SEC)
}考虑到您是在征求“代码评审”,如果您真的想编写自己的“退出”和“节流”,并且出于某种原因不想使用异步算法,那么下面我的回答是关于您的实现的一些观察:
您可以向Limiter添加一个独立于参与者的函数:
func submit(task: @escaping () async -> Void) {
switch policy {
case .throttle: throttle(task: task)
case .debounce: debounce(task: task)
}
}注意,我并没有像看起来那样(至少在Xcode 13.2.1中)使用callAsFunction作为一个独立于参与者的函数,因为这会导致编译器中的分段错误。
无论如何,您可以修改测试以使用submit参与者隔离函数,例如:
// test throttling as user enters “hello, world” into a text field
func testThrottler() async throws {
// Given
let promise = expectation(description: "Ensure first task fired")
let throttler = Limiter(policy: .throttle, duration: 1)
var fulfillmentCount = 0
promise.expectedFulfillmentCount = 2
var value = ""
func accumulateAndSendToServer(_ input: String) async {
value += input
await throttler.submit { [value] in
// Then
switch fulfillmentCount {
case 0: XCTAssertEqual(value, "h")
case 1: XCTAssertEqual(value, "hello,")
default: XCTFail()
}
promise.fulfill()
fulfillmentCount += 1
}
}
// When
await accumulateAndSendToServer("h")
await accumulateAndSendToServer("e")
await accumulateAndSendToServer("l")
await accumulateAndSendToServer("l")
await accumulateAndSendToServer("o")
try await Task.sleep(seconds: 2)
await accumulateAndSendToServer(",")
await accumulateAndSendToServer(" ")
await accumulateAndSendToServer("w")
await accumulateAndSendToServer("o")
await accumulateAndSendToServer("r")
await accumulateAndSendToServer("l")
await accumulateAndSendToServer("d")
wait(for: [promise], timeout: 10)
}作为旁白:
debounce中,对isCancelled的测试是多余的。如果任务被取消,Task.sleep将引发错误。operation作为闭包参数的名称,这大概是为了避免与Task实例混淆。Task改为Task?。然后,您可以将debounce简化为: func卸载(操作:@ defer ()异步-> Void) { task?.cancel()任务=任务{task?.cancel{ task = nil }尝试等待Task.sleep(秒:持续时间)等待操作()}}value += input从节流器/看门器里拉出来了。我还使用了一个[value]捕获列表,以确保我们避免了用户输入积累和网络请求之间的竞争条件。这是我对Limiter的演绎:
actor Limiter {
private let policy: Policy
private let duration: TimeInterval
private var task: Task?
init(policy: Policy, duration: TimeInterval) {
self.policy = policy
self.duration = duration
}
func submit(operation: @escaping () async -> Void) {
switch policy {
case .throttle: throttle(operation: operation)
case .debounce: debounce(operation: operation)
}
}
}
// MARK: - Limiter.Policy
extension Limiter {
enum Policy {
case throttle
case debounce
}
}
// MARK: - Private utility methods
private extension Limiter {
func throttle(operation: @escaping () async -> Void) {
guard task == nil else { return }
task = Task {
defer { task = nil }
try await Task.sleep(seconds: duration)
}
Task {
await operation()
}
}
func debounce(operation: @escaping () async -> Void) {
task?.cancel()
task = Task {
defer { task = nil }
try await Task.sleep(seconds: duration)
await operation()
}
}
}它使用这些扩展。
// MARK: - Task.sleep(seconds:)
extension Task where Success == Never, Failure == Never {
/// Suspends the current task for at least the given duration
/// in seconds.
///
/// If the task is canceled before the time ends,
/// this function throws `CancellationError`.
///
/// This function doesn't block the underlying thread.
public static func sleep(seconds duration: TimeInterval) async throws {
try await Task.sleep(nanoseconds: UInt64(duration * .nanosecondsPerSecond))
}
}
// MARK: - TimeInterval
extension TimeInterval {
static let nanosecondsPerSecond = TimeInterval(NSEC_PER_SEC)
}以及下列测试:
final class LimiterTests: XCTestCase {
// test throttling as user enters “hello, world” into a text field
func testThrottler() async throws {
// Given
let promise = expectation(description: "Ensure first task fired")
let throttler = Limiter(policy: .throttle, duration: 1)
var fulfillmentCount = 0
promise.expectedFulfillmentCount = 2
var value = ""
func accumulateAndSendToServer(_ input: String) async {
value += input
await throttler.submit { [value] in
// Then
switch fulfillmentCount {
case 0: XCTAssertEqual(value, "h")
case 1: XCTAssertEqual(value, "hello,")
default: XCTFail()
}
promise.fulfill()
fulfillmentCount += 1
}
}
// When
await accumulateAndSendToServer("h")
await accumulateAndSendToServer("e")
await accumulateAndSendToServer("l")
await accumulateAndSendToServer("l")
await accumulateAndSendToServer("o")
try await Task.sleep(seconds: 2)
await accumulateAndSendToServer(",")
await accumulateAndSendToServer(" ")
await accumulateAndSendToServer("w")
await accumulateAndSendToServer("o")
await accumulateAndSendToServer("r")
await accumulateAndSendToServer("l")
await accumulateAndSendToServer("d")
wait(for: [promise], timeout: 10)
}
// test debouncing as user enters “hello, world” into a text field
func testDebouncer() async throws {
// Given
let promise = expectation(description: "Ensure last task fired")
let debouncer = Limiter(policy: .debounce, duration: 1)
var value = ""
var fulfillmentCount = 0
promise.expectedFulfillmentCount = 2
func accumulateAndSendToServer(_ input: String) async {
value += input
await debouncer.submit { [value] in
// Then
switch fulfillmentCount {
case 0: XCTAssertEqual(value, "hello")
case 1: XCTAssertEqual(value, "hello, world")
default: XCTFail()
}
promise.fulfill()
fulfillmentCount += 1
}
}
// When
await accumulateAndSendToServer("h")
await accumulateAndSendToServer("e")
await accumulateAndSendToServer("l")
await accumulateAndSendToServer("l")
await accumulateAndSendToServer("o")
try await Task.sleep(seconds: 2)
await accumulateAndSendToServer(",")
await accumulateAndSendToServer(" ")
await accumulateAndSendToServer("w")
await accumulateAndSendToServer("o")
await accumulateAndSendToServer("r")
await accumulateAndSendToServer("l")
await accumulateAndSendToServer("d")
wait(for: [promise], timeout: 10)
}
func testThrottler2() async throws {
// Given
let promise = expectation(description: "Ensure throttle before duration")
let throttler = Limiter(policy: .throttle, duration: 1)
var end = Date.now + 1
promise.expectedFulfillmentCount = 2
func test() {
// Then
XCTAssertLessThanOrEqual(.now, end)
promise.fulfill()
}
// When
await throttler.submit(operation: test)
await throttler.submit(operation: test)
await throttler.submit(operation: test)
await throttler.submit(operation: test)
await throttler.submit(operation: test)
try await Task.sleep(seconds: 2)
end = .now + 1
await throttler.submit(operation: test)
await throttler.submit(operation: test)
await throttler.submit(operation: test)
try await Task.sleep(seconds: 2)
wait(for: [promise], timeout: 10)
}
func testDebouncer2() async throws {
// Given
let promise = expectation(description: "Ensure debounce after duration")
let debouncer = Limiter(policy: .debounce, duration: 1)
var end = Date.now + 1
promise.expectedFulfillmentCount = 2
func test() {
// Then
XCTAssertGreaterThanOrEqual(.now, end)
promise.fulfill()
}
// When
await debouncer.submit(operation: test)
await debouncer.submit(operation: test)
await debouncer.submit(operation: test)
await debouncer.submit(operation: test)
await debouncer.submit(operation: test)
try await Task.sleep(seconds: 2)
end = .now + 1
await debouncer.submit(operation: test)
await debouncer.submit(operation: test)
await debouncer.submit(operation: test)
try await Task.sleep(seconds: 2)
wait(for: [promise], timeout: 10)
}
}https://codereview.stackexchange.com/questions/269642
复制相似问题