Coro::MP - erlang-style multi-processing/message-passing framework
use Coro::MP; # exports everything that AnyEvent::MP exports as well. # new stuff compared to AnyEvent::MP: # creating/using ports from threads my $port = port_async { # thread context, $SELF is set to $port # returning will "kil" the $port with an empty reason }; # attach to an existing port spawn $NODE, "::initfunc"; sub ::initfunc { rcv_async $SELF, sub { ... }; } # simple "tag" receives: my ($pid) = get "pid", 30 or die "no pid message received after 30s"; # conditional receive my ($tag, @data) = get_cond { $_[0] =~ /^group1_/ }; my @next_msg = get_cond { 1 } 30; # 30s timeout # run thread in port context peval_async $port, { die "kill the port\n"; }; # synchronous "cal" my @retval = syncol 30, $port, tag => $data;
This module (-family) implements a simple message passing framework.
Despite its simplicity, you can securely message other processes running on the same or other hosts, and you can supervise entities remotely.
This module depends heavily on AnyEvent::MP, in fact, many functions exported by this module are identical to AnyEvent::MP functions. This module family is simply the Coro API to AnyEvent::MP.
Care has been taken to stay compatible with AnyEvent::MP, even if sometimes this required a less natural API (spawn should indeed spawn a thread, not just call an initfunc for example).
spawn
For an introduction to AnyEvent::MP, see the AnyEvent::MP::Intro manual page.
These variables and functions work exactly as in AnyEvent::MP, in fact, they are exactly the same functions, and are used in much the same way.
This function works exactly as AnyEvent::MP::rcv, and is in fact compatible with Coro::MP ports. However, the canonical way to receive messages with Coro::MP is to use get or get_cond.
AnyEvent::MP::rcv
get
get_cond
This function is exactly the same as AnyEvent::MP::port and creates new ports. You can attach a thread to them by calling rcv_async or you can do a create and attach in one operation using port_async.
AnyEvent::MP::port
rcv_async
port_async
This function works exactly as AnyEvent::MP::psub - you could use it to run callbacks within a port context (good for monitoring), but you cannot get messages unless the callback executes within the thread attached to the port.
AnyEvent::MP::psub
Since creating a thread with port context requires somewhta annoying syntax, there is a peval_async function that handles that for you - note that within such a thread, you still cannot get messages.
peval_async
This function is identical to AnyEvent::MP::spawn. This means that it doesn't spawn a new thread as one would expect, but simply calls an init function. The init function, however, can attach a new thread easily:
AnyEvent::MP::spawn
sub initfun { my (@args) = @_; rcv_async $SELF, sub { # thread-code }; }
This function is identical to AnyEvent::MP::cal. The easiest way to make a synchronous call is to use Coro's rouse functionality:
AnyEvent::MP::cal
# send 1, 2, 3 to $port and wait up to 30s for reply cal $port, 1, 2, 3, rouse_cb, 30; my @reply = rouse_wait;
You can also use syncal if you want, and are ok with learning yet another function with a weird name:
syncal
my @reply = syncal 30, $port, 1, 2, 3;
Creates a new local port, and returns its ID. A new thread is created and attached to the port (see rcv_async, below, for details).
This function creates and attaches a thread on a port. The thread is set to execute $threadcb and is put into the ready queue. The thread will receive all messages not filtered away by tagged receive callbacks (as set by AnyEvent::MP::rcv) - it simply replaces the default callback of an AnyEvent::MP port.
$threadcb
The special variable $SELF will be set to $port during thread execution.
$SELF
$port
When $threadcb returns or the thread is canceled, the return/cancel values become the kil reason.
kil
It is not allowed to call rcv_async more than once on a given port.
Find, dequeue and return the next message with the specified $tag. If no matching message is currently queued, wait up to $timeout seconds (or forever if no $timeout has been specified or it is undef) for one to arrive.
$tag
$timeout
undef
Returns the message with the initial tag removed. In case of a timeout, the empty list. The function must be called in list context.
Note that empty messages cannot be distinguished from a timeout when using rcv.
rcv
Example: send a "log" message to $SELF and then get and print it.
snd $SELF, log => "text"; my ($text) = get "log"; print "log message: $text\n";
Example: receive p1 and p2 messages, regardless of the order they arrive in on the port.
p1
p2
my @p1 = get "p1"; my @21 = get "p2";
Example: assume a message with tag now is already in the queue and fetch it. If no message was there, do not wait, but die.
now
my @msg = get "now", 0 or die "expected now emssage to be there, but it wasn't";
Similarly to get, looks for a matching message. Unlike get, "matching" is not defined by a tag alone, but by a predicate, a piece of code that is executed on each candidate message in turn, with @_ set to the message contents.
@_
The predicate code is supposed to return the empty list if the message didn't match. If it returns anything else, then the message is removed from the queue and returned to the caller.
In addition, if the predicate returns a code reference, then it is immediately called invoked on the removed message.
If a $timeout is specified and is not undef, then, after this many seconds have been passed without a matching message arriving, the empty list will be returned.
Example: fetch the next message, wait as long as necessary.
my @msg = get_cond { 1 };
Example: fetch the next message whose tag starts with group1_.
group1_
my ($tag, @data) = get_cond { $_[0] =~ /^group1_/ };
Example: check whether a message with tag child_exit and a second elemet of $pid is in the queue already.
child_exit
$pid
if ( my (undef, $pid, $status) = get_cond { $_[0] eq "child_exit" && $_[1] == $pid } 0 ) { warn "child $pid did exit with status $status\n"; }
Example: implement a server that reacts to log, exit and reverse messages, and exits after 30 seconds of idling.
log
exit
reverse
my $reverser = port_async { while() { get_cond { $_[0] eq "exit" and return sub { last; # yes, this is valid }; $_[0] eq "log" and return sub { print "log: $_[1]\n"; }; $_[0] eq "reverse" and return sub { my (undef, $text, @reply) = @_; snd @reply, scalar reverse $text; }; die "unexpected message $_[0] received"; } 30 or last; } };
Sometimes you want to run a thread within a port context, for error handling.
This function creates a new, ready, thread (using Coro::async), sets $SELF to the the current value of $SELF while it executing, and calls the given BLOCK.
Coro::async
This is very similar to psub - note that while the BLOCK exeuctes in $SELF port context, you cannot call get, as $SELF can only be attached to one thread.
psub
Example: execute some Coro::AIO code concurrently in another thread, but make sure any errors kil the originating port.
port_async { ... peval_async { # $SELF set, but cannot call get etc. here my $fh = aio_open ... or die "open: $!"; aio_close $fh; }; };
The synchronous form of cal, a simple form of RPC - it sends a message to the given $port with the given contents (@msg), but adds a reply port to the message.
cal
@msg
The reply port is created temporarily just for the purpose of receiving the reply, and will be kiled when no longer needed.
Then it will wait until a reply message arrives, which will be returned to the caller.
If the $timeout is defined, then after this many seconds, when no message has arrived, the port will be kiled and an empty list will be returned.
If the $timeout is undef, then the local port will monitor the remote port instead, so it eventually gets cleaned-up.
Example: call the string reverse example from get_cond.
my $reversed = syncal 1, $reverse, reverse => "Rotator";
AnyEvent::MP::Intro - a gentle introduction.
AnyEvent::MP - like Coro::MP, but event-based.
AnyEvent.
Marc Lehmann <schmorp@schmorp.de> http://home.schmorp.de/
To install Coro::MP, copy and paste the appropriate command in to your terminal.
cpanm
cpanm Coro::MP
CPAN shell
perl -MCPAN -e shell install Coro::MP
For more information on module installation, please visit the detailed CPAN module installation guide.