使用Python Pandas處理億級數據

jopen 8年前發布 | 16K 次閱讀 Python Python開發

在數據分析領域,最熱門的莫過于Python和R語言,此前有一篇文章《 別老扯什么Hadoop了,你的數據根本不夠大 》指出:只有在超過5TB數據量的規模下,Hadoop才是一個合理的技術選擇。這次拿到近億條日志數據,千萬級數據已經是關系型數據庫的查詢分析瓶頸,之前使用過Hadoop對大量文本進行分類,這次決定采用Python來處理數據:

  • 硬件環境
      • CPU:3.5 GHz Intel Core i7
      • 內存:32 GB HDDR 3 1600 MHz
      • 硬盤:3 TB Fusion Drive
  • 數據分析工具
      • Python:2.7.6
      • Pandas:0.15.0
      • IPython notebook:2.0.0

源數據如下表所示:

Table Size Desc
ServiceLogs 98,706,832 rows x 14 columns 8.77 GB 交易日志數據,每個交易會話可以有多條交易
ServiceCodes 286 rows × 8 columns 20 KB 交易分類的字典表

數據讀取

啟動IPython notebook,加載pylab環境:

ipython notebook --pylab=inline
ipythonnotebook --pylab=inline

Pandas提供了IO工具可以將大文件分塊讀取,測試了一下性能,完整加載9800萬條數據也只需要263秒左右,還是相當不錯了。

import pandas as pd
reader = pd.read_csv('data/servicelogs', iterator=True)
try:
    df = reader.get_chunk(100000000)
except StopIteration:
    print "Iteration is stopped."
importpandasas pd
reader = pd.read_csv('data/servicelogs', iterator=True)
try:
    df = reader.get_chunk(100000000)
exceptStopIteration:
    print "Iteration is stopped."
1百萬條 1千萬條 1億條
ServiceLogs 1 s 17 s 263 s

使用不同分塊大小來讀取再調用 pandas.concat 連接DataFrame,chunkSize設置在1000萬條左右速度優化比較明顯。

loop = True
chunkSize = 100000
chunks = []
while loop:
    try:
        chunk = reader.get_chunk(chunkSize)
        chunks.append(chunk)
    except StopIteration:
        loop = False
        print "Iteration is stopped."
df = pd.concat(chunks, ignore_index=True)
loop = True
chunkSize = 100000
chunks = []
while loop:
    try:
        chunk = reader.get_chunk(chunkSize)
        chunks.append(chunk)
    exceptStopIteration:
        loop = False
        print "Iteration is stopped."
df = pd.concat(chunks, ignore_index=True)

下面是統計數據,Read Time是數據讀取時間,Total Time是讀取和Pandas進行concat操作的時間,根據數據總量來看,對5~50個DataFrame對象進行合并,性能表現比較好。

Chunk Size Read Time (s) Total Time (s) Performance
100,000 224.418173 261.358521
200,000 232.076794 256.674154
1,000,000 213.128481 234.934142 √ √
2,000,000 208.410618 230.006299 √ √ √
5,000,000 209.460829 230.939319 √ √ √
10,000,000 207.082081 228.135672 √ √ √ √
20,000,000 209.628596 230.775713 √ √ √
50,000,000 222.910643 242.405967
100,000,000 263.574246 263.574246

如果使用Spark提供的Python Shell,同樣編寫Pandas加載數據,時間會短25秒左右,看來Spark對Python的內存使用都有優化。

數據清洗

Pandas提供了 DataFrame.describe 方法查看數據摘要,包括數據查看(默認共輸出首尾60行數據)和行列統計。由于源數據通常包含一些空值甚至空列,會影響數據分析的時間和效率,在預覽了數據摘要后,需要對這些無效數據進行處理。

首先調用 DataFrame.isnull() 方法查看數據表中哪些為空值,與它相反的方法是 DataFrame.notnull() ,Pandas會將表中所有數據進行null計算,以True/False作為結果進行填充,如下圖所示:

Pandas的非空計算速度很快,9800萬數據也只需要28.7秒。得到初步信息之后,可以對表中空列進行移除操作。嘗試了按列名依次計算獲取非空列,和 DataFrame.dropna() 兩種方式,時間分別為367.0秒和345.3秒,但檢查時發現 dropna() 之后所有的行都沒有了,查了Pandas手冊,原來不加參數的情況下, dropna() 會移除所有包含空值的行。如果只想移除全部為空值的列,需要加上 axis 和 how 兩個參數:

df.dropna(axis=1, how='all')
df.dropna(axis=1, how='all')

共移除了14列中的6列,時間也只消耗了85.9秒。

接下來是處理剩余行中的空值,經過測試,在 DataFrame.replace() 中使用空字符串,要比默認的空值NaN節省一些空間;但對整個CSV文件來說,空列只是多存了一個“,”,所以移除的9800萬 x 6列也只省下了200M的空間。進一步的數據清洗還是在移除無用數據和合并上。

對數據列的丟棄,除無效值和需求規定之外,一些表自身的冗余列也需要在這個環節清理,比如說表中的流水號是某兩個字段拼接、類型描述等,通過對這些數據的丟棄,新的數據文件大小為4.73GB,足足減少了4.04G!

數據處理

使用 DataFrame.dtypes 可以查看每列的數據類型,Pandas默認可以讀出int和float64,其它的都處理為object,需要轉換格式的一般為日期時間。 DataFrame.astype() 方法可對整個DataFrame或某一列進行數據格式轉換,支持Python和NumPy的數據類型。

df['Name'] = df['Name'].astype(np.datetime64)
df['Name'] = df['Name'].astype(np.datetime64)

對數據聚合,我測試了 DataFrame.groupby 和 DataFrame.pivot_table 以及 pandas.merge ,groupby 9800萬行 x 3列的時間為99秒,連接表為26秒,生成透視表的速度更快,僅需5秒。

df.groupby(['NO','TIME','SVID']).count() # 分組
fullData = pd.merge(df, trancodeData)[['NO','SVID','TIME','CLASS','TYPE']] # 連接
actions = fullData.pivot_table('SVID', columns='TYPE', aggfunc='count') # 透視表
df.groupby(['NO','TIME','SVID']).count() # 分組
fullData = pd.merge(df, trancodeData)[['NO','SVID','TIME','CLASS','TYPE']] # 連接
actions = fullData.pivot_table('SVID', columns='TYPE', aggfunc='count') # 透視表

根據透視表生成的交易/查詢比例餅圖:

將日志時間加入透視表并輸出每天的交易/查詢比例圖:

total_actions = fullData.pivot_table('SVID', index='TIME', columns='TYPE', aggfunc='count')
total_actions.plot(subplots=False, figsize=(18,6), kind='area')
total_actions = fullData.pivot_table('SVID', index='TIME', columns='TYPE', aggfunc='count')
total_actions.plot(subplots=False, figsize=(18,6), kind='area')

除此之外,Pandas提供的DataFrame查詢統計功能速度表現也非常優秀,7秒以內就可以查詢生成所有類型為交易的數據子表:

tranData = fullData[fullData['Type'] == 'Transaction']
tranData = fullData[fullData['Type'] == 'Transaction']

該子表的大小為 [10250666 rows x 5 columns]。在此已經完成了數據處理的一些基本場景。實驗結果足以說明,在非“>5TB”數據的情況下,Python的表現已經能讓擅長使用統計分析語言的數據分析師游刃有余。

來自: http://python.jobbole.com/84118/

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!