Pipe – Use ‘tee’ and ‘cat’ to Concatenate Results Multiple Times

catconcurrencypipetee

If I call some command, for instance an echo I can use the results from that command in several other commands with tee. Example:

echo "Hello world!" | tee >(command1) >(command2) >(command3)

With cat I can collect the results of several commands. Example:

cat <(command1) <(command2) <(command3)

I would like to be able to do both things at the same time, so that I can use tee to call those commands on the output of something else (for instance the echo I've written) and then collect all their results on a single output with cat.

It's important to keep the results in order, this means the lines in the output of command1, command2 and command3 should not be intertwined, but ordered as the commands are (as it happens with cat).

There may be better options than cat and tee but those are the ones I know so far.

I want to avoid using temporary files because the size of the input and output may be large.

How could I do this?

PD: another problem is that this happens in a loop, that makes harder handling temporary files. This is the current code I have and it works for small testcases, but it creates infinite loops when reading and writing from the auxfile in some way I don't understand.

somefunction()
{
  if [ $1 -eq 1 ]
  then
    echo "Hello world!"
  else
    somefunction $(( $1 - 1 )) > auxfile
    cat <(command1 < auxfile) \
        <(command2 < auxfile) \
        <(command3 < auxfile)
  fi
}

Readings and writings in auxfile seem to be overlapping, causing everything to explode.

Best Answer

You could use a combination of GNU stdbuf and pee from moreutils:

echo "Hello world!" | stdbuf -o 1M pee cmd1 cmd2 cmd3 > output

pee popen(3)s those 3 shell command lines and then freads the input and fwrites it all three, which will be buffered to up to 1M.

The idea is to have a buffer at least as big as the input. This way even though the three commands are started at the same time, they will only see input coming in when pee pcloses the three commands sequentially.

Upon each pclose, pee flushes the buffer to the command and waits for its termination. That guarantees that as long as those cmdx commands don't start outputting anything before they've received any input (and don't fork a process that may continue outputting after their parent has returned), the output of the three commands won't be interleaved.

In effect, that's a bit like using a temp file in memory, with the drawback that the 3 commands are started concurrently.

To avoid starting the commands concurrently, you could write pee as a shell function:

pee() (
  input=$(cat; echo .)
  for i do
    printf %s "${input%.}" | eval "$i"
  done
)
echo "Hello world!" | pee cmd1 cmd2 cmd3 > out

But beware that shells other than zsh would fail for binary input with NUL characters.

That avoids using temporary files, but that means the whole input is stored in memory.

In any case, you'll have to store the input somewhere, in memory or a temp file.

Actually, it's quite an interesting question, as it shows us the limit of the Unix idea of having several simple tools cooperate to a single task.

Here, we'd like to have several tools cooperate to the task:

  • a source command (here echo)
  • a dispatcher command (tee)
  • some filter commands (cmd1, cmd2, cmd3)
  • and an aggregation command (cat).

It would be nice if they could all run together at the same time and do their hard work on the data that they're meant to process as soon as it's available.

In the case of one filter command, it's easy:

src | tee | cmd1 | cat

All commands are run concurrently, cmd1 starts to munch data from src as soon as it's available.

Now, with three filter commands, we can still do the same: start them concurrently and connect them with pipes:

               ┏━━━┓▁▁▁▁▁▁▁▁▁▁┏━━━━┓▁▁▁▁▁▁▁▁▁▁┏━━━┓
               ┃   ┃░░░░2░░░░░┃cmd1┃░░░░░5░░░░┃   ┃
               ┃   ┃▔▔▔▔▔▔▔▔▔▔┗━━━━┛▔▔▔▔▔▔▔▔▔▔┃   ┃
┏━━━┓▁▁▁▁▁▁▁▁▁▁┃   ┃▁▁▁▁▁▁▁▁▁▁┏━━━━┓▁▁▁▁▁▁▁▁▁▁┃   ┃▁▁▁▁▁▁▁▁▁┏━━━┓
┃src┃░░░░1░░░░░┃tee┃░░░░3░░░░░┃cmd2┃░░░░░6░░░░┃cat┃░░░░░░░░░┃out┃
┗━━━┛▔▔▔▔▔▔▔▔▔▔┃   ┃▔▔▔▔▔▔▔▔▔▔┗━━━━┛▔▔▔▔▔▔▔▔▔▔┃   ┃▔▔▔▔▔▔▔▔▔┗━━━┛
               ┃   ┃▁▁▁▁▁▁▁▁▁▁┏━━━━┓▁▁▁▁▁▁▁▁▁▁┃   ┃
               ┃   ┃░░░░4░░░░░┃cmd3┃░░░░░7░░░░┃   ┃
               ┗━━━┛▔▔▔▔▔▔▔▔▔▔┗━━━━┛▔▔▔▔▔▔▔▔▔▔┗━━━┛

Which we can do relatively easily with named pipes:

pee() (
  mkfifo tee-cmd1 tee-cmd2 tee-cmd3 cmd1-cat cmd2-cat cmd3-cat
  { tee tee-cmd1 tee-cmd2 tee-cmd3 > /dev/null <&3 3<&- & } 3<&0
  eval "$1 < tee-cmd1 1<> cmd1-cat &"
  eval "$2 < tee-cmd2 1<> cmd2-cat &"
  eval "$3 < tee-cmd3 1<> cmd3-cat &"
  exec cat cmd1-cat cmd2-cat cmd3-cat
)
echo abc | pee 'tr a A' 'tr b B' 'tr c C'

(above the } 3<&0 is to work around the fact that & redirects stdin from /dev/null, and we use <> to avoid the opening of the pipes to block until the other end (cat) has opened as well)

Or to avoid named pipes, a bit more painfully with zsh coproc:

pee() (
  n=0 ci= co= is=() os=()
  for cmd do
    eval "coproc $cmd $ci $co"

    exec {i}<&p {o}>&p
    is+=($i) os+=($o)
    eval i$n=$i o$n=$o
    ci+=" {i$n}<&-" co+=" {o$n}>&-"
    ((n++))
  done
  coproc :
  read -p
  eval tee /dev/fd/$^os $ci "> /dev/null &" exec cat /dev/fd/$^is $co
)
echo abc | pee 'tr a A' 'tr b B' 'tr c C'

Now, the question is: once all the programs are started and connected, will the data flow?

We've got two contraints:

  • tee feeds all its outputs at the same rate, so it can only dispatch data at the rate of its slowest output pipe.
  • cat will only start reading from the second pipe (pipe 6 in the drawing above) when all data has been read from the first (5).

What that means is that data will not flow in pipe 6 until cmd1 has finished. And, like in the case of the tr b B above, that may mean that data will not flow in pipe 3 either, which means it will not flow in any of pipes 2, 3 or 4 since tee feeds at the slowest rate of all 3.

In practice those pipes have a non-null size, so some data will manage to get through, and on my system at least, I can get it to work up to:

yes abc | head -c $((2 * 65536 + 8192)) | pee 'tr a A' 'tr b B' 'tr c C' | uniq -c -c

Beyond that, with

yes abc | head -c $((2 * 65536 + 8192 + 1)) | pee 'tr a A' 'tr b B' 'tr c C' | uniq -c

We've got a deadlock, where we're in this situation:

               ┏━━━┓▁▁▁▁2▁▁▁▁▁┏━━━━┓▁▁▁▁▁5▁▁▁▁┏━━━┓
               ┃   ┃░░░░░░░░░░┃cmd1┃░░░░░░░░░░┃   ┃
               ┃   ┃▔▔▔▔▔▔▔▔▔▔┗━━━━┛▔▔▔▔▔▔▔▔▔▔┃   ┃
┏━━━┓▁▁▁▁1▁▁▁▁▁┃   ┃▁▁▁▁3▁▁▁▁▁┏━━━━┓▁▁▁▁▁6▁▁▁▁┃   ┃▁▁▁▁▁▁▁▁▁┏━━━┓
┃src┃██████████┃tee┃██████████┃cmd2┃██████████┃cat┃░░░░░░░░░┃out┃
┗━━━┛▔▔▔▔▔▔▔▔▔▔┃   ┃▔▔▔▔▔▔▔▔▔▔┗━━━━┛▔▔▔▔▔▔▔▔▔▔┃   ┃▔▔▔▔▔▔▔▔▔┗━━━┛
               ┃   ┃▁▁▁▁4▁▁▁▁▁┏━━━━┓▁▁▁▁▁7▁▁▁▁┃   ┃
               ┃   ┃██████████┃cmd3┃██████████┃   ┃
               ┗━━━┛▔▔▔▔▔▔▔▔▔▔┗━━━━┛▔▔▔▔▔▔▔▔▔▔┗━━━┛

We've filled pipes 3 and 6 (64kiB each). tee has read that extra byte, it has fed it to cmd1, but

  • it's now blocked writing on pipe 3 as it's waiting for cmd2 to empty it
  • cmd2 can't empty it because it's blocked writing on pipe 6, waiting for cat to empty it
  • cat can't empty it because it's waiting until there's no more input on pipe 5.
  • cmd1 can't tell cat there's no more input because it is waiting itself for more input from tee.
  • and tee can't tell cmd1 there's no more input because it's blocked... and so on.

We've got a dependency loop and thus a deadlock.

Now, what's the solution? Bigger pipes 3 and 4 (big enough to contain all of src's output) would do it. We could do that for instance by inserting pv -qB 1G between tee and cmd2/3 where pv could store up to 1G of data waiting for cmd2 and cmd3 to read them. That would mean two things though:

  1. that's using potentially a lot of memory, and moreover, duplicating it
  2. that's failing to have all 3 commands cooperate because cmd2 would in reality only start to process data when cmd1 has finished.

A solution to the second problem would be to make pipes 6 and 7 bigger as well. Assuming that cmd2 and cmd3 produce as much output as they consume, that would not consume more memory.

The only way to avoid duplicating the data (in the first problem) would be to implement the retention of data in the dispatcher itself, that is implement a variation on tee that can feed data at the rate of the fastest output (holding data to feed the slower ones at their own pace). Not really trivial.

So, in the end, the best we can reasonably get without programming is probably something like (Zsh syntax):

max_hold=1G
pee() (
  n=0 ci= co= is=() os=()
  for cmd do
    if ((n)); then
      eval "coproc pv -qB $max_hold $ci $co | $cmd $ci $co | pv -qB $max_hold $ci $co"
    else
      eval "coproc $cmd $ci $co"
    fi

    exec {i}<&p {o}>&p
    is+=($i) os+=($o)
    eval i$n=$i o$n=$o
    ci+=" {i$n}<&-" co+=" {o$n}>&-"
    ((n++))
  done
  coproc :
  read -p
  eval tee /dev/fd/$^os $ci "> /dev/null &" exec cat /dev/fd/$^is $co
)
yes abc | head -n 1000000 | pee 'tr a A' 'tr b B' 'tr c C' | uniq -c
Related Question