Bash Pipe – Why Data Loss Occurs in This Construction

bashclinuxpipeprocess-substitution

I am trying to combine a few programs like so (please ignore any extra includes, this is heavy work-in-progress):

pv -q -l -L 1  < input.csv | ./repeat <(nc "host" 1234)

Where the source of the repeat program looks as follows:

#include <fcntl.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

#include <iostream>
#include <string>

inline std::string readline(int fd, const size_t len, const char delim = '\n')
{
    std::string result;
    char c = 0;
    for(size_t i=0; i < len; i++)
    {
        const int read_result = read(fd, &c, sizeof(c));
        if(read_result != sizeof(c))
            break;
        else
        {
            result += c;
            if(c == delim)
                break;
        }
    }
    return result;
}

int main(int argc, char ** argv)
{
    constexpr int max_events = 10;

    const int fd_stdin = fileno(stdin);
    if (fd_stdin < 0)
    {
        std::cerr << "#Failed to setup standard input" << std::endl;
        return -1;
    }


    /* General poll setup */
    int epoll_fd = epoll_create1(0);
    if(epoll_fd == -1) perror("epoll_create1: ");
    {
        struct epoll_event event;
        event.events = EPOLLIN;
        event.data.fd = fd_stdin;
        const int result = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_stdin, &event);
        if(result == -1) std::cerr << "epoll_ctl add for fd " << fd_stdin << " failed: " << strerror(errno) << std::endl;
    }

    if (argc > 1)
    {
        for (int i = 1; i < argc; i++)
        {
            const char * filename = argv[i];
            const int fd = open(filename, O_RDONLY);
            if (fd < 0)
                std::cerr << "#Error opening file " << filename << ": error #" << errno << ": " << strerror(errno) << std::endl;
            else
            {
                struct epoll_event event;
                event.events = EPOLLIN;
                event.data.fd = fd;
                const int result = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event);
                if(result == -1) std::cerr << "epoll_ctl add for fd " << fd << "(" << filename << ") failed: " << strerror(errno) << std::endl;
                else std::cerr << "Added fd " << fd << " (" << filename << ") to epoll!" << std::endl;
            }
        }
    }

    struct epoll_event events[max_events];
    while(int event_count = epoll_wait(epoll_fd, events, max_events, -1))
    {
        for (int i = 0; i < event_count; i++)
        {
            const std::string line = readline(events[i].data.fd, 512);                      
            if(line.length() > 0)
                std::cout << line << std::endl;
        }
    }
    return 0;
}

I noticed this:

  • When I just use the pipe to ./repeat, everything works as intended.
  • When I just use the process substitution, everything works as intended.
  • When I encapsulate pv using process substitution, everything works as intended.
  • However, when I use the specific construction, I appear to lose data (individual characters) from stdin!

I have tried the following:

  • I have tried to disable buffering on the pipe between pv and ./repeat using stdbuf -i0 -o0 -e0 on all processes, but that doesn't seem to work.
  • I have swapped epoll for poll, doesn't work.
  • When I look at the stream between pv and ./repeat with tee stream.csv, this looks correct.
  • I used strace to see what was going on, and I see lots of single-byte reads (as expected) and they also show that data is going missing.

I wonder what is going on? Or what I can do to investigate further?

Best Answer

Because the nc command inside <(...) will also read from stdin.

Simpler example:

$ nc -l 9999 >/tmp/foo &
[1] 5659

$ echo text | cat <(nc -N localhost 9999) -
[1]+  Done                    nc -l 9999 > /tmp/foo

Where did the text go? Through the netcat.

$ cat /tmp/foo
text

Your program and nc compete for the same stdin, and nc gets some of it.

Related Question