package Mojo::Redis::PubSub;
use Mojo::Base 'Mojo::EventEmitter';

has connection     => sub { shift->redis->_connection };
has redis          => sub { Carp::confess('redis is requried in constructor') };
has _db_connection => sub { shift->redis->_connection };

sub channels_p {
  shift->_db_connection->write_p(qw(PUBSUB CHANNELS), @_);
}

sub keyspace_listen {
  my ($self, $cb) = (shift, pop);
  return $self->listen($self->_keyspace_key(@_), $cb);
}

sub keyspace_unlisten {
  my ($self, $cb) = (shift, ref $_[-1] eq 'CODE' ? pop : undef);
  return $self->unlisten($self->_keyspace_key(@_), $cb);
}

sub listen {
  my ($self, $name, $cb) = @_;
  my $op = $name =~ /\*/ ? 'PSUBSCRIBE' : 'SUBSCRIBE';

  Scalar::Util::weaken($self);
  $self->connection->write_p($op => $name)->then(sub { $self->_setup }) unless @{$self->{chans}{$name} ||= []};
  push @{$self->{chans}{$name}}, $cb;

  return $cb;
}

sub notify {
  shift->_db_connection->write_p(PUBLISH => @_);
}

sub numsub_p {
  shift->_db_connection->write_p(qw(PUBSUB NUMSUB), @_)->then(sub { +{@{$_[0]}} });
}

sub numpat_p {
  shift->_db_connection->write_p(qw(PUBSUB NUMPAT));
}

sub unlisten {
  my ($self, $name, $cb) = @_;
  my $chans = $self->{chans}{$name};

  @$chans = $cb ? grep { $cb ne $_ } @$chans : ();
  unless (@$chans) {
    $self->connection->write_p(($name =~ /\*/ ? 'PUNSUBSCRIBE' : 'UNSUBSCRIBE'), $name);
    delete $self->{chans}{$name};
  }

  return $self;
}

sub _keyspace_key {
  my $args = ref $_[-1] eq 'HASH' ? pop : {};
  my $self = shift;

  local $args->{key}  = $_[0] // $args->{key} // '*';
  local $args->{op}   = $_[1] // $args->{op} // '*';
  local $args->{type} = $args->{type} || ($args->{key} eq '*' ? 'keyevent' : 'keyspace');

  return sprintf '__%s@%s__:%s %s', $args->{type}, $args->{db} // $self->redis->url->path->[0] // '',
    $args->{type} eq 'keyevent' ? (@$args{qw(op key)}) : (@$args{qw(key op)});
}

sub _setup {
  my $self = shift;
  return if $self->{cb};

  Scalar::Util::weaken($self);
  $self->{cb} = $self->connection->on(
    response => sub {
      my ($conn, $res) = @_;    # $res = [$type, $name, $payload]
      for my $cb (@{$self->{chans}{$res->[1]}}) { $self->$cb($res->[2]) }
    }
  );
}

1;

=encoding utf8

=head1 NAME

Mojo::Redis::PubSub - Publish and subscribe to Redis messages

=head1 SYNOPSIS

  use Mojo::Redis;

  my $redis  = Mojo::Redis->new;
  my $pubsub = $redis->pubsub;

  $pubsub->listen("user:superwoman:messages" => sub {
    my ($pubsub, $message) = @_;
    say "superwoman got a message: $message";
  });

  $pubsub->notify("user:batboy:messages", "How are you doing?");

See L<https://github.com/jhthorsen/mojo-redis/blob/master/examples/chat.pl>
for example L<Mojolicious> application.

=head1 DESCRIPTION

L<Mojo::Redis::PubSub> is an implementation of the Redis Publish/Subscribe
messaging paradigm. This class has the same API as L<Mojo::Pg::PubSub>, so
you can easily switch between the backends.

This object holds one connection for receiving messages, and one connection
for sending messages. They are created lazily the first time L</listen> or
L</notify> is called. These connections does not affect the connection pool
for L<Mojo::Redis>.

See L<pubsub|https://redis.io/topics/pubsub> for more details.

=head1 ATTRIBUTES

=head2 connection

  $conn = $self->connection;
  $self = $self->connection(Mojo::Redis::Connection->new);

Holds a L<Mojo::Redis::Connection> object.

=head2 redis

  $conn = $self->connection;
  $self = $self->connection(Mojo::Redis::Connection->new);

Holds a L<Mojo::Redis> object used to create the connections to talk with Redis.

=head1 METHODS

=head2 channels_p

  $promise = $self->channels_p->then(sub { my $channels = shift });
  $promise = $self->channels_p("pat*")->then(sub { my $channels = shift });

Lists the currently active channels. An active channel is a Pub/Sub channel
with one or more subscribers (not including clients subscribed to patterns).

=head2 keyspace_listen

  $cb = $self->keyspace_listen($key, $op, sub { my ($self, $message) = @_ }) });
  $cb = $self->keyspace_listen($key, $op, \%args, sub { my ($self, $message) = @_ }) });

Used to listen for keyspace notifications. See L<https://redis.io/topics/notifications>
for more details.

C<$key> C<$op> and C<%args> are optional. C<$key> and C<$op> will default to
"*" and C<%args> can have the following key values:

The channel that will be subscribed to will look like one of these:

  __keyspace@${db}__:$key $op
  __keyevent@${db}__:$op $key

=over 2

=item * db

Default database to listen for events is the database set in
L<Mojo::Redis/url>. "*" is also a valid value, meaning listen for events
happening in all databases.

=item * key

Alternative to passing in C<$key>. Default value is "*".

=item * op

Alternative to passing in C<$op>. Default value is "*".

=item * type

Will default to "keyevent" if C<$key> is "*", and "keyspace" if not. It can
also be set to "key*" for listening to both "keyevent" and "keyspace" events.

=back

=head2 keyspace_unlisten

  $self = $self->keyspace_unlisten(@args);
  $self = $self->keyspace_unlisten(@args, $cb);

Stop listening for keyspace events. See L</keyspace_listen> for details about
keyspace events and what C<@args> can be.

=head2 listen

  $cb = $self->listen($channel => sub { my ($self, $message) = @_ });

Subscribe to a channel, there is no limit on how many subscribers a channel
can have. The returning code ref can be passed on to L</unlisten>.

=head2 notify

  $self->notify($channel => $message);

Send a plain string message to a channel.

=head2 numpat_p

  $promise = $self->channels_p->then(sub { my $int = shift });

Returns the number of subscriptions to patterns (that are performed using the
PSUBSCRIBE command). Note that this is not just the count of clients
subscribed to patterns but the total number of patterns all the clients are
subscribed to.

=head2 numsub_p

  $promise = $self->numsub_p(@channels)->then(sub { my $channels = shift });

Returns the number of subscribers (not counting clients subscribed to
patterns) for the specified channels as a hash-ref, where the keys are
channel names.

=head2 unlisten

  $self = $self->unlisten($channel);
  $self = $self->unlisten($channel, $cb);

Unsubscribe from a channel.

=head1 SEE ALSO

L<Mojo::Redis>.

=cut