NAME
Async::Stream - it's convenient way to work with async data flow.
IMPORTANT! PUBLIC INTERFACE ISN'T STABLE, DO NOT USE IN PRODACTION BEFORE VERSION 1.0.
VERSION
Version 0.08
SYNOPSIS
Module helps to organize your async code to stream.
use Async::Stream;
my @urls = qw(
http://ucoz.com
http://ya.ru
http://google.com
);
my $stream = Async::Stream->new_from(@urls);
$stream
->transform(sub {
$return_cb = shift;
http_get $_, sub {
$return_cb->({headers => $_[0], body => $_[0]})
};
}, 'async')
->filter(sub { $_->{headers}->{Status} =~ /^2/ })
->for_each(sub {
my $item = shift;
print $item->{body};
});
SUBROUTINES/METHODS
new($generator)
Constructor creates instance of class. Class method gets 1 arguments - generator subroutine references to generate items. Generator will get a callback which it will use for returning result. If generator is exhausted then returning callback is called without arguments.
my $i = 0;
my $stream = Async::Stream->new(sub {
$return_cb = shift;
if ($i < 10) {
$return_cb->($i++);
} else {
$return_cb->();
}
});
new_from(@array_of_items)
Constructor creates instance of class. Class method gets a list of items which are used for generating streams.
my @domains = qw(
ucoz.com
ya.ru
googl.com
);
my $stream = Async::Stream->new_from(@urls)
head()
Method returns stream's head item. Head is a instance of class Async::Stream::Item.
my $stream_head = $stream->head;
set_prefetch($number)
Method returns stream's head item. Head is a instance of class Async::Stream::Item.
$stream->set_prefetch(4);
iterator()
Method returns stream's iterator. Iterator is a instance of class Async::Stream::Iterator.
my $stream_iterator = $stream->iterator;
CONVEYOR METHODS
peek($action)
This method helps to debug streams data flow. You can use this method for printing or logging steam data and track data mutation between stream's transformations.
$stream->peek(sub { print $_, "\n" })->to_arrayref(sub {print @{$_[0]}});
filter($predicate)
The method filters current stream. Filter works like lazy grep.
$stream->filter(sub {$_ % 2})->to_arrayref(sub {print @{$_[0]}});
transform($transformer)
Method transform current stream. Transform works like lazy map with async response. You can use the method for example for async http request or another async operation.
$stream->transform(sub {
$return_cb = shift;
$return_cb->($_ * 2)
})->to_arrayref(sub {print @{$_[0]}});
append(@list_streams)
The method appends several streams to tail of current stream.
$stream->append($stream1)->to_arrayref(sub {print @{$_[0]}});
skip($number)
The method skips $number items in stream.
$stream->skip(5)->to_arrayref(sub {print @{$_[0]}});
limit($number)
The method limits $number items in stream.
$stream->limit(5)->to_arrayref(sub {print @{$_[0]}});
sorted($comparator)
The method sorts whole stream.
$stream->sorted(sub{$a <=> $b})->to_arrayref(sub {print @{$_[0]}});
cut_sorted($predicate, $comparator)
Sometimes stream can be infinity and you can't you $stream->sorted, you need certain parts of streams for example cut part by length of items.
$stream
->cut_sorted(sub {length $a != length $b},sub {$a <=> $b})
->to_arrayref(sub {print @{$_[0]}});
merge_in($comparator, @list_streams);
Merge additional streams into current stream by comparing each item of stream.
$stream->merge_in(sub{$a <=> $b}, $stream1, $stream2);
$branch_stream branch_out($predicat);
Split stream into 2 stream are divided by predicat. Branch returns 2 streams. First stream will contain "true" items, Second - "false" items;
my $error_stream = $stream->branch_out(sub{$_->{headers}{status} != 200});
distinct($key_generator)
Method discards duplicate items from stream. By default uniqueness of items will be determined by textual representation of item.
$stream->distinct(sub {$_->{name}})->to_arrayref(sub {print @{$_[0]}});
TERMINAL METHODS
to_arrayref($returing_cb)
Method returns stream's iterator.
$stream->to_arrayref(sub {
$array_ref = shift;
#...
});
for_each($action)
Method execute action on each item in stream.
$stream->to_arrayref(sub {
$array_ref = shift;
#...
});
reduce($accumulator, $returing_cb)
Performs a reduction on the items of the stream.
$stream->reduce(
sub{ $a + $b },
sub {
$sum = shift
#...
});
sum($returing_cb)
The method computes sum of all items in stream.
$stream->sum(
sub {
$sum = shift
#...
});
min($returing_cb)
The method finds out minimum item among all items in stream.
$stream->min(
sub {
$sum = shift
#...
});
max($returing_cb)
The method finds out maximum item among all items in stream.
$stream->max(
sub {
$sum = shift
#...
});
count($returing_cb)
The method counts number items in streams.
$stream->count(sub {
$count = shift;
});
any($predicat, $return_cb)
Method look for any equivalent item in steam. if there is any then return that. if there isn't then return nothing.
$stream->any(sub {$_ % 2})->to_arrayref(sub {print @{$_[0]}});
AUTHOR
Kirill Sysoev, <k.sysoev at me.com>
BUGS AND LIMITATIONS
Please report any bugs or feature requests to https://github.com/pestkam/p5-Async-Stream/issues.
SUPPORT
You can find documentation for this module with the perldoc command.
perldoc Async::Stream::Item
LICENSE AND COPYRIGHT
Copyright 2017 Kirill Sysoev.
This program is free software; you can redistribute it and/or modify it under the terms of the the Artistic License (2.0). You may obtain a copy of the full license at: