Sponsoring The Perl Toolchain Summit 2025: Help make this important event another success Learn more

use strict;
our $VERSION = "2.23";
$VERSION = eval $VERSION;
=head1 NAME
RedisDB::Parser::PP - redis protocol parser for RedisDB
=head1 DESCRIPTION
Pure Perl implementation of L<RedisDB::Parser>. You should not use this
module directly. See details in L<RedisDB::Parser> documentation.
=cut
use Encode qw();
use Carp;
use Scalar::Util qw(weaken);
sub new {
my ( $class, %params ) = @_;
my $self = {
utf8 => $params{utf8},
master => $params{master},
_error_class => $params{error_class} || "RedisDB::Parser::Error",
_default_cb => $params{default_callback},
_callbacks => [],
_buffer => '',
};
weaken( $self->{master} );
return bless $self, $class;
}
sub build_request {
my $self = shift;
my $nargs = @_;
my $req = "*$nargs\015\012";
if ( $self->{utf8} ) {
$req .= '$' . length($_) . "\015\012" . $_ . "\015\012"
for map { Encode::encode( 'UTF-8', $_, Encode::FB_CROAK | Encode::LEAVE_SRC ) } @_;
}
else {
use bytes;
$req .= '$' . length($_) . "\015\012" . $_ . "\015\012" for @_;
}
return $req;
}
sub push_callback {
my ( $self, $cb ) = @_;
push @{ $self->{_callbacks} }, $cb;
}
sub set_default_callback {
my ( $self, $cb ) = @_;
$self->{_default_cb} = $cb;
}
sub callbacks {
scalar @{ shift->{_callbacks} };
}
sub propagate_reply {
my ( $self, $reply ) = @_;
$self->{_default_cb}->( $self->{master}, $reply ) if $self->{_default_cb};
while ( my $cb = shift @{ $self->{_callbacks} } ) {
$cb->( $self->{master}, $reply );
}
return;
}
sub parse {
my ( $self, $data ) = @_;
$self->{_buffer} .= $data;
my $cnt = 0;
$cnt++ while length $self->{_buffer} and $self->_parse_reply;
return $cnt;
}
# $self->_parse_reply
#
# checks if buffer contains full reply. Returns 1 if it is,
# invokes callback for the reply
my ( $READ_LINE, $READ_ERROR, $READ_NUMBER, $READ_BULK_LEN, $READ_BULK, $READ_MBLK_LEN,
$WAIT_BUCKS ) = 1 .. 7;
sub _parse_reply {
my $self = shift;
return unless length $self->{_buffer};
# if we not yet started parsing reply
unless ( $self->{_parse_state} ) {
my $type = substr( $self->{_buffer}, 0, 1, '' );
delete $self->{_parse_mblk_level};
if ( $type eq '+' ) {
$self->{_parse_state} = $READ_LINE;
}
elsif ( $type eq '-' ) {
$self->{_parse_state} = $READ_ERROR;
}
elsif ( $type eq ':' ) {
$self->{_parse_state} = $READ_NUMBER;
}
elsif ( $type eq '$' ) {
$self->{_parse_state} = $READ_BULK_LEN;
}
elsif ( $type eq '*' ) {
$self->{_parse_state} = $READ_MBLK_LEN;
$self->{_parse_mblk_level} = 1;
}
else {
die "Got invalid reply: $type$self->{_buffer}";
}
}
# parse data
while (1) {
return unless length $self->{_buffer} >= 2;
if ( $self->{_parse_state} == $READ_LINE ) {
return unless defined( my $line = $self->_read_line );
return 1 if $self->_reply_completed($line);
}
elsif ( $self->{_parse_state} == $READ_ERROR ) {
return unless defined( my $line = $self->_read_line );
my $err = $self->{_error_class}->new($line);
return 1 if $self->_reply_completed($err);
}
elsif ( $self->{_parse_state} == $READ_NUMBER ) {
return unless defined( my $line = $self->_read_line );
die "Received invalid integer reply :$line" unless $line =~ /^-?[0-9]+$/;
return 1 if $self->_reply_completed( 0 + $line );
}
elsif ( $self->{_parse_state} == $READ_BULK_LEN ) {
return unless defined( my $len = $self->_read_line );
if ( $len >= 0 ) {
$self->{_parse_state} = $READ_BULK;
$self->{_parse_bulk_len} = $len;
}
elsif ( $len == -1 ) {
return 1 if $self->_reply_completed(undef);
}
}
elsif ( $self->{_parse_state} == $READ_BULK ) {
return unless length $self->{_buffer} >= 2 + $self->{_parse_bulk_len};
my $bulk = substr( $self->{_buffer}, 0, $self->{_parse_bulk_len}, '' );
substr $self->{_buffer}, 0, 2, '';
if ( $self->{utf8} ) {
try {
$bulk = Encode::decode( 'UTF-8', $bulk, Encode::FB_CROAK | Encode::LEAVE_SRC );
}
catch {
confess "Couldn't decode reply from the server, invalid UTF-8: '$bulk'";
};
}
return 1 if $self->_reply_completed($bulk);
}
elsif ( $self->{_parse_state} == $READ_MBLK_LEN ) {
return unless defined( my $len = $self->_read_line );
if ( $len > 0 ) {
$self->{_parse_mblk_len} = $len;
$self->{_parse_state} = $WAIT_BUCKS;
$self->{_parse_reply} = [];
}
elsif ( $len == 0 || $len == -1 ) {
$self->{_parse_mblk_level}--;
return 1 if $self->_reply_completed( $len ? undef : [] );
}
else {
die "Invalid multi-bulk reply: *$len\015\012$self->{_buffer}";
}
}
elsif ( $self->{_parse_state} == $WAIT_BUCKS ) {
my $char = substr( $self->{_buffer}, 0, 1, '' );
if ( $char eq '$' ) {
$self->{_parse_state} = $READ_BULK_LEN;
}
elsif ( $char eq ':' ) {
$self->{_parse_state} = $READ_NUMBER;
}
elsif ( $char eq '+' ) {
$self->{_parse_state} = $READ_LINE;
}
elsif ( $char eq '-' ) {
$self->{_parse_state} = $READ_ERROR;
}
elsif ( $char eq '*' ) {
$self->{_parse_state} = $READ_MBLK_LEN;
$self->{_parse_mblk_level}++;
push @{ $self->{_parse_mblk_store} },
[ $self->{_parse_mblk_len}, $self->{_parse_reply} ];
}
else {
die "Invalid multi-bulk reply. Expected [\$:+-*] but got $char";
}
}
}
return;
}
sub _read_line {
my $self = shift;
my $pos = index $self->{_buffer}, "\015\012";
my $line;
if ( $pos >= 0 ) {
# Got end of the line, add all stuff before \r\n
# to the reply string. Strip \r\n from the buffer
$line = substr( $self->{_buffer}, 0, $pos, '' );
substr $self->{_buffer}, 0, 2, '';
}
return $line;
}
sub _mblk_item {
my ( $self, $value ) = @_;
push @{ $self->{_parse_reply} }, $value;
my $repeat;
if ( --$self->{_parse_mblk_len} ) {
$self->{_parse_state} = $WAIT_BUCKS;
$repeat = 1;
}
elsif ( --$self->{_parse_mblk_level} ) {
my $res = $self->{_parse_reply};
( $self->{_parse_mblk_len}, $self->{_parse_reply} ) =
@{ pop @{ $self->{_parse_mblk_store} } };
$repeat = $self->_mblk_item($res);
}
else {
$repeat = 0;
}
return $repeat;
}
sub _reply_completed {
my ( $self, $reply ) = @_;
if ( $self->{_parse_mblk_level} ) {
return if $self->_mblk_item($reply);
$reply = delete $self->{_parse_reply};
}
$self->{_parse_state} = undef;
my $cb = shift( @{ $self->{_callbacks} } ) || $self->{_default_cb};
$cb->( $self->{master}, $reply );
return 1;
}
1;
__END__
=head1 SEE ALSO
L<RedisDB::Parser>
=head1 AUTHOR
Pavel Shaydo, C<< <zwon at cpan.org> >>
=head1 LICENSE AND COPYRIGHT
Copyright 2011-2015,2018 Pavel Shaydo.
This program is free software; you can redistribute it and/or modify it
under the terms of either: the GNU General Public License as published
by the Free Software Foundation; or the Artistic License.
See http://dev.perl.org/licenses/ for more information.
=cut