NAME
PAGI::EventLoops - how PAGI stays loop-agnostic, how not to block, and how a server binds to an event loop
DESCRIPTION
A PAGI application is async sub ($scope, $receive, $send) -- Futures and async/await, nothing more. The protocol names no event loop, and your app must not assume IO::Async, or any loop, is already set up. PAGI::Server is the reference implementation, not the implementation; other servers are expected, and your app should run unchanged on any of them.
That the protocol names no event loop is a deliberate stance, not an oversight. Perl's event-loop landscape is genuinely fragmented -- IO::Async, AnyEvent, POE, Mojo::IOLoop, and others, each with its own reactor and ecosystem, none of them the standard. PAGI does not try to end that by crowning a winner, and it does not add yet another loop of its own. It embraces the fragmentation instead: because the contract is just Futures and await, which name no loop, an application composes against whichever reactor happens to be running, and the choice of loop is pushed out to the server and its deployment -- where it belongs.
This document has four parts, meant to be read in order:
Why PAGI is loop-agnostic -- what the application contract does, and does not, say about the loop.
Not blocking the loop -- how to do real work (I/O, timers, calls to other services) in a handler without stalling every other connection.
Writing your own event sources -- timers, background jobs, and external triggers, modelled as Futures and rooted so they do not leak.
Binding a server to a loop -- reference-server specifics: how PAGI::Server attaches to its own loop, or embeds in an application that owns the loop.
Parts 1 and 2 are what most applications need; parts 3 and 4 build on them.
PART 1: WHY PAGI IS LOOP-AGNOSTIC
Look at the application contract and notice what is not there:
async sub {
my ($scope, $receive, $send) = @_;
...
}
There is no loop object, no reactor, no run to call. The signature names $scope, $receive, and $send, and that is all. Everything else is expressed with Futures and await.
await is the key. When you write await $some_future, your handler suspends until that Future is resolved. It does not say how the Future gets resolved, or which loop is turning while it waits. That is decided entirely by whatever created the leaf Future at the bottom of the chain -- the socket read inside $receive, the timer inside a Future::IO->sleep, the database driver's query Future. The loop enters your program only at those leaves. Your control flow never refers to it.
This is why the same handler runs unchanged under different servers. One server might drive everything with IO::Async; another might use a different reactor entirely. Your code does not change, because it never named the reactor in the first place. It only ever said "wait for this Future."
PAGI::Server happens to be built on IO::Async. That is an implementation choice of one reference server, not a requirement of the protocol. Write to the contract -- Futures and await -- and you are portable by construction.
PART 2: NOT BLOCKING THE LOOP
Loop-agnostic does not mean loop-immune. There is exactly one rule that matters for performance: a handler must never block the loop. A single-threaded event loop runs your handler on the same thread that services every other connection. If your handler stops to wait synchronously, every other client stops with it.
What counts as blocking:
sleep(the builtin) -- stops the whole process, not just your handler.Synchronous network clients --
LWP::UserAgent, a blockingDBIquery, a plain socketread. They return only when the I/O completes, and nothing else runs in the meantime.Synchronous file I/O on slow or remote filesystems, and large reads done in one gulp.
CPU-bound loops -- a tight computation that runs for hundreds of milliseconds holds the loop just as surely as a blocking read. Yield, offload, or chunk it.
The cure is to use an asynchronous equivalent that returns a Future and to await it.
Use Future::IO
Future::IO is what you want. It provides loop-neutral primitives -- sleep, socket reads and writes, and so on -- that return Futures without your code naming any loop. A handler written against Future::IO runs under any server whose deployment has wired a Future::IO backend (see "Who wires the Future::IO backend" below), so it stays portable across servers and loops.
use Future::AsyncAwait;
use Future::IO;
my $app = async sub {
my ($scope, $receive, $send) = @_;
die "Unsupported scope type: $scope->{type}" if $scope->{type} ne 'http';
# Non-blocking pause: the loop keeps serving other clients meanwhile.
await Future::IO->sleep(1);
await $send->({
type => 'http.response.start',
status => 200,
headers => [ ['content-type', 'text/plain'] ],
});
await $send->({
type => 'http.response.body',
body => "waited one second without blocking the loop\n",
});
};
Write your handlers this way unless you have a specific reason not to.
Who wires the Future::IO backend
Future::IO is loop-neutral until something selects a backend for it, and that selection is the job of the server or the deployer, not the application. Your handler just calls Future::IO->sleep (and friends) and stays backend-agnostic; something underneath it must have pointed Future::IO at the loop that is actually running, before your handlers load.
How that happens is server-specific. The two patterns below use the reference server, PAGI::Server, as the example -- another PAGI server would wire its own loop's backend its own way, and your application would not change:
The server wires it for you. Under
bin/pagi-serverthe Future::IO-to-IO::Async backend is configured automatically (when Future::IO is installed), before your app loads -- so the app carries no wiring line at all:# app.pl use Future::AsyncAwait; use Future::IO; # and NO `use Future::IO::Impl::IOAsync` my $app = async sub { my ($scope, $receive, $send) = @_; await Future::IO->sleep(1); # ticks; the server already wired the backend ... }; $app;and run it with nothing special:
pagi-server --app app.pl --port 5000You wire it yourself. When you embed a server in your own program -- so you own the loop -- you select the backend, by loading a Future::IO implementation before any Future::IO-based library. For an IO::Async loop that backend is Future::IO::Impl::IOAsync:
# server.pl -- you own the loop, so you wire the backend use Future::AsyncAwait; use IO::Async::Loop; use Future::IO; # the loop-neutral interface your handler calls use Future::IO::Impl::IOAsync; # select its backend -- before any library below use Async::Redis; # a Future::IO-based library (now uses that backend) use PAGI::Server; my $app = async sub { my ($scope, $receive, $send) = @_; await Future::IO->sleep(1); # resolves on the loop created below ... }; my $loop = IO::Async::Loop->new; my $server = PAGI::Server->new(app => $app, port => 5000); $loop->add($server); $server->listen->get; $loop->run;The load order is the load-bearing part:
Future::IO::Impl::IOAsyncmust be in effect before anything calls Future::IO, or a library could bind a different (or missing) backend first. A server built on a different reactor would have you load that reactor's Future::IO backend instead. The embedding mechanics themselves --add,listen,run, and sharing one loop with a host application -- are covered in Part 4.
The principle is the constant -- a backend must be wired before your handlers run. The patterns are only where that wiring ends up: inside the server's startup, or in your own embedding script. For the PAGI::Server specifics, see "LOOP INTEROPERABILITY" in PAGI::Server and Part 4 below.
A pragmatic alternative: IO::Async directly
Future::IO is the portable choice, but it is also the newer one. Its ecosystem of backends and client libraries is still growing, whereas IO::Async is mature and brings a deep, battle-tested catalogue of Net::Async::* clients. So reaching for IO::Async directly -- its timers and those clients -- is a perfectly reasonable choice today, as long as you accept the trade-off: the handler names a loop, and is no longer portable to a server that is not running IO::Async.
On PAGI::Server that loop is always there -- the reference server is IO::Async -- and a handler reaches it through the ordinary constructor: IO::Async::Loop->new returns the already-running loop (a cached singleton), not a fresh one. So you add a client to the loop that is already turning and await it:
use Future::AsyncAwait;
use IO::Async::Loop;
use Net::Async::HTTP;
my $app = async sub {
my ($scope, $receive, $send) = @_;
die "Unsupported scope type: $scope->{type}" if $scope->{type} ne 'http';
my $loop = IO::Async::Loop->new; # the server's running loop, not a new one
my $http = Net::Async::HTTP->new;
$loop->add($http);
# Non-blocking: the loop serves other clients while this request is in flight.
my $res = await $http->GET('http://example.com/');
await $send->({
type => 'http.response.start',
status => 200,
headers => [ ['content-type', 'text/plain'] ],
});
await $send->({ type => 'http.response.body', body => $res->content });
};
And the coupling is gentler than "you named a loop" makes it sound, because IO::Async is itself a kind of super-loop: it ships adapter loops (IO::Async::Loop::EV, IO::Async::Loop::Glib, and others) that let one IO::Async loop drive a foreign reactor. So this handler still runs on a non-IO::Async host -- as long as an IO::Async loop (possibly an adapter sitting over that host's own reactor) is the one actually turning. What it cannot do is run where no IO::Async loop is in play at all.
The preference still stands: where you can, write against Future::IO, and consider contributing a backend or a client to it -- that is how the younger ecosystem catches up. But using IO::Async directly, when its clients save you real work today, is a reasonable call rather than a mistake; just make it knowing which loop you have named.
PART 3: WRITING YOUR OWN EVENT SOURCES
The protocol gives you $receive for client-driven events. But a handler often needs to wait on something the client did not send: a timer firing, a background job finishing, a message arriving on an external queue. The loop-agnostic way to do this is to model the event as a Future and let the handler await it, exactly as it awaits $receive. You introduce no new scope types and need no server cooperation -- the source is just ordinary Future plumbing inside your program. (See "Defining your own events" in PAGI::Spec::Extensions for where this sits in the extension model.)
An app is a tree of futures
Think of an event-driven application as a single tree of futures. A linear await sequence is a chain; concurrent work branches -- a node awaits several children at once. The event loop's only job is to turn the root; everything else is a child held by its parent.
This frame is what makes background work correct. A long-lived source -- a ticker, a queue listener, a health probe -- is a branch of the tree, and it must have a parent that holds it. A future with no parent is an orphan, and an orphan is exactly the "lost its returning future" footgun (see below). In a raw PAGI app the natural parent for app-lifetime work is the lifespan scope: the lifespan handler is invoked once and stays suspended for the whole life of the app, so the server holds it -- and anything it holds -- until shutdown.
Composing branches: from one future to many
A handler usually awaits one thing at a time, but a branch of the tree often needs several at once. Three tools cover the range:
One future:
await $f-- a single timer, request, or query (the Future::IO examples above).A fixed set: the Future combinators.
Future->wait_any(@f)resolves on the first to finish and cancels the rest;Future->needs_all(@f)waits for all and fails if any one does. Reach for these when you know the futures up front.An ongoing set: Future::Selector. It holds a set of generators, each producing a future; when one completes, the selector calls that generator again for the next.
$selector->rundrives the whole set and fails if any source fails, so a crashing background source surfaces instead of vanishing silently. Reach for it when you have one or more long-lived sources that keep producing.
The periodic source below uses all three: Future::IO timers (one future each), a Future::Selector to keep two sources regenerating and to propagate their failures, and wait_any to race the running selector against shutdown.
A periodic source, rooted in lifespan
A worked example: a ticker that fires every couple of seconds, plus a long-poll endpoint that lets a client wait for the next tick. The runnable version is examples/14-periodic-events; the essentials follow.
The source and the request handlers must rendezvous: the source produces ticks, the handlers wait for them. Share that rendezvous as a small object stored in $scope->{state}. This matters because each request scope receives a shallow copy of the lifespan state -- the values (object references) are shared, but the top-level keys are private to each copy. So store one object and reach it through that shared reference; never reassign a top-level state key and expect other scopes to see it (see "Lifespan State" in PAGI::Spec::Lifespan). The hub owns its waiter list and only ever mutates it in place:
package TickHub {
sub new { bless { count => 0, waiters => [] }, shift }
sub count { $_[0]{count} }
sub next_tick { # subscribe: a Future for the next tick
my $self = shift;
push @{ $self->{waiters} }, my $f = Future->new;
return $f;
}
sub publish { # advance, then wake everyone waiting
my $self = shift;
$self->{count}++;
# splice drains the list IN PLACE (never reassigns the slot); iterating
# the drained copy means a subscriber that re-subscribes synchronously
# cannot grow the list we are walking.
$_->done($self->{count}) for splice @{ $self->{waiters} };
}
}
Start the sources in the lifespan handler and hold them in a Future::Selector. Two share one selector here -- a fast ticker and a slower heartbeat. Because the selector lives in the lifespan handler's frame, and the server holds that handler, the sources are retained for the app's lifetime with no file-scoped variables:
async sub handle_lifespan {
my ($scope, $receive, $send) = @_;
my $hub = ($scope->{state} //= {})->{ticks} = TickHub->new; # stored once
while (1) { last if (await $receive->())->{type} eq 'lifespan.startup' }
await $send->({ type => 'lifespan.startup.complete' });
my $selector = Future::Selector->new;
$selector->add(data => 'ticker', gen => async sub {
await Future::IO->sleep(2); # a Future::IO timer names no loop
$hub->publish;
return;
});
$selector->add(data => 'heartbeat', gen => async sub {
await Future::IO->sleep(5); # a second, independent source
$hub->beat;
return;
});
# Run the source until shutdown. If a source fails, run() fails and the
# error propagates out of this handler for the server to log -- it does
# not vanish.
my $shutdown = (async sub {
while (1) { return if (await $receive->())->{type} eq 'lifespan.shutdown' }
})->();
await Future->wait_any($shutdown, $selector->run);
await $send->({ type => 'lifespan.shutdown.complete' });
}
The HTTP handler "listens" for the next tick by subscribing to the hub and awaiting the Future it returns. While it waits, the loop serves every other request:
async sub handle_http {
my ($scope, $receive, $send) = @_;
my $hub = $scope->{state}{ticks};
# ... drain the request body ...
if (($scope->{path} // '/') eq '/next') {
await reply($send, 200, { tick => await $hub->next_tick }); # wakes on the next tick
}
else {
await reply($send, 200, { count => $hub->count, beats => $hub->beats });
}
}
That is the whole pattern: a source on a branch of the tree resolving Futures, and a handler that awaits them.
The same hub also powers a long-running stream. Instead of awaiting one tick and replying, a handler can hold the connection open and, in a loop, await $hub->next_tick and emit each one as a response-body chunk (more => 1) until the client disconnects -- a stream whose content is driven entirely by events outside the request. This is the shape of live feeds, progress streams, and notification endpoints. examples/14-periodic-events exposes it at /stream.
One limit to keep in mind, and the reason this is a teaching example: the hub lives in $scope->{state}, which is in-memory and per process. It is shared only within one process on one node. Run more than one process -- multiple workers (pagi-server --workers N), or multiple nodes (scaling pods on Kubernetes, say) -- and each gets its own ticker and hub: there is no tick shared across them, and the count differs between them.
To fan one event source out across every worker and node, publish through an external broker instead of an in-memory hub -- such as the PAGI::Middleware::Channels distribution with its Redis backend, which keeps the same subscribe/publish shape this example hand-rolls. The in-memory pattern above is exactly what such a layer implements for the single-process case.
The easy way, and the right way: deliver events through $receive
The hub above is the easy way, and it works -- but look at how a handler gets a tick. It reaches into $scope->{state} for the hub and calls $hub->next_tick. You can multiplex your source against the protocol stream right in the handler instead. The one rule: never let the race cancel the live $receive -- cancelling it ends the connection (the next $receive->() yields a cancelled future). So race with a small helper that watches both sides and cancels neither:
# resolve as soon as either future is ready -- cancelling neither
async sub await_either {
my @futures = @_;
my $first = Future->new;
$_->on_ready(sub { $first->done unless $first->is_ready }) for @futures;
await $first;
return;
}
# a streaming handler: emit each tick, stop on disconnect
my $disconnect = $receive->();
while (1) {
my $tick_f = $hub->next_tick;
await await_either($disconnect, $tick_f);
last if $disconnect->is_ready; # client gone
my $tick = $tick_f->get;
await $send->({ type => 'http.response.body',
body => qq({"tick":$tick}\n), more => 1 });
}
That is examples/14-periodic-events's /stream exactly.
But the right shape is to stop reaching for the source at all, and let your own events arrive the way protocol events already do: through $receive. A small middleware owns the source and wraps the $receive it hands to the app -- racing with the same await_either, so the long-lived receive is never cancelled:
# in a middleware that started the ticker on lifespan startup
my $protocol_f;
my $wrapped_receive = async sub {
$protocol_f //= $receive->(); # one outstanding receive, kept alive
my $tick_f = $hub->next_tick;
await await_either($protocol_f, $tick_f);
if ($protocol_f->is_ready) { # a protocol event arrived
my $event = $protocol_f->get;
undef $protocol_f; # consumed -- fetch a fresh one next time
return $event;
}
return { type => 'tick', count => $tick_f->get }; # a tick, shaped as an event
};
await $inner_app->($scope, $wrapped_receive, $send);
The runnable, verified version is examples/17-event-middleware.
Now the handler never sees the hub. It does exactly what it already does for every other event -- await $receive->() and switch on type:
my $event = await $receive->();
if ($event->{type} eq 'http.request') { ... }
elsif ($event->{type} eq 'tick') { ... } # your event, arriving like any other
That indirection earns its keep. Here is why it is the right design, not merely a tidier one:
It composes -- this is the whole reason. Because your events ride in the
$receivestream as ordinary typed events, every other middleware in the stack can act on them: log them, rate-limit them, authorize them, transform them, drop them, or fold in still more events -- the same wrapping it already does to protocol events. A hub instateis a side-channel: it routes around the middleware pipeline, so nothing downstream can see it or compose with it. Events on$receiveare first-class members of that pipeline; method calls on a state-stashed object are invisible to it. That is the difference between an event source the whole ecosystem can build on and one only its author can use.The handler is decoupled from the source. It depends only on the event contract -- a
typeand some fields -- never on a concrete hub. Replace the source (an in-memory ticker today, a Redis channel tomorrow) and not one line of the handler changes. With$hub->next_tickwired into the handler, that same swap is a rewrite.One model, not two. Your events and the server's events are awaited the same way and dispatched the same way. There is no second API to learn, no shared-state convention to remember, and no decision about when to poll the hub -- the handler is always just awaiting the next event.
The source's lifetime lives in one place. The middleware starts the ticker on
lifespanstartup and tears it down on shutdown, rooted in its own frame (the same lifespan-rooting the example uses, moved out of the app). The handler carries none of that bookkeeping.
This is not hypothetical: it is precisely what PAGI::Middleware::Channels does. Its $receive wrapper is the wait_any($protocol_f, $channel_f) shape shown above, with the source being a channel backend -- in-memory, or the Redis backend from the scaling note when the events must cross processes. So writing your event source to deliver through $receive is what lets it drop into that ecosystem: the very middleware that fans events across workers can itself be wrapped, in turn, by every other middleware in your stack.
A lifetime footgun: orphaned futures
If you skip the tree and start a background task at file scope, you get a future with no parent -- and Perl reaps it. Under pagi-server the app is loaded with do $file, which returns the $app closure; that closure does not close over a file-scoped my $ticker, so the only reference to the running task goes out of scope and the task is silently destroyed. The sole symptom is a cryptic
... lost its returning future ...
warning (emitted by Future::AsyncAwait), and your timer simply never fires.
The wrong fix is to pin the orphan in a package variable:
# ANTI-PATTERN: a side-pointer that compensates for a missing parent
package PeriodicEvents;
our $TICKER = (async sub { ... })->();
That stops the garbage collection, but it is a side-pointer outside the tree: the source's failures go nowhere (you must bolt on your own ->on_fail), and nothing coordinates its shutdown. The right fix is to give the future a parent -- root it in the lifespan scope as above, where the selector retains it, propagates its errors, and cancels it cleanly on shutdown.
See also PAGI::Building for using this pattern to build event-driven layers on PAGI.
PART 4: BINDING A SERVER TO A LOOP
Everything above is protocol-level and server-neutral. This section is different: it is about PAGI::Server specifically -- the reference server -- and how it attaches to an IO::Async loop. Other servers will bind differently.
Owning the loop versus embedding
The simple case is to let the server own the loop:
PAGI::Server->new(app => $app, port => 5000)->run;
run creates the loop, listens, and runs until stopped. This is what bin/pagi-server does, and it is all most deployments need.
The other case is embedding: you already have an IO::Async application with its own loop, and you want a PAGI::Server to share it. PAGI::Server is an IO::Async::Notifier, so you add it to your loop, start listening, and run your own loop:
use IO::Async::Loop;
use Future::IO::Impl::IOAsync; # we embed, so WE wire the Future::IO backend
use PAGI::Server;
my $loop = IO::Async::Loop->new;
my $server = PAGI::Server->new(app => $app, port => 5015);
$loop->add($server);
$server->listen->get; # listen without running the loop
$loop->run; # the host app owns the loop
Three points about this pattern:
You call
$loop->add($server)to attach the server to your loop, then$server->listen->getto bind the socket without running the loop, then$loop->runwhen your host application is ready to turn it.There is no
startmethod and noloop =>constructor argument. You attach withaddand listen withlisten; that is the whole API.Because you are embedding rather than going through
bin/pagi-server, you wire the Future::IO backend withuse Future::IO::Impl::IOAsync-- see "Who wires the Future::IO backend" above. Without it, Future::IO-based handlers will not tick.
The runnable version, including a host-app timer that proves the server and the host share one loop, is examples/15-embedded-ioasync.
Running under a foreign (EV-backed) loop
You can run PAGI::Server under a non-default reactor by giving IO::Async an adapter loop. The verified case is EV: PAGI::Server runs under an EV-backed IO::Async loop, and Future::IO ticks correctly there -- a Future::IO->sleep(1) inside a handler resolves and the request returns in about a second.
The wiring has three parts:
BEGIN { $ENV{IO_ASYNC_LOOP} = 'EV' } # before any IO::Async load
use IO::Async::Loop;
use Future::IO;
use Future::IO::Impl::IOAsync; # embedder wires the Future::IO backend
use PAGI::Server;
my $loop = IO::Async::Loop->new; # EV-backed, per IO_ASYNC_LOOP above
my $server = PAGI::Server->new(app => $app, port => 5016);
$loop->add($server);
$server->listen->get;
$loop->run;
The IO_ASYNC_LOOP environment variable is the load-bearing part. It must be set in a BEGIN block, before any IO::Async module is loaded, so that every IO::Async::Loop->new in the process -- both the one you create and the one Future::IO's IO::Async backend uses internally -- resolves to the same EV-backed loop. If the server and the Future::IO implementation ended up on different loops, the handler's Futures would be scheduled on a loop that nobody is turning, and requests would hang. With the env var set up front they share a single EV loop, and everything ticks.
Note: the BEGIN/env-var approach is what you use when you own the loop (embedding). When you let PAGI::Server own the loop via run() instead, there is a native shortcut -- the loop_type constructor option:
PAGI::Server->new(app => $app, port => 5016, loop_type => 'EV')->run;
This tells the server to create an EV-backed loop directly (it loads IO::Async::Loop::EV internally) without requiring a BEGIN block in your code. Future::IO wiring is still handled automatically, as with any pagi-server-owned loop.
The runnable, verified version is examples/16-foreign-loop.
SEE ALSO
- PAGI::Spec - the specification
- PAGI::Spec::Extensions - the extension mechanism and custom events
- PAGI::Building - building a framework or toolkit on PAGI
- "LOOP INTEROPERABILITY" in PAGI::Server - who wires the Future::IO backend, in full
- Future::IO - loop-neutral, portable async I/O primitives
- IO::Async - the reactor the reference server is built on
And the runnable examples: examples/14-periodic-events, examples/15-embedded-ioasync, and examples/16-foreign-loop.
AUTHOR
John Napiorkowski <jjnapiork@cpan.org>
COPYRIGHT AND LICENSE
This software is licensed under the same terms as Perl itself.