package Mojo::mysql::PubSub; use Mojo::Base 'Mojo::EventEmitter'; use Scalar::Util 'weaken'; use constant DEBUG => $ENV{MOJO_PUBSUB_DEBUG} || 0; has 'mysql'; sub DESTROY { my $self = shift; return unless $self->{wait_db} and $self->mysql; $self->mysql->db->query('delete from mojo_pubsub_subscribe where pid = ?', $self->{wait_db}->pid); } sub listen { my ($self, $channel, $cb) = @_; my $sync_db = $self->mysql->db; my $wait_pid = $self->_wait_db($sync_db)->pid; warn qq|[PubSub] (@{[$wait_pid]}) listen "$channel"\n| if DEBUG; $sync_db->query('replace mojo_pubsub_subscribe (pid, channel, ts) values (?, ?, current_timestamp)', $wait_pid, $channel); push @{$self->{chans}{$channel}}, $cb; return $cb; } sub notify { my ($self, $channel, $payload) = @_; my $sync_db = $self->mysql->db; warn qq|[PubSub] channel:$channel <<< "@{[$payload // '']}"\n| if DEBUG; $self->_init($sync_db) unless $self->{init}; $sync_db->query('insert into mojo_pubsub_notify (channel, payload) values (?, ?)', $channel, $payload // ''); return $self; } sub unlisten { my ($self, $channel, $cb) = @_; my $chan = $self->{chans}{$channel}; @$chan = grep { $cb ne $_ } @$chan; return $self if @$chan; my $sync_db = $self->mysql->db; my $wait_pid = $self->_wait_db($sync_db)->pid; warn qq|[PubSub] ($wait_pid) unlisten "$channel"\n| if DEBUG; $sync_db->query('delete from mojo_pubsub_subscribe where pid = ? and channel = ?', $wait_pid, $channel); delete $self->{chans}{$channel}; return $self; } sub _init { my ($self, $sync_db) = @_; $self->mysql->migrations->name('pubsub')->from_data->migrate; $sync_db->query('delete from mojo_pubsub_notify where ts < date_add(current_timestamp, interval -10 minute)'); $sync_db->query('delete from mojo_pubsub_subscribe where ts < date_add(current_timestamp, interval -1 hour)'); $self->{init} = 1; } sub _notifications { my ($self, $sync_db) = @_; my $result = $sync_db->query('select id, channel, payload from mojo_pubsub_notify where id > ? order by id', $self->{last_id}); while (my $row = $result->array) { my ($id, $channel, $payload) = @$row; $self->{last_id} = $id; next unless exists $self->{chans}{$channel}; warn qq/[PubSub] channel:$channel >>> "$payload"\n/ if DEBUG; for my $cb (@{$self->{chans}{$channel}}) { $self->$cb($payload) } } } sub _wait_db { my ($self, $sync_db) = @_; # Fork-safety delete @$self{qw(wait_db chans pid)} if ($self->{pid} //= $$) ne $$; return $self->{wait_db} if $self->{wait_db}; $self->_init($sync_db) unless $self->{init}; my $wait_db = $self->{wait_db} = $self->mysql->db; $sync_db->query('replace mojo_pubsub_subscribe (pid, channel) values (?, ?)', $wait_db->pid, $_) for keys %{$self->{chans}}; if ($self->{last_id}) { $self->_notifications($sync_db); } else { my $last = $sync_db->query('select id from mojo_pubsub_notify order by id desc limit 1')->array; $self->{last_id} = defined $last ? $last->[0] : 0; } weaken $wait_db->{mysql}; weaken $self; my $cb; $cb = sub { my ($db, $err, $res) = @_; return unless $self; warn qq|[PubSub] (@{[$db->pid]}) sleep(600) @{[$err ? "!!! $err" : $res->array->[0]]}\n| if DEBUG; my $sync_db = $self->mysql->db; return (delete $self->{wait_db}, $self->_wait_db($sync_db)) if $err; $res->finish; $db->query('select sleep(600)', $cb); $sync_db->query('update mojo_pubsub_subscribe set ts = current_timestamp where pid = ?', $db->pid); $self->_notifications($self->mysql->db); }; warn qq|[PubSub] (@{[$wait_db->pid]}) reconnect\n| if DEBUG; $self->emit(reconnect => $wait_db); return $wait_db->query('select sleep(600)', $cb); } 1; =encoding utf8 =head1 NAME Mojo::mysql::PubSub - Publish/Subscribe =head1 SYNOPSIS use Mojo::mysql::PubSub; my $pubsub = Mojo::mysql::PubSub->new(mysql => $mysql); my $cb = $pubsub->listen(foo => sub { my ($pubsub, $payload) = @_; say "Received: $payload"; }); $pubsub->notify(foo => 'bar'); $pubsub->unlisten(foo => $cb); =head1 DESCRIPTION L<Mojo::mysql::PubSub> is implementation of the publish/subscribe pattern used by L<Mojo::mysql>. The implementation should be considered an EXPERIMENT and might be removed without warning! Although MySQL does not have C<SUBSCRIBE/NOTIFY> like PostgreSQL and other RDBMs, this module implements similar feature. Single Database connection waits for notification by executing C<SLEEP> on server. C<connection_id> and subscribed channels in stored in C<mojo_pubsub_subscribe> table. Inserting new row in C<mojo_pubsub_notify> table triggers C<KILL QUERY> for all connections waiting for notification. C<PROCESS> privilege is needed for MySQL user to see other users processes. C<SUPER> privilege is needed to be able to execute C<KILL QUERY> for statements started by other users. C<SUPER> privilege may be needed to be able to define trigger. If your applications use this module using different MySQL users it is important the migration script to be executed by user having C<SUPER> privilege on the database. =head1 EVENTS L<Mojo::mysql::PubSub> inherits all events from L<Mojo::EventEmitter> and can emit the following new ones. =head2 reconnect $pubsub->on(reconnect => sub { my ($pubsub, $db) = @_; ... }); Emitted after switching to a new database connection for sending and receiving notifications. =head1 ATTRIBUTES L<Mojo::mysql::PubSub> implements the following attributes. =head2 mysql my $mysql = $pubsub->mysql; $pubsub = $pubsub->mysql(Mojo::mysql->new); L<Mojo::mysql> object this publish/subscribe container belongs to. =head1 METHODS L<Mojo::mysql::PubSub> inherits all methods from L<Mojo::EventEmitter> and implements the following new ones. =head2 listen my $cb = $pubsub->listen(foo => sub {...}); Subscribe to a channel, there is no limit on how many subscribers a channel can have. # Subscribe to the same channel twice $pubsub->listen(foo => sub { my ($pubsub, $payload) = @_; say "One: $payload"; }); $pubsub->listen(foo => sub { my ($pubsub, $payload) = @_; say "Two: $payload"; }); =head2 notify $pubsub = $pubsub->notify('foo'); $pubsub = $pubsub->notify(foo => 'bar'); Notify a channel. =head2 unlisten $pubsub = $pubsub->unlisten(foo => $cb); Unsubscribe from a channel. =head1 DEBUGGING You can set the C<MOJO_PUBSUB_DEBUG> environment variable to get some advanced diagnostics information printed to C<STDERR>. MOJO_PUBSUB_DEBUG=1 =head1 SEE ALSO L<Mojo::mysql>, L<Mojolicious::Guides>, L<http://mojolicio.us>. =cut __DATA__ @@ pubsub -- 1 down drop table mojo_pubsub_subscribe; drop table mojo_pubsub_notify; -- 1 up drop table if exists mojo_pubsub_subscribe; drop table if exists mojo_pubsub_notify; create table mojo_pubsub_subscribe ( id integer auto_increment primary key, pid integer not null, channel varchar(64) not null, ts timestamp not null default current_timestamp, unique key subs_idx(pid, channel), key ts_idx(ts) ); create table mojo_pubsub_notify ( id integer auto_increment primary key, channel varchar(64) not null, payload text, ts timestamp not null default current_timestamp, key channel_idx(channel), key ts_idx(ts) ); delimiter // create trigger mojo_pubsub_notify_kill after insert on mojo_pubsub_notify for each row begin declare done boolean default false; declare t_pid integer; declare subs_c cursor for select pid from mojo_pubsub_subscribe where channel = NEW.channel; declare continue handler for not found set done = true; open subs_c; repeat fetch subs_c into t_pid; if not done and exists ( select 1 from INFORMATION_SCHEMA.PROCESSLIST where ID = t_pid and STATE = 'User sleep') then kill query t_pid; end if; until done end repeat; close subs_c; end // delimiter ;