NAME
RxPerl - an implementation of Reactive Extensions / rxjs for Perl
SYNOPSIS
# one of...:
> cpanm RxPerl::AnyEvent
> cpanm RxPerl::IOAsync
> cpanm RxPerl::Mojo
# ..and then (if installed RxPerl::Mojo, for example):
use
Mojo::IOLoop;
rx_interval(1.4)->
pipe
(
op_take(5),
)->subscribe(
sub
{
say
"next: "
,
$_
[0] });
Mojo::IOLoop->start;
NOTE
You probably want to install one of the three adapter modules for your project instead of this one: RxPerl::AnyEvent, RxPerl::IOAsync or RxPerl::Mojo.
Each of these three modules adapts RxPerl to one of three event interfaces available in Perl (AnyEvent, IO::Async and Mojo::IOLoop), so pick the one that corresponds to the event interface that your app uses.
The documentation in this POD applies to all three adapter modules as well.
DESCRIPTION
This module is an implementation of Reactive Extensions in Perl. It replicates the behavior of rxjs 6 which is the JavaScript implementation of ReactiveX.
Currently 99 of the more than 100 operators in rxjs are implemented in this module.
EXPORTABLE FUNCTIONS
The code samples in this section assume $observer
has been set to:
$observer
= {
next
=>
sub
{
say
"next: "
,
$_
[0]},
error
=>
sub
{
say
"error: "
,
$_
[0]},
complete
=>
sub
{
say
"complete"
},
};
OBSERVABLE CREATION OPERATORS
Creation operators create and return an observable. They are usually unicast, which means that when an "rx_interval" observable is subscribed to three seperate times there will be three different & distinct recurring intervals. Exceptions to this are with subjects and that any observable can be transformed into a multicasting one using the "op_share" pipeable operator (or by other similar operators).
The following list is the currently implemented creation operators with links to relevant rxjs documentation (which should apply to RxPerl too).
- rx_behavior_subject
-
https://rxjs.dev/api/index/class/BehaviorSubject
# 10, 20, 30, complete
my
$b_s
= rx_behavior_subject->new(10);
$b_s
->subscribe(
$observer
);
$b_s
->
next
(20);
$b_s
->
next
(30);
$b_s
->complete;
# 20, 30, complete
my
$b_s
= rx_behavior_subject->new(10);
$b_s
->
next
(20);
$b_s
->subscribe(
$observer
);
$b_s
->
next
(30);
$b_s
->complete;
- rx_combine_latest
-
https://rxjs.dev/api/index/function/combineLatest
# [0, 0], [0, 1], [1, 1], [1, 2], [1, 3], ...
rx_combine_latest([
rx_interval(1),
rx_interval(0.7),
])->subscribe(
$observer
);
- rx_concat
-
https://rxjs.dev/api/index/function/concat
# 10, 20, 30, 10, 20, 30, 40, complete
rx_concat(
rx_of(10, 20, 30),
rx_of(10, 20, 30, 40),
)->subscribe(
$observer
);
- rx_defer
-
https://rxjs.dev/api/index/function/defer
my
$special_var
;
my
$o
= rx_defer(
sub
{
return
$special_var
? rx_of(10, 20, 30) : rx_of(40, 50, 60)
});
# 10, 20, 30, complete
$special_var
= 1;
$o
->subscribe(
$observer
);
# 40, 50, 60, complete
$special_var
= 0;
$o
->subscribe(
$observer
);
- rx_EMPTY
-
https://rxjs.dev/api/index/const/EMPTY
# complete
rx_EMPTY->subscribe(
$observer
);
# 10, 20, 30, 40, 50, 60, complete
rx_concat(
rx_of(10, 20, 30),
rx_EMPTY,
rx_EMPTY,
rx_EMPTY,
rx_of(40, 50, 60),
)->subscribe(
$observer
);
- rx_fork_join
-
https://rxjs.dev/api/index/function/forkJoin
# [30, 3, 'c'], complete
rx_fork_join([
rx_of(10, 20, 30),
rx_of(1, 2, 3),
rx_of(
'a'
,
'b'
,
'c'
),
])->subscribe(
$observer
);
# {x => 30, y => 3, z => 'c'}, complete
rx_fork_join({
x
=> rx_of(10, 20, 30),
y
=> rx_of(1, 2, 3),
z
=> rx_of(
'a'
,
'b'
,
'c'
),
})->subscribe(
$observer
);
- rx_from
-
https://rxjs.dev/api/index/function/from
Currently, only arrayrefs, promises, Futures, observables and strings are allowed as argument to this function.
# 10, 20, 30, complete
rx_from([10, 20, 30])->subscribe(
$observer
);
- rx_from_event
-
https://rxjs.dev/api/index/function/fromEvent
Currently, only instances of the Mojo::EventEmitter class are allowed as the first argument to this function.
# 4 seconds after Mojolicious hypnotoad is gracefully reloaded, websocket
# connection will close
sub
websocket (
$c
) {
rx_from_event(
$ioloop
,
'finish'
)->
pipe
(
op_delay(4),
)->subscribe({
next
=>
sub
{
$c
->finish },
});
}
- rx_from_event_array
-
https://rxjs.dev/api/index/function/fromEvent
Similar to: "rx_from_event".
Observables may emit at most one value per event, however Mojo::EventEmitter's are able to emit more. So this function serves to pack all of them in an arrayref, and emit that as a single value instead.
- rx_generate
-
https://rxjs.dev/api/index/function/generate
# 2, 5, 10, 17, 26
rx_generate(
1,
# initializer
sub
(
$x
) {
$x
<= 5 },
# check, and can also use $_ here
sub
(
$x
) {
$x
+ 1 },
# iterate, and can also use $_ here
sub
(
$x
) {
$x
** 2 + 1 },
# result selector (optional), and can also use $_ here
)->subscribe(
$observer
);
- rx_iif
-
https://rxjs.dev/api/index/function/iif
my
$i
;
my
$o
= rx_iif(
sub
{
$i
> 5 },
rx_of(1, 2, 3),
rx_of(10, 20, 30),
);
$i
= 4;
# 10, 20, 30, complete
$o
->subscribe(
$observer
);
$i
= 6;
# 1, 2, 3, complete
$o
->subscribe(
$observer
);
- rx_interval
-
https://rxjs.dev/api/index/function/interval
Works like rxjs's "interval", except the parameter is in seconds instead of ms.
# 0, 1, 2, ... every 0.7 seconds
rx_interval(0.7)->subscribe(
$observer
);
- rx_merge
-
https://rxjs.dev/api/index/function/merge
# 0, 0, 1, 1, 2, 3, 2, 4, 3, ...
rx_merge(
rx_interval(0.7),
rx_interval(1),
)->subscribe(
$observer
);
- rx_NEVER
-
https://rxjs.dev/api/index/const/NEVER
# 10, 20, 30 (and no complete)
rx_concat(
rx_of(10, 20, 30),
rx_NEVER,
rx_of(40, 50, 60),
)->subscribe(
$observer
);
- rx_observable
-
https://rxjs.dev/api/index/class/Observable
# 0.578, 0.234, 0.678, ... (every 1 second)
my
$o
= rx_observable->new(
sub
(
$subscriber
) {
# your code goes here
Mojo::IOLoop->recurring(1,
sub
{
$subscriber
->
next
(
rand
())});
});
Check the guide to creating your own observables.
- rx_of
-
https://rxjs.dev/api/index/function/of
# 10, 20, 30, complete
rx_of(10, 20, 30)->subscribe(
$observer
);
- rx_on_error_resume_next
-
https://rxjs.dev/api/index/function/onErrorResumeNext
# 1, 2, 3, 10, 20, 30, complete
rx_on_error_resume_next(
rx_of(1, 2, 3)->
pipe
( op_concat_with(rx_throw_error(
'foo'
)) ),
rx_throw_error(
'bar'
),
rx_of(10, 20, 30),
rx_throw_error(
'baz'
),
)->subscribe(
$observer
);
- rx_partition
-
https://rxjs.dev/api/index/function/partition
# 1, 3, 5, 7, 9, 0, 2, 4, 6, 8, complete
my
$source
= rx_interval(1)->
pipe
( op_take(10) );
my
(
$o1
,
$o2
) = rx_partition(
$source
,
sub
(
$value
,
$index
) {
$value
% 2 == 1 },
);
rx_concat(
$o1
,
$o2
)->subscribe(
$observer
);
- rx_race
-
https://rxjs.dev/api/index/function/race
# 0, 10, 20, 30, ... (every 0.7 seconds)
rx_race(
rx_interval(1)->
pipe
( op_map(
sub
{
$_
[0] * 100}) ),
rx_interval(0.7)->
pipe
( op_map(
sub
{
$_
[0] * 10) ),
)->subscribe(
$observer
);
- rx_range
-
https://rxjs.dev/api/index/function/range
# 10, 11, 12, 13, 14, 15, 16, complete
rx_range(10, 7)->subscribe(
$observer
);
- rx_replay_subject
-
https://rxjs.dev/api/index/class/ReplaySubject
Works like rxjs's "replaySubject", except the
window_time
parameter is in seconds instead of ms.# 20, 30, 40, 50, complete
my
$rs
= rx_replay_subject(2);
$rs
->
next
(10);
$rs
->
next
(20);
$rs
->
next
(30);
$rs
->subscribe(
$observer
);
$rs
->
next
(40);
$rs
->
next
(50);
$rs
->complete;
# or...
my
$rs
= rx_replay_subject(2, 3);
# params: buffer_size, window_time
- rx_subject
-
https://rxjs.dev/api/index/class/Subject
# 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, complete
my
$subject
= rx_subject->new;
$subject
->subscribe(
$observer
);
# elsewhere...
$subject
->
next
(
$_
)
for
1 .. 10;
$subject
->complete;
- rx_throw_error
-
https://rxjs.dev/api/index/function/throwError
# 0, 1, 2, 3, error: foo
rx_concat(
rx_interval(1)->
pipe
( op_take(4) ),
rx_throw_error(
'foo'
),
)->subscribe(
$observer
);
- rx_timer
-
https://rxjs.dev/api/index/function/timer
Works like rxjs's "timer", except the parameter is in seconds instead of ms.
# (pause 10 seconds) 0, complete
rx_timer(10)->subscribe(
$observer
);
# (pause 10 seconds) 0, 1, 2, 3, ... (every 1 second)
rx_timer(10, 1)->subscribe(
$observer
);
- rx_zip
-
https://rxjs.dev/api/index/function/zip
# [0, 0, 0], [1, 1, 1], [2, 2, 2], complete
rx_zip(
rx_interval(0.7)->
pipe
( op_take(3) ),
rx_interval(1),
rx_interval(2),
)->subscribe(
$observer
);
PIPEABLE OPERATORS
Pipeable operators (also referred to as "operators") are passed as arguments to the "pipe" method of observables. Their function is to take an observable, transform it somehow, then (similar to piped shell commands) pass the result of the transformation to the next pipeable operator in the pipe, or return it to the user.
The following list is the currently implemented operators, with links to relevant rxjs documentation (which should apply to RxPerl too).
- op_audit
-
https://rxjs.dev/api/operators/audit
# 1, 3, 5, 7, 9, ...
rx_interval(0.7)->
pipe
(
op_audit(
sub
(
$val
) { rx_timer(1) }),
# can also use $_ here
)->subscribe(
$observer
);
- op_audit_time
-
https://rxjs.dev/api/operators/auditTime
Works like rxjs's "auditTime", except the parameter is in seconds instead of ms.
# 30, complete
rx_concat(
rx_of(10, 20, 30),
rx_timer(5)->
pipe
( op_ignore_elements ),
)->
pipe
(
op_audit_time(1),
)->subscribe(
$observer
);
- op_buffer
-
https://rxjs.dev/api/operators/buffer
# [0, 1, 2], [3, 4, 5], [6, 7, 8, 9], ...
rx_interval(0.3)->
pipe
(
op_buffer(rx_interval(1.001)),
)->subscribe(
$observer
);
- op_buffer_count
-
https://rxjs.dev/api/operators/bufferCount
# [10, 20, 30], [40, 50], complete
rx_of(10, 20, 30, 40, 50)->
pipe
(
op_buffer_count(3),
)->subscribe(
$observer
);
# [10, 20, 30], [20, 30, 40], [30, 40, 50], [40, 50], [50], complete
rx_of(10, 20, 30, 40, 50)->
pipe
(
op_buffer_count(3, 1),
)->subscribe(
$observer
);
- op_buffer_time
-
Works like rxjs's "bufferTime", except the parameter is in seconds instead of ms.
https://rxjs.dev/api/operators/bufferTime
# [0], [1], [2, 3], [4], [5, 6], [7]...
rx_interval(0.7)->
pipe
(
op_buffer_time(1),
)->subscribe(
$observer
);
- op_catch_error
-
https://rxjs.dev/api/operators/catchError
# foo, foo, foo, complete
rx_throw_error(
'foo'
)->
pipe
(
op_catch_error(
sub
(
$err
,
$caught
) { rx_of(
$err
,
$err
,
$err
) }),
# could use $_ instead of $err here
)->subscribe(
$observer
);
- op_combine_latest_with
-
https://rxjs.dev/api/operators/combineLatestWith
Similar to rx_combine_latest, but as a pipeable operator.
# [0, 0, -5], [0, 1, -5], [10, 1, -5], [10, 2, -5], [10, 3, -5], [20, 3, -5], ...
rx_interval(1)->
pipe
(
op_map(
sub
{
$_
* 10 }),
op_combine_latest_with(
rx_interval(0.7),
rx_of(-5),
),
op_take(10),
)->subscribe(
$observer
);
- op_concat_all
-
https://rxjs.dev/api/operators/concatAll
# 0, 1, 2, 0, 1, 2, 0, 1, complete
rx_interval(0.7)->
pipe
(
op_map(
sub
{ rx_interval(1)->
pipe
( op_take(3) ) }),
op_concat_all(),
op_take(10),
)->subscribe(
$observer
);
- op_concat_map
-
https://rxjs.dev/api/operators/concatMap
# 0, 1, 2, 0, 1, 2, 0, 1, 2, complete
rx_of(10, 20, 30)->
pipe
(
op_concat_map(
sub
(
$val
,
$idx
) {
rx_interval(1)->
pipe
(op_take(3)),
# can also use $_ here instead of $val
}),
)->subscribe(
$observer
);
- op_concat_with
-
https://rxjs.dev/api/operators/concatWith
# 0, 1, 2, 3, 4, 5, 6, 7, 8, complete
rx_of(0, 1, 2)->
pipe
(
op_concat_with(
rx_of(3, 4, 5),
rx_of(6, 7, 8),
),
)->subscribe(
$observer
);
- op_count
-
https://rxjs.dev/api/operators/count
# 3, complete
rx_of(0, 1, 2)->
pipe
(
op_count(),
)->subscribe(
$observer
);
# 3, complete
rx_of(0, 1, 2, 3, 4, 5, 6)->
pipe
(
op_count(
sub
{
$_
[0] % 2 == 1 }),
# can also use $_ here
)->subscribe(
$observer
);
# 4, complete
rx_of(1, 1, 1, 1, 1, 1, 1)->
pipe
(
op_count(
sub
(
$value
,
$idx
) {
$idx
% 2 == 0 }),
);
- op_debounce
-
https://rxjs.dev/api/operators/debounce
# 3, complete
rx_of(1, 2, 3)->
pipe
(
op_debounce(
sub
(
$val
) { rx_timer(0.5) }),
# can also use $_ here
)->subscribe(
$observer
);
- op_debounce_time
-
https://rxjs.dev/api/operators/debounceTime
Works like rxjs's "debounceTime", except the parameter is in seconds instead of ms.
# 3, complete
rx_of(1, 2, 3)->
pipe
(
op_debounce_time(0.5),
)->subscribe(
$observer
);
- op_default_if_empty
-
https://rxjs.dev/api/operators/defaultIfEmpty
# 42, complete
rx_timer(0.7)->
pipe
(
op_ignore_elements(),
op_default_if_empty(42),
)->subscribe(
$observer
);
# 0, 1, complete
rx_interval(0.7)->
pipe
(
op_take(2),
op_default_if_empty(42),
)->subscribe(
$observer
);
- op_delay
-
https://rxjs.dev/api/operators/delay
Works like rxjs 7's "delay", except the parameter is in seconds instead of ms.
# (pause 11 seconds) 0, 1, 2, 3, ...
rx_interval(1)->
pipe
(
op_delay(10)
)->subscribe(
$observer
);
Note: Just as in rxjs 7, the complete event will not be delayed, so don't do this:
rx_EMPTY->
pipe
( op_delay(2) )
Do this instead, to achieve the expected effect:
rx_timer(2)->
pipe
( op_ignore_elements() )
- op_delay_when
-
https://rxjs.dev/api/operators/delayWhen
# (pause 3 seconds) 3, (pause 1 second) 4, (pause one second) 5, complete
rx_of(3, 4, 5)->
pipe
(
op_delay_when(
sub
(
$val
,
$idx
) { rx_timer(
$val
) }),
# can also use $_ here
)->subscribe(
$observer
);
- op_distinct
-
https://rxjs.dev/api/operators/distinct
# 1, 2, 3, 4, complete
rx_of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1)->
pipe
(
op_distinct(),
)->subscribe(
$observer
);
# { age => 4, name => 'Foo' }, { age => 7, name => 'Bar' }, complete
rx_of(
{
age
=> 4,
name
=>
'Foo'
},
{
age
=> 7,
name
=>
'Bar'
},
{
age
=> 5,
name
=>
'Foo'
},
)->
pipe
(
op_distinct(
sub
(
$val
) {
$val
->{name} }),
# can also use $_ here
)->subscribe(
$observer
);
- op_distinct_until_changed
-
https://rxjs.dev/api/operators/distinctUntilChanged
# 10, undef, 20, 30, [], [], complete
rx_of(10, 10,
undef
,
undef
, 20, 20, 20, 30, 30, [], [])->
pipe
(
op_distinct_until_changed(),
)->subscribe(
$observer
);
# {name => 'Peter', grade => 'A'}, {name => 'Mary', grade => 'B'}, complete
rx_of(
{
name
=>
'Peter'
,
grade
=>
'A'
},
{
name
=>
'Peter'
,
grade
=>
'B'
},
{
name
=>
'Mary'
,
grade
=>
'B'
},
{
name
=>
'Mary'
,
grade
=>
'A'
},
)->
pipe
(
op_distinct_until_changed(
sub
{
return
$_
[0]->{name} eq
$_
[1]->{name};
}),
)->subscribe(
$observer
);
- op_distinct_until_key_changed
-
https://rxjs.dev/api/operators/distinctUntilKeyChanged
# {name => 'Peter', grade => 'A'}, {name => 'Mary', grade => 'B'}, complete
rx_of(
{
name
=>
'Peter'
,
grade
=>
'A'
},
{
name
=>
'Peter'
,
grade
=>
'B'
},
{
name
=>
'Mary'
,
grade
=>
'B'
},
{
name
=>
'Mary'
,
grade
=>
'A'
},
)->
pipe
(
op_distinct_until_key_changed(
'name'
),
)->subscribe(
$observer
);
- op_element_at
-
https://rxjs.dev/api/operators/elementAt
# 2, complete
rx_interval(0.7)->
pipe
(
op_take(5),
op_element_at(2, 9),
)->subscribe(
$observer
);
- op_end_with
-
https://rxjs.dev/api/operators/endWith
# 0, 1, 2, 3, 100, 200, complete
rx_of(0, 1, 2, 3)->
pipe
(
op_end_with(100, 200),
)->subscribe(
$observer
);
- op_every
-
https://rxjs.dev/api/operators/every
Works like rxjs's "every", and emits true of false.
# "", complete
rx_of(5, 10, 15, 18, 20)->
pipe
(
op_every(
sub
(
$value
,
$idx
) {
$value
% 5 == 0 }),
# can also use $_ here
)->subscribe(
$observer
);
- op_exhaust_all
-
https://rxjs.dev/api/operators/exhaustAll
# 0, 1, 2, 3, 0, 1, 2, 3, complete
rx_interval(3)->
pipe
(
op_take(3),
op_map(
sub
{ rx_interval(1)->
pipe
( op_take(4) ) }),
op_exhaust_all(),
)->subscribe(
$observer
);
- op_exhaust_map
-
https://rxjs.dev/api/operators/exhaustMap
# 0, 1, 2, complete
rx_of(10, 20, 30)->
pipe
(
op_exhaust_map(
sub
(
$val
,
$idx
) {
rx_interval(1)->
pipe
( op_take(3) );
# can also use $_ here instead of $val
}),
)->subscribe(
$observer
);
- op_filter
-
https://rxjs.dev/api/operators/filter
# 0, 2, 4, 6, ... (every 1.4 seconds)
rx_interval(0.7)->
pipe
(
op_filter(
sub
{
$_
[0] % 2 == 0}),
# can also use $_ here
)->subscribe(
$observer
);
# 0, 2, 4, 6, ... (every 1.4 seconds)
rx_interval(0.7)->
pipe
(
op_filter(
sub
{
$_
% 2 == 0}),
)->subscribe(
$observer
);
# 10, 36, 50, complete
rx_of(10, 22, 36, 41, 50, 73)->
pipe
(
op_filter(
sub
(
$v
,
$idx
) {
$idx
% 2 == 0 }),
)->subscribe(
$observer
);
- op_finalize
-
https://rxjs.dev/api/operators/finalize
Note: Observe, in the second example below, that the order of execution of the finalize callbacks obeys the rxjs v7 order ('f1' first) rather than the rxjs v6 order ('f2' first).
# 1, 2, 3, complete, 'hi there'
rx_of(1, 2, 3)->
pipe
(
op_finalize(
sub
{
say
"hi there"
}),
)->subscribe(
$observer
);
# 0, f1, f2
my
$s
;
$s
= rx_interval(1)->
pipe
(
op_finalize(
sub
{
say
"f1"
}),
op_finalize(
sub
{
say
"f2"
}),
)->subscribe(
sub
{
say
$_
[0];
$s
->unsubscribe;
});
- op_find
-
https://rxjs.dev/api/operators/find
# 7, complete
rx_interval(0.7)->
pipe
(
op_find(
sub
(
$val
,
$idx
) {
$val
== 7 }),
# can also use $_ here
)->subscribe(
$observer
);
# undef, complete
rx_interval(0.7)->
pipe
(
op_take(5),
op_find(
sub
{
$_
== 7 }),
)->subscribe(
$observer
);
- op_find_index
-
https://rxjs.dev/api/operators/findIndex
# 7, complete
rx_interval(0.7)->
pipe
(
op_map(
sub
{
$_
* 2 }),
op_find_index(
sub
(
$val
,
$idx
) {
$val
== 14 }),
# can also use $_ here
)->subscribe(
$observer
);
# -1, complete
rx_interval(0.7)->
pipe
(
op_take(5),
op_find_index(
sub
{
$_
== 7 }),
)->subscribe(
$observer
);
- op_first
-
https://rxjs.dev/api/operators/first
# (pause 7 seconds) 6, complete
rx_interval(1)->
pipe
(
op_first(
sub
(
$val
,
$idx
) {
$val
> 5 }),
# can also use $_ here
)->subscribe(
$observer
);
# 0, complete
rx_interval(0.7)->
pipe
(
op_first(),
)->subscribe(
$observer
);
- op_ignore_elements
-
https://rxjs.dev/api/operators/ignoreElements
# (pause 3 seconds) complete
rx_interval(1)->
pipe
(
op_take(3),
op_ignore_elements(),
)->subscribe(
$observer
);
# (pause 3 seconds) error: foo
rx_concat(
rx_interval(1)->
pipe
(op_take(3)),
rx_throw_error(
'foo'
),
)->
pipe
(
op_ignore_elements(),
)->subscribe(
$observer
);
- op_is_empty
-
https://rxjs.dev/api/operators/isEmpty
Works like rxjs's "isEmpty", and emits true or false.
# (pause 1 second) "", complete
rx_interval(1)->
pipe
(
op_is_empty(),
)->subscribe(
$observer
);
# (pause 2 seconds) 1, complete
rx_timer(2)->
pipe
(
op_ignore_elements(),
op_is_empty(),
)->subscribe(
$observer
);
- op_last
-
https://rxjs.dev/api/operators/last
# 6, complete
rx_of(5, 6, 7)->
pipe
(
op_last(
sub
(
$val
,
$idx
) {
$val
% 2 == 0 }),
# can also use $_ here
)->subscribe(
$observer
);
# 9, complete
rx_EMPTY->
pipe
(
op_last(
undef
, 9),
# predicate, default
)->subscribe(
$observer
);
# error: no last value found
rx_EMPTY->
pipe
( op_last )->subscribe(
$observer
);
- op_map
-
https://rxjs.dev/api/operators/map
You can use
$_
instead of$_[0]
inside this operator's callback.# 10, 11, 12, 13, ...
rx_interval(1)->
pipe
(
op_map(
sub
{
$_
[0] + 10}),
)->subscribe(
$observer
);
# 10, 11, 12, 13, ...
rx_interval(1)->
pipe
(
op_map(
sub
{
$_
+ 10}),
)->subscribe(
$observer
);
# 10-0, 20-1, 30-2, complete
rx_of(10, 20, 30)->
pipe
(
op_map(
sub
(
$v
,
$idx
) {
"$v-$idx"
}),
)->subscribe(
$observer
);
- op_map_to
-
https://rxjs.dev/api/operators/mapTo
# 123, 123, 123, ... (every 1 second)
rx_interval(1)->
pipe
(
op_map_to(123),
)->subscribe(
$observer
);
- op_max
-
https://rxjs.dev/api/operators/max
# 20, complete
rx_of(10, 20, 15)->
pipe
(
op_max(),
)->subscribe(
$observer
);
# { a => 20 }, complete
rx_of(
{
a
=> 10 },
{
a
=> 20 },
{
a
=> 15 },
)->
pipe
(
op_max(
sub
(
$x
,
$y
) {
$x
->{a} <=>
$y
->{a} }),
)->subscribe(
$observer
);
- op_merge_all
-
https://rxjs.dev/api/operators/mergeAll
# 0, 1, 0, 2, 1, 3, 2, 0, 3, 1, 0, ...
rx_interval(1)->
pipe
(
op_map(
sub
{ rx_interval(0.7)->
pipe
(op_take(4)) }),
op_merge_all(2),
)->subscribe(
$observer
);
- op_merge_map
-
https://rxjs.dev/api/operators/mergeMap
# 11, 21, 31, 12, 22, 32, 13, 23, 33, complete
rx_of(10, 20, 30)->
pipe
(
op_merge_map(
sub
(
$x
,
$idx
) {
# can also use $_ here instead of $_[0]
return
rx_interval(1)->
pipe
(
op_map(
sub
(
$y
, @) {
return
$x
+
$y
+ 1;
}),
op_take(3),
);
}),
)->subscribe(
$observer
);
- op_merge_with
-
https://rxjs.dev/api/operators/mergeWith
# 0, 0, 1, 1, 2, 3, 2, 4, 3, ...
rx_interval(0.7)->
pipe
(
rx_merge_with( rx_interval(1) ),
)->subscribe(
$observer
);
- op_min
-
https://rxjs.dev/api/operators/min
# 10, complete
rx_of(20, 10, 15)->
pipe
(
op_min(),
)->subscribe(
$observer
);
# { a => 10 }, complete
rx_of(
{
a
=> 20 },
{
a
=> 10 },
{
a
=> 15 },
)->
pipe
(
op_min(
sub
(
$x
,
$y
) {
$x
->{a} <=>
$y
->{a} }),
)->subscribe(
$observer
);
- op_multicast
- op_on_error_resume_next_with
-
https://rxjs.dev/api/index/function/onErrorResumeNextWith
# 1, 2, 3, 10, 20, 30, complete
rx_of(1, 2, 3)->
pipe
(
op_concat_with( rx_throw_error(
'foo'
) ),
op_on_error_resume_next_with(
rx_throw_error(
'bar'
),
rx_of(10, 20, 30),
rx_throw_error(
'baz'
),
),
)->subscribe(
$observer
);
- op_pairwise
-
https://rxjs.dev/api/operators/pairwise
# [0, 1], [1, 2], [2, 3], ...
rx_interval(1)->
pipe
(
op_pairwise(),
)->subscribe(
sub
{
print
Dumper(
$_
[0])});
- op_pluck
-
https://rxjs.dev/api/operators/pluck
# Mary, Paul, undef, undef, undef, complete
rx_of(
{
name
=> {
first
=>
'Mary'
}},
{
name
=> {
first
=>
'Paul'
}},
{
house
=> {
first
=>
'Chicago'
}},
15,
undef
,
)->
pipe
(
op_pluck(
'name'
,
'first'
),
)->subscribe(
$observer
);
- op_race_with
-
https://rxjs.dev/api/operators/raceWith
# 0, 1, 2, 3, 4, ... (every second)
rx_interval(3)->
pipe
(
op_race_with(
rx_interval(2),
rx_interval(1),
),
)->subscribe(
$observer
);
- op_reduce
-
https://rxjs.dev/api/operators/reduce
# (pause 6 seconds) 15, complete
rx_interval(1)->
pipe
(
op_take(6),
op_reduce(
sub
(
$acc
,
$value
,
$idx
) {
$acc
+
$value
}, 0),
)->subscribe(
$observer
);
- op_ref_count
- op_repeat
-
https://rxjs.dev/api/operators/repeat
# 10, 20, 30, 10, 20, 30, 10, 20, 30, complete
rx_of(10, 20, 30)->
pipe
(
op_repeat(3),
)->subscribe(
$observer
);
- op_retry
-
https://rxjs.dev/api/operators/retry
# 10, 20, 30, 10, 20, 30, 10, 20, 30, error: foo
rx_concat(
rx_of(10, 20, 30),
rx_throw_error(
'foo'
),
)->
pipe
(
op_retry(2),
)->subscribe(
$observer
);
- op_sample
-
https://rxjs.dev/api/operators/sample
# 0, 1, 3, 4, 6, 7, ...
rx_interval(0.7)->
pipe
(
op_sample(rx_interval(1)),
)->subscribe(
$observer
);
- op_sample_time
-
https://rxjs.dev/api/operators/sampleTime
Works like rxjs's "sampleTime", except the parameter is in seconds instead of ms.
# 0, 2, 3, 5, 6, 8, ...
rx_interval(1)->
pipe
(
op_sample_time(1.6),
)->subscribe(
$observer
);
- op_scan
-
https://rxjs.dev/api/operators/scan
# 0, 1, 3, 6, 10, ...
rx_interval(1)->
pipe
(
op_scan(
sub
{
my
(
$acc
,
$item
,
$idx
) =
@_
;
return
$acc
+
$item
;
}, 0),
)->subscribe(
$observer
);
-
https://rxjs.dev/api/operators/share
# t0, 0, 0, t1, 1, 1, t2, 2, 2, ...
my
$o
= rx_interval(1)->
pipe
(
op_tap(
sub
{
say
't'
.
$_
[0]}),
op_share(),
);
$o
->subscribe(
$observer1
);
$o
->subscribe(
$observer2
);
- op_single
-
https://rxjs.dev/api/operators/single
# error: Too many values match
rx_of(0, 1, 2, 3)->
pipe
(
op_single(
sub
(
$val
,
$idx
) {
$val
% 2 == 1 }),
# can also use $_ here
)->subscribe(
$observer
);
# error: No values match
rx_of(1, 3)->
pipe
(
op_single(
sub
{
$_
% 2 == 0 }),
)->subscribe(
$observer
);
# 42, complete
rx_of(42)->
pipe
(
op_single(),
)->subscribe(
$observer
);
- op_skip
-
https://rxjs.dev/api/operators/skip
# 40, 50, complete
rx_of(10, 20, 30, 40, 50)->
pipe
(
op_skip(3),
)->subscribe(
$observer
);
- op_skip_last
-
https://rxjs.dev/api/operators/skipLast
# (pause 3 seconds) 0, 1, 2, 3, 4, 5, 6, 7, complete
rx_interval(1)->
pipe
(
op_take(10),
op_skip_last(2),
)->subscribe(
$observer
);
- op_skip_until
-
https://rxjs.dev/api/operators/skipUntil
# (pause 4 seconds) 3, 4, 5, ...
rx_interval(1)->
pipe
(
op_skip_until( rx_timer(3.5) ),
)->subscribe(
$observer
);
- op_skip_while
-
https://rxjs.dev/api/operators/skipWhile
# 5, 3, 7, 1, complete
rx_of(1, 3, 5, 3, 7, 1)->
pipe
(
op_skip_while(
sub
(
$v
,
$idx
) {
$v
< 4 }),
# can also use $_ here
)->subscribe(
$observer
);
- op_start_with
-
https://rxjs.dev/api/operators/startWith
# 100, 200, 0, 1, 2, 3, complete
rx_of(0, 1, 2, 3)->
pipe
(
op_start_with(100, 200),
)->subscribe(
$observer
);
- op_switch_all
-
https://rxjs.dev/api/operators/switchAll
# 0, 0, 0, 1, 2, 3, 4, complete
rx_timer(0, 3)->
pipe
(
op_take(3),
op_map(
sub
{ rx_interval(2)->
pipe
(op_take(5)) }),
op_switch_all(),
)->subscribe(
$observer
);
- op_switch_map
-
https://rxjs.dev/api/operators/switchMap
# 1, 2, 3, 11, 12, 13, 21, 22, 23, 24, 25, 26, 27, ...
my
$o
= rx_interval(2.5)->
pipe
( op_take(3) );
$o
->
pipe
(
op_switch_map(
sub
(
$x
,
$idx
) {
# can also use $_ here instead of $_[0]
return
rx_interval(0.7)->
pipe
(
op_map(
sub
(
$y
,
$idx2
) {
$x
* 10 +
$y
+ 1 }),
);
}),
)->subscribe(
$observer
);
- op_take
-
https://rxjs.dev/api/operators/take
# 0, 1, 2, 3, 4, complete
rx_interval(1)->
pipe
(
op_take(5),
)->subscribe(
$observer
);
- op_take_last
-
https://rxjs.dev/api/operators/takeLast
# 3, 5, 6, complete
rx_of(1, 2, 3, 5, 6)->
pipe
(
op_take_last(3),
)->subscribe(
$observer
);
- op_take_until
-
https://rxjs.dev/api/operators/takeUntil
# 0, 1, 2, 3, 4, complete
rx_interval(1)->
pipe
(
op_take_until( rx_timer(5.5) ),
)->subscribe(
$observer
);
- op_take_while
-
https://rxjs.dev/api/operators/takeWhile
# 0, 1, 2, 3, 4, 5, complete
rx_interval(1)->
pipe
(
op_take_while(
sub
(
$val
,
$idx
) {
$val
<= 5 }),
# can also use $_ here
)->subscribe(
$observer
);
# 0, 1, 2, 3, 4, 5, 6, complete
rx_interval(1)->
pipe
(
op_take_while(
sub
{
$_
<= 5 }, 1),
)->subscribe(
$observer
);
- op_tap
-
https://rxjs.dev/api/operators/tap
# foo0, 0, foo1, 1, foo2, 2, ...
rx_interval(1)->
pipe
(
op_tap(
sub
{
say
"foo$_[0]"
}),
)->subscribe(
$observer
);
- op_throttle
-
https://rxjs.dev/api/operators/throttle
# 0, 2, 4, 6, 8, 10, ...
rx_interval(0.7)->
pipe
(
op_throttle(
sub
(
$val
) { rx_timer(1) }),
# can also use $_ here
)->subscribe(
$observer
);
- op_throttle_time
-
https://rxjs.dev/api/operators/throttleTime
Works like rxjs's "throttleTime", except the parameter is in seconds instead of ms.
At the moment, this function only accepts
duration
as parameter, not the configuration options that rxjs's throttleTime accepts.# 0, 3, 6, 9, 12, ...
rx_interval(1)->
pipe
(
op_throttle_time(2.1),
)->subscribe(
$observer
);
- op_throw_if_empty
-
https://rxjs.dev/api/operators/throwIfEmpty
# error: hello
rx_timer(1)->
pipe
(
op_ignore_elements(),
op_throw_if_empty(
sub
{
"hello"
}),
)->subscribe(
$observer
);
# 0, 1, 2, complete
rx_interval(0.7)->
pipe
(
op_take(3),
op_throw_if_empty(
sub
{
"hello"
}),
)->subscribe(
$observer
);
- op_time_interval
-
https://rxjs.dev/api/operators/timeInterval
# { value => 0, interval => 0.7 }, { vale => 1, interval => 0.7 }, complete
rx_interval(0.7)->
pipe
(
op_take(2),
op_time_interval(),
)->subscribe(
$observer
);
- op_timeout
-
https://rxjs.dev/api/operators/timeout
# 0, error: Timeout has occurred
rx_timer(0.5, 2)->
pipe
(
op_timeout(1),
)->subscribe(
$observer
);
- op_timestamp
-
https://rxjs.dev/api/operators/timestamp
# { value => 0, timestamp => 1675976745.17414 }, { value => 1, timestamp => 1675976746.17414 }, complete
rx_interval(1)->
pipe
(
op_take(2),
op_timestamp(),
)->subscribe(
$observer
);
- op_to_array
-
https://rxjs.dev/api/operators/toArray
# [0, 1, 2, 3, 4], complete
rx_interval(0.7)->
pipe
(
op_take(5),
op_to_array(),
)->subscribe(
$observer
);
- op_with_latest_from
-
https://rxjs.dev/api/operators/withLatestFrom
# [0, 0], [1, 1], [2, 3], [3, 4], [4, 6], ...
rx_interval(1)->
pipe
(
op_with_latest_from(rx_interval(0.7)),
)->subscribe(
$observer
);
- op_zip_with
-
https://rxjs.dev/api/operators/zipWith
# [0, 0, 0], [1, 1, 1], [2, 2, 2], complete
rx_interval(0.7)->
pipe
(
op_take(3),
op_zip_with(
rx_interval(1),
rx_interval(2),
),
)->subscribe(
$observer
);
PROMISE FUNCTIONS
These functions return a promise or a future, and require the existence of a user-selectable promise library which is automatically loaded in runtime. The functions are borrowed from rxjs 7.
You can optionally set the type of promises returned by these functions with the RxPerl::AnyEvent->set_promise_class($promise_class)
class method, unless you're using RxPerl::AnyEvent, in which case it's mandatory.
By default the functions return a Mojo::Promise object (when using with RxPerl::Mojo), or a Future object (when using with RxPerl::IOAsync).
- first_value_from
-
Accepts an observable and returns a promise that resolves with the observable's first emitted value as soon as it gets emitted. If no value is emitted before the observable's completion, the promise is rejected.
RxPerl::AnyEvent->set_promise_class(
'Promise::ES6'
);
# Mojo::Promise works also
my
$o
= ...;
# an observable
first_value_from(
$o
)->then( ... );
- last_value_from
-
Accepts an observable and returns a promise that resolves with the observable's last emitted value as soon as the observable completes. If no value is emitted before the observable's completion, the promise is rejected.
RxPerl::AnyEvent->set_promise_class(
'Promise::ES6'
);
# Mojo::Promise works also
my
$o
= ...;
# an observable
last_value_from(
$o
)->then( ... );
OTHER FUNCTIONS
OBSERVABLE METHODS
- subscribe
-
http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-subscribe
$o
->subscribe(
sub
{
say
"next: $_[0]"
},
sub
{
say
"error: $_[0]"
},
sub
{
say
"complete"
},
);
$o
->subscribe(
undef
,
sub
{
say
"error: $_[0]"
},
);
$o
->subscribe({
next
=>
sub
{
say
"next: $_[0]"
},
complete
=>
sub
{
say
"complete"
},
});
- pipe
-
http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-pipe
# 2, 6, complete
rx_interval(1)->
pipe
(
op_take(5),
op_filter(
sub
{
$_
[0] % 2 == 1}),
op_map(
sub
{2 *
$_
[0]}),
)->subscribe(...)
CONNECTABLE OBSERVABLE METHODS
Connectable observables are a subclass of observables, which (like Subjects) are multicasting and can start emitting even before anyone subscribes to them, by invoking a method. They are usually created and returned by the "op_multicast" pipeable operator.
SUBJECT METHODS
Subjects multicast, and apart from being observables themselves (with their own subscribers), also have next, error and complete methods of their own, so can be used as the observer argument to another observable's subscribe method. That observable's events will then be "forwarded" to the subject's own subscribers, as if next/error/complete had been called on the subject directly.
- next, error, complete
-
Calling these methods manually will cause the subject's subscribers to receive the corresponding events.
Typically subjects don't emit anything on their own (as opposed to "rx_interval" et al), although it is possible to create a subclass of Subject that behaves differently. An example is a queueing subject that accumulates events from the observable it has been subscribed to, then emits all of them at once to the first subscriber that subscribes to it.
NAMING CONVENTIONS
To prevent naming collisions with Perl’s built-in functions (or the user’s own), as rxjs’s operators are often small english words (such as map
), the names of this module’s operators start with rx_
or op_
.
Functions that in the JS world would be imported from 'rxjs' have their corresponding RxPerl names prepended with rx_
, whereas functions imported from 'rxjs/operators' (namely pipeable opreators) start with op_
in RxPerl.
import
{Observable, Subject, timer, interval} from
'rxjs'
;
import
{
map
, filter, delay} from
'rxjs/operators'
;
becomes:
CAVEATS
Since the rxjs implementation differs from the ReactiveX API at a few points (as do most of the Rx* libraries), RxPerl chose to behave like rxjs rather than ReactiveX to cater for web developers already familiar with rxjs.
LEARNING RESOURCES
Ultimate RxJS courses (paid)
egghead RxJS courses (paid)
SEE ALSO
NOTIFICATIONS FOR NEW RELEASES
You can start receiving emails for new releases of this module, at https://perlmodules.net.
COMMUNITY CODE OF CONDUCT
The Community Code of Conduct can be found here.
LICENSE
Copyright (C) 2020 Karelcom OÜ.
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
AUTHOR
Alexander Karelas <karjala@cpan.org>