#include <EXTERN.h>
#include <perl.h>
#include <XSUB.h>
#define NEED_newRV_noinc
#define NEED_sv_2pv_flags
#include "ppport.h"
#include "rdb_parser.h"
RDB_parser* rdb_parser__init(SV* master, SV* error_class,
int
utf8) {
RDB_parser *parser;
Newx(parser, 1, RDB_parser);
if
(parser == NULL) {
croak(
"Couldn't allocate memory for parser"
);
}
if
(SvROK(master)) {
parser->master = SvRV(master);
}
else
{
parser->master = &PL_sv_undef;
}
parser->utf8 = utf8;
parser->callbacks = newAV();
parser->default_cb = NULL;
parser->mblk_reply = NULL;
parser->mblk_store = NULL;
parser->buffer = newSVpvn(
""
, 0);
parser->state = RDBP_CLEAN;
parser->error_class = newSVsv(error_class);
parser->error_class_constructor = newSVsv(error_class);
sv_catpv(parser->error_class_constructor,
"::new"
);
return
parser;
}
void
rdb_parser__free(RDB_parser *parser) {
struct
rdbp_mblk_store *store, *next;
SvREFCNT_dec(parser->callbacks);
SvREFCNT_dec(parser->buffer);
SvREFCNT_dec(parser->error_class);
SvREFCNT_dec(parser->error_class_constructor);
if
(parser->default_cb != NULL)
SvREFCNT_dec(parser->default_cb);
if
(parser->mblk_reply != NULL)
SvREFCNT_dec(parser->mblk_reply);
store = parser->mblk_store;
while
(store != NULL) {
next = store->next;
SvREFCNT_dec(store->mblk_reply);
Safefree(store);
store = next;
}
Safefree(parser);
}
void
rdb_parser__propagate_reply(RDB_parser *parser, SV *reply) {
SV *cb;
while
(1) {
if
(av_len(parser->callbacks) >= 0) {
cb = av_shift(parser->callbacks);
sv_2mortal(cb);
}
else
if
(parser->default_cb != NULL) {
cb = parser->default_cb;
parser->default_cb = NULL;
}
else
{
break
;
}
{
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
XPUSHs(sv_2mortal(newRV_inc(parser->master)));
XPUSHs(sv_2mortal(newSVsv(reply)));
PUTBACK;
call_sv(cb, G_VOID|G_DISCARD);
FREETMPS;
LEAVE;
}
}
}
static
long
_line_length(
char
*start,
size_t
length) {
int
i;
char
*pos = start;
for
(i=0; i < length - 1; i++, pos++) {
if
(*pos ==
'\r'
&& pos[1] ==
'\n'
) {
return
(
long
)(pos - start);
}
}
return
-1;
}
static
SV* _read_line(SV *buffer) {
char
*pv;
long
len;
SV *line = NULL;
pv = SvPVX(buffer);
len = _line_length(pv, sv_len(buffer));
if
(len >= 0) {
line = newSVpvn(pv, len);
sv_chop(buffer, pv + len + 2);
}
return
line;
};
static
SV* _read_number(SV *buffer) {
char
*pv;
long
len;
SV *num = NULL;
pv = SvPVX(buffer);
len = _line_length(pv, sv_len(buffer));
if
(len >= 0) {
pv[len] = 0;
num = newSViv(
atol
(pv));
sv_chop(buffer, pv + len + 2);
}
return
num;
}
static
long
_read_length(SV *buffer) {
char
*pv;
long
len;
long
num = -2;
pv = SvPVX(buffer);
len = _line_length(pv, sv_len(buffer));
if
(len >= 0) {
pv[len] = 0;
num =
atol
(pv);
sv_chop(buffer, pv + len + 2);
}
return
num;
}
static
SV* _create_rdb_error(RDB_parser *parser, SV *msg) {
int
count;
SV* err;
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
XPUSHs(parser->error_class);
XPUSHs(sv_2mortal(msg));
PUTBACK;
count = call_sv(parser->error_class_constructor, G_SCALAR);
if
(count != 1)
croak(
"Expected single return value from new, but got many"
);
SPAGAIN;
err = newSVsv(POPs);
PUTBACK;
FREETMPS;
LEAVE;
return
err;
}
static
void
_mblk_status_store(RDB_parser *parser) {
struct
rdbp_mblk_store *store;
Newx(store, 1,
struct
rdbp_mblk_store);
store->mblk_reply = parser->mblk_reply;
parser->mblk_reply = NULL;
store->mblk_len = parser->mblk_len;
store->next = parser->mblk_store;
parser->mblk_store = store;
}
static
void
_mblk_status_fetch(RDB_parser *parser) {
struct
rdbp_mblk_store *store;
store = parser->mblk_store;
if
(store == NULL)
croak(
"Already at the upper level of multi-bulk reply"
);
parser->mblk_len = store->mblk_len;
parser->mblk_reply = store->mblk_reply;
parser->mblk_store = store->next;
Safefree(store);
}
static
int
_mblk_item(RDB_parser *parser, SV *value) {
SV *tmp;
int
repeat = 0;
av_push(parser->mblk_reply, value);
if
(parser->mblk_len > 1) {
parser->mblk_len--;
parser->state = RDBP_WAIT_BUCKS;
repeat = 1;
}
else
if
(parser->mblk_level > 1) {
parser->mblk_level--;
tmp = newRV_noinc((SV *)(parser->mblk_reply));
_mblk_status_fetch(parser);
repeat = _mblk_item(parser, tmp);
}
return
repeat;
}
static
int
_reply_completed(RDB_parser *parser, SV *value) {
SV *reply, *cb;
if
(parser->mblk_level) {
if
(_mblk_item(parser, value))
return
0;
reply = newRV_noinc((SV *)(parser->mblk_reply));
parser->mblk_reply = NULL;
}
else
reply = value;
parser->state = RDBP_CLEAN;
{
dSP;
ENTER;
SAVETMPS;
if
(av_len(parser->callbacks) >= 0) {
cb = av_shift(parser->callbacks);
sv_2mortal(cb);
}
else
if
(parser->default_cb != NULL) {
cb = parser->default_cb;
}
else
croak(
"No callbacks in the queue and no default callback set"
);
PUSHMARK(SP);
XPUSHs(sv_2mortal(newRV_inc(parser->master)));
XPUSHs(sv_2mortal(reply));
PUTBACK;
call_sv(cb, G_VOID|G_DISCARD);
FREETMPS;
LEAVE;
}
return
1;
}
int
rdb_parser__parse_reply(RDB_parser *parser) {
char
op;
char
*pv;
SV *line, *err, *bulk, *mblk;
long
length;
if
(sv_len(parser->buffer) == 0)
return
0;
if
(parser->state == RDBP_CLEAN) {
parser->mblk_level = 0;
pv = SvPVX(parser->buffer);
op = *pv;
sv_chop(parser->buffer, pv + 1);
if
(op ==
'+'
)
parser->state = RDBP_READ_LINE;
else
if
(op ==
'-'
)
parser->state = RDBP_READ_ERROR;
else
if
(op ==
':'
)
parser->state = RDBP_READ_NUMBER;
else
if
(op ==
'$'
)
parser->state = RDBP_READ_BULK_LEN;
else
if
(op ==
'*'
) {
parser->state = RDBP_READ_MBLK_LEN;
parser->mblk_level = 1;
}
else
{
croak(
"Got invalid reply"
);
}
}
while
(1) {
if
(sv_len(parser->buffer) < 2)
return
0;
if
(parser->state == RDBP_READ_LINE) {
line = _read_line(parser->buffer);
if
(line == NULL)
return
0;
if
(_reply_completed(parser, line))
return
1;
}
else
if
(parser->state == RDBP_READ_ERROR) {
line = _read_line(parser->buffer);
if
(line == NULL)
return
0;
err = _create_rdb_error(parser, line);
if
(_reply_completed(parser, err))
return
1;
}
else
if
(parser->state == RDBP_READ_NUMBER) {
line = _read_number(parser->buffer);
if
(line == NULL)
return
0;
if
(_reply_completed(parser, line))
return
1;
}
else
if
(parser->state == RDBP_READ_BULK_LEN) {
length = _read_length(parser->buffer);
if
(length >= 0) {
parser->state = RDBP_READ_BULK;
parser->bulk_len = length;
}
else
if
(length == -1) {
if
(_reply_completed(parser, newSVpvn(NULL, 0)))
return
1;
}
else
return
0;
}
else
if
(parser->state == RDBP_READ_BULK) {
if
(sv_len(parser->buffer) < 2 + parser->bulk_len)
return
0;
pv = SvPVX(parser->buffer);
bulk = newSVpvn(pv, parser->bulk_len);
sv_chop(parser->buffer, pv + parser->bulk_len + 2);
if
(parser->utf8) {
if
(!sv_utf8_decode(bulk))
croak(
"Received invalid UTF-8 string from the server"
);
}
if
(_reply_completed(parser, bulk))
return
1;
}
else
if
(parser->state == RDBP_READ_MBLK_LEN) {
length = _read_length(parser->buffer);
if
(length > 0) {
parser->mblk_len = length;
parser->state = RDBP_WAIT_BUCKS;
parser->mblk_reply = newAV();
}
else
if
(length == 0 || length == -1) {
mblk = (length == 0) ? newRV_noinc((SV *)newAV()) : newSVpvn(NULL, 0);
parser->mblk_level--;
if
(parser->mblk_level > 0)
_mblk_status_fetch(parser);
if
(_reply_completed(parser, mblk))
return
1;
}
else
return
0;
}
else
if
(parser->state == RDBP_WAIT_BUCKS) {
pv = SvPVX(parser->buffer);
op = *pv;
sv_chop(parser->buffer, pv + 1);
if
(op ==
'$'
) parser->state = RDBP_READ_BULK_LEN;
else
if
(op ==
':'
) parser->state = RDBP_READ_NUMBER;
else
if
(op ==
'+'
) parser->state = RDBP_READ_LINE;
else
if
(op ==
'-'
) parser->state = RDBP_READ_ERROR;
else
if
(op ==
'*'
) {
parser->state = RDBP_READ_MBLK_LEN;
parser->mblk_level++;
_mblk_status_store(parser);
}
else
croak(
"Invalid multi-bulk reply. Expected [$:+-*] but got something else"
);
}
}
return
0;
}