
これをある程度実行するPythonスクリプトがあります。
current_tasks = TaskManager()
MAXPROCS = 8
while len(outstanding_tasks) > 0:
if len(current_tasks.running) < MAXPROCS:
current_tasks.addTask(outstanding_tasks.next())
else:
current_tasks.wait_for_one_finish()
Outstanding_tasks.next() はデフォルトで次のようになります。
p = subprocess.Popen([task], stdout=OUTFILE, stderr=subprocess.PIPE)
そしてcurrent_tasks.wait_for_one_finish()
:
waiting = True
while waiting:
for t in tasks:
ret = t.poll()
if ret not None:
handle_stderr(t)
waiting = False
break
とても簡単です。一度に 8 つのジョブを実行するまで、要求に応じてジョブを作成し、一度に 1 つずつ完了するまでブロックし、さらにジョブを作成します。
問題はこれである:
stderr=subprocess.PIPE
各サブプロセスは stderr をパイプに書き込みます。競合が発生してパイプに大きなログメッセージまたはその他のエントリを書き込もうとし、メッセージがパイプバッファのサイズを超えると、write()はブロックされます。プロセスが完了しないので、私の制御プロセスは戻り値を確認せずにpoll()
stderrから読み込みます。
明らかに、この問題を解決する方法はいくつかあります。
- 私のサブプロセスから一時ファイルにstderrをリダイレクトする
- 実行中のすべてのジョブのstderrファイル記述子から読み込み、それをメモリにバッファリングするPythonスレッドを作成します。
- 私の小さな一時的なイベントループにはselect()または他のものがあります。
しかし、これはすべて私のアプリケーションコードで処理する必要があることです。私が知りたいことは:パイプの動作を取得する方法はありますか?しかし、子プロセスが常にstderrに成功したwrite()を実行してから、私が見ることなく終了できるように、大きくて弾力性のあるバッファを使用する方法はありますか?終わるまで?
答え1
短い答えは: いいえです。
子プロセスパイプを介して送信された大規模データを処理するために必要な回避策を強調しました。 「素晴らしい、大きな弾力性のあるバッファ」パイプラインは存在しません。これは…サブプロセス管理 Python ドキュメントデッドロックの潜在的な原因として追加されたソリューションを呼び出してproc.communicate()
stderrから読み取ることができます。あなたの場合、問題はcommunicate()
すべてのプロセスを同時に呼び出すことはできず、すべてのデータが読み込まれるまでメソッドがブロックされることです。
私はおそらくループの代わりにselect()
すべてのプロセスの呼び出しを使用します。すべてのプロセスが何かを実行するまでブロックすることができ、プロセスが終了するとstderrパイプを閉じるので一石二鳥です(データがstderrに書き込まれる時期を知り、プロセスが終了する時期がわかります)。stderr
proc.poll()
select()