NAME

Schedule::Easing::Stream - Control timestamp update, queuing, and buffering of input.

SYNOPSIS

my $stream=Schedule::Easing::Stream->new(
	fh    =>\*STDIN,
	input =>sub { my (@lines)=@_; ... },
	update=>sub { my ($epoch)=@_; ... },

	### configuration ###
	lines =>5,
	clock =>10,
	regexp=>qr/(\d+)/,
	batch =>16,
	sleep =>4,
);

$stream->read();

DESCRIPTION

This stream handler permits quick configuration for various forms of line buffering and queueing, coupled with a mechanism to control associated per-line timestamps. By default, when passing lines through a stream, each line is processed individually by the input handler and the update callback, if defined, will be called with the current epoch time. In the default configuration, the handler is equivalent to:

while(<$fh>) {
	&input(line);
	&update(time());
}

Ideally the input can be called with batches of lines for better runtime before and, for most applications, it's unnecessary to call for the system time() with every new line of input, particularly when processing historical data, or when data is arriving quickly.

CONFIGURATION

Calling Update

By default, the update callback, if defined, is called for every single line of input. This can be adjusted by setting one or more of lines, clock, or regexp.

Lines

Setting lines=>N will invoke update every N lines. Subsequent lines will retain the new value until the line counter again reached N. If clock is used, it will not reset the line counter. If regexp is used and matches a line, it will reset the line counter.

Clock

Setting clock=>T uses alarm() to invoke update every T seconds. This occurs even while waiting for additional input. Subsequent lines retain the new value until the another T seconds have passed.

Regular Expression

Setting regexp=>qr/...(re).../ takes the epoch seconds from the input line itself. When re matches, the value in the first capture group is passed to update. Subsequent lines use the captured value unless they also match. If lines is in use, the line counter will be reset. Any clock alarm is not reset.

This option is primarily useful for processing of existing, timestamped logs. Be cautious that using regexp in combination with other contigurations can result in time running backwards or randomly.

Batching

Setting batch=>M will collect M lines before calling input(@batch). The update callback is called after the batch is processed, so all lines in the batch will have the same timestamp. This is useful for high-speed data where many lines are read from an existing file, or from a tool producing data, every second.

If lines is set, the line counter is checked only once after M batched lines are processed, after which the line counter will be reset. Therefore, lines>M may only perform an update in some of the batches, whereas lines<=M will call update after every batch.

If clock is set, when the alarm() fires, any existing, partial batch will be passed to input first, and then update will be called. This permits "batching with timeout" to ensure that no batch is held for more than clock seconds, but it can output partial batches.

If regexp is set, it first processes any existing, partial batch, before passing the single matched line to input.

To enforce a fixed batch size despite other settings and alarm() interruptions, the input handler can perform its own batching.

Sleeping

If sleep=>S is set, after processing a batch (possibly a single line), the stream handler will wait S seconds if no input is currently available on fh. For high-speed data, setting sleep may introduce a slight delay to check the state of the fh. For mixed speed data, be cautious that the chosen sleep does not result in a full input buffer that blocks incoming data.

Sleeping may be interrupted by the alarm() call when clock is set, so there will be no delay before receiving the next line.

When regexp is set, sleeping will not occur for any existing, partial batch, only after the matched line is processed.