代码

Python高效处理GB级数据

最近需要处理车辆轨迹数据,数据源是
http://kolntrace.project.citi-lab.fr/ ,得到的CSV文件大小是18.9GB,大约有3亿5千万条记录。

现在的处理方式是读取源文件的数据,进行处理后将结果保存到另一个文件中。现在有四种不同的方式。

方式一

def init_csv(filename):
    csv_title = 'vehicleID,time,x_coordinates,y_coordinates,speed'
    with open(filename, 'a+', encoding="utf-8") as f:
        f.writelines(csv_title)
        f.writelines('\n')


def write_to_csv(filename, vehicleID, time, x_coordinates, y_coordinates, speed):
    with open(filename, 'a+', encoding="utf-8") as f:
        f.writelines(str(vehicleID) +',' + str(time) + ','+ str(x_coordinates) + ','+ str(y_coordinates) + ',' + str(speed))
        f.writelines('\n')


def process_line(line, i, csvfilename):
    if i <= 50:
        print(line + '\n')
        info = line.split()
        time = info[0]
        id = info[1]
        x_coordinates = info[2]
        y_coordinates = info[3]
        speed = info[4]
        if id.isdigit():
            pass
        else:
            write_to_csv(csvfilename, id, time, x_coordinates, y_coordinates, speed)


def read_file_to_csv(filename, csvfilename):
    init_csv(csvfilename)
    with open(filename, 'r', encoding='utf-8') as f:
        i = 1
        for line in f:
            process_line(line, i, csvfilename)
            i = i + 1


def main():
    ORIGIN_FILE_NAME = '../../koln.tr/koln.tr'
    CSV_FILE_NAME = 'koln.csv'
    read_file_to_csv(ORIGIN_FILE_NAME, CSV_FILE_NAME)


if __name__ == '__main__':
    main()

第一种方式是我一开始使用,使用了很多子函数,逻辑清晰,但是效率极低,如果要处理完18.9G数据,时间估算是大约需要12天。

方式二

def read_file_to_csv(filename, csvfilename):
    csv_title = 'vehicleID,time,x_coordinates,y_coordinates,speed'
    with open(csvfilename, 'a+', encoding="utf-8") as csvf:
        csvf.writelines(csv_title)
        csvf.writelines('\n')
        with open(filename, 'r', encoding='utf-8') as f:
            for line in f:
                info = line.split()
                time = info[0]
                id = info[1]
                x_coordinates = info[2]
                y_coordinates = info[3]
                speed = info[4]
                if float(speed) != 0:
                    if id.isdigit():
                        linedata = str(id) + ',' + str(time) + ',' + str(x_coordinates) + ',' + str(y_coordinates) + ',' + str(speed)
                        if linedata.count(',') == 4:
                            csvf.writelines(linedata)
                            csvf.writelines('\n')


if __name__ == '__main__':
    ORIGIN_FILE_NAME = '../../koln.tr/koln.tr'
    CSV_FILE_NAME = '../../koln.tr/readfiledata.csv'
    read_file_to_csv(ORIGIN_FILE_NAME, CSV_FILE_NAME)

第二种方式是将第一种方式进行改进,去掉了子函数,而且把文件的打开操作作为一步执行,大大减少了文件IO与函数调用的时间,但由于仍然是单进程在执行,所以时间也不是很短,处理完18.9G数据,大概需要3个小时。

方式三

import multiprocessing as mp
import os

ORIGIN_FILE_NAME = '../../koln.tr/koln.tr'
CSV_FILE_NAME = '../../koln.tr/data.csv'

global csvf
csvf = open(CSV_FILE_NAME, 'a+', encoding="utf-8")

def is_number(n):
    try:
        num = float(n)
        # 检查 "nan"
        is_number = num == num   # 或者使用 `math.isnan(num)`
    except ValueError:
        is_number = False
    return is_number


def not_comma_in(n):
    s = str(n)
    if s.find(',') == -1:
        return True
    else:
        return False


def process_wrapper(chunkStart, chunkSize):
    with open(ORIGIN_FILE_NAME) as f:
        f.seek(chunkStart)
        lines = f.read(chunkSize).splitlines()
        for line in lines:
            info = line.split()
            time = info[0]
            id = info[1]
            x_coordinates = info[2]
            y_coordinates = info[3]
            speed = info[4]
            if is_number(id) and is_number(time) and is_number(x_coordinates) and is_number(y_coordinates) and is_number(speed)\
                    and not_comma_in(id) and not_comma_in(time) and not_comma_in(x_coordinates) and not_comma_in(y_coordinates) and not_comma_in(speed):
                linedata = str(id) + ',' + str(time) + ',' + str(x_coordinates) + ',' + str(y_coordinates) + ',' + str(speed)
                datas = linedata.split(',')
                if 5 == len(datas):
                    csvf.writelines(linedata)
                    csvf.writelines('\n')


def chunkify(fname, size=6138*10240):
    fileEnd = os.path.getsize(fname)
    with open(fname,'rb') as f:
        chunkEnd = f.tell()
        while True:
            chunkStart = chunkEnd
            f.seek(size,1)
            f.readline()
            chunkEnd = f.tell()
            yield chunkStart, chunkEnd - chunkStart
            if chunkEnd > fileEnd:
                break


def main():
    # init objects
    pool = mp.Pool(processes=60)
    jobs = []

    # create jobs
    csv_title = 'vehicleID,time,x_coordinates,y_coordinates,speed'
    csvf.writelines(csv_title)
    csvf.writelines('\n')

    for chunkStart, chunkSize in chunkify(ORIGIN_FILE_NAME):
        jobs.append(pool.apply_async(process_wrapper, (chunkStart, chunkSize)))

    # wait for all jobs to finish
    for job in jobs:
        job.get()

    # clean up
    pool.close()


if __name__ == '__main__':
    main()

第三种方式使用了多进程,所以需要性能比较好的计算机才能运行,这也使得数据处理非常快速,处理完18.9G数据只需要不到10分钟。但是,这也导致了一个问题,由于它是将文件分块进行处理,导致了很多数据在分块的时候在每一块中变成了错误的数据。

方式四

import pandas as pd
import multiprocessing as mp

CSV_FILE_NAME = 'withspeed.csv'
TRACE_FILE = 'trace'


def process_wrapper(chunk, chunk_num):
    trace_num = 1
    with open(TRACE_FILE + '_' + str(chunk_num) + '.csv', 'a+', encoding='utf-8') as f:
        f.write('traceID,time,x,y')
        f.write('\n')
        vehicleID = chunk['vehicleID'].drop_duplicates()
        print(len(vehicleID))
        for id in vehicleID:
            trace = chunk[chunk['vehicleID'] == id].sort_values(by=['time'])
            if len(trace) >= 60:
                x = trace['x_coordinates']
                y = trace['y_coordinates']
                time = trace['time']
                for i in range(len(trace)):
                    if i + 1 < len(trace): # is not over bound
                        if time.values[i+1] == time.values[i] + 1:
                            f.write(str(trace_num) + ',' + str(time.values[i]) + ',' + str(x.values[i]) + ',' + str(y.values[i]))
                            f.write('\n')
                        elif time.values[i+1] - time.values[i] >= 30:
                            trace_num += 1
                            # f.write(str(trace_num) + ',' + str(time.values[i]) + ',' + str(x.values[i]) + ',' + str(y.values[i]))
                            # f.write('\n')
                        else:
                            pass
                trace_num += 1
        f.close()

def main():
    # init objects
    pool = mp.Pool(processes=20)
    jobs = []

    chunk_size = 5 ** 10

    chunk_num = 0
    for chunk in pd.read_csv(CSV_FILE_NAME, error_bad_lines=False, chunksize=chunk_size):
        jobs.append(pool.apply_async(process_wrapper, (chunk, chunk_num)))
        chunk_num += 1

    # wait for all jobs to finish
    for job in jobs:
        job.get()

    # clean up
    pool.close()


if __name__ == '__main__':
    main()

方式四是最终采用的方式,先使用panpads 读取CSV文件,就可以无损地对CSV文件进行分块,即通过行数进行分块,并将各个分块的结果保存到不同的文件中,这样必须是分块数据具有一定的独立性。最终的解决方法可以保证在数据正确性的前提下处理速度也是很快的。

安卓设备连接Cohda Wireless MK5 OBU并通信

在上一篇文章Cohda Wireless MK5 RSU&OBU 入门指南中,我们已经可以通过PC与OBU或RSU进行通信。

而现在我们需要使用安卓设备中丰富的传感器,我们必须寻求一种方式使得安卓设备可以通过有线连接与OBU进行通信,当然,这里的有线连接是特指通过USB数据线进行连接通信。幸运地是,官方有一篇相关文档,本文根据官方文档并进行实践,在此记录如下。

我们需要一条OTG线,Micro USB 转 USB A 的转接线,如果有同学不懂,可以看下图。

除了准备一条OTG线外,我们还需要对OBU以及Android设备进行一些简单设置。

通过Xshell连接OBU,然后输入命令  sudo fw_setenv usb_mode将OBU的USB模式设置为android。可通过命令  sudo fw_printenv查看OBU所有的环境变量,如下图所示,倒数第二行显示为“usb_mode=android”,说明OBU已经设置成功。

之后,就可以将安卓设备与OBU通过OTG线进行连接,并在安卓设备中系统设置里开启USB共享网络。

之后,回到连接到OBU的终端,输入以下命令,可以看到OBU中已经有了usb0这个网卡。如果你的也是一样,那么恭喜你,OBU已经可以和安卓设备进行通信了。

[email protected]:~# ifconfig usb0
usb0      Link encap:Ethernet  HWaddr 02:00:5a:0a:32:38
          inet addr:192.168.42.101  Bcast:192.168.42.255  Mask:255.255.255.0
          inet6 addr: fe80::5aff:fe0a:3238/64 Scope:Link
          UP BROADCAST RUNNING MULTICAST  MTU:1500  Metric:1
          RX packets:6 errors:0 dropped:0 overruns:0 frame:0
          TX packets:6 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:1000
          RX bytes:544 (544.0 B)  TX bytes:732 (732.0 B)

经过了之前的步骤,安卓设备已经在物理层面连接起来,但是如果通过安卓设备来处理OBU接收到的BSM数据报文,则需要重新执行OBU中的Bsm-shell程序。其中,192.168.42.129为安卓设备的IP地址,这样,就大功告成了。

D_LEVEL=4 ./bsm-shell -m 1 -c 178 -b 172 -n 172 -i 1 –f/dev/null –u 192.168.42.129 -y 4040 -z 4040

高频率百度定位与高德定位

百度定位API提供的自动定位方法频率最高是 1 Hz。 可能对于大部分调用定位接口的APP是没有影响的,但是对于需要定位频率更高些的APP可能无法通过这种方式来实现。

mLocationClient = new LocationClient(getApplicationContext());
        //声明LocationClient类
        mLocationClient.registerLocationListener(myListener);
        //注册监听函数
        LocationClientOption option = new LocationClientOption();

        option.setLocationMode(LocationClientOption.LocationMode.Hight_Accuracy);
        //可选,设置定位模式,默认高精度
        //LocationMode.Hight_Accuracy:高精度;
        //LocationMode. Battery_Saving:低功耗;
        //LocationMode. Device_Sensors:仅使用设备;

        option.setCoorType("bd09ll");
        //可选,设置返回经纬度坐标类型,默认gcj02
        //gcj02:国测局坐标;
        //bd09ll:百度经纬度坐标;
        //bd09:百度墨卡托坐标;
        //海外地区定位,无需设置坐标类型,统一返回wgs84类型坐标

        option.setScanSpan(0);
        //可选,设置发起定位请求的间隔,int类型,单位ms
        //如果设置为0,则代表单次定位,即仅定位一次,默认为0
        //如果设置非0,需设置1000ms以上才有效

        option.setOpenGps(true);
        //可选,设置是否使用gps,默认false
        //使用高精度和仅用设备两种定位模式的,参数必须设置为true

        option.setLocationNotify(true);
        //可选,设置是否当GPS有效时按照1S/1次频率输出GPS结果,默认false

        option.setIgnoreKillProcess(false);
        //可选,定位SDK内部是一个service,并放到了独立进程。
        //设置是否在stop的时候杀死这个进程,默认(建议)不杀死,即setIgnoreKillProcess(true)

        option.SetIgnoreCacheException(true);
        //可选,设置是否收集Crash信息,默认收集,即参数为false

        option.setWifiCacheTimeOut(5*60*1000);
        //可选,7.2版本新增能力
        //如果设置了该接口,首次启动定位时,会先判断当前WiFi是否超出有效期,若超出有效期,会先重新扫描WiFi,然后定位

        option.setEnableSimulateGps(false);
        //可选,设置是否需要过滤GPS仿真结果,默认需要,即参数为false

        mLocationClient.setLocOption(option);
        //mLocationClient为第二步初始化过的LocationClient对象
        //需将配置好的LocationClientOption对象,通过setLocOption方法传递给LocationClient对象使用
        //更多LocationClientOption的配置,请参照类参考中LocationClientOption类的详细说明

所以考虑使用线程, 通过线程以较高频率(10Hz)来请求百度定位信息,实际上是可行的。

private class GpsThread extends Thread {
        private final static int FREQUENCY = 100;
        private boolean stop;
        public void stopMe(){
            this.stop = true;
        }
        @Override
        public void run() {
            super.run();
            stop = false;
            while (!stop){
                try {
                    mLocationClient.requestLocation();
                    sleep(FREQUENCY);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

之前的代码是使用百度定位Api进行的,现在迁移到高德定位,发现网上也没有相关的文章,现在把高德定位的解决方案也记录如下。

mLocationClient = new AMapLocationClient(getApplicationContext());
//设置定位回调监听
mLocationClient.setLocationListener(mLocationListener);
//初始化AMapLocationClientOption对象
mLocationOption = new AMapLocationClientOption();
mLocationOption.setLocationMode(AMapLocationClientOption.AMapLocationMode.Hight_Accuracy);
//给定位客户端对象设置定位参数
mLocationClient.setLocationOption(mLocationOption);
	
public class GaodeThread extends Thread{
	@Override
	public void run() {
		super.run();
		//现在是高德的100ms刷新一次
		while (true){
			try {
				mLocationClient.startLocation();
				sleep(100);
			} catch (InterruptedException e) {
					e.printStackTrace();
			}
		}
	}
}

勘误

以上方法由于错误的验证方式,现在发现是无效的,若对您造成困扰,十分抱歉。