CSV処理中の各プロセスを1つの実行ブロックに保持

CSV処理中の各プロセスを1つの実行ブロックに保持

私はcsvファイルを処理する方法についていくつか質問しましたが、スループットが高いことがわかりました。

毎月私のGoogleドライブディレクトリに複数のTXTファイルが受信されます。この情報を処理してマージしてPostgresライブラリにロードする必要があります。

受信したファイルのレイアウトと構造は次のとおりです。

#A1401099999999              022021I                                   
00999999999   000000000099999999+000000000000000000-000000000000000000-   
00999999999   000000000099999999-000000000000000000-000000000000000000-  
00999999999   000000000099999999-000000000000000000-000000000000000000-
@036

AWKを使用してファイルヘッダーを3つの列に分割します。

$ cat tst.awk
BEGIN { OFS="," }
NR==1 {
    pfx = substr($0,8,7) OFS substr($0,30,6) OFS substr($0,36,1)
    next
}
{
    gsub(/[+-]/,"& ")
    $1 = pfx OFS $1
    print
}

結果

9999999,012021,I,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,012021,I,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,012021,I,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,012021,I,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
@036

次の手順では、すべてのファイルの統合を次のパターンに変更します。

cat -T *.txt > final.csv 


9999999,012021,I,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,012021,I,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,012021,I,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,012021,S,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
@036
9999999,022021,S,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,022021,I,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,022021,S,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,022021,S,0099999999,000000000099999999+,000000000000000000-,000000000000000000-    
@042

ベースプレートを取り外した。

sed -i '/@/d' final.csv

9999999,012021,I,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,012021,I,0099999999,000000000099999999-,000000000000000000-,000000000000000000-
9999999,012021,I,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,012021,S,0099999999,000000000099999999-,000000000000000000-,000000000000000000-
9999999,022021,S,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,022021,I,0099999999,000000000099999999-,000000000000000000-,000000000000000000-
9999999,022021,S,0099999999,000000000099999999-,000000000000000000-,000000000000000000-
9999999,022021,S,0099999999,000000000099999999-,000000000000000000-,000000000000000000- 

フィールド 2 の形式を日付として指定しました。 「MM/YYYY」です。今「yyyy-mm-01」です。 ddのデフォルトとして01を使用します。

awk -F, '{OFS=",";a=substr($2,1,2);b=substr($2,3,4);$2=b"-"a"-01";print $0}' final.csv

結果:

10013534,2021-01-01,I,0090922002,000000000009102629+,000000000000000000-,000000000000000000-,
10013534,2021-01-01,I,0091000002,000000000063288833+,000000000000000000-,000000000000000000-,
10013534,2021-01-01,I,0091100005,000000000063288833+,000000000000000000-,000000000000000000-,
10013534,2021-01-01,I,0091110002,000000000063288833+,000000000000000000-,000000000000000000-,
10013534,2021-01-01,I,0099999995,000000008017897139+,000000000000000000-,000000000000000000-,

列5で関数を使用するには、値が負の場合に位置を「-」から列の先頭に変更する必要があります。

awk -F "," '{sign=substr($5,length($5),1);$5=substr($5,0,length($5)-1); if(sign =="-"){$5="-"$5}; print}' ./mycsv

    10013534,2021-01-01,I,0090922002,000000000009102629,000000000000000000-,000000000000000000-,
    10013534,2021-01-01,I,0091000002,-000000000063288833,000000000000000000-,000000000000000000-,
    10013534,2021-01-01,I,0091100005,-000000000063288833,000000000000000000-,000000000000000000-,
    10013534,2021-01-01,I,0091110002,000000000063288833,000000000000000000-,000000000000000000-,
    10013534,2021-01-01,I,0099999995,-000000008017897139,000000000000000000-,000000000000000000-,

このすべての処理が終わったら、データをPostgresにインポートしたいと思います。

import pandas, csv

from io import StringIO
from sqlalchemy import create_engine

def psql_insert_copy(table, conn, keys, data_iter):
   dbapi_conn = conn.connection
   with dbapi_conn.cursor() as cur:
       s_buf = StringIO()
       writer = csv.writer(s_buf)
       writer.writerows(data_iter)
       s_buf.seek(0)
       columns = ', '.join('"{}"'.format(k) for k in keys)
       if table.schema:
           table_name = '{}.{}'.format(table.schema, table.name, columns)
       else:
           table_name = table.name
       sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(table_name, columns)
       cur.copy_expert(sql=sql, file=s_buf)

engine = create_engine('postgresql://xxxxx:xxx@xxxx:xxx/xxxx')

df = pandas.read_csv("final.csv")
df.to_sql('xxxxxxx', engine, schema='xxxxxxx', method=psql_insert_copy)

関連情報