私は次のようないくつかのプログラムを設定しようとしています(追加を含めないでください。進行中の作業が多すぎます)。
pv -q -l -L 1 < input.csv | ./repeat <(nc "host" 1234)
反復プログラムのソースコードは次のとおりです。
#include <fcntl.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <iostream>
#include <string>
inline std::string readline(int fd, const size_t len, const char delim = '\n')
{
std::string result;
char c = 0;
for(size_t i=0; i < len; i++)
{
const int read_result = read(fd, &c, sizeof(c));
if(read_result != sizeof(c))
break;
else
{
result += c;
if(c == delim)
break;
}
}
return result;
}
int main(int argc, char ** argv)
{
constexpr int max_events = 10;
const int fd_stdin = fileno(stdin);
if (fd_stdin < 0)
{
std::cerr << "#Failed to setup standard input" << std::endl;
return -1;
}
/* General poll setup */
int epoll_fd = epoll_create1(0);
if(epoll_fd == -1) perror("epoll_create1: ");
{
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = fd_stdin;
const int result = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_stdin, &event);
if(result == -1) std::cerr << "epoll_ctl add for fd " << fd_stdin << " failed: " << strerror(errno) << std::endl;
}
if (argc > 1)
{
for (int i = 1; i < argc; i++)
{
const char * filename = argv[i];
const int fd = open(filename, O_RDONLY);
if (fd < 0)
std::cerr << "#Error opening file " << filename << ": error #" << errno << ": " << strerror(errno) << std::endl;
else
{
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = fd;
const int result = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event);
if(result == -1) std::cerr << "epoll_ctl add for fd " << fd << "(" << filename << ") failed: " << strerror(errno) << std::endl;
else std::cerr << "Added fd " << fd << " (" << filename << ") to epoll!" << std::endl;
}
}
}
struct epoll_event events[max_events];
while(int event_count = epoll_wait(epoll_fd, events, max_events, -1))
{
for (int i = 0; i < event_count; i++)
{
const std::string line = readline(events[i].data.fd, 512);
if(line.length() > 0)
std::cout << line << std::endl;
}
}
return 0;
}
私はこれを見つけました:
- パイプを使用すると、
./repeat
すべてが期待どおりに機能します。 - プロセス置換を使用すると、すべてが期待どおりに機能します。
- プロセス交換を使用してPVをカプセル化すると、すべてが期待どおりに機能します。
- しかし、特定の設定を使用すると、標準入力からデータ(単一文字)が失われるようです!
私は以下を試しました:
pv
すべてのプロセス間のパイプバッファリングを無効にし、すべてのプロセスで./repeat
使用しようとしましたが、うまくstdbuf -i0 -o0 -e0
いかないようです。- epollをpollに変更しましたが、まだ機能しません。
pv
./repeat
との間の流れを見ると、tee stream.csv
これが正しいようです。- 私は
strace
何が起こっているのかを一度見て(予想通り)多くのシングルバイトの読み取りを見て、データの損失も示しました。
何が起こったのか知りたいですか?それとも、さらなる調査のために私ができることはありますか?
答え1
nc
内部コマンド<(...)
もstdinから読み込まれるためです。
より簡単な例:
$ nc -l 9999 >/tmp/foo &
[1] 5659
$ echo text | cat <(nc -N localhost 9999) -
[1]+ Done nc -l 9999 > /tmp/foo
どこに行きましたかtext
? Netcat経由。
$ cat /tmp/foo
text
あなたのプログラムはnc
同じ標準入力と競合し、nc
それらのいくつかを取得します。
答え2
E/POLLIN で返される epoll() または poll() は、ユーザーにのみ通知します。一つ読む()可能ブロックしないでください。
あなたがしたように、改行まで多くのシングルバイトread()を行うことができるというわけではありません。
私は言った可能これは、E/POLLIN から返された epoll() を使用した後、read() がまだブロックされる可能性があるためです。
また、コードは過去のEOFを読み取ろうとし、すべてのread()エラーを完全に無視します。