The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.

NAME

IO::Lambda::Message - message passing queue

DESCRIPTION

The module implements a generic message passing protocol, and two generic classes that implement the server and the client functionality. The server code is implemented in a simple, blocking fashion, and is only capable of simple operations. The client API is written in lambda style, where message completion can be asynchronously awaited for.

SYNOPSIS

    use IO::Lambda::Message qw(message);

    lambda {
       my $messenger = IO::Lambda::Message-> new( \*READER, \*WRITER);
       context $messenger-> new_message('hello world');
    tail {
       print "response1: @_, "\n";
       context $messenger, 'same thing';
    message {
       print "response2: @_, "\n";
       undef $messenger;
    }}}

Message protocol

The message passing protocol is synchronous, any message is expected to be replied to. Messages are prepended with simple header, that is a 8-digit hexadecimal length of the message, and 1 byte with value 0x0A (newline). After the message another 0x0A byte is followed.

IO::Lambda::Message

The class implements a generic message passing queue, that allows to add asynchronous messages to the queue, and wait until they are responded to.

new $class, $reader, $writer, %options

Constructs a new object of IO::Lambda::Message class, and attaches to $reader and $writer file handles ( which can be the same object ). Accepted options:

reader :: ($fh, $buf, $cond, $deadline) -> ioresult

Custom reader, sysreader by default.

writer :: ($fh, $buf, $length, $offset, $deadline) -> ioresult

Custom writer, syswriter by default.

buf :: string

If $reader handle was used (or will be needed to be used) in buffered I/O, it's buffer can be passed along to the object.

async :: boolean

If set, the object will listen for incoming messages from the server, otherwise it will only send outcoming messages. By default 0, and the method incoming that must handle the incoming message will die. This functionality is designed for derived classes, not for the caller.

new_message($message, $deadline = undef) :: () -> ($response, $error)

Registers a message that must be delivered no later than $deadline, and returns a lambda that will be ready when the message is responded to. The lambda returns the response or the error.

Timeout is regarded also as protocol error, so on timeout all messages will be also purged, so use with care.

message ($message, $deadline = undef) :: () -> ($response, $error)

Predicate version of new_message.

cancel_queue(@reason)

Cancels all pending messages, stores @reason in the associated lambdas.

error

Returns the last protocol handling error. If set, no new messages are allowed to be registered, and listening will also fail.

is_listening

If set, object is listening for asynchronous events from server.

is_pushing

If set, object is sedning messages to the server.

IO::Lambda::Message::Simple

The class implements a simple generic protocol dispatcher, that executes methods of its own class, and returns the results back to the client. The methods have to be defined in the derivative class.

new $reader, $writer

Creates a new object that will communicate with clients using given handles, in a blocking fashion.

run

Starts the message loop

quit

Signals the loop to stop

SEE ALSO

IO::Lambda::DBI.

AUTHOR

Dmitry Karasik, <dmitry@karasik.eu.org>.