NAME

Ryu::Source - base representation for a source of events

SYNOPSIS

my $src = Ryu::Source->new;
my $chained = $src->map(sub { $_ * $_ })->prefix('value: ')->say;
$src->emit($_) for 1..5;
$src->finish;

DESCRIPTION

This is probably the module you'd want to start with, if you were going to be using any of this. There's a disclaimer in Ryu that may be relevant at this point.

GLOBALS

$FUTURE_FACTORY

This is a coderef which should return a new Future-compatible instance.

Example overrides might include:

$Ryu::Source::FUTURE_FACTORY = sub { Mojo::Future->new->set_label($_[1]) };

METHODS

new

Takes named parameters.

describe

Returns a string describing this source and any parents - typically this will result in a chain like from->combine_latest->count.

from

Creates a new source from things.

The precise details of what this method supports may be somewhat ill-defined at this point in time. It is expected that the interface and internals of this method will vary greatly in versions to come.

encode

Passes each item through an encoder.

The first parameter is the encoder to use, the remainder are used as options for the selected encoder.

Examples:

$src->encode('json')
$src->encode('utf8')
$src->encode('base64')

decode

Passes each item through a decoder.

The first parameter is the decoder to use, the remainder are used as options for the selected decoder.

Examples:

$src->decode('json')
$src->decode('utf8')
$src->decode('base64')

say

Shortcut for ->each(sub { print "$_\n" }).

print

Shortcut for -each(sub { print }) >, except this will also save the initial state of $\ and use that for each call for consistency.

empty

Creates an empty source, which finishes immediately.

never

An empty source that never finishes.

throw

Throws something. I don't know what, maybe a chair.

METHODS - Instance

new_future

Used internally to get a Future.

pause

Does nothing useful.

resume

Is about as much use as "pause".

is_paused

Might return 1 or 0, but is generally meaningless.

debounce

Not yet implemented.

Requires timing support, see implementations such as Ryu::Async instead.

chomp

Chomps all items with the current delimiter.

Once you've instantiated this, it will stick with the delimiter which was in force at the time of instantiation. Said delimiter follows the usual rules of $/, whatever they happen to be.

map

A bit like "map" in perlfunc.

split

Splits the input into chunks. By default, will split into characters.

chunksize

Splits input into fixed-size chunks.

Note that output is always guaranteed to be a full chunk - if there is partial input at the time the input stream finishes, those extra bytes will be discarded.

sprintf_methods

Convenience method for generating a string from a "sprintf"-style format string and a set of method names to call.

Note that any undef items will be mapped to an empty string.

Example:

$src->sprintf_methods('%d has name %s', qw(id name))
    ->say
    ->await;

as_list

Resolves to a list consisting of all items emitted by this source.

as_arrayref

Resolves to a single arrayref consisting of all items emitted by this source.

as_string

Concatenates all items into a single string.

Returns a Future which will resolve on completion.

combine_latest

with_index

Emits arrayrefs consisting of [ $item, $idx ].

with_latest_from

merge

apply

Used for setting up multiple streams.

Accepts a variable number of coderefs, will call each one and gather Ryu::Source results.

each_as_source

switch_str

Given a condition, will select one of the alternatives based on stringified result.

Example:

$src->switch_str(
 sub { $_->name }, # our condition
 smith => sub { $_->id }, # if this matches the condition, the code will be called with $_ set to the current item
 jones => sub { $_->parent->id },
 sub { undef } # and this is our default case
);

ordered_futures

Given a stream of Futures, will emit the results as each Future is marked ready. If any fail, the stream will fail.

This is a terrible name for a method, expect it to change.

distinct

Emits new distinct items, using string equality with an exception for undef (i.e. undef is treated differently from empty string or 0).

Given 1,2,3,undef,2,3,undef,'2',2,4,1,5, you'd expect to get the sequence 1,2,3,undef,4,5.

distinct_until_changed

Removes contiguous duplicates, defined by string equality.

sort_by

Emits items sorted by the given key. This is a stable sort function.

The algorithm is taken from List::UtilsBy.

nsort_by

Emits items numerically sorted by the given key. This is a stable sort function.

See "sort_by".

rev_sort_by

Emits items sorted by the given key. This is a stable sort function.

The algorithm is taken from List::UtilsBy.

rev_nsort_by

Emits items numerically sorted by the given key. This is a stable sort function.

See "sort_by".

extract_all

Expects a regular expression and emits hashrefs containing the named capture buffers.

The regular expression will be applied using the m//gc operator.

Example:

$src->extract_all(qr{/(?<component>[^/]+)})
# emits { component => '...' }, { component => '...' }

skip

Skips the first N items.

skip_last

Skips the last N items.

take

Takes a limited number of items.

Given a sequence of 1,2,3,4,5 and ->take(3), you'd get 1,2,3 and then the stream would finish.

first

Returns a source which provides the first item from the stream.

some

every

count

sum

mean

max

min

statistics

Emits a single hashref of statistics once the source completes.

filter

filter_isa

Emits only the items which ->isa one of the given parameters.

emit

flat_map

Similar to "map", but will flatten out some items:

  • an arrayref will be expanded out to emit the individual elements

  • for a Ryu::Source, passes on any emitted elements

This also means you can "merge" items from a series of sources.

Note that this is not recursive - an arrayref of arrayrefs will be expanded out into the child arrayrefs, but no further.

each

completed

METHODS - Proxied

The following methods are proxied to our completion Future:

  • then

  • is_ready

  • is_done

  • failure

  • is_cancelled

  • else

await

Block until this source finishes.

finish

Mark this source as completed.

METHODS - Internal

chained

Returns a new Ryu::Source chained from this one.

each_while_source

Like "each", but removes the source from the callback list once the parent completes.

AUTHOR

Tom Molesworth <TEAM@cpan.org>

LICENSE

Copyright Tom Molesworth 2011-2017. Licensed under the same terms as Perl itself.