我在MongoDB集合中有1000个信息文件。我正在编写一个查询来获取1000条记录,并且在一个循环中,我调用一个函数将该文件下载到本地系统。因此,下载所有1000个文件是一个连续的过程。
我希望在下载过程中有一些并行性。在循环中,我想一次下载10个文件,这意味着我想调用download函数10次,在完成10个文件下载后,我想下载下10个文件(这意味着我需要调用download函数10次)。
我怎样才能实现这种并行性,或者有没有更好的方法呢?
我看到了Kue
npm,但如何实现呢?顺便说一下,我是从FTP下载的,所以我使用basic-ftp
npm进行ftp操作。
发布于 2019-12-15 00:57:08
async库在这方面非常强大,一旦您理解了基础知识,它就会变得非常简单。
我建议你使用eachLimit,这样你的应用程序就不必担心分批下载10个文件,它只需要同时下载10个文件即可。
var files = ['a.txt', 'b.txt']
var concurrency = 10;
async.eachLimit(files, concurrency, downloadFile, onFinish);
function downloadFile(file, callback){
// run your download code here
// when file has downloaded, call callback(null)
// if there is an error, call callback('error code')
}
function onFinish(err, results){
if(err) {
// do something with the error
}
// reaching this point means the files have all downloaded
}
异步库将并行运行downloadFile
,向每个实例发送files
列表中的一个条目,然后当列表中的每个项目都完成时,它将调用onFinish
。
发布于 2019-12-15 02:02:37
在没有看到您的实现的情况下,我只能提供一个通用的答案。
假设您的download函数接收到一个fileId,并返回一个promise,该promise在所述文件完成下载后解析。对于此POC,我将通过承诺在200到500毫秒后解析为文件名来模拟这一点。
function download(fileindex) {
return new Promise((resolve,reject)=>{
setTimeout(()=>{
resolve(`file_${fileindex}`);
},200+300*Math.random());
});
}
您有1000个文件,并希望在100次迭代中下载它们,每个迭代10个文件。
让我们来封装一些东西。我将声明一个函数,它接收起始ID和大小,并返回[N...N+size]
ID
function* range(bucket, size=10) {
let start = bucket*size,
end=start+size;
for (let i = start; i < end; i++) {
yield i;
}
}
您应该创建100个“bucket”,每个“bucket”包含对10个文件的引用。
let buckets = [...range(0,100)].map(bucket=>{
return [...range(bucket,10)];
});
A这一点,buckets
的内容是:
[
[file0 ... file9]
...
[file 990 ... file 999]
]
然后,使用for..of
(支持异步)遍历存储桶
在每次迭代中,使用Promise.all
将10个对download
的调用入队
async function proceed() {
for await(let bucket of buckets) { // for...of
await Promise.all(bucket.reduce((accum,fileindex)=>{
accum.push(download(fileindex));
return accum;
},[]));
}
}
让我们来看一个运行的例子(只有10个buckets,我们都很忙:D )
function download(fileindex) {
return new Promise((resolve, reject) => {
let file = `file_${fileindex}`;
setTimeout(() => {
resolve(file);
}, 200 + 300 * Math.random());
});
}
function* range(bucket, size = 10) {
let start = bucket * size,
end = start + size;
for (let i = start; i < end; i++) {
yield i;
}
}
let buckets = [...range(0, 10)].map(bucket => {
return [...range(bucket, 10)];
});
async function proceed() {
let bucketNumber = 0,
timeStart = performance.now();
for await (let bucket of buckets) {
let startingTime = Number((performance.now() - timeStart) / 1000).toFixed(1).substr(-5),
result = await Promise.all(bucket.reduce((accum, fileindex) => {
accum.push(download(fileindex));
return accum;
}, []));
console.log(
`${startingTime}s downloading bucket ${bucketNumber}`
);
await result;
let endingTime = Number((performance.now() - timeStart) / 1000).toFixed(1).substr(-5);
console.log(
`${endingTime}s bucket ${bucketNumber++} complete:`,
`[${result[0]} ... ${result.pop()}]`
);
}
}
document.querySelector('#proceed').addEventListener('click',proceed);
<button id="proceed" >Proceed</button>
https://stackoverflow.com/questions/59336903
复制相似问题