The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.

NAME

RxPerl::Guides::CreatingPipeableOperators - A guide to creating your own pipeable operators

CREATING YOUR OWN PIPEABLE OPERATORS

A pipeable operator is a function that returns a function that accepts a $source observable as input, and outputs another observable.

Check out this example:

sub op_map {
    my ($mapping_function) = @_;

    return sub {
        my ($source) = @_;

        return rx_observable->new(sub {
            my ($subscriber) = @_;

            my $own_observer = {
                next => sub {
                    $subscriber->next( $mapping_function->(@_) );
                },
                error => sub {
                    $subscriber->error(@_);
                },
                complete => sub {
                    $subscriber->complete();
                },
            };

            my $own_subscription = $source->subscribe($own_observer);

            return $own_subscription;
        });
    };
}

Here, we subscribe to the source with an observer that will call $subscriber->next on each item emitted. The return $own_subscription; line makes sure that if the resulting observable is unsubscribed from, so will the source observable be unsubscribed from.

A $subscriber object contains a subscription object. If you're sure that you want the inner subscription to end at the same time as the outer one, you can include %$subscriber inside $own_observer, so that they both share the same subscription object, as follows:

sub op_map {
    my ($mapping_function) = @_;

    return sub {
        my ($source) = @_;

        return rx_observable->new(sub {
            my ($subscriber) = @_;

            my $own_observer = {
                %$subscriber,
                next => sub {
                    $subscriber->next( $mapping_function->(@_) );
                },
                error => sub {
                    $subscriber->error(@_);
                },
                complete => sub {
                    $subscriber->complete();
                },
            };

            $source->subscribe($own_observer);

            return;
        });
    };
}

As you can see in the code above, there's no reason to return a subscription object from rx_observable's callback. That is because, due to the sharing of subscription object between the inner and outer subscribers, the two subscriptions (which are really one) will end at the same time, so there's no need to explicitly ask for the other to end.

Since $subscriber already has an error and a complete hash key which do the same thing as $subscriber->error and $subscriber->complete, one can omit the above two keys from $own_observer, resulting in smaller code as follows:

sub op_map {
    my ($mapping_function) = @_;

    return sub {
        my ($source) = @_;

        return rx_observable->new(sub {
            my ($subscriber) = @_;

            my $own_observer = {
                %$subscriber,
                next => sub {
                    $subscriber->next( $mapping_function->(@_) );
                },
            };

            $source->subscribe($own_observer);

            return;
        });
    };
}

Circumstances might be such that you may want to add a callback to be executed when the outer subscription terminates (by means of error/complete or unsubscribe) before the the $source->subscribe call is made. To achieve this, do:

$subscriber->subscription->add($callback);

...before $source->subscribe. Any nested structure of callbacks and subscription objects can be passed to the add method. Callbacks will be executed, and subscription objects will be unsubscribed from:

$subscriber->subscription->add($s1, $s2, \%inner_subscriptions, \@callbacks);

If you want to terminate the inner subscription as soon as the outer one is terminated, another way to achieve this is the following:

my $own_subscription = RxPerl::Subscription->new;
$subscriber->subscription->add($own_subscription);
my $own_observer = {
    new_subscription => $own_subscription,
    next => { ... },
    error => { ... },
    complete => { ... },
};
$source->subscribe($own_observer);
return;

This lets you determine the subscription object of the inner subscriber, and use it in any way you want.