IPC::Pipeline - Create a shell-like pipeline of many running commands
use IPC::Pipeline; my @pids = pipeline( my $first_child_in, my $last_child_out, my $err, [qw(filter1 args)], sub { filter2(); return 0 }, [qw(filter3 args)], ... [qw(commandN args)] ); ... do stuff ... my %statuses = map { waitpid($_, 0); $_ => ($? >> 8); } @pids;
IPC::Pipeline exports a single function pipeline().
pipeline()
Similar in calling convention to IPC::Open3, pipeline() spawns N children, connecting the first child to the $first_child_in handle, the final child to $last_child_out, and each child to a shared standard error handle, $err. Each subsequent filter specified causes a new process to be fork()ed. Each process is linked to the last with a file descriptor pair created by pipe(), using dup2() to chain each process' standard input to the last standard output.
$first_child_in
$last_child_out
$err
fork()
pipe()
dup2()
IPC::Pipeline does not work on MSWin32, but it works on cygwin.
IPC::Pipeline accepts external commands to be executed in the form of ARRAY references containing the command name and each argument, as well as CODE references that are executed within their own processes as well, each as independent parts of a pipeline.
When a filter is passed in the form of an ARRAY containing an external system command, each such item is executed in its own subprocess in the following manner.
exec(@$filter) or die("Cannot exec(): $!");
When a filter is passed in the form of a CODE ref, each such item is executed in its own subprocess in the following way.
exit $filter->();
If fileglobs or numeric file descriptors are passed in any of the three positional parameters, then they will be duplicated onto the file handles allocated as a result of the process pipelining. Otherwise, simple scalar assignment will be performed.
Like IPC::Open3, pipeline() returns immediately after spawning the process chain, though differing slightly in that the IDs of each process is returned in order of specification in a list when called in array context. When called in scalar context, an ARRAY reference of the process IDs will be returned.
Also like IPC::Open3, one may use select() to multiplex reading and writing to each of the handles returned by pipeline(), preferably with non-buffered sysread() and syswrite() calls. Using this to handle reading standard output and error from the children is ideal, as blocking and buffering considerations are alleviated.
select()
If any child process dies prematurely, or any of the piped file handles are closed for any reason, the calling process inherits the kernel behavior of receiving a SIGPIPE, which requires the installation of a signal handler for appropriate recovery.
SIGPIPE
Unlike IPC::Open3, IPC::Pipeline will NOT redirect child process stderr to stdout if no file handle for stderr is specified. As of version 0.6, the caller will always need to handle standard error, to prevent any children from blocking; it would make little sense to pass one process' standard error as an input to the next process.
The following example implements a quick and dirty, but relatively sane tar and gzip solution. For proper error handling from any of the children, use select() to multiplex the output and error streams.
use IPC::Pipeline; my @paths = qw(/some /random /locations); open(my $err, '<', '/dev/null'); my @pids = pipeline(my ($in, $out), $err, [qw(tar pcf -), @paths], ['gzip'] ); open(my $fh, '>', 'file.tar.gz'); close $in; while (my $len = sysread($out, my $buf, 512)) { syswrite($fh, $buf, $len); } close $fh; close $out; # # We may need to wait for the children to die in some extraordinary # circumstances. # foreach my $pid (@pids) { waitpid($pid, 1); }
The following solution implements a true I/O stream filter as provided by any Unix-style shell.
use IPC::Pipeline; open(my $err, '<', '/dev/null'); my @pids = pipeline(my ($in, $out), $err, [qw(tr A-Ma-mN-Zn-z N-Zn-zA-Ma-m)], [qw(cut -d), ':', qw(-f 2)] ); my @records = qw( foo:bar:baz eins:zwei:drei cats:dogs:rabbits ); foreach my $record (@records) { print $in $record ."\n"; } close $in; while (my $len = sysread($out, my $buf, 512)) { syswrite(STDOUT, $buf, $len); } close $out; foreach my $pid (@pids) { waitpid($pid, 1); }
The following solution demonstrates the ability of IPC::Pipeline to execute CODE references in the midst of a pipeline.
use IPC::Pipeline; open(my $err, '<', '/dev/null'); my @pids = pipeline(my ($in, $out), $err, sub { print 'cats'; return 0 }, [qw(tr acst lbhe)] ); close $in; while (my $line = readline($out)) { chomp $line; print "Got '$line'\n"; } close $out;
It should be mentioned that mst's IO::Pipeline has very little in common with IPC::Pipeline.
Written by Xan Tronix <xan@cpan.org>
Copyright (c) 2014, cPanel, Inc. All rights reserved. http://cpanel.net/
This is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See the LICENSE file for further details.
To install IPC::Pipeline, copy and paste the appropriate command in to your terminal.
cpanm
cpanm IPC::Pipeline
CPAN shell
perl -MCPAN -e shell install IPC::Pipeline
For more information on module installation, please visit the detailed CPAN module installation guide.