Sponsoring The Perl Toolchain Summit 2025: Help make this important event another success Learn more

#include <EXTERN.h>
#include <perl.h>
#include <XSUB.h>
#define NEED_newRV_noinc
#define NEED_sv_2pv_flags
#include "ppport.h"
#include "rdb_parser.h"
RDB_parser* rdb_parser__init(SV* master, SV* error_class, int utf8) {
RDB_parser *parser;
Newx(parser, 1, RDB_parser);
if (parser == NULL) {
croak("Couldn't allocate memory for parser");
}
if (SvROK(master)) {
parser->master = SvRV(master);
}
else {
parser->master = &PL_sv_undef;
}
parser->utf8 = utf8;
parser->callbacks = newAV();
parser->default_cb = NULL;
parser->mblk_reply = NULL;
parser->mblk_store = NULL;
parser->buffer = newSVpvn("", 0);
parser->state = RDBP_CLEAN;
parser->error_class = newSVsv(error_class);
parser->error_class_constructor = newSVsv(error_class);
sv_catpv(parser->error_class_constructor, "::new");
return parser;
}
void rdb_parser__free(RDB_parser *parser) {
struct rdbp_mblk_store *store, *next;
SvREFCNT_dec(parser->callbacks);
SvREFCNT_dec(parser->buffer);
SvREFCNT_dec(parser->error_class);
SvREFCNT_dec(parser->error_class_constructor);
if (parser->default_cb != NULL)
SvREFCNT_dec(parser->default_cb);
if (parser->mblk_reply != NULL)
SvREFCNT_dec(parser->mblk_reply);
store = parser->mblk_store;
while (store != NULL) {
next = store->next;
SvREFCNT_dec(store->mblk_reply);
Safefree(store);
store = next;
}
Safefree(parser);
}
void rdb_parser__propagate_reply(RDB_parser *parser, SV *reply) {
SV *cb;
while (1) {
if(av_len(parser->callbacks) >= 0) {
cb = av_shift(parser->callbacks);
sv_2mortal(cb);
}
else if (parser->default_cb != NULL) {
cb = parser->default_cb;
parser->default_cb = NULL;
}
else {
break;
}
{
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
XPUSHs(sv_2mortal(newRV_inc(parser->master)));
XPUSHs(sv_2mortal(newSVsv(reply)));
PUTBACK;
call_sv(cb, G_VOID|G_DISCARD);
FREETMPS;
LEAVE;
}
}
}
static
long _line_length(char *start, size_t length) {
int i;
char *pos = start;
for (i=0; i < length - 1; i++, pos++) {
if (*pos == '\r' && pos[1] == '\n') {
return (long)(pos - start);
}
}
return -1;
}
/*
* read line from the buffer and return it as SV
*/
static
SV* _read_line(SV *buffer) {
char *pv;
long len;
SV *line = NULL;
pv = SvPVX(buffer);
len = _line_length(pv, sv_len(buffer));
if (len >= 0) {
line = newSVpvn(pv, len);
sv_chop(buffer, pv + len + 2);
}
return line;
};
/*
* read line containing integer number
*/
static
SV* _read_number(SV *buffer) {
char *pv;
long len;
SV *num = NULL;
pv = SvPVX(buffer);
len = _line_length(pv, sv_len(buffer));
if (len >= 0) {
pv[len] = 0;
num = newSViv(atol(pv));
sv_chop(buffer, pv + len + 2);
}
return num;
}
/*
* read line containing length
* returns length >= -1, or -2 if line is not finished
*/
static
long _read_length(SV *buffer) {
char *pv;
long len;
long num = -2;
pv = SvPVX(buffer);
len = _line_length(pv, sv_len(buffer));
if (len >= 0) {
pv[len] = 0;
num = atol(pv);
sv_chop(buffer, pv + len + 2);
}
return num;
}
/*
* creates RedisDB::Parser::Error object from the message
*/
static
SV* _create_rdb_error(RDB_parser *parser, SV *msg) {
int count;
SV* err;
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
XPUSHs(parser->error_class);
XPUSHs(sv_2mortal(msg));
PUTBACK;
count = call_sv(parser->error_class_constructor, G_SCALAR);
if (count != 1)
croak("Expected single return value from new, but got many");
SPAGAIN;
err = newSVsv(POPs);
PUTBACK;
FREETMPS;
LEAVE;
return err;
}
/*
* stores current multibulk reply status in mblk_stack
*/
static
void _mblk_status_store(RDB_parser *parser) {
struct rdbp_mblk_store *store;
Newx(store, 1, struct rdbp_mblk_store);
store->mblk_reply = parser->mblk_reply;
parser->mblk_reply = NULL;
store->mblk_len = parser->mblk_len;
store->next = parser->mblk_store;
parser->mblk_store = store;
}
/*
* fetches status of the multibulk reply from the mblk_stack
*/
static
void _mblk_status_fetch(RDB_parser *parser) {
struct rdbp_mblk_store *store;
store = parser->mblk_store;
if (store == NULL)
croak("Already at the upper level of multi-bulk reply");
parser->mblk_len = store->mblk_len;
parser->mblk_reply = store->mblk_reply;
parser->mblk_store = store->next;
Safefree(store);
}
/*
* process new value of multi-bulk reply
*/
static
int _mblk_item(RDB_parser *parser, SV *value) {
SV *tmp;
int repeat = 0;
av_push(parser->mblk_reply, value);
if (parser->mblk_len > 1) {
parser->mblk_len--;
parser->state = RDBP_WAIT_BUCKS;
repeat = 1;
}
else if (parser->mblk_level > 1) {
parser->mblk_level--;
tmp = newRV_noinc((SV *)(parser->mblk_reply));
_mblk_status_fetch(parser);
repeat = _mblk_item(parser, tmp);
}
return repeat;
}
/*
* check if we finished with this reply, and invoke callback if needed.
* returns 1 if reply completed or 0 otherwise
*/
static
int _reply_completed(RDB_parser *parser, SV *value) {
SV *reply, *cb;
if (parser->mblk_level) {
if (_mblk_item(parser, value))
return 0;
reply = newRV_noinc((SV *)(parser->mblk_reply));
parser->mblk_reply = NULL;
}
else
reply = value;
parser->state = RDBP_CLEAN;
{
dSP;
ENTER;
SAVETMPS;
if (av_len(parser->callbacks) >= 0) {
cb = av_shift(parser->callbacks);
sv_2mortal(cb);
}
else if (parser->default_cb != NULL) {
cb = parser->default_cb;
}
else croak("No callbacks in the queue and no default callback set");
PUSHMARK(SP);
XPUSHs(sv_2mortal(newRV_inc(parser->master)));
XPUSHs(sv_2mortal(reply));
PUTBACK;
call_sv(cb, G_VOID|G_DISCARD);
FREETMPS;
LEAVE;
}
return 1;
}
int rdb_parser__parse_reply(RDB_parser *parser) {
char op;
char *pv;
SV *line, *err, *bulk, *mblk;
long length;
if (sv_len(parser->buffer) == 0) return 0;
if (parser->state == RDBP_CLEAN) {
parser->mblk_level = 0;
/* remove first character from the buffer */
pv = SvPVX(parser->buffer);
op = *pv;
sv_chop(parser->buffer, pv + 1);
if (op == '+')
parser->state = RDBP_READ_LINE;
else if (op == '-')
parser->state = RDBP_READ_ERROR;
else if (op == ':')
parser->state = RDBP_READ_NUMBER;
else if (op == '$')
parser->state = RDBP_READ_BULK_LEN;
else if (op == '*') {
parser->state = RDBP_READ_MBLK_LEN;
parser->mblk_level = 1;
}
else {
croak("Got invalid reply");
}
}
while (1) {
if (sv_len(parser->buffer) < 2) return 0;
if (parser->state == RDBP_READ_LINE) {
line = _read_line(parser->buffer);
if (line == NULL) return 0;
if (_reply_completed(parser, line)) return 1;
}
else if (parser->state == RDBP_READ_ERROR) {
line = _read_line(parser->buffer);
if (line == NULL) return 0;
err = _create_rdb_error(parser, line);
if (_reply_completed(parser, err)) return 1;
}
else if (parser->state == RDBP_READ_NUMBER) {
line = _read_number(parser->buffer);
if (line == NULL) return 0;
if (_reply_completed(parser, line)) return 1;
}
else if (parser->state == RDBP_READ_BULK_LEN) {
length = _read_length(parser->buffer);
if (length >= 0) {
parser->state = RDBP_READ_BULK;
parser->bulk_len = length;
}
else if (length == -1) {
if (_reply_completed(parser, newSVpvn(NULL, 0))) return 1;
}
else return 0;
}
else if (parser->state == RDBP_READ_BULK) {
if (sv_len(parser->buffer) < 2 + parser->bulk_len) return 0;
pv = SvPVX(parser->buffer);
bulk = newSVpvn(pv, parser->bulk_len);
sv_chop(parser->buffer, pv + parser->bulk_len + 2);
if (parser->utf8) {
if (!sv_utf8_decode(bulk))
croak("Received invalid UTF-8 string from the server");
}
if (_reply_completed(parser, bulk)) return 1;
}
else if (parser->state == RDBP_READ_MBLK_LEN) {
length = _read_length(parser->buffer);
if (length > 0) {
parser->mblk_len = length;
parser->state = RDBP_WAIT_BUCKS;
parser->mblk_reply = newAV();
}
else if (length == 0 || length == -1) {
mblk = (length == 0) ? newRV_noinc((SV *)newAV()) : newSVpvn(NULL, 0);
parser->mblk_level--;
if (parser->mblk_level > 0)
_mblk_status_fetch(parser);
if (_reply_completed(parser, mblk)) return 1;
}
else return 0;
}
else if (parser->state == RDBP_WAIT_BUCKS) {
/* remove first character from the buffer */
pv = SvPVX(parser->buffer);
op = *pv;
sv_chop(parser->buffer, pv + 1);
if (op == '$') parser->state = RDBP_READ_BULK_LEN;
else if (op == ':') parser->state = RDBP_READ_NUMBER;
else if (op == '+') parser->state = RDBP_READ_LINE;
else if (op == '-') parser->state = RDBP_READ_ERROR;
else if (op == '*') {
parser->state = RDBP_READ_MBLK_LEN;
parser->mblk_level++;
_mblk_status_store(parser);
}
else croak("Invalid multi-bulk reply. Expected [$:+-*] but got something else");
}
}
return 0;
}