首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >将请求流批处理为promises,按时间间隔分组

将请求流批处理为promises,按时间间隔分组
EN

Stack Overflow用户
提问于 2018-03-23 20:40:09
回答 2查看 177关注 0票数 0

我有一个api端点,它接收来自各种来源的大量请求。

对于收到的每个请求,我创建了一个调用内部api的promise。

我想按来源对这些promises进行批处理,其中每个批处理最多包含10秒的请求。

如何做到这一点?

EN

回答 2

Stack Overflow用户

发布于 2018-03-23 21:45:50

如果您有来自多个源的多个请求,您可以一直将它们放在一个Map对象中,其中的键是接收到的源和值,请求收集在一个数组中。例如,让myMap类似于;

代码语言:javascript
运行
复制
{source1: [req1,req2,req3],
 source2: [req1,req2],
 .
 .
 sourceN: [req1,req2,...,reqm]}

您可以设置一个伪递归setTimeout循环来调用您的内部API。

代码语言:javascript
运行
复制
var apiInterval = 10000;

function runner(){
  setTimeout(mv => { Promise.all(mv.map(reqs => Promise.all(reqs.map(req => apiCall(req)))))
                            .then(pss => pss.map(ps => ps.map(p => p.then(r => doSomethingWithEachApiCallResult(r)))));
                     clearMapValues(); // to be filled in the next 10 seconds
                     runner();
                   }, apiInterval, myMap.values.slice());
}

请将上面的代码作为一个伪代码,只是为了给你一个概念。例如,Map.values返回一个迭代器对象,在对其使用.map().slice()之前,您可能需要将其转换为类似于[...myMap.values()]的数组。

这比setInterval循环方式稍好一些,因为您可以根据工作负载等动态更改间隔值。

票数 1
EN

Stack Overflow用户

发布于 2018-03-24 09:01:26

我提出了以下解决方案。

  • 它使用映射来存储字符串键和值数组。
  • 它对每个映射键使用setTimeout,以将该映射键的值刷新到回调。

代码

代码语言:javascript
运行
复制
/**
 * A stream of requests come from various sources, can be transposed into a batch indexed 
 * by the source of the request.
 * 
 * The size of each batch is defined by a time interval. I.e. any request received within the
 * time interval is stored in a batch.
 */
export class BatchStream<K, V> {

    cache: Map<K, V[]>
    flushRate: number
    onBatch: (k: K, v: V[]) => Promise<void>
    debug: boolean

    constructor(onBatch: (k: K, v: V[]) => Promise<void>, flushRate = 5000, debug = false) {
        this.cache = new Map<K, V[]>()
        this.onBatch = onBatch
        this.debug = debug
        this.flushRate = flushRate
        this.flush = this.flush.bind(this)
    }

    push(k: K, v: V) {
        if (this.cache.has(k)) {
            let batch = this.cache.get(k)
            batch.push(v)
            this.cache.set(k, batch)
        } else {
            this.cache.set(k, [v])
            setTimeout(this.flush, this.flushRate, k)
        }
    }

    flush(k: K) {
        this.debug && console.log("Flush", k)
        let batch = this.cache.get(k)
        this.cache.delete(k)
        this.onBatch(k, batch)
        this.debug && console.log("Size", this.cache.size)
    }
}

测试

代码语言:javascript
运行
复制
it("BatchStream", (done) => {

        let sources = []
        let iterations = 10
        let jobs = []
        let jobsDone = 0
        let debug = true

        // Prepare sources
        for (let i = 97; i < 123; i++) {
            sources.push(String.fromCharCode(i))
        }

        // Prepare a stream of test data
        for (let k of sources) {
            for (let i = 0; i < iterations; i++) {
                jobs.push({ k, v: k + i.toString() })
            }
        }

        shuffle(jobs)

        // Batch handler
        let onBatch = (k: string, v: string[]) => {
            return new Promise<void>((resolve, reject) => {
                jobsDone += v.length
                debug && console.log(" --> " + k, v.length, v.join(","), jobsDone, sources.length * iterations)

                if (jobsDone == sources.length * iterations) {
                    done()
                }

                resolve()
            })
        }

        let batchStream = new BatchStream<string, string>(onBatch, 5000, debug)

        // Stream test data into batcher
        let delay = 0
        for (let j of jobs) {
            delay += 100
            setTimeout(() => {
                batchStream.push(j.k, j.v)
            }, delay)
        }
    })
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49449900

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档