希望

希望

希望

パイプラインに応答してコマンドを繰り返し実行したいと思います。

firehose | expensive-command

しかし、私は多くの行を受け取り、コマンドは非常にリソース集約的です。最大1回だけ実行するようにコマンドへの入力をフィルタリングしたいと思います。X第二:

firehose | interval 1 second | expensive-command

このintervalコマンドは単純なフィルタにしてはいけません。むしろ、クールダウン期間中に到着したすべてのアイテムをブロックするのではなく、クールダウン期間の終わりに最新の受信行を送信する必要があります。

どうすればいいですか?


試みる

epoch () { date +%s --date="$*" }

interval () {
    INTERVAL="$*"
    LAST_RUN_AT=0
    WHEN_TO_RUN=0
    while read LINE; do
        if (( $(epoch now) >= $WHEN_TO_RUN )) then
            echo $LINE
            WHEN_TO_RUN="$(epoch now + $INTERVAL)"
        fi
    done
}

alias firehose='(print "1\n2\n3" ; sleep 2 ; print "4\n")'
alias expensive-command='cat'

firehose | interval 1 second | expensive-command

これはほとんど機能しますが、出荷ラインを後で遅らせることができないという問題があります。すぐに出荷するかキャンセルするかを決定できます。

何が起こったのか:

1
4

スロットルはそれを受け取り、それを渡して1冷却し続けます。冷却期間1中に到着するので3完全に廃棄されます。クールタイムが4到達する前に終了して配信されました。

何があったのか

私がしたいこと:

1
3
4

受信後、1スロットルは1秒間冷却する必要があります。その後、2まだ冷却しているので、後で受け取って保管する必要があります。その後、それを受け取り、後で送信された内容を3置き換えます。2その後、スロットルは冷却を停止し、その時点ですぐに送信する必要があります3。最後に、4ターンのクールダウンが完了すると到着するので、すぐに送信されます。

私は何が起こりたいですか?

zshにいる場合閉鎖、休止状態のサブシェルを起動し、ついに$INTERVALこれechoを受け取りますLINEが、残念ながらzshにはクロージャはありません。

答え1

問題は読み取りには時間制限が必要です。。何も送信しないとfirehoseループ無期限のブロックそして、これを行うと、最後に受信した行の送信に失敗します。Bashには、タイムアウト読み取りを表す-tパラメーターがあります。zshにreadこれがあれば使用できます。

アルゴリズムは、常に行を読み取り、1秒(または他の)間隔の終わりに期限切れになるように常に再計算される(ますます短くなる)タイムアウトを設定することです。この間隔に達すると、1 つ以上の行を読み取ると、最後の行が送信されます。それ以外の場合は何も送信されず、次の行間隔を読み始めます。

受信した最初のラインまたはインターバル時間より長い後、受信した最初のラインに「即時配信」を実装できます。間隔が1秒で、最後のラインが出力されてfirehoseから1.5秒間何もないと、ラインが通過する可能性があり、マシンはその時点で新しい1秒間隔を開始するようにリセットできます。

TXR Lispのこの概念実証実装は私にとってうまく機能し、基本的なアルゴリズムを検証します。

(defvarl %interval% 1000000) ;; us

(defun epoch-usec ()
  (tree-bind (sec . usec) (time-usec)
    (+ (* 1000000 sec) usec)))

(let ((now (epoch-usec))
      (*stdin* (open-fileno (fileno *stdin*) "rl")) ;; line buffered
      remaining-time next-time line done)
  (while (not done)
    (set next-time (+ now %interval%))
    (set remaining-time (- next-time now))
    (while (poll (list (cons *stdin* poll-in))
                 (trunc remaining-time 1000))
      ;; got a line or EOF poll: no timeout
      (iflet ((nline (get-line)))
        (set line nline)              ;; got line
        (progn (flip done) (return))) ;; EOF poll
      (set now (epoch-usec))
      (when (minusp (set remaining-time (- next-time now)))
        (return)))
    ;; timeout, past deadline or exit: flush line, if any:
    (when line
      (put-line line)
      (set line nil))))

pollタイムアウト読み取りが使用されており、pollストリームバッファが表示されないため、バッファリングされていないストリームを設定してください。ストリームに読み込まれていないバッファリングされたデータがあるときに入力をポーリングしたくないというアイデアです。これはnitpickです。テストでは、*stdin*この動作とバッファリングされたネイティブストリームの使用との間の動作の質的な違いは実際には見られませんでした。ストリームにバッファリングされたデータがあるがファイル記述子にデータがない場合にポーリング時間を浪費する場合、間隔より長く待たずに新しいデータが早く到着すると、待ち時間は間隔より短くなります。

我々は成功がpoll全行を読むことができることを意味すると仮定する。pollもちろん、これを保証することはできませんが、正しく機能するテキストストリーム入力ソースは、入力バイトがウェイクアップに使用可能な場合、そのpollバイトの後に過度の遅延なしに完全な行が続くことを保証する必要があります。

残り時間の計算には、カレンダーの時間とpoll時間の調整に敏感ではない可能性がある相対的な待機のみが使用されます。したがって、一般的な注意事項が適用されます。時計が突然後ろに戻ったら、こんな!

これらのテストケースは、顕著な遅延なしに行われます。

$ echo foo | txr throttle.txr
foo
$ (echo foo; echo bar) | txr throttle.tl 
bar
$ (echo foo; echo bar; echo xyzzy) | txr throttle.tl 
xyzzy

それから:

$ (echo foo; sleep 2; echo bar; sleep 2; echo xyzzy) | txr throttle.tl 
foo
bar
xyzzy

find / | txr throttle.tlなどをテストしてみました。

答え2

最初の変形(動作しない、2番目の変形を参照)

ループの実行が停止するため、readコマンドを使用してこれらの操作を実行できないようです。readwhile

この例を見てください(printf "1\n2\n3\n" ; sleep 5; printf "4\n") | while read -r line; do echo hello; done

while内部ループはread次のように実行されます。

  • 1回繰り返し - 読む1
  • 2回繰り返し - 読み取り2;
  • 3回繰り返し - 読み取り3;
  • 4回繰り返し - 5秒待ってから読みます4

このループ内では、「1秒ごとに実行」などのスケジュールされたタスクを実行することはできません。なぜなら、定期的に停止して入力を待つからです。たとえば、1分以上待つと、スケジュールされたタスクが停止します。

function interval () {
    amount_of_seconds=$1
    print_time=0
    buffer=''
    while read -r line; do
        current_time=$(date +%s)

        if (( current_time > print_time )); then
            echo -e "${buffer}${line}"
            buffer=''
            print_time=$((current_time + amount_of_seconds))
        else
            buffer="$line\n"
        fi
    done
    echo -en "$buffer"
}

テスト:

$ alias firehose='(printf "1\n2\n3\n" ; sleep 2 ; printf "4\n"; sleep 2 ; printf "5\n6\n7\n" ; sleep 2; printf "8\n")'
$ firehose | interval 1 | cat
1
3
4
5
7
8
$ 

2番目の変形

firehose出力をファイルにリダイレクトする:(下に表示されないfirehose >> buffer_file.txt理由の説明)>>>

expensive-command毎秒ファイルの最後の行を読み、ファイルをフラッシュします。

while true; do
    tail -n 1 buffer_file.txt | expensive-command
    # clear file
    echo -n '' > buffer_file.txt
    # and sleep 1 second
    sleep 1      
done

その結果、私たちは次のようになります:

  1. 2つのコマンドが同時に実行されます(firehoseバックグラウンドで):

    firehose >> buffer_file.txt & ./script_with_expensive_command_inside.sh

    APPEND演算子 - WRITE>>の後に必要であり、必要ありません。それ以外の場合は、ファイルはクリーンアップされずに拡大し続けます。firehose>以下は、この動作の説明です。
  2. 不要な行はすべて削除され、最後の行のみが渡されます。expensive command
  3. expensive command最後の行は、読み取らずにファイルを消去する前に保存されます。

答え3

私がやった!

ここに私のintervalスクリプトがあります(また羽ハブから):

#!/usr/bin/env zsh
# Lets a line pass only once every $1 seconds.  If multiple lines arrive during
# the cooldown interval, only the latest is passed on when the cooldown ends.

INTERVAL="$1"

CHILD_PID=
BUFFER=$(mktemp)
CAN_PRINT_IMMEDIATELY=1
CAN_START_SUBPROCESS=1

# Reset state when child process returns
child-return () {
    CAN_START_SUBPROCESS=1
    CAN_PRINT_IMMEDIATELY=1
}
trap child-return CHLD

# Clean up when quitting
cleanup () {
    kill -TERM "$CHILD_PID" &> /dev/null
    rm "$BUFFER"
    exit
}
trap cleanup TERM INT QUIT

while read LINE; do
    # If we're just starting, just print immediately
    if [[ -n $CAN_PRINT_IMMEDIATELY ]]; then
        echo $LINE
        CAN_PRINT_IMMEDIATELY=
    else
        # Otherwise, store the line for later
        echo "$LINE" > $BUFFER
        # And spawn a subprocess to handle it one interval later, unless one is
        # already running.  With the SIGCHLD trap, the state variables will
        # reset when it exits.
        if [[ -n $CAN_START_SUBPROCESS ]]; then
            CAN_START_SUBPROCESS=
            (
                sleep $INTERVAL
                tail -n1 $BUFFER
            ) &
            CHILD_PID=$!
        fi
    fi
done

# Once we exhaust stdin, wait for the last child process to finish, if any.
if [[ -n $CHILD_PID ]]; then
    wait $CHILD_PID &> /dev/null
    cleanup
fi

read私はプログラムが時々行を非同期的に印刷しなければならないので(メッセージが受信されなかった場合、時々終了してから長い時間が経過しても)、stdinループラインが常に印刷を担当しているわけではないことを観察しました。したがって、子プロセスです。

tee >(sed)これは次のように機能し、時間を観察するために入力も別々に保持されます。

質問に提供されたサンプル入力に対してスクリプトが機能することを示すGIF録音

これは前の図と一致します。

問題の予想結果

答え4

これは非常に簡単な方法で必要なものを実行する必要があります:)

firehose | awk '{print $1; system("sleep 1")}' | expensive-command

欠点は、すべてを殺すのが少し難しいことです(killall awk動作しますが非常にエレガントです)。ただし、少なくとも単純で特別なスクリプトやその他の項目は必要ありません。

関連情報