我想逐行读取一个大文件(>5GB),而不将其全部内容加载到内存中。我不能使用readlines(),因为它在内存中创建了一个非常大的列表。


当前回答

我意识到这个问题在很久以前就已经回答过了,但是这里有一种并行的方法,而不会杀死您的内存开销(如果您试图将每一行放入池中,就会出现这种情况)。显然,将readJSON_line2函数替换为一些合理的函数——这只是为了说明这一点!

加速将取决于文件大小和你对每一行所做的事情-但最坏的情况是,对于一个小文件,只是用JSON阅读器读取它,我看到下面设置的性能与ST相似。

希望对大家有用:

def readJSON_line2(linesIn):
  #Function for reading a chunk of json lines
   '''
   Note, this function is nonsensical. A user would never use the approach suggested 
   for reading in a JSON file, 
   its role is to evaluate the MT approach for full line by line processing to both 
   increase speed and reduce memory overhead
   '''
   import json

   linesRtn = []
   for lineIn in linesIn:

       if lineIn.strip() != 0:
           lineRtn = json.loads(lineIn)
       else:
           lineRtn = ""
        
       linesRtn.append(lineRtn)

   return linesRtn




# -------------------------------------------------------------------
if __name__ == "__main__":
   import multiprocessing as mp

   path1 = "C:\\user\\Documents\\"
   file1 = "someBigJson.json"

   nBuffer = 20*nCPUs  # How many chunks are queued up (so cpus aren't waiting on processes spawning)
   nChunk = 1000 # How many lines are in each chunk
   #Both of the above will require balancing speed against memory overhead

   iJob = 0  #Tracker for SMP jobs submitted into pool
   iiJob = 0  #Tracker for SMP jobs extracted back out of pool

   jobs = []  #SMP job holder
   MTres3 = []  #Final result holder
   chunk = []  
   iBuffer = 0 # Buffer line count
   with open(path1+file1) as f:
      for line in f:
            
          #Send to the chunk
          if len(chunk) < nChunk:
              chunk.append(line)
          else:
              #Chunk full
              #Don't forget to add the current line to chunk
              chunk.append(line)
                
              #Then add the chunk to the buffer (submit to SMP pool)                  
              jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
              iJob +=1
              iBuffer +=1
              #Clear the chunk for the next batch of entries
              chunk = []
                            
          #Buffer is full, any more chunks submitted would cause undue memory overhead
          #(Partially) empty the buffer
          if iBuffer >= nBuffer:
              temp1 = jobs[iiJob].get()
              for rtnLine1 in temp1:
                  MTres3.append(rtnLine1)
              iBuffer -=1
              iiJob+=1
            
      #Submit the last chunk if it exists (as it would not have been submitted to SMP buffer)
      if chunk:
          jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
          iJob +=1
          iBuffer +=1

      #And gather up the last of the buffer, including the final chunk
      while iiJob < iJob:
          temp1 = jobs[iiJob].get()
          for rtnLine1 in temp1:
              MTres3.append(rtnLine1)
          iiJob+=1

   #Cleanup
   del chunk, jobs, temp1
   pool.close()

其他回答

老派方法:

fh = open(file_name, 'rt')
line = fh.readline()
while line:
    # do stuff with line
    line = fh.readline()
fh.close()

谢谢你!我最近已经转换到python 3,并对使用readlines(0)读取大文件感到沮丧。这就解决了问题。但是为了得到每一行,我必须做一些额外的步骤。每一行之前都有一个“b”,我猜这是二进制格式的。使用“decode(utf-8)”将其更改为ascii。

然后我必须在每行中间删除一个“=\n”。

然后我在新线处把线分开。

b_data=(fh.read(ele[1]))#endat This is one chunk of ascii data in binary format
        a_data=((binascii.b2a_qp(b_data)).decode('utf-8')) #Data chunk in 'split' ascii format
        data_chunk = (a_data.replace('=\n','').strip()) #Splitting characters removed
        data_list = data_chunk.split('\n')  #List containing lines in chunk
        #print(data_list,'\n')
        #time.sleep(1)
        for j in range(len(data_list)): #iterate through data_list to get each item 
            i += 1
            line_of_data = data_list[j]
            print(line_of_data)

下面是Arohi代码中“打印数据”上方的代码。

请试试这个:

with open('filename','r',buffering=100000) as f:
    for line in f:
        print line

最好使用迭代器。 相关:fileinput -迭代多个输入流中的行。

从文档中可以看出:

import fileinput
for line in fileinput.input("filename", encoding="utf-8"):
    process(line)

这将避免将整个文件一次复制到内存中。

我意识到这个问题在很久以前就已经回答过了,但是这里有一种并行的方法,而不会杀死您的内存开销(如果您试图将每一行放入池中,就会出现这种情况)。显然,将readJSON_line2函数替换为一些合理的函数——这只是为了说明这一点!

加速将取决于文件大小和你对每一行所做的事情-但最坏的情况是,对于一个小文件,只是用JSON阅读器读取它,我看到下面设置的性能与ST相似。

希望对大家有用:

def readJSON_line2(linesIn):
  #Function for reading a chunk of json lines
   '''
   Note, this function is nonsensical. A user would never use the approach suggested 
   for reading in a JSON file, 
   its role is to evaluate the MT approach for full line by line processing to both 
   increase speed and reduce memory overhead
   '''
   import json

   linesRtn = []
   for lineIn in linesIn:

       if lineIn.strip() != 0:
           lineRtn = json.loads(lineIn)
       else:
           lineRtn = ""
        
       linesRtn.append(lineRtn)

   return linesRtn




# -------------------------------------------------------------------
if __name__ == "__main__":
   import multiprocessing as mp

   path1 = "C:\\user\\Documents\\"
   file1 = "someBigJson.json"

   nBuffer = 20*nCPUs  # How many chunks are queued up (so cpus aren't waiting on processes spawning)
   nChunk = 1000 # How many lines are in each chunk
   #Both of the above will require balancing speed against memory overhead

   iJob = 0  #Tracker for SMP jobs submitted into pool
   iiJob = 0  #Tracker for SMP jobs extracted back out of pool

   jobs = []  #SMP job holder
   MTres3 = []  #Final result holder
   chunk = []  
   iBuffer = 0 # Buffer line count
   with open(path1+file1) as f:
      for line in f:
            
          #Send to the chunk
          if len(chunk) < nChunk:
              chunk.append(line)
          else:
              #Chunk full
              #Don't forget to add the current line to chunk
              chunk.append(line)
                
              #Then add the chunk to the buffer (submit to SMP pool)                  
              jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
              iJob +=1
              iBuffer +=1
              #Clear the chunk for the next batch of entries
              chunk = []
                            
          #Buffer is full, any more chunks submitted would cause undue memory overhead
          #(Partially) empty the buffer
          if iBuffer >= nBuffer:
              temp1 = jobs[iiJob].get()
              for rtnLine1 in temp1:
                  MTres3.append(rtnLine1)
              iBuffer -=1
              iiJob+=1
            
      #Submit the last chunk if it exists (as it would not have been submitted to SMP buffer)
      if chunk:
          jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
          iJob +=1
          iBuffer +=1

      #And gather up the last of the buffer, including the final chunk
      while iiJob < iJob:
          temp1 = jobs[iiJob].get()
          for rtnLine1 in temp1:
              MTres3.append(rtnLine1)
          iiJob+=1

   #Cleanup
   del chunk, jobs, temp1
   pool.close()