我试图读取一个大文件一行在一次。我在Quora上找到了一个关于这个主题的问题,但我错过了一些联系,把整个事情联系在一起。

 var Lazy=require("lazy");
 new Lazy(process.stdin)
     .lines
     .forEach(
          function(line) { 
              console.log(line.toString()); 
          }
 );
 process.stdin.resume();

我想要弄清楚的是如何一次从文件中读取一行,而不是像本例中那样读取STDIN。

我试着:

 fs.open('./VeryBigFile.csv', 'r', '0666', Process);

 function Process(err, fd) {
    if (err) throw err;
    // DO lazy read 
 }

但这并不奏效。我知道在必要时我可以使用PHP之类的东西,但我想弄清楚这个问题。

我不认为其他答案会起作用,因为文件比我运行它的服务器的内存大得多。


当前回答

带载波模块:

var carrier = require('carrier');

process.stdin.resume();
carrier.carry(process.stdin, function(line) {
    console.log('got one line: ' + line);
});

其他回答

function createLineReader(fileName){
    var EM = require("events").EventEmitter
    var ev = new EM()
    var stream = require("fs").createReadStream(fileName)
    var remainder = null;
    stream.on("data",function(data){
        if(remainder != null){//append newly received data chunk
            var tmp = new Buffer(remainder.length+data.length)
            remainder.copy(tmp)
            data.copy(tmp,remainder.length)
            data = tmp;
        }
        var start = 0;
        for(var i=0; i<data.length; i++){
            if(data[i] == 10){ //\n new line
                var line = data.slice(start,i)
                ev.emit("line", line)
                start = i+1;
            }
        }
        if(start<data.length){
            remainder = data.slice(start);
        }else{
            remainder = null;
        }
    })

    stream.on("end",function(){
        if(null!=remainder) ev.emit("line",remainder)
    })

    return ev
}


//---------main---------------
fileName = process.argv[2]

lineReader = createLineReader(fileName)
lineReader.on("line",function(line){
    console.log(line.toString())
    //console.log("++++++++++++++++++++")
})

我把日常行处理的整个逻辑包装成一个npm模块:line-kit https://www.npmjs.com/package/line-kit

/ /实例 Var计数= 0 需要(“line-kit”)(需要(fs) .createReadStream (/ etc /问题), (line) => {count++;}, () => {console.log(' seen ${count} lines ')})

当我试图处理这些行并将它们写入另一个流时,我最终使用Lazy逐行读取大量内存泄漏,这是由于节点工作中的drain/pause/resume方式(参见:http://elegantcode.com/2011/04/06/taking-baby-steps-with-node-js-pumping-data-between-streams/(我喜欢这个家伙顺便说一句))。我还没有仔细研究Lazy,无法确切地理解其中的原因,但是我无法暂停读流以允许在Lazy退出的情况下进行排泄。

我写了代码来处理大量的csv文件到xml文档,你可以在这里看到代码:https://github.com/j03m/node-csv2xml

如果你用Lazy line运行之前的版本,它就会泄露。最新的版本完全没有泄露,你可以把它作为一个阅读器/处理器的基础。虽然我有一些定制的东西在里面。

编辑:我想我还应该指出,我用Lazy编写的代码工作得很好,直到我发现自己编写了足够大的xml片段,因为必要而耗尽/暂停/恢复。对于较小的块,这是可以的。

2019年更新

Nodejs官方文档中已经发布了一个很棒的例子。在这里

这需要在您的机器上安装最新的Nodejs。> 11.4

const fs = require('fs');
const readline = require('readline');

async function processLineByLine() {
  const fileStream = fs.createReadStream('input.txt');

  const rl = readline.createInterface({
    input: fileStream,
    crlfDelay: Infinity
  });
  // Note: we use the crlfDelay option to recognize all instances of CR LF
  // ('\r\n') in input.txt as a single line break.

  for await (const line of rl) {
    // Each line in input.txt will be successively available here as `line`.
    console.log(`Line from file: ${line}`);
  }
}

processLineByLine();

我使用下面的代码读取行后,验证它不是一个目录,它不包括在文件列表不需要检查。

(function () {
  var fs = require('fs');
  var glob = require('glob-fs')();
  var path = require('path');
  var result = 0;
  var exclude = ['LICENSE',
    path.join('e2e', 'util', 'db-ca', 'someother-file'),
    path.join('src', 'favicon.ico')];
  var files = [];
  files = glob.readdirSync('**');

  var allFiles = [];

  var patternString = [
    'trade',
    'order',
    'market',
    'securities'
  ];

  files.map((file) => {
    try {
      if (!fs.lstatSync(file).isDirectory() && exclude.indexOf(file) === -1) {
        fs.readFileSync(file).toString().split(/\r?\n/).forEach(function(line){
          patternString.map((pattern) => {
            if (line.indexOf(pattern) !== -1) {
              console.log(file + ' contain `' + pattern + '` in in line "' + line +'";');
              result = 1;
            }
          });
        });
      }
    } catch (e) {
      console.log('Error:', e.stack);
    }
  });
  process.exit(result);

})();