サイズが不均一な2つの整列された名前付きパイプをマージする方法は?

サイズが不均一な2つの整列された名前付きパイプをマージする方法は?

私の質問は「同じフィールドのソート値に基づいて2つのソートされたファイルをマージします。「しかし、それを名前付きパイプに拡張します。

ソートされた整数を含む2つのテキストファイルがあり、それらをマージしたいとします。これを使用して、sort -nm file1.txt file2.txt > merged.txtワンタイム非ブロックマージを実行できます。

さて、これらのファイルが実際に私が作成してPythonで埋める名前付きパイプ(FIFO)であるとしましょう。あるパイプに交互に書き込んでから次のパイプに書き込む限り、これをうまく実行できます。このコードは、2つのソートされた整数のリストを生成し、子プロセスsortによって読み取られた名前付きパイプに書き込むために使用されます。これにより、結合結果が単一のファイルとして出力されます。

import tempfile
import subprocess
import os
import sys


# Make temporary fifos
tempdir = tempfile.mkdtemp()
tempdir = "/tmp/tmph1ilvegn"  # hard-code tempdir for repeated runs
fifo_path1 = os.path.join(tempdir, "fifo1")
fifo_path2 = os.path.join(tempdir, "fifo2")
pos_fifo = os.mkfifo(fifo_path1)
neg_fifo = os.mkfifo(fifo_path2)


# Output will be a sorted merge from 2 inlines2ut streams.
outfile = "sorted_merge.txt"
sortProcess = subprocess.Popen('sort -snm ' +  fifo_path1 + " " + fifo_path2 + " > " +
    outfile, shell=True)


fifo_writer1 = open(fifo_path1, 'w')
fifo_writer2 = open(fifo_path2, 'w')

nlines1 = 0
nlines2 = 0

# Simulate 2 sorted lists by just going iterating through a sorted list and
# printing some numbers to one list and some to the other.

for i in range(1,100000):
    print("i: {}; n1: {}; n2: {}; imbalance:{}".format(i, nlines1, nlines2, nlines1-nlines2))
    line_to_write = (str(i) + "\n")
    if i % 2:
        nlines1 +=1
        fifo_writer2.write(line_to_write)
    else:
        nlines2 +=1
        fifo_writer1.write(line_to_write)

# clean up fifos:
fifo_writer1.close()
fifo_writer2.close()
os.remove(fifo_path1)
os.remove(fifo_path2)
sortProcess.communicate()

まとめられた結果を得ました。しかし、今リストをにi % 2変更i % 3。元のバージョンでは、fifo1、fifo2、fifo1、fifo2などで印刷されました.変更されたバージョンでは、2つのパイプのうちの1つに2倍の行数が印刷されます。

実行すると、i % 3次の結果が表示されます。

...
i: 16182; n1: 10788; n2: 5393; imbalance:5395
i: 16183; n1: 10788; n2: 5394; imbalance:5394
i: 16184; n1: 10789; n2: 5394; imbalance:5395
i: 16185; n1: 10790; n2: 5394; imbalance:5396
i: 16186; n1: 10790; n2: 5395; imbalance:5395
i: 16187; n1: 10791; n2: 5395; imbalance:5396
i: 16188; n1: 10792; n2: 5395; imbalance:5397
i: 16189; n1: 10792; n2: 5396; imbalance:5396
i: 16190; n1: 10793; n2: 5396; imbalance:5397
i: 16191; n1: 10794; n2: 5396; imbalance:5398
i: 16192; n1: 10794; n2: 5397; imbalance:5397
i: 16193; n1: 10795; n2: 5397; imbalance:5398

常に同じ場所に停止します。 straceを使用すると、次のことがわかります。

writePythonプロセスはコールポイント4で中断されます。write(4, "9\n15170\n15172\n15173\n15175\n15176\n"..., 4100

ただし、プロセスは sortコールポイント 3 で中断されます。readread(3,

lsof -n -pプロセスの出力を見ると、sort値が到着するのを待っていますfifo1が、writeプロセスは値が記録されるのを待っていることがわかりますfifo2

sort    23330 nsheff  txt    REG  259,2   110040 10769142 /usr/bin/sort
sort    23330 nsheff  mem    REG  259,2  2981280 10752335 /usr/lib/locale/locale-archive
sort    23330 nsheff  mem    REG  259,2  1868984  6031544 /lib/x86_64-linux-gnu/libc-2.23.so
sort    23330 nsheff  mem    REG  259,2   138696  6031518 /lib/x86_64-linux-gnu/libpthread-2.23.so
sort    23330 nsheff  mem    REG  259,2   162632  6031516 /lib/x86_64-linux-gnu/ld-2.23.so
sort    23330 nsheff    0u   CHR  136,1      0t0        4 /dev/pts/1
sort    23330 nsheff    1w   REG  259,2        0  4719615 /home/nsheff/code/bamSitesToWig/sorted_merge.txt
sort    23330 nsheff    2u   CHR  136,1      0t0        4 /dev/pts/1
sort    23330 nsheff    3r  FIFO  259,2      0t0   786463 /tmp/tmph1ilvegn/fifo1
sort    23330 nsheff    4r  FIFO  259,2      0t0   786465 /tmp/tmph1ilvegn/fifo2

したがって、何らかの理由でsortプロセス*が受信を停止し、fifo2プロセスが中断されました。

fifo2今すぐ発行して別のリスナーを配置すると、cat fifo2プロセスが再開され、何千回も繰り返されます。

バッファリングされたパイプを使用すると、私が理解できないことが起こっており、sortあるストリームから次のストリームに読み込む方法がどのように変わるかを考えます。私にとって奇妙なことは、決定的であり、まったく同じ場所で失敗し、リストのバランスが合わない場合にのみ失敗することです。

この問題を解決する方法はありますか?

答え1

明らかに、2つの名前付きパイプに異なる量のデータを書き込むと、プログラムはデッドロックに陥ります。 1 fifo2(バッファがいっぱい)の場合はプログラムがブロックされ、writefifo1(バッファが空)のプロセスはブロックsortされます。read

sortあなたはそれを実現する方法を知りません。ファイルを大きな塊に読み込んだ後、メモリからデータを処理してより効率的に処理できます。データを読み取る関数を使用すると、バッファリングもsort自動的に発生する可能性があります。stdio.h

名前付き(および名前なし)パイプはデータバッファを使用します。
バッファがいっぱいになると、読み出しプロセスが一部のデータを読み取るか終了するまで書き込みプロセスがブロックされます。
バッファが空の場合、書き込みプロセスがデータの書き込みまたは終了まで読み込みプロセスがブロックされます。

各サイクルで、fifo1 に 1 行、fifo2 に 2 行を書き込むと、fifo2 のバッファーはいっぱいになりますが、fifo1 のバッファーは半分だけ埋められます。

sortプログラムがfifoに書き込むデータの量と読みたいデータの量に応じて、これは明らかにsortfifo1から何かを読みたいのですが、プログラムに空のバッファがあり、プログラムが書き込みを望む状況につながる可能性があります。完全バッファーがある fifo2。

sortパイプバッファのサイズは固定され、プログラムも固定サイズを持ち、固定バッファサイズを使用してデータを読み書きできるため、結果は決定的です。

GNUソースコードを見ることができますsort
https://github.com/wertarbyte/coreutils/blob/master/src/sort.c

最初は、すべてのファイルのループの関数を使用して、すべての入力ファイルの入力バッファを埋めようとしますfillbuf

後で場合によっては、fillbuf入力ファイルを再呼び出しします。

関数にfillbufコメントがあります

          /* Read as many bytes as possible, but do not read so many
             bytes that there might not be enough room for the
             corresponding line array.  The worst case is when the
             rest of the input file consists entirely of newlines,
             except that the last byte is not a newline.  */

明らかにsort入力ファイルを選択し、一定量のデータが必要です。読み込みがブロックされると、入力ファイルは切り替えられません。

readこの実装は、ジョブがしばらくすると一部のデータまたはEOFを返すため、永久にブロックされず、通常のファイルでうまく機能します。


2つのプロセス/スレッド間をブロックできるものが複数ある場合、デッドロックを避けることは常に困難です。あなたの場合は、1つのパイプのみを使用してください。常にfifo1に書き込むデータがあり(fifo2がブロックしている場合)、逆の場合は非ブロック操作を使用すると役に立ちます。

2つの別々のスレッド/プロセスを使用してパイプに書き込む場合は、2つのパイプを使用することができますが、スレッド/プロセスが互いに独立して動作している場合にのみ可能です。パイプライン1に書き込まれているスレッドAが何らかの方法でスレッドB(パイプライン2に書き込むときにのみブロックされます)を待っている場合、これは役に立ちません。

関連情報