NAME
Ryu::Source - base representation for a source of events
SYNOPSIS
my $src = Ryu::Source->new;
my $chained = $src->map(sub { $_ * $_ })->prefix('value: ')->say;
$src->emit($_) for 1..5;
$src->finish;
DESCRIPTION
This is probably the module you'd want to start with, if you were going to be using any of this. There's a disclaimer in Ryu that may be relevant at this point.
GLOBALS
$FUTURE_FACTORY
This is a coderef which should return a new Future-compatible instance.
Example overrides might include:
$Ryu::Source::FUTURE_FACTORY = sub { Mojo::Future->new->set_label($_[1]) };
%ENCODER
An encoder is a coderef which takes input and returns output.
METHODS
new
Takes named parameters, such as:
label - the label used in descriptions
Note that this is rarely called directly, see "from", "empty" and "never" instead.
from
Creates a new source from things.
The precise details of what this method supports may be somewhat ill-defined at this point in time. It is expected that the interface and internals of this method will vary greatly in versions to come.
empty
Creates an empty source, which finishes immediately.
never
An empty source that never finishes.
METHODS - Instance
describe
Returns a string describing this source and any parents - typically this will result in a chain like from->combine_latest->count
.
encode
Passes each item through an encoder.
The first parameter is the encoder to use, the remainder are used as options for the selected encoder.
Examples:
$src->encode('json')
$src->encode('utf8')
$src->encode('base64')
decode
Passes each item through a decoder.
The first parameter is the decoder to use, the remainder are used as options for the selected decoder.
Examples:
$src->decode('json')
$src->decode('utf8')
$src->decode('base64')
Shortcut for ->each(sub { print })
, except this will also save the initial state of $\
and use that for each call for consistency.
say
Shortcut for ->each(sub { print "$_\n" })
.
hexdump
Convert input bytes to a hexdump representation, for example:
00000000 00 00 12 04 00 00 00 00 00 00 03 00 00 00 80 00 >................<
00000010 04 00 01 00 00 00 05 00 ff ff ff 00 00 04 08 00 >................<
00000020 00 00 00 00 7f ff 00 00 >........<
One line is emitted for each 16 bytes.
Takes the following named parameters:
continuous
- accumulates data for a continuous stream, and does not reset the offset counter. Note that this may cause the last output to be delayed until the source completes.
throw
Throws something. I don't know what, maybe a chair.
debounce
Not yet implemented.
Requires timing support, see implementations such as Ryu::Async instead.
chomp
Chomps all items with the given delimiter.
Once you've instantiated this, it will stick with the delimiter which was in force at the time of instantiation. Said delimiter follows the usual rules of $/
, whatever they happen to be.
Example:
$ryu->stdin
->chomp("\n")
->say
map
A bit like "map" in perlfunc.
Takes a single parameter - the coderef to execute for each item. This should return a scalar value which will be used as the next item.
Often useful in conjunction with a do
block to provide a closure.
Examples:
$src->map(do {
my $idx = 0;
sub {
[ @$_, ++$idx ]
}
})
flat_map
Similar to "map", but will flatten out some items:
an arrayref will be expanded out to emit the individual elements
for a Ryu::Source, passes on any emitted elements
This also means you can "merge" items from a series of sources.
Note that this is not recursive - an arrayref of arrayrefs will be expanded out into the child arrayrefs, but no further.
split
Splits the input on the given delimiter.
By default, will split into characters.
Note that each item will be processed separately - the buffer won't be retained across items, see "by_line" for that.
chunksize
Splits input into fixed-size chunks.
Note that output is always guaranteed to be a full chunk - if there is partial input at the time the input stream finishes, those extra bytes will be discarded.
batch
Splits input into arrayref batches of a given size.
Note that the last item emitted may have fewer elements (or none at all).
$src->batch(10)
->map(sub { "Next 10 (or fewer) items: @$_" })
->say;
by_line
Emits one item for each line in the input. Similar to "split" with a \n
parameter, except this will accumulate the buffer over successive items and only emit when a complete line has been extracted.
prefix
Applies a string prefix to each item.
suffix
Applies a string suffix to each item.
sprintf_methods
Convenience method for generating a string from a "sprintf"-style format string and a set of method names to call.
Note that any undef
items will be mapped to an empty string.
Example:
$src->sprintf_methods('%d has name %s', qw(id name))
->say
->await;
ignore
Receives items, but ignores them entirely.
Emits nothing and eventually completes when the upstream Ryu::Source is done.
Might be useful for keeping a source alive.
buffer
Accumulate items while any downstream sources are paused.
Takes the following named parameters:
high
- once at least this many items are buffered, will "pause" the upstream Ryu::Source.low
- if the buffered count drops to this number, will "resume" the upstream Ryu::Source.
as_list
Resolves to a list consisting of all items emitted by this source.
as_arrayref
Resolves to a single arrayref consisting of all items emitted by this source.
as_string
Concatenates all items into a single string.
Returns a Future which will resolve on completion.
combine_latest
Takes the most recent item from one or more Ryu::Sources, and emits an arrayref containing the values in order.
An item is emitted for each update as soon as all sources have provided at least one value. For example, given 2 sources, if the first emits 1
then 2
, then the second emits a
, this would emit a single [2, 'a']
item.
with_index
Emits arrayrefs consisting of [ $item, $idx ]
.
with_latest_from
Similar to "combine_latest", but will start emitting as soon as we have any values. The arrayref will contain undef
for any sources which have not yet emitted any items.
merge
Emits items as they are generated by the given sources.
Example:
$numbers->merge($letters)->say # 1, 'a', 2, 'b', 3, 'c'...
apply
Used for setting up multiple streams.
Accepts a variable number of coderefs, will call each one and gather Ryu::Source results.
switch_str
Given a condition, will select one of the alternatives based on stringified result.
Example:
$src->switch_str(
sub { $_->name }, # our condition
smith => sub { $_->id }, # if this matches the condition, the code will be called with $_ set to the current item
jones => sub { $_->parent->id },
sub { undef } # and this is our default case
);
ordered_futures
Given a stream of Futures, will emit the results as each Future is marked ready. If any fail, the stream will fail.
This is a terrible name for a method, expect it to change.
concurrent
distinct
Emits new distinct items, using string equality with an exception for undef
(i.e. undef
is treated differently from empty string or 0).
Given 1,2,3,undef,2,3,undef,'2',2,4,1,5, you'd expect to get the sequence 1,2,3,undef,4,5.
distinct_until_changed
Removes contiguous duplicates, defined by string equality.
sort_by
Emits items sorted by the given key. This is a stable sort function.
The algorithm is taken from List::UtilsBy.
nsort_by
Emits items numerically sorted by the given key. This is a stable sort function.
See "sort_by".
rev_sort_by
Emits items sorted by the given key. This is a stable sort function.
The algorithm is taken from List::UtilsBy.
rev_nsort_by
Emits items numerically sorted by the given key. This is a stable sort function.
See "sort_by".
extract_all
Expects a regular expression and emits hashrefs containing the named capture buffers.
The regular expression will be applied using the m//gc operator.
Example:
$src->extract_all(qr{/(?<component>[^/]+)})
# emits { component => '...' }, { component => '...' }
skip
Skips the first N items.
skip_last
Skips the last N items.
skip_until
Skips the items that arrive before a given condition is reached.
Either a Future instance (we skip all items until it's marked as `done`), or a coderef, which we call for each item until it first returns true
take
Takes a limited number of items.
Given a sequence of 1,2,3,4,5
and ->take(3)
, you'd get 1,2,3 and then the stream would finish.
first
Returns a source which provides the first item from the stream.
some
Applies the given code to each item, and emits a single item:
0 if the code never returned true or no items were received
1 if the code ever returned a true value
every
Similar to "some", except this requires the coderef to return true for all values in order to emit a 1
value.
count
Emits the count of items seen once the parent source completes.
sum
Emits the numeric sum of items seen once the parent completes.
mean
Emits the mean (average) numerical value of all seen items.
max
Emits the maximum numerical value of all seen items.
min
Emits the minimum numerical value of all seen items.
statistics
Emits a single hashref of statistics once the source completes.
This will contain the following keys:
count
sum
min
max
mean
filter
Applies the given parameter to filter values.
The parameter can be a regex or coderef. You can also pass (key, value) pairs to filter hashrefs or objects based on regex or coderef values.
Examples:
$src->filter(name => qr/^[A-Z]/, id => sub { $_ % 2 })
filter_isa
Emits only the items which ->isa
one of the given parameters. Will skip non-blessed items.
emit
Emits the given item.
each
each_as_source
completed
Returns a Future indicating completion (or failure) of this stream.
await
Block until this source finishes.
finish
Mark this source as completed.
METHODS - Proxied
The following methods are proxied to our completion Future:
then
is_ready
is_done
failure
is_cancelled
else
METHODS - Internal
prepare_await
Run any pre-completion callbacks (recursively) before we go into an await cycle.
Used for compatibility with sync bridges when there's no real async event loop available.
chained
Returns a new Ryu::Source chained from this one.
each_while_source
Like "each", but removes the source from the callback list once the parent completes.
map_source
Provides a "chained" source which has more control over what it emits than a standard "map" or "filter" implementation.
$original->map_source(sub {
my ($item, $src) = @_;
$src->emit('' . reverse $item);
});
new_future
Used internally to get a Future.
INHERITED METHODS
AUTHOR
Tom Molesworth <TEAM@cpan.org>
LICENSE
Copyright Tom Molesworth 2011-2019. Licensed under the same terms as Perl itself.