我试图读取一个大的csv文件(aprox。6 GB)在熊猫和我得到一个内存错误:

MemoryError                               Traceback (most recent call last)
<ipython-input-58-67a72687871b> in <module>()
----> 1 data=pd.read_csv('aphro.csv',sep=';')

...

MemoryError: 

有什么帮助吗?


该错误表明机器没有足够的内存来读取整个 CSV一次转换成一个数据帧。假设您不需要整个数据集 内存,避免这个问题的一种方法是处理CSV在 Chunks(通过指定chunksize参数):

chunksize = 10 ** 6
for chunk in pd.read_csv(filename, chunksize=chunksize):
    process(chunk)

chunksize参数指定每个块的行数。 (当然,最后一个块可能包含少于块大小的行。)


熊猫>= 1.2

Read_csv with chunksize返回一个上下文管理器,像这样使用:

chunksize = 10 ** 6
with pd.read_csv(filename, chunksize=chunksize) as reader:
    for chunk in reader:
        process(chunk)

参见 GH38225

我是这样说的:

chunks=pd.read_table('aphro.csv',chunksize=1000000,sep=';',\
       names=['lat','long','rf','date','slno'],index_col='slno',\
       header=None,parse_dates=['date'])

df=pd.DataFrame()
%time df=pd.concat(chunk.groupby(['lat','long',chunk['date'].map(lambda x: x.year)])['rf'].agg(['sum']) for chunk in chunks)

你可以尝试sframe,它和pandas有相同的语法,但是允许你操作比你的RAM大的文件。

函数read_csv和read_table几乎是一样的。但在程序中使用read_table函数时,必须分配分隔符“,”。

def get_from_action_data(fname, chunk_size=100000):
    reader = pd.read_csv(fname, header=0, iterator=True)
    chunks = []
    loop = True
    while loop:
        try:
            chunk = reader.get_chunk(chunk_size)[["user_id", "type"]]
            chunks.append(chunk)
        except StopIteration:
            loop = False
            print("Iteration is stopped")

    df_ac = pd.concat(chunks, ignore_index=True)

如果您使用pandas将大文件读入块,然后逐行yield,这是我所做的

import pandas as pd

def chunck_generator(filename, header=False,chunk_size = 10 ** 5):
   for chunk in pd.read_csv(filename,delimiter=',', iterator=True, chunksize=chunk_size, parse_dates=[1] ): 
        yield (chunk)

def _generator( filename, header=False,chunk_size = 10 ** 5):
    chunk = chunck_generator(filename, header=False,chunk_size = 10 ** 5)
    for row in chunk:
        yield row

if __name__ == "__main__":
filename = r'file.csv'
        generator = generator(filename=filename)
        while True:
           print(next(generator))

分块不应该总是解决这个问题的第一步。

Is the file large due to repeated non-numeric data or unwanted columns? If so, you can sometimes see massive memory savings by reading in columns as categories and selecting required columns via pd.read_csv usecols parameter. Does your workflow require slicing, manipulating, exporting? If so, you can use dask.dataframe to slice, perform your calculations and export iteratively. Chunking is performed silently by dask, which also supports a subset of pandas API. If all else fails, read line by line via chunks. Chunk via pandas or via csv library as a last resort.

对于大数据,我建议你使用"dask"库,例如:

# Dataframes implement the Pandas API
import dask.dataframe as dd
df = dd.read_csv('s3://.../2018-*-*.csv')

你可以在这里阅读更多的文档。

另一个很好的选择是使用modin,因为所有的功能都与pandas相同,但它利用了分布式数据框架库,如dask。

在我的项目中,另一个高级库是数据表。

# Datatable python library
import datatable as dt
df = dt.fread("s3://.../2018-*-*.csv")

除了上面的答案,对于那些想要处理CSV然后导出到CSV、parquet或SQL的人来说,d6tstack是另一个不错的选择。您可以加载多个文件,它处理数据模式更改(添加/删除列)。核心支持已经被剔除。

def apply(dfg):
    # do stuff
    return dfg

c = d6tstack.combine_csv.CombinerCSV([bigfile.csv], apply_after_read=apply, sep=',', chunksize=1e6)

# or
c = d6tstack.combine_csv.CombinerCSV(glob.glob('*.csv'), apply_after_read=apply, chunksize=1e6)

# output to various formats, automatically chunked to reduce memory consumption
c.to_csv_combine(filename='out.csv')
c.to_parquet_combine(filename='out.pq')
c.to_psql_combine('postgresql+psycopg2://usr:pwd@localhost/db', 'tablename') # fast for postgres
c.to_mysql_combine('mysql+mysqlconnector://usr:pwd@localhost/db', 'tablename') # fast for mysql
c.to_sql_combine('postgresql+psycopg2://usr:pwd@localhost/db', 'tablename') # slow but flexible

您可以将数据读入为块,并将每个块保存为pickle。

import pandas as pd 
import pickle

in_path = "" #Path where the large file is
out_path = "" #Path to save the pickle files to
chunk_size = 400000 #size of chunks relies on your available memory
separator = "~"

reader = pd.read_csv(in_path,sep=separator,chunksize=chunk_size, 
                    low_memory=False)    


for i, chunk in enumerate(reader):
    out_file = out_path + "/data_{}.pkl".format(i+1)
    with open(out_file, "wb") as f:
        pickle.dump(chunk,f,pickle.HIGHEST_PROTOCOL)

在下一步中,读入pickle并将每个pickle附加到所需的数据框架中。

import glob
pickle_path = "" #Same Path as out_path i.e. where the pickle files are

data_p_files=[]
for name in glob.glob(pickle_path + "/data_*.pkl"):
   data_p_files.append(name)


df = pd.DataFrame([])
for i in range(len(data_p_files)):
    df = df.append(pd.read_pickle(data_p_files[i]),ignore_index=True)

解决方案1:

使用大数据的熊猫

解决方案2:

TextFileReader = pd.read_csv(path, chunksize=1000)  # the number of rows per chunk

dfList = []
for df in TextFileReader:
    dfList.append(df)

df = pd.concat(dfList,sort=False)

如果有人还在寻找这样的东西,我发现这个名为modin的新库可以提供帮助。它使用分布式计算来帮助读取。这里有一篇不错的文章将其功能与熊猫进行了比较。它本质上使用与熊猫相同的功能。

import modin.pandas as pd
pd.read_csv(CSV_FILE_NAME)

下面是一个例子:

chunkTemp = []
queryTemp = []
query = pd.DataFrame()

for chunk in pd.read_csv(file, header=0, chunksize=<your_chunksize>, iterator=True, low_memory=False):

    #REPLACING BLANK SPACES AT COLUMNS' NAMES FOR SQL OPTIMIZATION
    chunk = chunk.rename(columns = {c: c.replace(' ', '') for c in chunk.columns})

    #YOU CAN EITHER: 
    #1)BUFFER THE CHUNKS IN ORDER TO LOAD YOUR WHOLE DATASET 
    chunkTemp.append(chunk)

    #2)DO YOUR PROCESSING OVER A CHUNK AND STORE THE RESULT OF IT
    query = chunk[chunk[<column_name>].str.startswith(<some_pattern>)]   
    #BUFFERING PROCESSED DATA
    queryTemp.append(query)

#!  NEVER DO pd.concat OR pd.DataFrame() INSIDE A LOOP
print("Database: CONCATENATING CHUNKS INTO A SINGLE DATAFRAME")
chunk = pd.concat(chunkTemp)
print("Database: LOADED")

#CONCATENATING PROCESSED DATA
query = pd.concat(queryTemp)
print(query)

我想在已经提供的大多数潜在解决方案的基础上,给出一个更全面的答案。我还想指出另一个可能有助于阅读的方法。

选项1:dtypes

"dtypes"是一个非常强大的参数,您可以使用它来减少读取方法的内存压力。看看这个和这个答案。默认情况下,Pandas尝试推断数据的dtype。

根据数据结构,每存储一个数据,就进行一次内存分配。在基本层面上,请参考以下值(下表说明了C编程语言的值):

The maximum value of UNSIGNED CHAR = 255                                    
The minimum value of SHORT INT = -32768                                     
The maximum value of SHORT INT = 32767                                      
The minimum value of INT = -2147483648                                      
The maximum value of INT = 2147483647                                       
The minimum value of CHAR = -128                                            
The maximum value of CHAR = 127                                             
The minimum value of LONG = -9223372036854775808                            
The maximum value of LONG = 9223372036854775807

请参阅本页查看NumPy和C类型之间的匹配。

假设你有一个整数数组。你可以在理论上和实际中分配,比如说一个16位整数类型的数组,但是你会分配比实际需要更多的内存来存储这个数组。为了避免这种情况,您可以在read_csv上设置dtype选项。你不想把数组项存储为长整数,实际上你可以用8位整数(np.int8或np.uint8)来适合它们。

观察下面的dtype映射。

来源:https://pbpython.com/pandas_dtypes.html

你可以将dtype参数作为参数传递给pandas方法,就像读取{column: type}一样。

import numpy as np
import pandas as pd

df_dtype = {
        "column_1": int,
        "column_2": str,
        "column_3": np.int16,
        "column_4": np.uint8,
        ...
        "column_n": np.float32
}

df = pd.read_csv('path/to/file', dtype=df_dtype)

选项2:按块读取

以块形式读取数据允许您访问内存中的一部分数据,并且可以对数据应用预处理并保存处理过的数据而不是原始数据。如果将这个选项与第一个选项dtypes结合使用,效果会好得多。

我想指出熊猫烹饪书中关于这一过程的部分,你可以在这里找到。注意这两个部分;

逐块读取csv文件 逐块读取csv中的特定行

选项3:Dask

Dask是一个框架,在Dask的网站上定义为:

Dask为分析提供了高级的并行性,为您喜爱的工具提供了大规模的性能

它的诞生是为了覆盖熊猫无法到达的必要部位。Dask是一个功能强大的框架,通过以分布式方式处理数据,允许您访问更多的数据。

你可以使用dask来预处理你的数据作为一个整体,dask负责分块部分,所以不像pandas,你可以只定义你的处理步骤,让dask来做工作。Dask在被compute和/或persist显式地推入之前不会应用计算(请参阅这里的答案了解区别)。

其他辅助(想法)

ETL流程为数据设计。只保留原始数据中需要的内容。 首先,使用Dask或PySpark等框架将ETL应用于整个数据,并导出处理后的数据。 然后看看处理过的数据是否能整体地放入内存中。 考虑增加内存。 考虑在云平台上使用这些数据。

在使用chunksize选项之前,如果你想确定你想要在@unutbu提到的分块for循环中写入的进程函数,你可以简单地使用nrows选项。

small_df = pd.read_csv(filename, nrows=100)

一旦确定流程块准备好了,就可以将其放入整个数据帧的分块for循环中。

如果你有一个csv文件,有数百万个数据条目,你想要加载完整的数据集,你应该使用dask_cudf,

import dask_cudf as dc

df = dc.read_csv("large_data.csv")