在forEach循环中使用async/await有什么问题吗?我正在尝试循环浏览一系列文件,并等待每个文件的内容。

import fs from 'fs-promise'

async function printFiles () {
  const files = await getFilePaths() // Assume this works fine

  files.forEach(async (file) => {
    const contents = await fs.readFile(file, 'utf8')
    console.log(contents)
  })
}

printFiles()

这段代码确实有效,但这段代码会出错吗?我有人告诉我,你不应该在这样的高阶函数中使用async/await,所以我只想问问这是否有问题。


当前回答

这里是一个在forEach循环中使用异步的好例子。

编写自己的asyncForEach

async function asyncForEach(array, callback) {  
    for (let index = 0; index < array.length; index++) {
        await callback(array[index], index, array)
    }
}

你可以这样用

await asyncForEach(array, async function(item,index,array){
     //await here
   }
)

其他回答

对于TypeScript用户,使用工作类型的Promise.all(array.map(迭代器))包装器

使用Promise.all(array.map(迭代器))具有正确的类型,因为TypeScript的stdlib支持已经处理了泛型。然而,每次需要异步映射时复制粘贴Promise.all(array.map(迭代器))显然不是最佳的,Promise.all(array.ma(迭代))并不能很好地传达代码的意图,因此大多数开发人员都会将其包装成一个asyncMap()包装函数。然而,要做到这一点,需要使用泛型来确保使用const value=await asyncMap()设置的值具有正确的类型。

export const asyncMap = async <ArrayItemType, IteratorReturnType>(
  array: Array<ArrayItemType>,
  iterator: (
    value: ArrayItemType,
    index?: number
  ) => Promise<IteratorReturnType>
): Promise<Array<IteratorReturnType>> => {
  return Promise.all(array.map(iterator));
};

快速测试:

it(`runs 3 items in parallel and returns results`, async () => {
  const result = await asyncMap([1, 2, 3], async (item: number) => {
    await sleep(item * 100);
    return `Finished ${item}`;
  });
  expect(result.length).toEqual(3);
  // Each item takes 100, 200 and 300ms
  // So restricting this test to 300ms plus some leeway
}, 320);

sleep()只是:

const sleep = async (timeInMs: number): Promise<void> => {
  return new Promise((resolve) => setTimeout(resolve, timeInMs));
};

然而,上述两种解决方案都有效,Antonio的代码更少,这是它如何帮助我从数据库中解析数据,从几个不同的子引用中,然后将它们全部推到一个数组中,并在完成所有任务后以承诺的方式进行解析:

Promise.all(PacksList.map((pack)=>{
    return fireBaseRef.child(pack.folderPath).once('value',(snap)=>{
        snap.forEach( childSnap => {
            const file = childSnap.val()
            file.id = childSnap.key;
            allItems.push( file )
        })
    })
})).then(()=>store.dispatch( actions.allMockupItems(allItems)))

在一个文件中弹出几个方法,以串行化的顺序处理异步数据,并为代码提供更传统的风格,这是非常轻松的。例如:

module.exports = function () {
  var self = this;

  this.each = async (items, fn) => {
    if (items && items.length) {
      await Promise.all(
        items.map(async (item) => {
          await fn(item);
        }));
    }
  };

  this.reduce = async (items, fn, initialValue) => {
    await self.each(
      items, async (item) => {
        initialValue = await fn(initialValue, item);
      });
    return initialValue;
  };
};

现在,假设保存在'/myAsync.js'您可以在相邻文件中执行类似以下操作:

...
/* your server setup here */
...
var MyAsync = require('./myAsync');
var Cat = require('./models/Cat');
var Doje = require('./models/Doje');
var example = async () => {
  var myAsync = new MyAsync();
  var doje = await Doje.findOne({ name: 'Doje', noises: [] }).save();
  var cleanParams = [];

  // FOR EACH EXAMPLE
  await myAsync.each(['bork', 'concern', 'heck'], 
    async (elem) => {
      if (elem !== 'heck') {
        await doje.update({ $push: { 'noises': elem }});
      }
    });

  var cat = await Cat.findOne({ name: 'Nyan' });

  // REDUCE EXAMPLE
  var friendsOfNyanCat = await myAsync.reduce(cat.friends,
    async (catArray, friendId) => {
      var friend = await Friend.findById(friendId);
      if (friend.name !== 'Long cat') {
        catArray.push(friend.name);
      }
    }, []);
  // Assuming Long Cat was a friend of Nyan Cat...
  assert(friendsOfNyanCat.length === (cat.friends.length - 1));
}

OP的原始问题

在forEach循环中使用async/await有什么问题吗。。。

在@Bergi选择的答案中,它展示了如何串行和并行处理。然而,并行性还存在其他问题-

订单--@chharvey注意到-

例如,如果一个非常小的文件在一个非常大的文件之前完成了读取,那么它将首先被记录,即使小文件在文件数组中位于大文件之后。

可能一次打开太多文件--Bergi在另一个答案下的评论

同时打开数千个文件以同时读取它们也是不好的。人们总是要评估顺序、并行或混合方法是否更好。

因此,让我们来解决这些问题,展示实际的代码,简洁明了,不使用第三方库。易于剪切、粘贴和修改的东西。

并行读取(一次读取),串行打印(每个文件尽可能早)。

最简单的改进是像@Bergi的回答那样执行完全并行,但做了一个小改动,以便在保持顺序的同时尽快打印每个文件。

async function printFiles2() {
  const readProms = (await getFilePaths()).map((file) =>
    fs.readFile(file, "utf8")
  );
  await Promise.all([
    await Promise.all(readProms),                      // branch 1
    (async () => {                                     // branch 2
      for (const p of readProms) console.log(await p);
    })(),
  ]);
}

上面,两个单独的分支同时运行。

分支1:同时并行读取,分支2:连续读取以强制排序,但等待时间不超过必要

这很容易。

在并发限制下并行读取,串行打印(每个文件尽可能早)。

“并发限制”意味着同时读取的文件不超过N个。就像一家一次只允许这么多顾客进入的商店(至少在新冠疫情期间)。

首先引入了一个helper函数-

function bootablePromise(kickMe: () => Promise<any>) {
  let resolve: (value: unknown) => void = () => {};
  const promise = new Promise((res) => { resolve = res; });
  const boot = () => { resolve(kickMe()); };
  return { promise, boot };
}

函数bootablePromise(kickMe:()=>Promise<any>)需要函数kickMe作为启动任务的参数(在本例中为readFile),但不会立即启动。

bootablePromise返回几个财产

承诺类型承诺引导类型函数()=>void

承诺有两个阶段

承诺开始一项任务作为一个承诺,完成一项已经开始的任务。

当调用boot()时,promise从第一状态转换到第二状态。

bootablePromise用于printFiles--

async function printFiles4() {
  const files = await getFilePaths();
  const boots: (() => void)[] = [];
  const set: Set<Promise<{ pidx: number }>> = new Set<Promise<any>>();
  const bootableProms = files.map((file,pidx) => {
    const { promise, boot } = bootablePromise(() => fs.readFile(file, "utf8"));
    boots.push(boot);
    set.add(promise.then(() => ({ pidx })));
    return promise;
  });
  const concurLimit = 2;
  await Promise.all([
    (async () => {                                       // branch 1
      let idx = 0;
      boots.slice(0, concurLimit).forEach((b) => { b(); idx++; });
      while (idx<boots.length) {
        const { pidx } = await Promise.race([...set]);
        set.delete([...set][pidx]);
        boots[idx++]();
      }
    })(),
    (async () => {                                       // branch 2
      for (const p of bootableProms) console.log(await p);
    })(),
  ]);
}

和以前一样,有两个分支

分支1:用于运行和处理并发。分支2:用于打印

现在的区别是不允许并发运行超过concurrentLimit Promise。

重要的变量是

boots:要调用以强制其相应Promise转换的函数数组。它仅在分支1中使用。set:在随机访问容器中有Promise,这样一旦实现,就可以很容易地删除它们。此容器仅在分支1中使用。bootableProms:这些是与最初在集合中的Promise相同的Promise,但它是一个数组而不是集合,并且该数组从未更改。它仅在分支2中使用。

使用模拟fs.readFile运行,所需时间如下(文件名与时间(毫秒))。

const timeTable = {
  "1": 600,
  "2": 500,
  "3": 400,
  "4": 300,
  "5": 200,
  "6": 100,
};

可以看到这样的测试运行时间,显示并发正在运行--

[1]0--0.601
[2]0--0.502
[3]0.503--0.904
[4]0.608--0.908
[5]0.905--1.105
[6]0.905--1.005

可在typescript游乐场沙盒中执行

我使用Array.prototype.reduce代替Promise.all和Array.prototy.map(这不能保证Promise的解析顺序),从解析的Promise开始:

async function printFiles () {
  const files = await getFilePaths();

  await files.reduce(async (promise, file) => {
    // This line will wait for the last async function to finish.
    // The first iteration uses an already resolved Promise
    // so, it will immediately continue.
    await promise;
    const contents = await fs.readFile(file, 'utf8');
    console.log(contents);
  }, Promise.resolve());
}