我有一个api端点,它接收来自各种来源的大量请求。
对于收到的每个请求,我创建了一个调用内部api的promise。
我想按来源对这些promises进行批处理,其中每个批处理最多包含10秒的请求。
如何做到这一点?
发布于 2018-03-23 21:45:50
如果您有来自多个源的多个请求,您可以一直将它们放在一个Map
对象中,其中的键是接收到的源和值,请求收集在一个数组中。例如,让myMap
类似于;
{source1: [req1,req2,req3],
source2: [req1,req2],
.
.
sourceN: [req1,req2,...,reqm]}
您可以设置一个伪递归setTimeout
循环来调用您的内部API。
var apiInterval = 10000;
function runner(){
setTimeout(mv => { Promise.all(mv.map(reqs => Promise.all(reqs.map(req => apiCall(req)))))
.then(pss => pss.map(ps => ps.map(p => p.then(r => doSomethingWithEachApiCallResult(r)))));
clearMapValues(); // to be filled in the next 10 seconds
runner();
}, apiInterval, myMap.values.slice());
}
请将上面的代码作为一个伪代码,只是为了给你一个概念。例如,Map.values
返回一个迭代器对象,在对其使用.map()
或.slice()
之前,您可能需要将其转换为类似于[...myMap.values()]
的数组。
这比setInterval
循环方式稍好一些,因为您可以根据工作负载等动态更改间隔值。
发布于 2018-03-24 09:01:26
我提出了以下解决方案。
代码
/**
* A stream of requests come from various sources, can be transposed into a batch indexed
* by the source of the request.
*
* The size of each batch is defined by a time interval. I.e. any request received within the
* time interval is stored in a batch.
*/
export class BatchStream<K, V> {
cache: Map<K, V[]>
flushRate: number
onBatch: (k: K, v: V[]) => Promise<void>
debug: boolean
constructor(onBatch: (k: K, v: V[]) => Promise<void>, flushRate = 5000, debug = false) {
this.cache = new Map<K, V[]>()
this.onBatch = onBatch
this.debug = debug
this.flushRate = flushRate
this.flush = this.flush.bind(this)
}
push(k: K, v: V) {
if (this.cache.has(k)) {
let batch = this.cache.get(k)
batch.push(v)
this.cache.set(k, batch)
} else {
this.cache.set(k, [v])
setTimeout(this.flush, this.flushRate, k)
}
}
flush(k: K) {
this.debug && console.log("Flush", k)
let batch = this.cache.get(k)
this.cache.delete(k)
this.onBatch(k, batch)
this.debug && console.log("Size", this.cache.size)
}
}
测试
it("BatchStream", (done) => {
let sources = []
let iterations = 10
let jobs = []
let jobsDone = 0
let debug = true
// Prepare sources
for (let i = 97; i < 123; i++) {
sources.push(String.fromCharCode(i))
}
// Prepare a stream of test data
for (let k of sources) {
for (let i = 0; i < iterations; i++) {
jobs.push({ k, v: k + i.toString() })
}
}
shuffle(jobs)
// Batch handler
let onBatch = (k: string, v: string[]) => {
return new Promise<void>((resolve, reject) => {
jobsDone += v.length
debug && console.log(" --> " + k, v.length, v.join(","), jobsDone, sources.length * iterations)
if (jobsDone == sources.length * iterations) {
done()
}
resolve()
})
}
let batchStream = new BatchStream<string, string>(onBatch, 5000, debug)
// Stream test data into batcher
let delay = 0
for (let j of jobs) {
delay += 100
setTimeout(() => {
batchStream.push(j.k, j.v)
}, delay)
}
})
https://stackoverflow.com/questions/49449900
复制相似问题