STD multiplexing in Perl6, part 2

In the previous post, Managing stdout from multiple processes: STD multiplexing in Perl6, kinda, we created a function procs2stdout that would take any number of @processes, set them up to have their output captured, and print it to the screen with a prefix displaying the file name being worked on.

You may have been asking why we needed the call to .lines[1], doubly so as we combine the lines back into a single string. So lets take run('perl6', '-e', "'say abc\n123\n'")[2] as a theoretically example and examine some possible outcomes from a single process:

# $out = "abc\n123"
# $out = "\n"

$stdout.tap: -> $out { 
    print "$prefix # $out";
}

# <prefix> # abc
# 123
# <prefix> #

Oh...

# $out = "ab"
# $out = "c\n123"
# $out = "\n"

$stdout.tap: -> $out { 
    print "$prefix # $out";
}

# <prefix> # ab
# <prefix> # c
# 123
# <prefix> #
#

OHHHH...

So .lines is used to make sure we add the prefix to every printed line, not prefix it to every data chunk the tap[3] receives. Because .lines removes the \n, we need to explicitly add the newline back ourselves.

# $out = "ab"
# $out = "c\n123"
# $out = "\n"

$stdout.tap: -> $out { 
    print "$prefix # $_\n" for $out.lines;
}

# <prefix> # abc
# <prefix> # 123

Now this is somewhat usable. We will tack on a .substr(0,$width)[3:1] to each printed line to avoid ugly word wrapping at the cost of displayed data. Here's a complete example:

# explained at http://ugexe.com/multiplexing-stdout-from-multiple-processes/
sub procs2stdout(*@processes) is export {
    return unless @processes;
    my @basenames = @processes>>.id>>.IO>>.basename;
    my $longest-basename = @basenames.max(*.chars);
    for @processes -> $proc {
        for $proc.stdout, $proc.stderr -> $stdio {
            $stdio.tap: -> $out { 
                for $out.lines.grep(*.so) -> $line {
                    state $to-print ~= sprintf(
                        "%-{$longest-basename.chars + 1}s# %s\n",
                        $proc.id.IO.basename, 
                        $line.substr(0,40)
                    );
                    LAST { print $to-print if $to-print }
                }
            }
        }
    }
}

# 1) Create 3 processes, all of which will run the `dmesg` command.
# 2) We add the attribute `has $.id` to the process object, because 
# we cannot reliably assume which argument of the process is the 
# file name we want to display (if there is one at all).
# 3) Save the not-yet-started processes to our array
my @processes = gather for ^3 -> $id {
    my $proc = Proc::Async.new('dmesg');
    $proc does role :: { has $.id = "test-$id {'x' x $id}" }
    take $proc;
}

# If we had started the processes already we might not tap
# it before stdout has been flushed.
procs2stdout(@processes);

# procs2stdout has taken care of all the preperation for us
# at this point, so we can start any number of processes, 
# promise to finish them, and wait for that promise to 
# be broken or kept.
await Promise.allof(@processes>>.start);

Which gives us something like:

test-1 x  # [12134858.269587] [UFW BLOCK] IN=eth0 OU
test-1 x  # [12135116.726661] [UFW BLOCK] IN=eth0 OU
test-1 x  # [12135118.732735] [UFW BLOCK] IN=eth0 OU
test-1 x  # [12135119.724064] [UFW BLOCK] IN=eth0 OU
test-2 xx # 8:ac:5a:19:41:08:00 SRC=173.254.203.151
test-2 xx # [12133263.534066] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12133280.103741] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12133283.099396] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12133289.110199] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12133647.522099] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12133941.728820] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12134053.727483] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12134056.740897] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12134062.747135] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12134073.385976] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12134858.269587] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12135116.726661] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12135118.732735] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12135119.724064] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12135184.300904] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12135296.946054] [UFW BLOCK] IN=eth0 OU
test-0    # 6 WINDOW=16384 RES=0x00 SYN URGP=0
test-0    # [12141050.117685] [UFW BLOCK] IN=eth0 OU
test-0    # [12141449.293908] [UFW BLOCK] IN=eth0 OU
test-0    # [12141669.208238] [UFW BLOCK] IN=eth0 OU
test-0    # [12141680.604782] [UFW BLOCK] IN=eth0 OU
test-0    # [12141683.604594] [UFW BLOCK] IN=eth0 OU
test-0    # [12141689.604816] [UFW BLOCK] IN=eth0 OU
test-2 xx # .98.13.238 DST=23.239.16.90 LEN=60 TOS=0
test-2 xx # [12135370.217267] [UFW BLOCK] IN=eth0 OU

There you have it. From here you may wish to add proper word wrapping, dynamically change the max width of a non-wrapped line during terminal resize, or a status/information bar (to show the current number of failed tests between all processes for example). This is where it starts to get a little more ugly; Zef::SystemInfo[4] gets us the terminal's column width by running a regex on a system command. Zef::App[5] catches Signal::SIGWINCH[6] so we can update the index where the row is cut off (but it doesn't work on JVM, hence it has a wrapper around it so it isn't used in this situation).

But wait! What if I have another thread outside of this that wants to print something? This was the initial problem faced when combining Zef::CLI::STDMux[7] with the Zef::CLI::StatusBar[8] (which is spawned in a start block and prints based on a timer). The general idea was just override $*OUT and $*ERR inside a lock that calls the print that actually prints to the terminal, but that's probably bad and beyond the scope of this particular post.

Read Part 1: Managing stdout from multiple processes: STD multiplexing in Perl6, kinda

  1. http://doc.perl6.org/type/Str#routine_lines ↩︎

  2. http://doc.perl6.org/routine/run ↩︎

  3. http://doc.perl6.org/type/Tap ↩︎ ↩︎

  4. http://doc.perl6.org/routine/substr ↩︎

  5. Zef::Utils::SystemInfo ↩︎

  6. Zef::App ↩︎

  7. http://doc.perl6.org/routine/signal ↩︎

  8. Zef::CLI::STDMux ↩︎