次のような偽のシリアルデバイスを作成しています。
socat -d -d pty,raw,echo=0 pty,raw,echo=0
毎回新しい番号を持つデバイスが作成されます。/dev/pty/6
彼らに電話しましょう/dev/pty/7
私のテストコンピュータで利用できないハードウェアデバイスをシミュレートするために、/dev/pty/6
毎秒20行の速度で書きました。
これと似ています(実際のプロジェクトではPythonで書いていますが、これは同じ問題を示しています)。
while true
do
date > /dev/pts/6
sleep 0.05
done
数秒間実行すると、cat
/dev/pty/7
私が書いてから行から始めるのではなく、中断した部分から始めることがわかりました。
だから、timeout 5 cat -n /dev/pty/7
途中で休む時間なしで続けてみると、常に100行ほど出てきます。ただし、「生産者」スクリプトを5分間実行してから実行すると、数千timeout 5 cat -n /dev/pty/7
行が表示されます。最後に読んだ後に書かれたすべての行のようです。
私がシミュレートしたい実際のハードウェアはこのように動作しません。
timeout 5 cat -n /dev/ttyUSB0
それはいつも私に約100行を与えます。誰も読まない行を捨てます。
ハードウェアデバイスのこの側面をどのようによくエミュレートできますか?プロデューサスクリプトは、誰かが読み書きすることを確認する必要がありますか?それともsocatコマンドにオプションを提供する必要がありますか?
Pythonでこれを行うと、作成中のバッファがいっぱいになり、プロデューサプログラムがスペースが作成されるのを待って停止したようですが、この場合はこれを証明できません。
答え1
この方法を使用する際の問題は、socat
ptyに大きなバッファがあることです。 Pythonを使用しているので、直接模擬を試すことができます。例えば、
#!/usr/bin/python3
# https://unix.stackexchange.com/a/735081/119298
# some termios code from:
# https://github.com/pyserial/pyserial/blob/master/serial/serialposix.py
import os, time, struct, fcntl, termios
class pseudotty:
TIOCM_zero_str = struct.pack('I', 0)
def __init__(self, name, mode):
master, slave = os.openpty()
sname = os.readlink(f"/proc/self/fd/{slave}") # "/dev/pts/6"
os.unlink(name)
os.symlink(sname, name)
self.mfd = master
self.sfd = slave
self.file = os.fdopen(master, mode)
def waiting(self):
result = fcntl.ioctl(self.sfd, termios.FIONREAD, self.TIOCM_zero_str)
return struct.unpack('I', result)[0]
datain = pseudotty("input", "r")
dataout = pseudotty("output", "w")
while True:
d = datain.file.readline()
wait = dataout.waiting()
if wait<200:
dataout.file.write(d)
input
これにより、現在の作業ディレクトリと各スレーブへのシンボリックリンクを持つ2つのptyが作成され、開きます。したがって、およびをoutput
使用してループをブートストラップできます。date >input
cat
cat output
プログラムは永遠に繰り返されます。入力 pty から 1 行を読み取り、出力 pty で現在待機している文字数をテストし、文字が 200 文字未満の場合はその行を出力に書き込みます。 200文字を超えると入力は削除されます。
これにより、2つのpty間にバッファリングされるデータ量をある程度制御できます。もちろん、200の制限を15に置き換えることができますreadline()
。read(1)
これにより、限られたfifoを使用して多くのシリアルデバイスをエミュレートできます。