Ryu::Source - base representation for a source of events


 my $src = Ryu::Source->new;
 my $chained = $src->map(sub { $_ * $_ })->prefix('value: ')->say;
 $src->emit($_) for 1..5;


This is probably the module you'd want to start with, if you were going to be using any of this. There's a disclaimer in Ryu that may be relevant at this point.

Quick start

You'd normally want to start by creating a Ryu::Source instance:

 my $src = Ryu::Source->new;

If you're dealing with IO::Async code, use Ryu::Async to ensure that you get properly awaitable Future instances:

 $loop->add(my $ryu = Ryu::Async->new);
 my $src = $ryu->source;

Once you have a source, you'll need two things:

  • items to put into one end

  • processing to attach to the other end

For the first, call "emit":

 use Future::AsyncAwait;
 # 1s drifting periodic timer
 while(1) {
  await $loop->delay_future(after => 1);

For the second, this would be "each":

 $src->each(sub { print "Had timer tick\n" });

So far, not so useful - the power of this type of reactive programming is in the ability to chain and combine disparate event sources.

At this point, is worth a visit - this provides a clear visual demonstration of how to combine multiple event streams using the chaining methods. Most of the API here is modelled after similar principles.

First, the "map" method: this provides a way to transform each item into something else:

 $src->map(do { my $count = 0; sub { ++$count } })
     ->each(sub { print "Count is now $_\n" })

Next, "filter" provides an equivalent to Perl's grep functionality:

 $src->map(do { my $count = 0; sub { ++$count } })
     ->filter(sub { $_ % 2 })
     ->each(sub { print "Count is now at an odd number: $_\n" })

You can stack these:

 $src->map(do { my $count = 0; sub { ++$count } })
     ->filter(sub { $_ % 2 })
     ->filter(sub { $_ % 5 })
     ->each(sub { print "Count is now at an odd number which is not divisible by 5: $_\n" })


 $src->map(do { my $count = 0; sub { ++$count } })
     ->map(sub { $_ % 3 ? 'fizz' : $_ })
     ->map(sub { $_ % 5 ? 'buzz' : $_ })
     ->each(sub { print "An imperfect attempt at the fizz-buzz game: $_\n" })



This is a coderef which should return a new Future-compatible instance.

Example overrides might include:

 $Ryu::Source::FUTURE_FACTORY = sub { Mojo::Future->new->set_label($_[1]) };


An encoder is a coderef which takes input and returns output.



Takes named parameters, such as:

  • label - the label used in descriptions

Note that this is rarely called directly, see "from", "empty" and "never" instead.


Creates a new source from things.

The precise details of what this method supports may be somewhat ill-defined at this point in time. It is expected that the interface and internals of this method will vary greatly in versions to come.

At the moment, the following inputs are supported:

  • arrayref - when called as ->from([1,2,3]) this will emit the values from the arrayref, deferring until the source is started

  • Future - given a Future instance, will emit the results when that Future is marked as done

  • file handle - if provided a filehandle, such as ->from(\*STDIN), this will read bytes and emit those until EOF


Creates an empty source, which finishes immediately.


An empty source that never finishes.

METHODS - Instance


Passes each item through an encoder.

The first parameter is the encoder to use, the remainder are used as options for the selected encoder.




Passes each item through a decoder.

The first parameter is the decoder to use, the remainder are used as options for the selected decoder.




Shortcut for ->each(sub { print }), except this will also save the initial state of $\ and use that for each call for consistency.


Shortcut for ->each(sub { print "$_\n" }).


Convert input bytes to a hexdump representation, for example:

 00000000 00 00 12 04 00 00 00 00 00 00 03 00 00 00 80 00 >................<
 00000010 04 00 01 00 00 00 05 00 ff ff ff 00 00 04 08 00 >................<
 00000020 00 00 00 00 7f ff 00 00                         >........<

One line is emitted for each 16 bytes.

Takes the following named parameters:

  • continuous - accumulates data for a continuous stream, and does not reset the offset counter. Note that this may cause the last output to be delayed until the source completes.


Throws something. I don't know what, maybe a chair.


Not yet implemented.

Requires timing support, see implementations such as Ryu::Async instead.


Chomps all items with the given delimiter.

Once you've instantiated this, it will stick with the delimiter which was in force at the time of instantiation. Said delimiter follows the usual rules of $/, whatever they happen to be.




A bit like "map" in perlfunc.

Takes a single parameter - the coderef to execute for each item. This should return a scalar value which will be used as the next item.

Often useful in conjunction with a do block to provide a closure.


 $src->map(do {
   my $idx = 0;
   sub {
    [ @$_, ++$idx ]


Similar to "map", but will flatten out some items:

  • an arrayref will be expanded out to emit the individual elements

  • for a Ryu::Source, passes on any emitted elements

This also means you can "merge" items from a series of sources.

Note that this is not recursive - an arrayref of arrayrefs will be expanded out into the child arrayrefs, but no further.

Failure on any input source will cause this source to be marked as failed as well.


Splits the input on the given delimiter.

By default, will split into characters.

Note that each item will be processed separately - the buffer won't be retained across items, see "by_line" for that.


Splits input into fixed-size chunks.

Note that output is always guaranteed to be a full chunk - if there is partial input at the time the input stream finishes, those extra bytes will be discarded.


Splits input into arrayref batches of a given size.

Note that the last item emitted may have fewer elements (or none at all).

  ->map(sub { "Next 10 (or fewer) items: @$_" })


Emits one item for each line in the input. Similar to "split" with a \n parameter, except this will accumulate the buffer over successive items and only emit when a complete line has been extracted.


Applies a string prefix to each item.


Applies a string suffix to each item.


Convenience method for generating a string from a "sprintf"-style format string and a set of method names to call.

Note that any undef items will be mapped to an empty string.


 $src->sprintf_methods('%d has name %s', qw(id name))


Receives items, but ignores them entirely.

Emits nothing and eventually completes when the upstream Ryu::Source is done.

Might be useful for keeping a source alive.


Accumulate items while any downstream sources are paused.

Takes the following named parameters:


Resolves to a list consisting of all items emitted by this source.


Resolves to a single arrayref consisting of all items emitted by this source.


Concatenates all items into a single string.

Returns a Future which will resolve on completion.


Returns a Future::Queue instance which will "push" in Future::Queue items whenever the source emits them.

The queue will be marked as finished when this source is completed.

Parameters passed to this method will be given to the Future::Queue constructor:

 use Future::AsyncAwait qw(:experimental(suspend));
 my $queue = $src->as_queue(
  max_items => 100
 SUSPEND { print "Waiting for more items\n" }
 while(my @batch = await $queue->shift_atmost(10)) {
  print "Had batch of @{[ 0 + @batch ]} items\n";


Returns a Ryu::Buffer instance, which will "write" in Ryu::Buffer any emitted items from this source to the buffer as they arrive.

Intended for stream protocol handling - individual sized packets are perhaps better suited to the Ryu::Source per-item behaviour.

Supports the following named parameters:

  • low - low waterlevel for buffer, start accepting more bytes once the Ryu::Buffer has less content than this

  • high - high waterlevel for buffer, will pause the parent stream if this is reached

The backpressure (low/high) values default to undefined, meaning no backpressure is applied: the buffer will continue to fill indefinitely.


Takes the most recent item from one or more Ryu::Sources, and emits an arrayref containing the values in order.

An item is emitted for each update as soon as all sources have provided at least one value. For example, given 2 sources, if the first emits 1 then 2, then the second emits a, this would emit a single [2, 'a'] item.


Emits arrayrefs consisting of [ $item, $idx ].


Similar to "combine_latest", but will start emitting as soon as we have any values. The arrayref will contain undef for any sources which have not yet emitted any items.


Emits items as they are generated by the given sources.


 $numbers->merge($letters)->say # 1, 'a', 2, 'b', 3, 'c'...


Emits items as they are generated by the given sources.


 my $src = Ryu::Source->new;


Used for setting up multiple streams.

Accepts a variable number of coderefs, will call each one and gather Ryu::Source results.


Given a condition, will select one of the alternatives based on stringified result.


  sub { $_->name }, # our condition
  smith => sub { $_->id }, # if this matches the condition, the code will be called with $_ set to the current item
  jones => sub { $_->parent->id },
  sub { undef } # and this is our default case


Given a stream of Futures, will emit the results as each Future is marked ready.

If any Future in the stream fails, that will mark this source as failed, and all remaining Future instances will be cancelled. To avoid this behaviour and leave the Future instances active, use:


See "without_cancel" in Future for more details.

Takes the following named parameters:

This method is also available as "resolve".


A synonym for "ordered_futures".



Emits new distinct items, using string equality with an exception for undef (i.e. undef is treated differently from empty string or 0).

Given 1,2,3,undef,2,3,undef,'2',2,4,1,5, you'd expect to get the sequence 1,2,3,undef,4,5.


Removes contiguous duplicates, defined by string equality.


Emits items sorted by the given key. This is a stable sort function.

The algorithm is taken from List::UtilsBy.


Emits items numerically sorted by the given key. This is a stable sort function.

See "sort_by".


Emits items sorted by the given key. This is a stable sort function.

The algorithm is taken from List::UtilsBy.


Emits items numerically sorted by the given key. This is a stable sort function.

See "sort_by".


Expects a regular expression and emits hashrefs containing the named capture buffers.

The regular expression will be applied using the m//gc operator.


 # emits { component => '...' }, { component => '...' }


Skips the first N items.


Skips the last N items.


Skips the items that arrive before a given condition is reached.

  • Either a Future instance (we skip all items until it's marked as `done`), or a coderef, which we call for each item until it first returns true


Passes through items that arrive until a given condition is reached.

Expects a single parameter, which can be one of the following:

  • a Future instance - we will skip all items until it's marked as done

  • a coderef, which we call for each item until it first returns true

  • or a Ryu::Source, in which case we stop when that first emits a value


Takes a limited number of items.

Given a sequence of 1,2,3,4,5 and ->take(3), you'd get 1,2,3 and then the stream would finish.


Returns a source which provides the first item from the stream.


Applies the given code to each item, and emits a single item:

  • 0 if the code never returned true or no items were received

  • 1 if the code ever returned a true value


Similar to "some", except this requires the coderef to return true for all values in order to emit a 1 value.


Emits the count of items seen once the parent source completes.


Emits the numeric sum of items seen once the parent completes.


Emits the mean (average) numerical value of all seen items.


Emits the maximum numerical value of all seen items.


Emits the minimum numerical value of all seen items.


Emits a single hashref of statistics once the source completes.

This will contain the following keys:

  • count

  • sum

  • min

  • max

  • mean


Applies the given parameter to filter values.

The parameter can be a regex or coderef. You can also pass (key, value) pairs to filter hashrefs or objects based on regex or coderef values.


 $src->filter(name => qr/^[A-Z]/, id => sub { $_ % 2 })


Emits only the items which ->isa one of the given parameters. Will skip non-blessed items.


Emits the given item.




Block until this source finishes.


Returns a Future which will resolve to the next item emitted by this source.

If the source completes before an item is emitted, the Future will be cancelled.

Note that these are independent - they don't stack, so if you call ->next multiple times before an item is emitted, each of those would return the same value.

See Ryu::Buffer if you're dealing with protocols and want to extract sequences of bytes or characters.

To access the sequence as a discrete stream of Future instances, try "as_queue" which will provide a Future::Queue.


Mark this source as completed.

METHODS - Proxied

The following methods are proxied to our completion Future:

  • then

  • is_ready

  • is_done

  • failure

  • is_cancelled

  • else

METHODS - Internal


Run any pre-completion callbacks (recursively) before we go into an await cycle.

Used for compatibility with sync bridges when there's no real async event loop available.


Returns a new Ryu::Source chained from this one.


Like "each", but removes the source from the callback list once the parent completes.


Provides a "chained" source which has more control over what it emits than a standard "map" or "filter" implementation.

 $original->map_source(sub {
  my ($item, $src) = @_;
  $src->emit('' . reverse $item);



completed, describe, flow_control, is_paused, label, new_future, parent, pause, resume, unblocked


Tom Molesworth <>


Copyright Tom Molesworth 2011-2023. Licensed under the same terms as Perl itself.