私の質問は「同じフィールドのソート値に基づいて2つのソートされたファイルをマージします。「しかし、それを名前付きパイプに拡張します。
ソートされた整数を含む2つのテキストファイルがあり、それらをマージしたいとします。これを使用して、sort -nm file1.txt file2.txt > merged.txt
ワンタイム非ブロックマージを実行できます。
さて、これらのファイルが実際に私が作成してPythonで埋める名前付きパイプ(FIFO)であるとしましょう。あるパイプに交互に書き込んでから次のパイプに書き込む限り、これをうまく実行できます。このコードは、2つのソートされた整数のリストを生成し、子プロセスsort
によって読み取られた名前付きパイプに書き込むために使用されます。これにより、結合結果が単一のファイルとして出力されます。
import tempfile
import subprocess
import os
import sys
# Make temporary fifos
tempdir = tempfile.mkdtemp()
tempdir = "/tmp/tmph1ilvegn" # hard-code tempdir for repeated runs
fifo_path1 = os.path.join(tempdir, "fifo1")
fifo_path2 = os.path.join(tempdir, "fifo2")
pos_fifo = os.mkfifo(fifo_path1)
neg_fifo = os.mkfifo(fifo_path2)
# Output will be a sorted merge from 2 inlines2ut streams.
outfile = "sorted_merge.txt"
sortProcess = subprocess.Popen('sort -snm ' + fifo_path1 + " " + fifo_path2 + " > " +
outfile, shell=True)
fifo_writer1 = open(fifo_path1, 'w')
fifo_writer2 = open(fifo_path2, 'w')
nlines1 = 0
nlines2 = 0
# Simulate 2 sorted lists by just going iterating through a sorted list and
# printing some numbers to one list and some to the other.
for i in range(1,100000):
print("i: {}; n1: {}; n2: {}; imbalance:{}".format(i, nlines1, nlines2, nlines1-nlines2))
line_to_write = (str(i) + "\n")
if i % 2:
nlines1 +=1
fifo_writer2.write(line_to_write)
else:
nlines2 +=1
fifo_writer1.write(line_to_write)
# clean up fifos:
fifo_writer1.close()
fifo_writer2.close()
os.remove(fifo_path1)
os.remove(fifo_path2)
sortProcess.communicate()
まとめられた結果を得ました。しかし、今リストをにi % 2
変更i % 3
。元のバージョンでは、fifo1、fifo2、fifo1、fifo2などで印刷されました.変更されたバージョンでは、2つのパイプのうちの1つに2倍の行数が印刷されます。
実行すると、i % 3
次の結果が表示されます。
...
i: 16182; n1: 10788; n2: 5393; imbalance:5395
i: 16183; n1: 10788; n2: 5394; imbalance:5394
i: 16184; n1: 10789; n2: 5394; imbalance:5395
i: 16185; n1: 10790; n2: 5394; imbalance:5396
i: 16186; n1: 10790; n2: 5395; imbalance:5395
i: 16187; n1: 10791; n2: 5395; imbalance:5396
i: 16188; n1: 10792; n2: 5395; imbalance:5397
i: 16189; n1: 10792; n2: 5396; imbalance:5396
i: 16190; n1: 10793; n2: 5396; imbalance:5397
i: 16191; n1: 10794; n2: 5396; imbalance:5398
i: 16192; n1: 10794; n2: 5397; imbalance:5397
i: 16193; n1: 10795; n2: 5397; imbalance:5398
常に同じ場所に停止します。 straceを使用すると、次のことがわかります。
write
Pythonプロセスはコールポイント4で中断されます。write(4, "9\n15170\n15172\n15173\n15175\n15176\n"..., 4100
ただし、プロセスは
sort
コールポイント 3 で中断されます。read
read(3,
lsof -n -p
プロセスの出力を見ると、sort
値が到着するのを待っていますfifo1
が、write
プロセスは値が記録されるのを待っていることがわかりますfifo2
。
sort 23330 nsheff txt REG 259,2 110040 10769142 /usr/bin/sort
sort 23330 nsheff mem REG 259,2 2981280 10752335 /usr/lib/locale/locale-archive
sort 23330 nsheff mem REG 259,2 1868984 6031544 /lib/x86_64-linux-gnu/libc-2.23.so
sort 23330 nsheff mem REG 259,2 138696 6031518 /lib/x86_64-linux-gnu/libpthread-2.23.so
sort 23330 nsheff mem REG 259,2 162632 6031516 /lib/x86_64-linux-gnu/ld-2.23.so
sort 23330 nsheff 0u CHR 136,1 0t0 4 /dev/pts/1
sort 23330 nsheff 1w REG 259,2 0 4719615 /home/nsheff/code/bamSitesToWig/sorted_merge.txt
sort 23330 nsheff 2u CHR 136,1 0t0 4 /dev/pts/1
sort 23330 nsheff 3r FIFO 259,2 0t0 786463 /tmp/tmph1ilvegn/fifo1
sort 23330 nsheff 4r FIFO 259,2 0t0 786465 /tmp/tmph1ilvegn/fifo2
したがって、何らかの理由でsort
プロセス*が受信を停止し、fifo2
プロセスが中断されました。
fifo2
今すぐ発行して別のリスナーを配置すると、cat fifo2
プロセスが再開され、何千回も繰り返されます。
バッファリングされたパイプを使用すると、私が理解できないことが起こっており、sort
あるストリームから次のストリームに読み込む方法がどのように変わるかを考えます。私にとって奇妙なことは、決定的であり、まったく同じ場所で失敗し、リストのバランスが合わない場合にのみ失敗することです。
この問題を解決する方法はありますか?
答え1
明らかに、2つの名前付きパイプに異なる量のデータを書き込むと、プログラムはデッドロックに陥ります。 1 fifo2(バッファがいっぱい)の場合はプログラムがブロックされ、write
fifo1(バッファが空)のプロセスはブロックsort
されます。read
sort
あなたはそれを実現する方法を知りません。ファイルを大きな塊に読み込んだ後、メモリからデータを処理してより効率的に処理できます。データを読み取る関数を使用すると、バッファリングもsort
自動的に発生する可能性があります。stdio.h
名前付き(および名前なし)パイプはデータバッファを使用します。
バッファがいっぱいになると、読み出しプロセスが一部のデータを読み取るか終了するまで書き込みプロセスがブロックされます。
バッファが空の場合、書き込みプロセスがデータの書き込みまたは終了まで読み込みプロセスがブロックされます。
各サイクルで、fifo1 に 1 行、fifo2 に 2 行を書き込むと、fifo2 のバッファーはいっぱいになりますが、fifo1 のバッファーは半分だけ埋められます。
sort
プログラムがfifoに書き込むデータの量と読みたいデータの量に応じて、これは明らかにsort
fifo1から何かを読みたいのですが、プログラムに空のバッファがあり、プログラムが書き込みを望む状況につながる可能性があります。完全バッファーがある fifo2。
sort
パイプバッファのサイズは固定され、プログラムも固定サイズを持ち、固定バッファサイズを使用してデータを読み書きできるため、結果は決定的です。
GNUソースコードを見ることができますsort
:
https://github.com/wertarbyte/coreutils/blob/master/src/sort.c
最初は、すべてのファイルのループの関数を使用して、すべての入力ファイルの入力バッファを埋めようとしますfillbuf
。
後で場合によっては、fillbuf
入力ファイルを再呼び出しします。
関数にfillbuf
コメントがあります
/* Read as many bytes as possible, but do not read so many
bytes that there might not be enough room for the
corresponding line array. The worst case is when the
rest of the input file consists entirely of newlines,
except that the last byte is not a newline. */
明らかにsort
入力ファイルを選択し、一定量のデータが必要です。読み込みがブロックされると、入力ファイルは切り替えられません。
read
この実装は、ジョブがしばらくすると一部のデータまたはEOFを返すため、永久にブロックされず、通常のファイルでうまく機能します。
2つのプロセス/スレッド間をブロックできるものが複数ある場合、デッドロックを避けることは常に困難です。あなたの場合は、1つのパイプのみを使用してください。常にfifo1に書き込むデータがあり(fifo2がブロックしている場合)、逆の場合は非ブロック操作を使用すると役に立ちます。
2つの別々のスレッド/プロセスを使用してパイプに書き込む場合は、2つのパイプを使用することができますが、スレッド/プロセスが互いに独立して動作している場合にのみ可能です。パイプライン1に書き込まれているスレッドAが何らかの方法でスレッドB(パイプライン2に書き込むときにのみブロックされます)を待っている場合、これは役に立ちません。