Python高效处理GB级数据

最近需要处理车辆轨迹数据,数据源是
http://kolntrace.project.citi-lab.fr/ ,得到的CSV文件大小是18.9GB,大约有3亿5千万条记录。

现在的处理方式是读取源文件的数据,进行处理后将结果保存到另一个文件中。现在有四种不同的方式。

方式一

def init_csv(filename):
    csv_title = 'vehicleID,time,x_coordinates,y_coordinates,speed'
    with open(filename, 'a+', encoding="utf-8") as f:
        f.writelines(csv_title)
        f.writelines('\n')


def write_to_csv(filename, vehicleID, time, x_coordinates, y_coordinates, speed):
    with open(filename, 'a+', encoding="utf-8") as f:
        f.writelines(str(vehicleID) +',' + str(time) + ','+ str(x_coordinates) + ','+ str(y_coordinates) + ',' + str(speed))
        f.writelines('\n')


def process_line(line, i, csvfilename):
    if i <= 50:
        print(line + '\n')
        info = line.split()
        time = info[0]
        id = info[1]
        x_coordinates = info[2]
        y_coordinates = info[3]
        speed = info[4]
        if id.isdigit():
            pass
        else:
            write_to_csv(csvfilename, id, time, x_coordinates, y_coordinates, speed)


def read_file_to_csv(filename, csvfilename):
    init_csv(csvfilename)
    with open(filename, 'r', encoding='utf-8') as f:
        i = 1
        for line in f:
            process_line(line, i, csvfilename)
            i = i + 1


def main():
    ORIGIN_FILE_NAME = '../../koln.tr/koln.tr'
    CSV_FILE_NAME = 'koln.csv'
    read_file_to_csv(ORIGIN_FILE_NAME, CSV_FILE_NAME)


if __name__ == '__main__':
    main()

第一种方式是我一开始使用,使用了很多子函数,逻辑清晰,但是效率极低,如果要处理完18.9G数据,时间估算是大约需要12天。

方式二

def read_file_to_csv(filename, csvfilename):
    csv_title = 'vehicleID,time,x_coordinates,y_coordinates,speed'
    with open(csvfilename, 'a+', encoding="utf-8") as csvf:
        csvf.writelines(csv_title)
        csvf.writelines('\n')
        with open(filename, 'r', encoding='utf-8') as f:
            for line in f:
                info = line.split()
                time = info[0]
                id = info[1]
                x_coordinates = info[2]
                y_coordinates = info[3]
                speed = info[4]
                if float(speed) != 0:
                    if id.isdigit():
                        linedata = str(id) + ',' + str(time) + ',' + str(x_coordinates) + ',' + str(y_coordinates) + ',' + str(speed)
                        if linedata.count(',') == 4:
                            csvf.writelines(linedata)
                            csvf.writelines('\n')


if __name__ == '__main__':
    ORIGIN_FILE_NAME = '../../koln.tr/koln.tr'
    CSV_FILE_NAME = '../../koln.tr/readfiledata.csv'
    read_file_to_csv(ORIGIN_FILE_NAME, CSV_FILE_NAME)

第二种方式是将第一种方式进行改进,去掉了子函数,而且把文件的打开操作作为一步执行,大大减少了文件IO与函数调用的时间,但由于仍然是单进程在执行,所以时间也不是很短,处理完18.9G数据,大概需要3个小时。

方式三

import multiprocessing as mp
import os

ORIGIN_FILE_NAME = '../../koln.tr/koln.tr'
CSV_FILE_NAME = '../../koln.tr/data.csv'

global csvf
csvf = open(CSV_FILE_NAME, 'a+', encoding="utf-8")

def is_number(n):
    try:
        num = float(n)
        # 检查 "nan"
        is_number = num == num   # 或者使用 `math.isnan(num)`
    except ValueError:
        is_number = False
    return is_number


def not_comma_in(n):
    s = str(n)
    if s.find(',') == -1:
        return True
    else:
        return False


def process_wrapper(chunkStart, chunkSize):
    with open(ORIGIN_FILE_NAME) as f:
        f.seek(chunkStart)
        lines = f.read(chunkSize).splitlines()
        for line in lines:
            info = line.split()
            time = info[0]
            id = info[1]
            x_coordinates = info[2]
            y_coordinates = info[3]
            speed = info[4]
            if is_number(id) and is_number(time) and is_number(x_coordinates) and is_number(y_coordinates) and is_number(speed)\
                    and not_comma_in(id) and not_comma_in(time) and not_comma_in(x_coordinates) and not_comma_in(y_coordinates) and not_comma_in(speed):
                linedata = str(id) + ',' + str(time) + ',' + str(x_coordinates) + ',' + str(y_coordinates) + ',' + str(speed)
                datas = linedata.split(',')
                if 5 == len(datas):
                    csvf.writelines(linedata)
                    csvf.writelines('\n')


def chunkify(fname, size=6138*10240):
    fileEnd = os.path.getsize(fname)
    with open(fname,'rb') as f:
        chunkEnd = f.tell()
        while True:
            chunkStart = chunkEnd
            f.seek(size,1)
            f.readline()
            chunkEnd = f.tell()
            yield chunkStart, chunkEnd - chunkStart
            if chunkEnd > fileEnd:
                break


def main():
    # init objects
    pool = mp.Pool(processes=60)
    jobs = []

    # create jobs
    csv_title = 'vehicleID,time,x_coordinates,y_coordinates,speed'
    csvf.writelines(csv_title)
    csvf.writelines('\n')

    for chunkStart, chunkSize in chunkify(ORIGIN_FILE_NAME):
        jobs.append(pool.apply_async(process_wrapper, (chunkStart, chunkSize)))

    # wait for all jobs to finish
    for job in jobs:
        job.get()

    # clean up
    pool.close()


if __name__ == '__main__':
    main()

第三种方式使用了多进程,所以需要性能比较好的计算机才能运行,这也使得数据处理非常快速,处理完18.9G数据只需要不到10分钟。但是,这也导致了一个问题,由于它是将文件分块进行处理,导致了很多数据在分块的时候在每一块中变成了错误的数据。

方式四

import pandas as pd
import multiprocessing as mp

CSV_FILE_NAME = 'withspeed.csv'
TRACE_FILE = 'trace'


def process_wrapper(chunk, chunk_num):
    trace_num = 1
    with open(TRACE_FILE + '_' + str(chunk_num) + '.csv', 'a+', encoding='utf-8') as f:
        f.write('traceID,time,x,y')
        f.write('\n')
        vehicleID = chunk['vehicleID'].drop_duplicates()
        print(len(vehicleID))
        for id in vehicleID:
            trace = chunk[chunk['vehicleID'] == id].sort_values(by=['time'])
            if len(trace) >= 60:
                x = trace['x_coordinates']
                y = trace['y_coordinates']
                time = trace['time']
                for i in range(len(trace)):
                    if i + 1 < len(trace): # is not over bound
                        if time.values[i+1] == time.values[i] + 1:
                            f.write(str(trace_num) + ',' + str(time.values[i]) + ',' + str(x.values[i]) + ',' + str(y.values[i]))
                            f.write('\n')
                        elif time.values[i+1] - time.values[i] >= 30:
                            trace_num += 1
                            # f.write(str(trace_num) + ',' + str(time.values[i]) + ',' + str(x.values[i]) + ',' + str(y.values[i]))
                            # f.write('\n')
                        else:
                            pass
                trace_num += 1
        f.close()

def main():
    # init objects
    pool = mp.Pool(processes=20)
    jobs = []

    chunk_size = 5 ** 10

    chunk_num = 0
    for chunk in pd.read_csv(CSV_FILE_NAME, error_bad_lines=False, chunksize=chunk_size):
        jobs.append(pool.apply_async(process_wrapper, (chunk, chunk_num)))
        chunk_num += 1

    # wait for all jobs to finish
    for job in jobs:
        job.get()

    # clean up
    pool.close()


if __name__ == '__main__':
    main()

方式四是最终采用的方式,先使用panpads 读取CSV文件,就可以无损地对CSV文件进行分块,即通过行数进行分块,并将各个分块的结果保存到不同的文件中,这样必须是分块数据具有一定的独立性。最终的解决方法可以保证在数据正确性的前提下处理速度也是很快的。

发表评论