package App::Memcached::CLI::DataSource; use strict; use warnings; use 5.008_001; use Carp; use IO::Socket; use Time::HiRes; use App::Memcached::CLI::Util qw(is_unixsocket debug); sub new { my $class = shift; my %args = @_; bless \%args, $class; } sub connect { my $class = shift; my $addr = shift; my %opts = @_; my $socket = sub { return IO::Socket::UNIX->new(Peer => $addr) if is_unixsocket($addr); return IO::Socket::INET->new( PeerAddr => $addr, Proto => 'tcp', Timeout => $opts{timeout} || 1, ); }->(); confess "Can't connect to $addr" unless $socket; return $class->new(socket => $socket); } sub ping { my $self = shift; my $version = eval { return $self->query_one('version'); }; if (!$version or $@) { debug "Ping failed."; debug "ERROR: " . $@ if $@; return; } return 1; } sub get { my $self = shift; return $self->_retrieve('get', shift); } sub gets { my $self = shift; return $self->_retrieve('gets', shift); } sub _retrieve { my $self = shift; my ($cmd, $keys) = @_; my $key_str = join(q{ }, @$keys); $self->{socket}->write("$cmd $key_str\r\n"); my @results; while (1) { my $response = $self->_readline; next if ($response =~ m/^[\r\n]+$/); if ($response =~ m/^VALUE (\S+) (\d+) (\d+)(?: (\d+))?/) { my %data = ( key => $1, flags => $2, length => $3, cas => $4, ); local $SIG{ALRM} = sub { die 'Timed out to Read Socket.' }; alarm 3; $self->{socket}->read($response, $data{length}); alarm 0; $data{value} = $response; push @results, \%data; } elsif ($response =~ m/^END/) { last; } else { warn "Unknown response '$response'"; } } return \@results; } sub set { return &_store(shift, 'set', @_); } sub add { return &_store(shift, 'add', @_); } sub replace { return &_store(shift, 'replace', @_); } sub append { return &_modify(shift, 'append', @_); } sub prepend { return &_modify(shift, 'prepend', @_); } sub _modify { my $self = shift; my ($cmd, $key, $value) = @_; return $self->_store($cmd, $key, $value); } sub _store { my $self = shift; my $cmd = shift; my $key = shift; my $value = shift; my %option = @_; my $flags = $option{flags} || 0; my $expire = $option{expire} || 0; my $bytes = sub { use bytes; return length $value; }->(); $self->{socket}->write("$cmd $key $flags $expire $bytes\r\n"); $self->{socket}->write("$value\r\n"); my $response = eval { return $self->_readline; }; if ($@) { confess qq{Failed to store data by "$cmd"! ($key, $value) ERROR: } . $@; } if ($response !~ m/^STORED/) { debug qq{Failed to $cmd data as ($key, $value)}; return; } return 1; } sub cas { my $self = shift; my $key = shift; my $value = shift; my $cas = shift; my %option = @_; my $flags = $option{flags} || 0; my $expire = $option{expire} || 0; my $bytes = sub { use bytes; return length $value; }->(); $self->{socket}->write("cas $key $flags $expire $bytes $cas\r\n"); $self->{socket}->write("$value\r\n"); my $response = eval { return $self->_readline; }; if ($@) { confess qq{Failed to store data by "cas"! ($key, $value) ERROR: } . $@; } if ($response !~ m/^STORED/) { debug qq{Failed to set data as ($key, $value) with cas $cas}; return; } return 1; } sub delete { my $self = shift; my $key = shift; my $response = $self->query_one("delete $key"); if ($response !~ m/^DELETED/) { warn "Failed to delete '$key'"; return; } return 1; } sub touch { my $self = shift; my $key = shift; my $expire = shift; my $response = $self->query_one("touch $key $expire"); if ($response =~ m/^NOT_FOUND/) { debug "No such data KEY '$key'"; return; } elsif ($response !~ m/^TOUCHED/) { warn "Failed to touch '$key' with EXPIRE '$expire'. RES: $response"; return; } return 1; } sub incr { return &_incr_decr(shift, 'incr', @_); } sub decr { return &_incr_decr(shift, 'decr', @_); } sub _incr_decr { my $self = shift; my $cmd = shift; my $key = shift; my $number = shift; my $response = $self->query_one("$cmd $key $number"); if ($response =~ m/^NOT_FOUND/) { warn "No such data KEY '$key'"; return; } elsif ($response !~ m/^(\d+)/) { warn "Failed to $cmd '$key' by number '$number'. RES: $response"; return; } my $new_value = $1; return $new_value; } sub query_one { my $self = shift; my $query = shift; $self->{socket}->write("$query\r\n"); my $response = eval { return $self->_readline; }; if ($@) { confess "Failed to query! query: $query ERROR: " . $@; } chomp $response if $response; return $response; } sub query_any { my $self = shift; my $query = shift; $self->{socket}->write("$query\r\n"); # Save blocking mode my $blocking_mode = $self->{socket}->blocking; my $response = eval { local $SIG{ALRM} = sub { die 'Timed out to Read Socket.' }; alarm 5; my $resp = q{}; $self->{socket}->blocking(0); my $getline_from_sock = sub { for my $i (1..3) { my $line = $self->{socket}->getline; return $line if defined $line; #debug "failed to getline - $i. query: $query"; Time::HiRes::sleep(0.01); } return; }; while (my $line = $getline_from_sock->()) { $resp .= $line; } alarm 0; return $resp; }; my $err = $@; # Restore blocking mode $self->{socket}->blocking($blocking_mode); if ($err) { confess "Failed to query! query: $query ERROR: " . $err; } return $response; } sub query { my $self = shift; my $query = shift; my $response = eval { return $self->_query($query); }; if ($@) { confess "Failed to query! query: $query ERROR: " . $@; } return $response; } sub _query { my $self = shift; my $query = shift; $self->{socket}->write("$query\r\n"); my @response; while (1) { my $line = $self->_readline; $line =~ s/[\r\n]+$//; last if ($line =~ m/^(OK|END)/); die $line if ($line =~ m/^(CLIENT|SERVER_)?ERROR/); push @response, $line; } return \@response; } sub _readline { my $self = shift; local $SIG{ALRM} = sub { die 'Timed out to Read Socket.' }; alarm 3; my $line = $self->{socket}->getline; alarm 0; return $line; } sub DESTROY { my $self = shift; if ($self->{socket}) { $self->{socket}->close; } } 1; __END__ =encoding utf-8 =head1 NAME App::Memcached::CLI::DataSource - Data Access Interface of Memcached server =head1 SYNOPSIS use App::Memcached::CLI::DataSource; my $ds = App::Memcached::CLI::DataSource->connect( $params{addr}, timeout => $params{timeout} ); my $stats = $ds->query('stats'); =head1 DESCRIPTION This provides data access interface for Memcached server. =head1 LICENSE Copyright (C) YASUTAKE Kiyoshi. This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself. =head1 AUTHOR YASUTAKE Kiyoshi E<lt>yasutake.kiyoshi@gmail.comE<gt> =cut