use 5.008000; use strict; use warnings; package AnyEvent::Redis::RipeRedis; use base qw( Exporter ); our $VERSION = '1.62'; use AnyEvent; use AnyEvent::Handle; use Encode qw( find_encoding is_utf8 ); use Scalar::Util qw( looks_like_number weaken ); use Digest::SHA qw( sha1_hex ); use Carp qw( croak ); my %ERROR_CODES; BEGIN { %ERROR_CODES = ( E_CANT_CONN => 1, E_LOADING_DATASET => 2, E_IO => 3, E_CONN_CLOSED_BY_REMOTE_HOST => 4, E_CONN_CLOSED_BY_CLIENT => 5, E_NO_CONN => 6, E_OPRN_ERROR => 9, E_UNEXPECTED_DATA => 10, E_NO_SCRIPT => 11, E_READ_TIMEDOUT => 12, E_BUSY => 13, E_MASTER_DOWN => 14, E_MISCONF => 15, E_READONLY => 16, E_OOM => 17, E_EXEC_ABORT => 18, E_NO_AUTH => 19, E_WRONG_TYPE => 20, E_NO_REPLICAS => 21, E_BUSY_KEY => 22, E_CROSS_SLOT => 23, E_TRY_AGAIN => 24, E_ASK => 25, E_MOVED => 26, E_CLUSTER_DOWN => 27, ); } BEGIN { our @EXPORT_OK = keys %ERROR_CODES; our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK, ); } use constant { # Default values D_HOST => 'localhost', D_PORT => 6379, D_DB_INDEX => 0, %ERROR_CODES, # Operation status S_NEED_PERFORM => 1, S_IN_PROGRESS => 2, S_IS_DONE => 3, # String terminator EOL => "\r\n", EOL_LEN => 2, }; my %SUB_CMDS = ( subscribe => 1, psubscribe => 1, ); my %SUBUNSUB_CMDS = ( %SUB_CMDS, unsubscribe => 1, punsubscribe => 1, ); my %NEED_POSTPROCESS = ( %SUBUNSUB_CMDS, info => 1, select => 1, quit => 1, ); my %MSG_TYPES = ( message => 1, pmessage => 1, ); my %ERR_PREFS_MAP = ( LOADING => E_LOADING_DATASET, NOSCRIPT => E_NO_SCRIPT, BUSY => E_BUSY, MASTERDOWN => E_MASTER_DOWN, MISCONF => E_MISCONF, READONLY => E_READONLY, OOM => E_OOM, EXECABORT => E_EXEC_ABORT, NOAUTH => E_NO_AUTH, WRONGTYPE => E_WRONG_TYPE, NOREPLICAS => E_NO_REPLICAS, BUSYKEY => E_BUSY_KEY, CROSSSLOT => E_CROSS_SLOT, TRYAGAIN => E_TRY_AGAIN, ASK => E_ASK, MOVED => E_MOVED, CLUSTERDOWN => E_CLUSTER_DOWN, ); my %EVAL_CACHE; # Constructor sub new { my $proto = shift; my %params = @_; my $self = ref( $proto ) ? $proto : bless {}, $proto; $self->{host} = $params{host} || D_HOST; $self->{port} = $params{port} || D_PORT; $self->{password} = $params{password}; $self->{database} = defined $params{database} ? $params{database} : D_DB_INDEX; $self->{reconnect} = exists $params{reconnect} ? $params{reconnect} : 1; $self->{on_connect} = $params{on_connect}; $self->{on_disconnect} = $params{on_disconnect}; $self->{on_connect_error} = $params{on_connect_error}; $self->encoding( $params{encoding} ); $self->connection_timeout( $params{connection_timeout} ); $self->read_timeout( $params{read_timeout} ); $self->min_reconnect_interval( $params{min_reconnect_interval} ); $self->on_error( $params{on_error} ); my $hdl_params = $params{handle_params} || {}; foreach my $name ( qw( linger autocork ) ) { if ( !exists $hdl_params->{$name} && defined $params{$name} ) { $hdl_params->{$name} = $params{$name}; } } $self->{handle_params} = $hdl_params; $self->{_handle} = undef; $self->{_connected} = 0; $self->{_lazy_conn_st} = $params{lazy}; $self->{_auth_st} = S_NEED_PERFORM; $self->{_select_db_st} = S_NEED_PERFORM; $self->{_ready_to_write} = 0; $self->{_input_queue} = []; $self->{_temp_queue} = []; $self->{_processing_queue} = []; $self->{_txn_lock} = 0; $self->{_channels} = {}; $self->{_channel_cnt} = 0; $self->{_reconnect_timer} = undef; unless ( $self->{_lazy_conn_st} ) { $self->_connect(); } return $self; } sub multi { my $self = shift; my $cmd = $self->_prepare_cmd( 'multi', [ @_ ] ); $self->{_txn_lock} = 1; $self->_execute_cmd( $cmd ); return; } sub exec { my $self = shift; my $cmd = $self->_prepare_cmd( 'exec', [ @_ ] ); $self->{_txn_lock} = 0; $self->_execute_cmd( $cmd ); return; } sub eval_cached { my $self = shift; my $cmd = $self->_prepare_cmd( 'evalsha', [ @_ ] ); $cmd->{script} = $cmd->{args}[0]; unless ( exists $EVAL_CACHE{ $cmd->{script} } ) { $EVAL_CACHE{ $cmd->{script} } = sha1_hex( $cmd->{script} ); } $cmd->{args}[0] = $EVAL_CACHE{ $cmd->{script} }; $self->_execute_cmd( $cmd ); return; } sub disconnect { my $self = shift; $self->_disconnect(); return; } sub encoding { my $self = shift; if ( @_ ) { my $enc = shift; if ( defined $enc ) { $self->{encoding} = find_encoding( $enc ); unless ( defined $self->{encoding} ) { croak "Encoding \"$enc\" not found"; } } else { undef $self->{encoding}; } } return $self->{encoding}; } sub on_error { my $self = shift; if ( @_ ) { my $on_error = shift; if ( defined $on_error ) { $self->{on_error} = $on_error; } else { $self->{on_error} = sub { my $err_msg = shift; warn "$err_msg\n"; }; } } return $self->{on_error}; } sub selected_database { my $self = shift; return $self->{database}; } # Generate additional methods and accessors { no strict 'refs'; foreach my $kwd ( keys %SUBUNSUB_CMDS ) { *{$kwd} = sub { my $self = shift; my $cmd = $self->_prepare_cmd( $kwd, [ @_ ] ); if ( exists $SUB_CMDS{ $cmd->{kwd} } && !defined $cmd->{on_message} ) { croak "\"on_message\" callback must be specified"; } if ( $self->{_txn_lock} ) { AE::postpone( sub { $self->_process_cmd_error( $cmd, "Command \"$cmd->{kwd}\" not allowed after \"multi\" command." . ' First, the transaction must be finalized.', E_OPRN_ERROR ); } ); return; } $cmd->{replies_cnt} = scalar @{ $cmd->{args} }; if ( defined $cmd->{on_done} ) { my $on_done = $cmd->{on_done}; $cmd->{on_done} = sub { $on_done->( @{ $_[0] } ); } } $self->_execute_cmd( $cmd ); return; }, } foreach my $name ( qw( connection_timeout read_timeout min_reconnect_interval ) ) { *{$name} = sub { my $self = shift; if ( @_ ) { my $seconds = shift; if ( defined $seconds && ( !looks_like_number($seconds) || $seconds < 0 ) ) { croak "\"$name\" must be a positive number"; } $self->{$name} = $seconds; } return $self->{$name}; } } foreach my $name ( qw( reconnect on_connect on_disconnect on_connect_error ) ) { *{$name} = sub { my $self = shift; if ( @_ ) { $self->{$name} = shift; } return $self->{$name}; } } } sub _connect { my $self = shift; $self->{_handle} = AnyEvent::Handle->new( %{ $self->{handle_params} }, connect => [ $self->{host}, $self->{port} ], on_prepare => $self->_get_on_prepare(), on_connect => $self->_get_on_connect(), on_connect_error => $self->_get_on_connect_error(), on_rtimeout => $self->_get_on_rtimeout(), on_eof => $self->_get_on_eof(), on_error => $self->_get_handle_on_error(), on_read => $self->_get_on_read(), ); return; } sub _get_on_prepare { my $self = shift; weaken( $self ); return sub { if ( defined $self->{connection_timeout} ) { return $self->{connection_timeout}; } return; }; } sub _get_on_connect { my $self = shift; weaken( $self ); return sub { $self->{_connected} = 1; unless ( defined $self->{password} ) { $self->{_auth_st} = S_IS_DONE; } if ( $self->{database} == 0 ) { $self->{_select_db_st} = S_IS_DONE; } if ( $self->{_auth_st} == S_NEED_PERFORM ) { $self->_auth(); } elsif ( $self->{_select_db_st} == S_NEED_PERFORM ) { $self->_select_db(); } else { $self->{_ready_to_write} = 1; $self->_flush_input_queue(); } if ( defined $self->{on_connect} ) { $self->{on_connect}->(); } }; } sub _get_on_connect_error { my $self = shift; weaken( $self ); return sub { my $err_msg = pop; $self->_disconnect( "Can't connect to $self->{host}:$self->{port}: $err_msg", E_CANT_CONN ); }; } sub _get_on_rtimeout { my $self = shift; weaken( $self ); return sub { if ( @{ $self->{_processing_queue} } ) { $self->_disconnect( 'Read timed out.', E_READ_TIMEDOUT ); } else { $self->{_handle}->rtimeout( undef ); } }; } sub _get_on_eof { my $self = shift; weaken( $self ); return sub { $self->_disconnect( 'Connection closed by remote host.', E_CONN_CLOSED_BY_REMOTE_HOST ); }; } sub _get_handle_on_error { my $self = shift; weaken( $self ); return sub { my $err_msg = pop; $self->_disconnect( $err_msg, E_IO ); }; } sub _get_on_read { my $self = shift; weaken( $self ); my $str_len; my @bufs; my $bufs_num = 0; return sub { my $handle = shift; MAIN: while ( 1 ) { if ( $handle->destroyed() ) { return; } my $reply; my $err_code; if ( defined $str_len ) { if ( length( $handle->{rbuf} ) < $str_len + EOL_LEN ) { return; } $reply = substr( $handle->{rbuf}, 0, $str_len, '' ); substr( $handle->{rbuf}, 0, EOL_LEN, '' ); if ( defined $self->{encoding} ) { $reply = $self->{encoding}->decode( $reply ); } undef $str_len; } else { my $eol_pos = index( $handle->{rbuf}, EOL ); if ( $eol_pos < 0 ) { return; } $reply = substr( $handle->{rbuf}, 0, $eol_pos, '' ); my $type = substr( $reply, 0, 1, '' ); substr( $handle->{rbuf}, 0, EOL_LEN, '' ); if ( $type ne '+' && $type ne ':' ) { if ( $type eq '$' ) { if ( $reply >= 0 ) { $str_len = $reply; next; } undef $reply; } elsif ( $type eq '*' ) { if ( $reply > 0 ) { push( @bufs, { data => [], err_code => undef, chunks_cnt => $reply, } ); $bufs_num++; next; } elsif ( $reply == 0 ) { $reply = []; } else { undef $reply; } } elsif ( $type eq '-' ) { $err_code = E_OPRN_ERROR; if ( $reply =~ m/^([A-Z]{3,}) / ) { if ( exists $ERR_PREFS_MAP{$1} ) { $err_code = $ERR_PREFS_MAP{$1}; } } } else { $self->_disconnect( 'Unexpected reply received.', E_UNEXPECTED_DATA ); return; } } } while ( $bufs_num > 0 ) { my $curr_buf = $bufs[-1]; if ( defined $err_code ) { unless ( ref($reply) ) { $reply = AnyEvent::Redis::RipeRedis::Error->new( $reply, $err_code ); } $curr_buf->{err_code} = E_OPRN_ERROR; } push( @{ $curr_buf->{data} }, $reply ); if ( --$curr_buf->{chunks_cnt} > 0 ) { next MAIN; } $reply = $curr_buf->{data}; $err_code = $curr_buf->{err_code}; pop @bufs; $bufs_num--; } $self->_process_reply( $reply, $err_code ); } return; }; } sub _prepare_cmd { my $self = shift; my $kwd = shift; my $args = shift; my $cmd; if ( ref( $args->[-1] ) eq 'HASH' ) { $cmd = pop @{$args}; } else { $cmd = {}; if ( ref( $args->[-1] ) eq 'CODE' ) { if ( exists $SUB_CMDS{$kwd} ) { $cmd->{on_message} = pop @{$args}; } else { $cmd->{on_reply} = pop @{$args}; } } } $cmd->{kwd} = $kwd; $cmd->{args} = $args; return $cmd; } sub _execute_cmd { my $self = shift; my $cmd = shift; unless ( $self->{_ready_to_write} ) { if ( defined $self->{_handle} ) { if ( $self->{_connected} ) { if ( $self->{_auth_st} == S_IS_DONE ) { if ( $self->{_select_db_st} == S_NEED_PERFORM ) { $self->_select_db(); } } elsif ( $self->{_auth_st} == S_NEED_PERFORM ) { $self->_auth(); } } } elsif ( $self->{_lazy_conn_st} ) { $self->{_lazy_conn_st} = 0; $self->_connect(); } elsif ( $self->{reconnect} ) { if ( defined $self->{min_reconnect_interval} && $self->{min_reconnect_interval} > 0 ) { unless ( defined $self->{_reconnect_timer} ) { $self->{_reconnect_timer} = AE::timer( $self->{min_reconnect_interval}, 0, sub { undef $self->{_reconnect_timer}; $self->_connect(); } ); } } else { $self->_connect(); } } else { AE::postpone( sub { $self->_process_cmd_error( $cmd, "Operation \"$cmd->{kwd}\" aborted:" . ' No connection to the server.', E_NO_CONN ); } ); return; } push( @{ $self->{_input_queue} }, $cmd ); return; } $self->_push_write( $cmd ); return; } sub _push_write { my $self = shift; my $cmd = shift; my $cmd_str = ''; foreach my $token ( $cmd->{kwd}, @{ $cmd->{args} } ) { unless ( defined $token ) { $token = ''; } elsif ( defined $self->{encoding} && is_utf8( $token ) ) { $token = $self->{encoding}->encode( $token ); } $cmd_str .= '$' . length( $token ) . EOL . $token . EOL; } $cmd_str = '*' . ( scalar( @{ $cmd->{args} } ) + 1 ) . EOL . $cmd_str; my $handle = $self->{_handle}; if ( defined $self->{read_timeout} && !@{ $self->{_processing_queue} } ) { $handle->rtimeout_reset(); $handle->rtimeout( $self->{read_timeout} ); } push( @{ $self->{_processing_queue} }, $cmd ); $handle->push_write( $cmd_str ); return; } sub _auth { my $self = shift; weaken( $self ); $self->{_auth_st} = S_IN_PROGRESS; $self->_push_write( { kwd => 'auth', args => [ $self->{password} ], on_done => sub { $self->{_auth_st} = S_IS_DONE; if ( $self->{_select_db_st} == S_NEED_PERFORM ) { $self->_select_db(); } else { $self->{_ready_to_write} = 1; $self->_flush_input_queue(); } }, on_error => sub { $self->{_auth_st} = S_NEED_PERFORM; $self->_abort_all( @_ ); }, } ); return; } sub _select_db { my $self = shift; weaken( $self ); $self->{_select_db_st} = S_IN_PROGRESS; $self->_push_write( { kwd => 'select', args => [ $self->{database} ], on_done => sub { $self->{_select_db_st} = S_IS_DONE; $self->{_ready_to_write} = 1; $self->_flush_input_queue(); }, on_error => sub { $self->{_select_db_st} = S_NEED_PERFORM; $self->_abort_all( @_ ); }, } ); return; } sub _flush_input_queue { my $self = shift; $self->{_temp_queue} = $self->{_input_queue}; $self->{_input_queue} = []; while ( my $cmd = shift @{ $self->{_temp_queue} } ) { $self->_push_write( $cmd ); } return; } sub _process_reply { my $self = shift; my $reply = shift; my $err_code = shift; if ( defined $err_code ) { my $cmd = shift @{ $self->{_processing_queue} }; unless ( defined $cmd ) { $self->_disconnect( "Don't know how process error message. Processing queue is empty.", E_UNEXPECTED_DATA, ); return; } $self->_process_cmd_error( $cmd, ref($reply) ? ( "Operation \"$cmd->{kwd}\" completed with errors.", $err_code, $reply ) : $reply, $err_code ); } elsif ( $self->{_channel_cnt} > 0 && ref( $reply ) && exists $MSG_TYPES{ $reply->[0] } ) { my $cmd = $self->{_channels}{ $reply->[1] }; unless ( defined $cmd ) { $self->_disconnect( "Don't know how process published message." . " Unknown channel or pattern \"$reply->[1]\".", E_UNEXPECTED_DATA ); return; } $cmd->{on_message}->( $reply->[0] eq 'pmessage' ? @{$reply}[ 2, 3, 1 ] : @{$reply}[ 1, 2 ] ); } else { my $cmd = $self->{_processing_queue}[0]; unless ( defined $cmd ) { $self->_disconnect( "Don't know how process reply. Processing queue is empty.", E_UNEXPECTED_DATA ); return; } if ( !defined $cmd->{replies_cnt} || --$cmd->{replies_cnt} <= 0 ) { shift @{ $self->{_processing_queue} }; } $self->_process_cmd_success( $cmd, $reply ); } return; } sub _process_cmd_error { my $self = shift; my $cmd = shift; if ( $_[1] == E_NO_SCRIPT && defined $cmd->{script} ) { $cmd->{kwd} = 'eval'; $cmd->{args}[0] = delete $cmd->{script}; $self->_push_write( $cmd ); return; } if ( defined $cmd->{on_error} ) { $cmd->{on_error}->( @_ ); } elsif ( defined $cmd->{on_reply} ) { $cmd->{on_reply}->( @_[ 2, 0, 1 ] ); } else { $self->{on_error}->( @_ ); } return; } sub _process_cmd_success { my $self = shift; my $cmd = shift; my $reply = shift; if ( exists $NEED_POSTPROCESS{ $cmd->{kwd} } ) { my $kwd = $cmd->{kwd}; if ( exists $SUBUNSUB_CMDS{$kwd} ) { shift @{$reply}; if ( exists $SUB_CMDS{$kwd} ) { $self->{_channels}{ $reply->[0] } = $cmd; } else { # unsubscribe or punsubscribe delete $self->{_channels}{ $reply->[0] }; } $self->{_channel_cnt} = $reply->[1]; } elsif ( $kwd eq 'info' ) { $reply = $self->_parse_info( $reply ); } elsif ( $kwd eq 'select' ) { $self->{database} = $cmd->{args}[0]; } else { # quit $self->_disconnect(); } } if ( defined $cmd->{on_done} ) { $cmd->{on_done}->( $reply ); } elsif ( defined $cmd->{on_reply} ) { $cmd->{on_reply}->( $reply ); } return; } sub _parse_info { return { map { split( m/:/, $_, 2 ) } grep { m/^[^#]/ } split( EOL, $_[1] ) }; } sub _disconnect { my $self = shift; my $err_msg = shift; my $err_code = shift; my $was_connected = $self->{_connected}; if ( defined $self->{_handle} ) { $self->{_handle}->destroy(); undef $self->{_handle}; } $self->{_connected} = 0; $self->{_auth_st} = S_NEED_PERFORM; $self->{_select_db_st} = S_NEED_PERFORM; $self->{_ready_to_write} = 0; $self->{_txn_lock} = 0; $self->_abort_all( $err_msg, $err_code ); if ( $was_connected && defined $self->{on_disconnect} ) { $self->{on_disconnect}->(); } return; } sub _abort_all { my $self = shift; my $err_msg = shift; my $err_code = shift; my @unfin_cmds = ( @{ $self->{_processing_queue} }, @{ $self->{_temp_queue} }, @{ $self->{_input_queue} }, ); my %channels = %{ $self->{_channels} }; $self->{_input_queue} = []; $self->{_temp_queue} = []; $self->{_processing_queue} = []; $self->{_channels} = {}; $self->{_channel_cnt} = 0; if ( !defined $err_msg && @unfin_cmds ) { $err_msg = 'Connection closed by client prematurely.'; $err_code = E_CONN_CLOSED_BY_CLIENT; } if ( defined $err_msg ) { if ( defined $self->{on_connect_error} && $err_code == E_CANT_CONN ) { $self->{on_connect_error}->( $err_msg ); } else { $self->{on_error}->( $err_msg, $err_code ); } if ( %channels && $err_code != E_CONN_CLOSED_BY_CLIENT ) { foreach my $name ( keys %channels ) { my $cmd = $channels{$name}; $self->_process_cmd_error( $cmd, "Subscription \"$name\" lost: " . $err_msg, $err_code ); } } foreach my $cmd ( @unfin_cmds ) { $self->_process_cmd_error( $cmd, "Operation \"$cmd->{kwd}\" aborted: " . $err_msg, $err_code ); } } return; } sub AUTOLOAD { our $AUTOLOAD; my $method = $AUTOLOAD; $method =~ s/^.+:://; my ( $kwd, @args ) = split( m/_/, lc( $method ) ); my $sub = sub { my $self = shift; my $cmd = $self->_prepare_cmd( $kwd, [ @args, @_ ] ); $self->_execute_cmd( $cmd ); return; }; do { no strict 'refs'; *{$method} = $sub; }; goto &{$sub}; } sub DESTROY { my $self = shift; if ( defined $self->{_handle} ) { my @unfin_cmds = ( @{ $self->{_processing_queue} }, @{ $self->{_temp_queue} }, @{ $self->{_input_queue} }, ); foreach my $cmd ( @unfin_cmds ) { warn "Operation \"$cmd->{kwd}\" aborted:" . " Client object destroyed prematurely.\n"; } } return; } package AnyEvent::Redis::RipeRedis::Error; # Constructor sub new { my $class = shift; my $err_msg = shift; my $err_code = shift; my $self = bless {}, $class; $self->{message} = $err_msg; $self->{code} = $err_code; return $self; } sub message { my $self = shift; return $self->{message}; } sub code { my $self = shift; return $self->{code}; } 1; __END__ =head1 NAME AnyEvent::Redis::RipeRedis - DEPRECATED. Use AnyEvent::RipeRedis instead =head1 SYNOPSIS use AnyEvent; use AnyEvent::Redis::RipeRedis; my $cv = AE::cv(); my $redis = AnyEvent::Redis::RipeRedis->new( host => 'localhost', port => 6379, password => 'yourpass', ); $redis->incr( 'foo', sub { my $reply = shift; if (@_) { my $err_msg = shift; my $err_code = shift; warn "[$err_code] $err_msg\n"; return; } print "$reply\n"; } ); $redis->set( 'bar', 'string', { on_error => sub { my $err_msg = shift; my $err_code = shift; warn "[$err_code] $err_msg\n"; } } ); $redis->get( 'bar', { on_done => sub { my $reply = shift; print "$reply\n"; }, on_error => sub { my $err_msg = shift; my $err_code = shift; warn "[$err_code] $err_msg\n"; }, } ); $redis->quit( sub { my $reply = shift; if (@_) { my $err_msg = shift; my $err_code = shift; warn "[$err_code] $err_msg\n"; } $cv->send(); } ); $cv->recv(); =head1 DESCRIPTION MODULE IS DEPRECATED. Use L instead. The interface of L has several differences from interface of AnyEvent::Redis::RipeRedis. For more information see documentation. AnyEvent::Redis::RipeRedis is the flexible non-blocking Redis client with reconnect feature. The client supports subscriptions, transactions and connection via UNIX-socket. Requires Redis 1.2 or higher, and any supported event loop. =head1 CONSTRUCTOR =head2 new( %params ) my $redis = AnyEvent::Redis::RipeRedis->new( host => 'localhost', port => 6379, password => 'yourpass', database => 7, lazy => 1, connection_timeout => 5, read_timeout => 5, reconnect => 1, min_reconnect_interval => 5, encoding => 'utf8', on_connect => sub { # handling... }, on_disconnect => sub { # handling... }, on_connect_error => sub { my $err_msg = shift; # error handling... }, on_error => sub { my $err_msg = shift; my $err_code = shift; # error handling... }, ); =over =item host => $host Server hostname (default: 127.0.0.1) =item port => $port Server port (default: 6379) =item password => $password If the password is specified, the C command is sent to the server after connection. =item database => $index Database index. If the index is specified, the client is switched to the specified database after connection. You can also switch to another database after connection by using C