我想逐行读取一个大文件(>5GB),而不将其全部内容加载到内存中。我不能使用readlines(),因为它在内存中创建了一个非常大的列表。
当前回答
我意识到这个问题在很久以前就已经回答过了,但是这里有一种并行的方法,而不会杀死您的内存开销(如果您试图将每一行放入池中,就会出现这种情况)。显然,将readJSON_line2函数替换为一些合理的函数——这只是为了说明这一点!
加速将取决于文件大小和你对每一行所做的事情-但最坏的情况是,对于一个小文件,只是用JSON阅读器读取它,我看到下面设置的性能与ST相似。
希望对大家有用:
def readJSON_line2(linesIn):
#Function for reading a chunk of json lines
'''
Note, this function is nonsensical. A user would never use the approach suggested
for reading in a JSON file,
its role is to evaluate the MT approach for full line by line processing to both
increase speed and reduce memory overhead
'''
import json
linesRtn = []
for lineIn in linesIn:
if lineIn.strip() != 0:
lineRtn = json.loads(lineIn)
else:
lineRtn = ""
linesRtn.append(lineRtn)
return linesRtn
# -------------------------------------------------------------------
if __name__ == "__main__":
import multiprocessing as mp
path1 = "C:\\user\\Documents\\"
file1 = "someBigJson.json"
nBuffer = 20*nCPUs # How many chunks are queued up (so cpus aren't waiting on processes spawning)
nChunk = 1000 # How many lines are in each chunk
#Both of the above will require balancing speed against memory overhead
iJob = 0 #Tracker for SMP jobs submitted into pool
iiJob = 0 #Tracker for SMP jobs extracted back out of pool
jobs = [] #SMP job holder
MTres3 = [] #Final result holder
chunk = []
iBuffer = 0 # Buffer line count
with open(path1+file1) as f:
for line in f:
#Send to the chunk
if len(chunk) < nChunk:
chunk.append(line)
else:
#Chunk full
#Don't forget to add the current line to chunk
chunk.append(line)
#Then add the chunk to the buffer (submit to SMP pool)
jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
iJob +=1
iBuffer +=1
#Clear the chunk for the next batch of entries
chunk = []
#Buffer is full, any more chunks submitted would cause undue memory overhead
#(Partially) empty the buffer
if iBuffer >= nBuffer:
temp1 = jobs[iiJob].get()
for rtnLine1 in temp1:
MTres3.append(rtnLine1)
iBuffer -=1
iiJob+=1
#Submit the last chunk if it exists (as it would not have been submitted to SMP buffer)
if chunk:
jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
iJob +=1
iBuffer +=1
#And gather up the last of the buffer, including the final chunk
while iiJob < iJob:
temp1 = jobs[iiJob].get()
for rtnLine1 in temp1:
MTres3.append(rtnLine1)
iiJob+=1
#Cleanup
del chunk, jobs, temp1
pool.close()
其他回答
老派方法:
fh = open(file_name, 'rt')
line = fh.readline()
while line:
# do stuff with line
line = fh.readline()
fh.close()
谢谢你!我最近已经转换到python 3,并对使用readlines(0)读取大文件感到沮丧。这就解决了问题。但是为了得到每一行,我必须做一些额外的步骤。每一行之前都有一个“b”,我猜这是二进制格式的。使用“decode(utf-8)”将其更改为ascii。
然后我必须在每行中间删除一个“=\n”。
然后我在新线处把线分开。
b_data=(fh.read(ele[1]))#endat This is one chunk of ascii data in binary format
a_data=((binascii.b2a_qp(b_data)).decode('utf-8')) #Data chunk in 'split' ascii format
data_chunk = (a_data.replace('=\n','').strip()) #Splitting characters removed
data_list = data_chunk.split('\n') #List containing lines in chunk
#print(data_list,'\n')
#time.sleep(1)
for j in range(len(data_list)): #iterate through data_list to get each item
i += 1
line_of_data = data_list[j]
print(line_of_data)
下面是Arohi代码中“打印数据”上方的代码。
请试试这个:
with open('filename','r',buffering=100000) as f:
for line in f:
print line
最好使用迭代器。 相关:fileinput -迭代多个输入流中的行。
从文档中可以看出:
import fileinput
for line in fileinput.input("filename", encoding="utf-8"):
process(line)
这将避免将整个文件一次复制到内存中。
我意识到这个问题在很久以前就已经回答过了,但是这里有一种并行的方法,而不会杀死您的内存开销(如果您试图将每一行放入池中,就会出现这种情况)。显然,将readJSON_line2函数替换为一些合理的函数——这只是为了说明这一点!
加速将取决于文件大小和你对每一行所做的事情-但最坏的情况是,对于一个小文件,只是用JSON阅读器读取它,我看到下面设置的性能与ST相似。
希望对大家有用:
def readJSON_line2(linesIn):
#Function for reading a chunk of json lines
'''
Note, this function is nonsensical. A user would never use the approach suggested
for reading in a JSON file,
its role is to evaluate the MT approach for full line by line processing to both
increase speed and reduce memory overhead
'''
import json
linesRtn = []
for lineIn in linesIn:
if lineIn.strip() != 0:
lineRtn = json.loads(lineIn)
else:
lineRtn = ""
linesRtn.append(lineRtn)
return linesRtn
# -------------------------------------------------------------------
if __name__ == "__main__":
import multiprocessing as mp
path1 = "C:\\user\\Documents\\"
file1 = "someBigJson.json"
nBuffer = 20*nCPUs # How many chunks are queued up (so cpus aren't waiting on processes spawning)
nChunk = 1000 # How many lines are in each chunk
#Both of the above will require balancing speed against memory overhead
iJob = 0 #Tracker for SMP jobs submitted into pool
iiJob = 0 #Tracker for SMP jobs extracted back out of pool
jobs = [] #SMP job holder
MTres3 = [] #Final result holder
chunk = []
iBuffer = 0 # Buffer line count
with open(path1+file1) as f:
for line in f:
#Send to the chunk
if len(chunk) < nChunk:
chunk.append(line)
else:
#Chunk full
#Don't forget to add the current line to chunk
chunk.append(line)
#Then add the chunk to the buffer (submit to SMP pool)
jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
iJob +=1
iBuffer +=1
#Clear the chunk for the next batch of entries
chunk = []
#Buffer is full, any more chunks submitted would cause undue memory overhead
#(Partially) empty the buffer
if iBuffer >= nBuffer:
temp1 = jobs[iiJob].get()
for rtnLine1 in temp1:
MTres3.append(rtnLine1)
iBuffer -=1
iiJob+=1
#Submit the last chunk if it exists (as it would not have been submitted to SMP buffer)
if chunk:
jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
iJob +=1
iBuffer +=1
#And gather up the last of the buffer, including the final chunk
while iiJob < iJob:
temp1 = jobs[iiJob].get()
for rtnLine1 in temp1:
MTres3.append(rtnLine1)
iiJob+=1
#Cleanup
del chunk, jobs, temp1
pool.close()
推荐文章
- PyLint消息:logging-format-interpolation
- 我如何从一个URL读取图像数据?
- 存储Python字典
- 如何使用cat读取文件的第一行?
- 如何创建新文件夹?
- python中链式调用父初始化器
- 熊猫只创建列名的空数据框架
- 如何在Python中实现Softmax函数
- Conda是否取代了对virtualenv的需求?
- 我如何将熊猫系列或索引转换为NumPy数组?
- Tight_layout()不会考虑图形suptitle
- “else if”的正确语法是什么?
- 熊猫分组,然后按组排序
- 如何确定Pandas列是否包含特定值
- 当DEBUG = False时,Django给出Bad Request (400)