NAME
Schedule::Easing::Stream - Control timestamp update, queuing, and buffering of input.
SYNOPSIS
my $stream=Schedule::Easing::Stream->new(
fh =>\*STDIN,
input =>sub { my (@lines)=@_; ... },
update=>sub { my ($epoch)=@_; ... },
### configuration ###
lines =>5,
clock =>10,
regexp=>qr/(\d+)/,
batch =>16,
sleep =>4,
);
$stream->read();
DESCRIPTION
This stream handler permits quick configuration for various forms of line buffering and queueing, coupled with a mechanism to control associated per-line timestamps. By default, when passing lines through a stream, each line is processed individually by the input
handler and the update
callback, if defined, will be called with the current epoch time. In the default configuration, the handler is equivalent to:
while(<$fh>) {
&input(line);
&update(time());
}
Ideally the input
can be called with batches of lines for better runtime before and, for most applications, it's unnecessary to call for the system time()
with every new line of input, particularly when processing historical data, or when data is arriving quickly.
CONFIGURATION
Calling Update
By default, the update
callback, if defined, is called for every single line of input. This can be adjusted by setting one or more of lines
, clock
, or regexp
.
Lines
Setting lines=>N
will invoke update
every N
lines. Subsequent lines will retain the new value until the line counter again reached N
. If clock
is used, it will not reset the line counter. If regexp
is used and matches a line, it will reset the line counter.
Clock
Setting clock=>T
uses alarm()
to invoke update
every T
seconds. This occurs even while waiting for additional input. Subsequent lines retain the new value until the another T
seconds have passed.
Regular Expression
Setting regexp=>qr/...(re).../
takes the epoch seconds from the input line itself. When re
matches, the value in the first capture group is passed to update
. Subsequent lines use the captured value unless they also match. If lines
is in use, the line counter will be reset. Any clock
alarm is not reset.
This option is primarily useful for processing of existing, timestamped logs. Be cautious that using regexp
in combination with other contigurations can result in time running backwards or randomly.
Batching
Setting batch=>M
will collect M
lines before calling input(@batch)
. The update
callback is called after the batch is processed, so all lines in the batch will have the same timestamp. This is useful for high-speed data where many lines are read from an existing file, or from a tool producing data, every second.
If lines
is set, the line counter is checked only once after M
batched lines are processed, after which the line counter will be reset. Therefore, lines>M
may only perform an update
in some of the batches, whereas lines<=M
will call update
after every batch.
If clock
is set, when the alarm()
fires, any existing, partial batch will be passed to input
first, and then update
will be called. This permits "batching with timeout" to ensure that no batch is held for more than clock
seconds, but it can output partial batches.
If regexp
is set, it first processes any existing, partial batch, before passing the single matched line to input
.
To enforce a fixed batch size despite other settings and alarm()
interruptions, the input
handler can perform its own batching.
Sleeping
If sleep=>S
is set, after processing a batch (possibly a single line), the stream handler will wait S
seconds if no input is currently available on fh
. For high-speed data, setting sleep
may introduce a slight delay to check the state of the fh
. For mixed speed data, be cautious that the chosen sleep
does not result in a full input buffer that blocks incoming data.
Sleeping may be interrupted by the alarm()
call when clock
is set, so there will be no delay before receiving the next line.
When regexp
is set, sleeping will not occur for any existing, partial batch, only after the matched line is processed.