NAME
Async::Event::Interval - Scheduled and one-off restartable asynchronous events
SYNOPSIS
Here's an example of a simple asynchronous event that fetches JSON data from a website every two seconds using a shared scalar variable to hold the decoded JSON hashref, while allowing the main application to continue running in the foreground. Multiple events can be used simultaneously if desired.
See the "SCENARIOS/EXAMPLES" section for further usage examples.
use warnings;
use strict;
use Async::Event::Interval;
use JSON;
my $event = Async::Event::Interval->new(2, \&callback);
my $api_data_href = $event->shared_scalar;
$event->start;
while (1) {
if ($$api_data_href) {
print "Element 1 of 'data' dict is $$api_data_href->{data}[1]\n";
# ...do other things with data
}
# ...do other things
if ($event->error) {
print $event->error_message;
$event->restart;
}
}
sub callback {
my $api_json = some_web_api_call(); # '{"data": [1, 2, 3]}';
$$api_data_href = decode_json($api_json);
}
DESCRIPTION
Very basic implementation of asynchronous events triggered by a timed interval. If a time of zero is specified, we'll run the event only once while providing the ability to re-run it manually at any time in the future.
Signal handling: The module installs $SIG{INT} and $SIG{TERM} handlers at load time to ensure shared memory segments are cleaned up when the host process is killed by a signal. The handlers stop any running event children, remove all shared memory segments, then re-raise the signal with the default handler so the process exits with the correct status. If you install your own handlers for these signals, call Async::Event::Interval::_end(1) from them before exiting to avoid leaking segments.
The module also sets $SIG{CHLD} = 'IGNORE' at load time to automatically reap forked event children, preventing zombie processes. If you need to manage child processes manually (e.g. to call waitpid yourself), install your own $SIG{CHLD} handler after use Async::Event::Interval.
METHODS - EVENT OPERATION
new($delay, $callback, @params)
Returns a new Async::Event::Interval object. Does not start the event. Use start for that.
Parameters:
$delay
Mandatory: The interval on which to trigger your event callback, in seconds. Represent partial seconds as a floating point number. If zero is specified, we'll simply run the event once and stop.
$callback
Mandatory: A reference to a subroutine that will be called every time the interval expires.
@params
Optional, List: A list of parameters to pass to the callback. Note that these are not shared parameters and are a copy only, so changes to them in the main code will not be seen in the event, and vice-versa. See "shared_scalar" if you'd like to use variables that can be shared between the main application and the events.
Optional: Set a per-callback-execution timeout via timeout() before calling start() to have the event terminate itself if a callback runs longer than the specified number of seconds.
Optional: Set immediate() to have the callback fire immediately on start(), rather than waiting for the first interval.
Also note: These parameters are sent into the event only once. Each time the callback is called, they will receive the exact same set of params.
To have the event get different values in the params each time the callback is called, see start().
start(@params)
Starts the event timer. Each time the interval is reached, the event callback is executed.
Parameters:
@params
Optional, List: A list of parameters that the callback will receive each time the callback is called. This is most effective in single-run mode so you can send in different parameter values on each incarnation. The parameters can be any type of any complexity. Your callback will get them in whatever order you send them in as.
stop
Stops the event from being executed.
Sets a cooperative _stop_requested flag in shared memory so a well-behaved child exits its event loop on the next iteration. If the child is stuck in a long-running callback, escalates: sends SIGTERM and polls for up to STOP_TERM_TIMEOUT seconds, then sends SIGKILL and polls for up to STOP_KILL_TIMEOUT seconds. Croaks if the process survives both signals.
restart
Alias for start(). Re-starts a stop()ped event.
status
Returns the event's process ID (true) if it is running, 0 (false) if it isn't.
Side effect: calling status() probes the event's child process with kill 0 to detect a crashed background process. If the process is gone, the event's internal _started flag is cleared, an internal _crashed flag is set, and pid is cleared (so "pid" subsequently returns undef). Subsequent calls to status(), "error", or "waiting" see the updated state. To clear the crash flag, call start() or "restart".
wait($interval)
Blocks until waiting() returns true, polling at the given interval. Useful for one-shot events where you want to wait for the callback to finish without writing the poll loop by hand.
Parameters:
$interval
Optional, Number: Polling interval in seconds (integer or float). Defaults to 0.01.
Return: Nothing.
Note: wait() returns once the event is dormant for any reason, including crash. Inspect "error" after the call returns if you need to distinguish a clean finish from a crash.
waiting
Returns true if the event is dormant and is ready for a start() or restart() command. Returns false if the event is already running.
Side effect: calls /error() and status() internally, both of which probe the child process (see those methods for details).
The same state is also surfaced as the waiting field in "events" and "info" snapshots, where it can be read without the side effects of this method.
error
Returns true if an event crashed unexpectedly in the background, and is ready for a start() or restart() command. Returns false if the event is not in an error state.
Side effect: calling error() runs the same crash probe documented under "status". The event's internal flags and PID may be mutated as a side effect of this call.
The same state is also surfaced as the error field in "events" and "info" snapshots, where it can be read without the side effects of this method (and without per-object access).
See "errors" for the crash count.
interval($seconds)
Gets/sets the delay time (in seconds) between each execution of the event's callback code. You can use this method to change the delay between event execution during the event's lifecycle.
Parameters:
$seconds
Optional, Number: The number of seconds (integer or floating point) to delay between executions.
Return: Number (integer or float), the number of seconds between execution runs. If the interval was set to zero, the return will be 0.
timeout($seconds)
Sets (or gets) a per-callback-execution timeout in seconds. If the event's callback takes longer than the specified time to complete, the event will terminate itself with an error.
Parameters:
$seconds
Optional, Integer: The number of whole seconds the callback is allowed to execute for before timing out. Must be a non-negative integer; fractional seconds are not supported. Use 0 or undef to disable.
Default: 0
Return: Currently set value.
Note: The timeout is read from shared memory at the start of every callback invocation, so changes made via this setter while an event is running take effect on the next iteration of the interval loop (mirroring interval()).
immediate($value)
Sets (or gets) whether the callback fires immediately on start(), bypassing the first interval wait. Subsequent invocations follow the normal interval cadence.
Parameters:
$value
Optional, Integer: 1 to enable immediate first execution, 0 or undef to disable. Must be a non-negative integer when defined.
Default: 0
Return: Currently set value.
Note: The flag is read from shared memory on each iteration of the event loop. Changes made before calling the initial start() always take effect. Changes made after start() take effect on the next loop iteration; however, once the first callback has executed, immediate has already served its purpose and further changes will not trigger another immediate execution. Restart the event for a fresh immediate check.
Note: This feature is a no-op when running in single run mode. In that mode, the event is always fired immediately on a call to start().
shared_scalar
Returns a reference to a scalar variable that can be shared between the main process and the events. This reference can be used within multiple events, and multiple shared scalars can be created by each event.
To read from or assign to the returned scalar, dereference it:
$$s = 42; # plain number
$$s = 'some string'; # plain string
$$s = { key => 'v' }; # hashref
$$s = [1, 2, 3]; # arrayref
Supported values: Internally IPC::Shareable serializes to JSON by default, so values must be JSON-representable: scalars (strings/numbers), arrayrefs, hashrefs, and combinations of those. Blessed objects, code references, regex references, and globs are not supported and will be silently lost or corrupt the segment.
Nested references work transparently and cleanup is automatic. Note that under the hood, each nested hashref/arrayref allocates its own child shared-memory segment, so very deeply nested structures consume one shm segment per node:
$$s = { config => { db => { host => 'localhost', port => 5432 } } };
my $host = $$s->{config}{db}{host}; # 'localhost'
Updating a stored hashref: When extending a hashref already in the scalar, mutate through the dereference directly. Do not fetch the reference into a lexical, mutate it, and store it back: that pattern corrupts the segment because the fetched reference still carries IPC::Shareable's tied magic, and re-storing a tied value into its own parent breaks the serialization:
# Recommended: direct dereferenced mutation
$$s->{new_key} = 'val';
# Also works (modern stacks): spread + reassign
$$s = { %{$$s}, new_key => 'val' };
# Unreliable: re-storing a fetched reference corrupts the segment
# my $h = $$s; $h->{new_key} = 'val'; $$s = $h;
The spread idiom replaces the entire stored value, which on older IPC::Shareable versions can lose accumulated cross-process writes (later writers replacing earlier writers' data). The direct dereferenced mutation avoids the nested-segment STORE path and is the more portable choice.
Lifetime: The underlying shared memory segment is owned by the event object that created it. When the event goes out of scope (and its DESTROY runs), every shared_scalar it created is released. Do not dereference the returned scalar reference after the owning event has been destroyed; the segment will no longer exist. If you need a shared scalar whose lifetime is independent of any event, tie it directly with IPC::Shareable.
Hex keys: "info" and "events" return shared_scalars as an arrayref of hex key strings. These identify the underlying IPC segments and can be used to re-attach from another process:
my $info = $event->info;
for my $key (@{ $info->{shared_scalars} }) {
tie my $scalar, 'IPC::Shareable', $key, {};
print "$$scalar\n";
}
In practice, however, it is simpler to retain the reference returned by shared_scalar() and use it directly.
METHODS - EVENT INFORMATION
errors
Returns the number of times a started or restarted event has crashed unexpectedly. See "error" to test whether the event is currently in an error state.
error_message
Returns the error message (if any) that caused the most recent event crash.
If the crash was caused by timeout() firing, the message has the form "Callback timed out after N seconds" (where N is the timeout in whole seconds), which consumers can pattern-match on to distinguish timeouts from other callback failures.
events
Returns a plain hash reference containing a snapshot of the data for all existing events. The returned hash is a copy; modifying it will not affect the live events. shared_scalars is an arrayref of the hex key strings for each shared scalar created by the event; use the scalar reference returned by "shared_scalar" to read or write values.
The snapshot is taken under a read lock (LOCK_SH) for consistency.
This method can be called as a class method (Async::Event::Interval->events) since it returns data for all events regardless of caller context.
$VAR1 = {
'0' => {
'pid' => 11859,
'runs' => 16,
'errors' => 0,
'error' => 0,
'waiting' => 0,
'interval' => 5,
'shared_scalars' => [
'0x4a3f2c1b5d6e',
'0x7f8e9d0c1b2a'
],
},
'1' => {
'pid' => 11860,
'runs' => 447,
'errors' => 2,
'error' => 1,
'waiting' => 1,
'interval' => 0.6,
'error_message' => 'File notes.txt not found at scripts/write_file.pl line 227',
}
};
error is 1 if the event is currently stopped because its callback died (mirrors "error"), 0 otherwise. waiting is 1 if the event is dormant and ready for a start()/"restart" call (mirrors "waiting"), 0 if it is currently running. See "info" for the full lifecycle state table.
id
Returns the integer ID of the event.
info
Returns a hash reference containing a snapshot of the event's data. The returned hash is a copy; modifying it will not affect the live event. shared_scalars is an arrayref of hex key strings; use the scalar reference returned by "shared_scalar" to read or write values.
The snapshot is taken under a read lock (LOCK_SH) for consistency.
$VAR1 = {
'pid' => 6841,
'runs' => 4077,
'errors' => 0,
'error' => 0,
'waiting' => 0,
'interval' => 1.4,
'shared_scalars' => [
'0x4a3f2c1b5d6e',
'0x7f8e9d0c1b2a'
],
};
The error and waiting fields mirror the "error" and "waiting" methods but can be read from any process holding a reference to the %events hash without the side effects of those methods. error is a stored flag (written by the child when its callback dies, or by _detect_crash on the next probe for externally-killed children). waiting is derived on every snapshot from pid, error, and a kill(0, $pid) liveness probe, so it always reflects the current state.
The following table summarises the values across the event lifecycle:
State error waiting
-----------------------------------------------------------------
Just instantiated, never started 0 1
Currently running 0 0
Stopped cleanly via stop() 0 1
One-shot finished cleanly 0 1
Callback died (interval mode) 1 1
Callback died (one-shot) 1 1
timeout() fired (callback alarmed out) 1 1
External `kill -9` of the child 1 1
Restarted after a crash 0 0
pid
Returns the Process ID the event is running under:
undefbeforestart()has ever been calledundefafter a crashed event has been detected (via a call to "error", "status", or "waiting") and until the nextstart()/restart()The PID of the most recent child after a clean
stop()(a dead process; provided for diagnostic purposes only)A positive integer (the PID of the currently running child) otherwise
Note: Use "status" and "error" to determine which state applies; do not interpret the PID integer value beyond "some past or current child PID". Prior versions returned the magic value -99 after a crash; that sentinel has been retired in favor of "error".
runs
Returns the number of executions of the event's callback routine.
SCENARIOS/EXAMPLES
Run once
Send in an interval of zero (0) to have your event run a single time. Call start() (or restart()) repeatedly for numerous individual/one-off runs.
use Async::Event::Interval;
my $event = Async::Event::Interval->new(0, sub {print "hey\n";});
$event->start;
# Do other work while the event runs...
# waiting() probes the child process; returns true once the
# one-shot has finished, allowing a clean restart
$event->start if $event->waiting;
Change delay interval during operation
Change the delay interval from 5 to 600 seconds after the event has fired 100 times
use Async::Event::Interval;
my $event = Async::Event::Interval->new(5, sub {print "hey\n";});
$event->start;
while (1) {
if ($event->runs > 99 && $event->interval != 600) {
$event->interval(600);
}
#... do stuff
}
Closures and lexical variables
When a callback closes over a lexical variable, the child process sees the value that existed at the moment of fork. For one-shot events (interval 0), each start() forks a fresh child, so changes to the lexical between calls are visible:
use Async::Event::Interval;
my $msg = "first run";
my $e = Async::Event::Interval->new(0, sub { print "$msg\n"; });
$e->start; # prints "first run"
select(undef, undef, undef, 0.3);
$msg = "second run";
$e->start; # prints "second run"
Note: For interval events (interval > 0), the child is forked once on the first start() and loops. Parent-side changes to closed-over lexicals will never be seen by the already-running child. Use "shared_scalar" or start(@params) for data that must cross process boundaries mid-run.
Per callback execution parameters
When using an event in a one-off situation where you restart the same event manually, you can send in parameters that differ for each execution.
Send in a list of any data type. The list will be sent as-is to the callback.
NOTE: Parameters sent in to the start() method will override ones sent into the new() method.
For example:
use Async::Event::Interval;
my @params = (
{ a => 1 },
{ b => 2 },
{ c => 3 },
);
my $event = Async::Event::Interval->new(0, \&callback);
my $count = 0;
for my $href (@params) {
$event->start($count, $href);
while (! $event->waiting) {}
$count++;
}
sub callback {
my ($count, $href) = @_;
my ($k, $v) = each %$href;
print "$count: $k = $v\n";
}
Global event callback parameters
You can send in a list of parameters to the event callback when instantiating the event. Note that these parameters will remain the same for every call of the callback.
Changing these within the main program will have no effect on the values sent into the event itself. These parameter variables are copies and are not shared. For shared variables, see "shared_scalar".
use Async::Event::Interval;
my @params = qw(1 2 3);
my $event = Async::Event::Interval->new(
1,
\&callback,
@params
);
sub callback {
my ($one, $two, $three) = @_;
print "$one, $two, $three\n";
}
Shared data across events
This software uses IPC::Shareable internally, so it's automatically installed for you already. You can use shared data for use across many processes and events, and if you use the same IPC key, even across multiple scripts.
Here's an example that uses a hash that's stored in shared memory, where the parent process (the script) and two other processes (the two events) all share and update the same hash.
Important: keep shared hash values flat (strings, numbers). Nested data structures (e.g. $hash{$$}{key}) cause IPC::Shareable to create child shared-memory segments whose ownership can conflict across forked processes, leading to data loss. For per-event shared data, consider "shared_scalar" instead.
use Async::Event::Interval;
use IPC::Shareable;
tie my %shared_data, 'IPC::Shareable', {
key => '123456789',
create => 1,
destroy => 1
};
$shared_data{$$}++;
my $event_one = Async::Event::Interval->new(0.2, \&update);
my $event_two = Async::Event::Interval->new(1, \&update);
$event_one->start;
$event_two->start;
sleep 10;
$event_one->stop;
$event_two->stop;
for my $pid (keys %shared_data) {
printf(
"Process ID %d executed %d times\n",
$pid,
$shared_data{$pid}
);
}
for my $event ($event_one, $event_two) {
printf(
"Event ID %d with PID %d ran %d times, with %d errors and an interval" .
" of %.2f seconds\n",
$event->id,
$event->pid,
$event->runs,
$event->errors,
$event->interval
);
}
sub update {
# Because each event runs in its own process, $$ will be set to the
# process ID of the calling event, even though they both call this
# same function
$shared_data{$$}++;
}
Event error management
If an event crashes, print out error information and restart the event. If an event crashes five or more times, print the most recent error message and halt the program so you can figure out what's wrong with your callback code.
use Async::Event::Interval;
my $event = Async::Event::Interval->new(5, sub {print "hey\n";});
$event->start;
while (1) {
#... do stuff
if ($event->errors >= 5) {
print $event->error_message;
exit;
}
if ($event->error) {
printf(
"Runs: %d, Runs errored: %d, Last error message: %s\n",
$event->runs,
$event->errors,
$event->error_message
);
$event->restart;
}
}
Event crash: Restart event
use warnings;
use strict;
use Async::Event::Interval;
# kill 9, $$ is a contrived self-kill to demonstrate crash detection
my $event = Async::Event::Interval->new(0.5, sub { kill 9, $$; });
$event->start;
sleep 1; # Do stuff
if ($event->error) {
print "Event crashed, restarting\n";
$event->restart;
}
Event crash: End program
use warnings;
use strict;
use Async::Event::Interval;
# kill 9, $$ is a contrived self-kill to demonstrate crash detection
my $event = Async::Event::Interval->new(0.5, sub { kill 9, $$; });
$event->start;
sleep 1; # Do stuff
die "Event crashed, can't continue" if $event->error;
Immediate first execution
Set immediate to have the callback fire right away on start(), then repeat at the regular interval thereafter:
use Async::Event::Interval;
my $event = Async::Event::Interval->new(5, sub { print "hey\n"; });
$event->immediate(1);
$event->start;
sleep 10;
$event->stop;
Event suicidal timeout
Built in is the ability to have the event die() if your callback breaches a timeout threshold. A timeout is set with timeout(). It can be set at any time; it will be picked up on each iteration of your callback.
use Async::Event::Interval;
my $event = Async::Event::Interval->new(60, sub { sleep 9; });
$event->timeout(8);
$event->start;
while (1) {
if ($event->error_message =~ /Callback timed out/) {
print "Event callback timed out... exiting to troubleshoot\n";
exit;
}
}
Shared scalar
shared_scalar() returns a tied scalar reference whose value lives in shared memory and is visible to the parent and to event callbacks. The sub-sections below show common usage patterns; see shared_scalar() for the API reference and constraints.
Storing simple types
A shared scalar can hold any JSON-representable value: scalars, arrayrefs, hashrefs, or combinations thereof.
use Async::Event::Interval;
my $event = Async::Event::Interval->new(0, sub {});
my $s = $event->shared_scalar;
$$s = 42;
$$s = 'hello';
$$s = [1, 2, 3];
$$s = { lang => 'Perl' };
print "$$s->{lang}\n";
Event writes, parent reads
An event populates the scalar in the background; the parent reads it after the callback finishes.
use Async::Event::Interval;
my $s;
my $event = Async::Event::Interval->new(0, sub {
$$s = { name => 'alice', score => 42 };
});
$s = $event->shared_scalar;
$event->start;
$event->wait;
print "$$s->{name}: $$s->{score}\n";
Updating a stored hashref
When extending a hashref already in the scalar, mutate through the dereference directly. The spread idiom also works on modern IPC::Shareable stacks but is less portable across forked writers on older versions:
$$s = { a => 1, b => 2 };
# Recommended: direct dereferenced mutation
$$s->{c} = 3;
# Also works (modern stacks): spread + reassign
$$s = { %{$$s}, d => 4 };
Do not fetch the reference into a lexical, mutate it, and store it back (my $h = $$s; $h->{x} = 1; $$s = $h;) - that pattern corrupts the segment. See shared_scalar() for the full rules.
Two events sharing a scalar
Multiple events can read/write the same shared scalar via closure. The segment is owned by the event that created it; other events reference it through the closed-over variable.
use Async::Event::Interval;
my $s;
my $event_a = Async::Event::Interval->new(0, sub {
$$s = { source => 'A', value => 100 };
});
$s = $event_a->shared_scalar;
my $event_b = Async::Event::Interval->new(0, sub {
$$s = { %{$$s}, source => 'B' };
});
$event_a->start; $event_a->wait;
$event_b->start; $event_b->wait;
print "source=$$s->{source} value=$$s->{value}\n";
Multiple scalars on one event
One event can own several shared scalars - for example, one for input the callback reads and one for results the parent reads back.
use Async::Event::Interval;
my ($s_in, $s_out);
my $event = Async::Event::Interval->new(0, sub {
$$s_out = { sum => $$s_in->{a} + $$s_in->{b} };
});
$s_in = $event->shared_scalar;
$s_out = $event->shared_scalar;
$$s_in = { a => 3, b => 4 };
$event->start;
$event->wait;
print "sum=$$s_out->{sum}\n";
Periodic background writes
An interval event writes to the shared scalar on each tick; the parent reads the latest value at its own pace.
use Async::Event::Interval;
my $s;
my $event = Async::Event::Interval->new(1, sub {
my $prev = ($$s && $$s->{count}) || 0;
$$s = { time => scalar localtime, count => $prev + 1 };
});
$s = $event->shared_scalar;
$event->start;
for (1..5) {
sleep 1;
print "$$s->{time} (tick $$s->{count})\n" if $$s;
}
$event->stop;
Wait for one-shot completion
Use wait() to block until the event becomes dormant instead of writing the poll loop by hand. wait() returns whether the event finished cleanly or crashed; inspect "error" after it returns to distinguish the two.
use Async::Event::Interval;
my $event = Async::Event::Interval->new(0, sub {
select(undef, undef, undef, 0.5); # simulate work
print "callback done\n";
});
$event->start;
$event->wait; # blocks until the callback finishes
die "callback crashed: " . $event->error_message if $event->error;
Pass a poll interval to tune responsiveness (default is 0.01):
$event->wait(0.001); # fine-grained polling
AUTHOR
Steve Bertrand, <steveb at cpan.org>
LICENSE AND COPYRIGHT
Copyright 2024 Steve Bertrand.
This program is free software; you can redistribute it and/or modify it under the terms of either: the GNU General Public License as published by the Free Software Foundation; or the Artistic License.
See http://dev.perl.org/licenses/ for more information.