NAME
Mojo::Pg::PubSub - Publish/Subscribe
SYNOPSIS
use
Mojo::Pg::PubSub;
my
$pubsub
= Mojo::Pg::PubSub->new(
pg
=>
$pg
);
my
$cb
=
$pubsub
->
listen
(
foo
=>
sub
(
$pubsub
,
$payload
) {
say
"Received: $payload"
;
});
$pubsub
->notify(
foo
=>
'I ♥ Mojolicious!'
);
$pubsub
->unlisten(
foo
=>
$cb
);
DESCRIPTION
Mojo::Pg::PubSub is a scalable implementation of the publish/subscribe pattern used by Mojo::Pg. It is based on PostgreSQL notifications and allows many consumers to share the same database connection, to avoid many common scalability problems.
EVENTS
Mojo::Pg::PubSub inherits all events from Mojo::EventEmitter and can emit the following new ones.
disconnect
$pubsub
->on(
disconnect
=>
sub
(
$pubsub
,
$db
) {
...
});
Emitted after the current database connection is lost.
reconnect
$pubsub
->on(
reconnect
=>
sub
(
$pubsub
,
$db
) {
...
});
Emitted after switching to a new database connection for sending and receiving notifications.
ATTRIBUTES
Mojo::Pg::PubSub implements the following attributes.
pg
my
$pg
=
$pubsub
->pg;
$pubsub
=
$pubsub
->pg(Mojo::Pg->new);
Mojo::Pg object this publish/subscribe container belongs to. Note that this attribute is weakened.
reconnect_interval
my
$interval
=
$pubsub
->reconnect_interval;
$pubsub
=
$pubsub
->reconnect_interval(0.1);
Amount of time in seconds to wait to reconnect after disconnecting, defaults to 1
.
METHODS
Mojo::Pg::PubSub inherits all methods from Mojo::EventEmitter and implements the following new ones.
db
my
$db
=
$pubsub
->db;
Build and cache or get cached Mojo::Pg::Database connection from "pg". Used to reconnect if disconnected.
# Reconnect immediately
$pubsub
->unsubscribe(
'disconnect'
)->on(
disconnect
=>
sub
(
$pubsub
,
$db
) { pubsub->db });
json
$pubsub
=
$pubsub
->json(
'foo'
);
Activate automatic JSON encoding and decoding with "to_json" in Mojo::JSON and "from_json" in Mojo::JSON for a channel.
# Send and receive data structures
$pubsub
->json(
'foo'
)->
listen
(
foo
=>
sub
(
$pubsub
,
$payload
) {
say
$payload
->{bar};
});
$pubsub
->notify(
foo
=> {
bar
=>
'I ♥ Mojolicious!'
});
listen
my
$cb
=
$pubsub
->
listen
(
foo
=>
sub
{...});
Subscribe to a channel, there is no limit on how many subscribers a channel can have. Automatic decoding of JSON text to Perl values can be activated with "json".
# Subscribe to the same channel twice
$pubsub
->
listen
(
foo
=>
sub
(
$pubsub
,
$payload
) {
say
"One: $payload"
;
});
$pubsub
->
listen
(
foo
=>
sub
(
$pubsub
,
$payload
) {
say
"Two: $payload"
;
});
new
my
$pubsub
= Mojo::Pg::PubSub->new;
my
$pubsub
= Mojo::Pg::PubSub->new(
pg
=> Mojo::Pg->new);
my
$pubsub
= Mojo::Pg::PubSub->new({
pg
=> Mojo::Pg->new});
Construct a new Mojo::Pg::PubSub object and subscribe to the "disconnect" event with default reconnect logic.
notify
$pubsub
=
$pubsub
->notify(
'foo'
);
$pubsub
=
$pubsub
->notify(
foo
=>
'I ♥ Mojolicious!'
);
$pubsub
=
$pubsub
->notify(
foo
=> {
bar
=>
'baz'
});
Notify a channel. Automatic encoding of Perl values to JSON text can be activated with "json".
reset
$pubsub
->
reset
;
Reset all subscriptions and the database connection. This is usually done after a new process has been forked, to prevent the child process from stealing notifications meant for the parent process.
unlisten
$pubsub
=
$pubsub
->unlisten(
'foo'
);
$pubsub
=
$pubsub
->unlisten(
foo
=>
$cb
);
Unsubscribe from a channel.