our
@EXPORT_OK
=
qw/
last_value_from first_value_from is_observable
/
;
our
%EXPORT_TAGS
= (
all
=> \
@EXPORT_OK
);
our
$VERSION
=
"v6.29.3"
;
sub
_promise_class {
my
$fn
= (
caller
(1))[3];
my
$rx_class
=
$fn
;
$rx_class
=~ s/\:\:[^\:]+\z//;
no
strict
'refs'
;
my
$promise_class
= ${
"${rx_class}::promise_class"
};
return
wantarray
? (
$promise_class
,
$rx_class
) :
$promise_class
;
}
sub
last_value_from {
my
(
$observable
) =
@_
;
my
(
$promise_class
,
$rx_class
) = _promise_class;
$promise_class
or croak
"Promise class not set, set it with: ${rx_class}->set_promise_class(\$promise_class)"
;
my
(
$promise
,
$resolve
,
$reject
) =
do
{
if
(
$promise_class
eq
'Future'
) {
my
$future
= Future->new;
(
$future
,
sub
{
$future
->done(
@_
) },
sub
{
$future
->fail(
@_
) } );
}
else
{
my
(
$res
,
$rej
);
my
$p
=
$promise_class
->new(
sub
{
(
$res
,
$rej
) =
@_
;
});
(
$p
,
$res
,
$rej
);
}
};
my
(
$got_value
,
$last_value
);
$observable
->subscribe({
next
=>
sub
{
$last_value
=
$_
[0];
$got_value
= 1;
},
error
=>
sub
{
$reject
->(
$_
[0]);
},
complete
=>
sub
{
if
(
$got_value
) {
$resolve
->(
$last_value
);
}
else
{
$reject
->(
'no elements in sequence'
);
}
},
});
return
$promise
;
}
sub
first_value_from {
my
(
$observable
) =
@_
;
return
last_value_from(
$observable
->
pipe
(RxPerl::Operators::Pipeable::op_first())
);
}
sub
is_observable {
my
(
$thing
) =
@_
;
return
!!(blessed(
$thing
) &&
$thing
->isa(
'RxPerl::Observable'
));
}
1;