The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.

NAME

UniEvent::Streamer::Output - base class for custom consumers for streamer

SYNOPSYS

    package MyOutput;
    use parent 'UniEvent::Streamer::Output';
    use 5.012;
    
    sub new {
        my ($class, ...) = @_;
        my $self = $class->SUPER::new();
        ...
        return $self;
    }
    
    sub start            { ... }
    sub stop             { ... }
    sub write            { ... }
    sub write_queue_size { return length(shift->{buffer}) }

    ...
    
    my $output = MyOutput->new(...);
    my $streamer = UE::Streamer->new($input, $output);
    ...

DESCRIPTION

This is base class for custom output streams, meant to be assigned with UniEvent::Streamer. It is assumed, that you will send data, given by UniEvent::Streamer, to some external destination will generate data and eof event, and

Streamer will use output object as follows: it will call start() once and then will periodically call write(). You are expected to write that data somewhere and call handle_write() for each corresponding write(). In case of any error you must call handle_write() with error. You must implement any kind of dynamic-size buffer (if it's not already implemented by lower level objects that you use to write data) and return summary size of all not yet completed writes from method write_queue_size() which will be called by Streamer periodically. Streamer will call stop() method once when the process finishes successfully or with error.

METHODS

new()

Constructs new output stream.

If you override this method, you MUST proxy to parent and use return value as $self.

start($loop)

This method will be invoked by Streamer upon start. Executed once per object lifetime.

This method should return undef if no error occured or XS::ErrorCode object otherwise.

NOTE: This is a callback, do not call this method by yourself!

stop()

This method is invoked when Streamer finishes. Executed once per object lifetime. No any further I/O is expected.

In this method you are expected to release all resources.

NOTE: This is a callback, do not call this method by yourself!

write($data)

This method is invoked when Streamer receives chunk of data from input.

You are expected to start writing process for that chunk. When the process finishes you must call handle_write(). Streamer may call write() many times without waiting for you to call handle_write() for previous requests. Regardless of how calls to write() were made you must call handle_write() for each write() when appropriate write request finishes.

This method should return undef if no error occured or XS::ErrorCode object otherwise. If an error occurs later, in async writing process, call handle_write() with error. Anyway, regardless of how you return error (in this method or with handle_write()), after that the process will stop immediately and stop() will be called. You should not call handle_write() after that.

NOTE: This is a callback, do not call this method by yourself!

write_queue_size()

This method must return size of data buffered in memory in your output object or in lower-level objects.

If this size will be greater than configured max buffer size in Streamer, it will temporarily suspend input stream, giving you time to complete a part of pending writes.

You must implement such buffer by yourself. The reason why this buffer is not implemented in Streamer itself is that many objects already have output buffer. For example UniEvent::Stream handles have output buffer and write_queue_size() method. In this case implementing such buffer in Streamer would lead to less efficient execution.

NOTE: This is a callback, do not call this method by yourself!

handle_write([$error])

You must call this method when one of pending write requests finishes. If any error occured during sending data, pass $error argument.

$error must be XS::ErrorCode object. You can use UniEvent::SystemError::* constants to create errors.

    $self->handle_write(UniEvent::SystemError::timed_out);