#define PERL_NO_GET_CONTEXT
#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"
#include "ppport.h"
#include <stdint.h>
#include "define.h"
#include "type.h"
#include "proto.h"
#include "decode.h"
#include "encode.h"
typedef struct {
int column_count;
int uniq_column_count;
struct cc_column *columns;
} Cassandra__Client__RowMeta;
MODULE = Cassandra::Client PACKAGE = Cassandra::Client::Protocol
PROTOTYPES: DISABLE
void
unpack_metadata(protocol_version, is_result, data)
int protocol_version
int is_result
SV *data
PPCODE:
STRLEN pos, size;
unsigned char *ptr;
int32_t flags, column_count, uniq_column_count;
Cassandra__Client__RowMeta *row_meta;
ST(0) = &PL_sv_undef; /* Will have our RowMeta instance */
ST(1) = &PL_sv_undef; /* Will have our paging state */
ptr = (unsigned char*)SvPV(data, size);
pos = 0;
if (UNLIKELY(!ptr))
croak("Missing data argument to unpack_metadata");
if (UNLIKELY(protocol_version != 3 && protocol_version != 4))
croak("Invalid protocol version");
flags = unpack_int(aTHX_ ptr, size, &pos);
column_count = unpack_int(aTHX_ ptr, size, &pos);
if (protocol_version >= 4 && !is_result) {
int i, pk_count;
pk_count = unpack_int(aTHX_ ptr, size, &pos);
if (UNLIKELY(pk_count < 0))
croak("Protocol error: pk_count<0");
for (i = 0; i < pk_count; i++) {
// Read the short, but throw it away for now.
unpack_short(aTHX_ ptr, size, &pos);
}
}
if (UNLIKELY(flags < 0 || flags > 7))
croak("Invalid protocol data passed to unpack_metadata (reason: invalid flags)");
if (UNLIKELY(column_count < 0))
croak("Invalid protocol data passed to unpack_metadata (reason: invalid column count)");
if (flags & CC_METADATA_FLAG_HAS_MORE_PAGES) {
ST(1) = unpack_bytes_sv(aTHX_ ptr, size, &pos);
sv_2mortal(ST(1));
}
if (!(flags & CC_METADATA_FLAG_NO_METADATA)) {
int i, have_global_spec;
SV *global_keyspace, *global_table;
HV *name_hash;
have_global_spec = flags & CC_METADATA_FLAG_GLOBAL_TABLES_SPEC;
if (have_global_spec) {
global_keyspace = unpack_string_sv(aTHX_ ptr, size, &pos);
sv_2mortal(global_keyspace);
global_table = unpack_string_sv(aTHX_ ptr, size, &pos);
sv_2mortal(global_table);
}
Newxz(row_meta, 1, Cassandra__Client__RowMeta);
ST(0) = sv_newmortal();
sv_setref_pv(ST(0), "Cassandra::Client::RowMetaPtr", (void*)row_meta);
if (UNLIKELY(column_count > size))
croak("Invalid protocol data passed to unpack_metadata (reason: column count unlikely)");
row_meta->column_count = column_count;
Newxz(row_meta->columns, column_count, struct cc_column);
name_hash = (HV*)sv_2mortal( (SV*)newHV() );
uniq_column_count = 0;
for (i = 0; i < column_count; i++) {
struct cc_column *column = &(row_meta->columns[i]);
if (have_global_spec) {
column->keyspace = global_keyspace;
SvREFCNT_inc(column->keyspace);
column->table = global_table;
SvREFCNT_inc(column->table);
} else {
column->keyspace = unpack_string_sv(aTHX_ ptr, size, &pos);
column->table = unpack_string_sv(aTHX_ ptr, size, &pos);
}
column->name = unpack_string_sv_hash(aTHX_ ptr, size, &pos, &column->name_hash);
unpack_type(aTHX_ ptr, size, &pos, &column->type);
if (!hv_exists_ent(name_hash, column->name, column->name_hash)) {
uniq_column_count++;
hv_store_ent(name_hash, column->name, &PL_sv_undef, column->name_hash);
}
}
row_meta->uniq_column_count = uniq_column_count;
}
sv_chop(data, (char*)ptr+pos);
XSRETURN(2);
MODULE = Cassandra::Client PACKAGE = Cassandra::Client::RowMetaPtr
AV*
decode(self, data, use_hashes)
Cassandra::Client::RowMeta *self
SV *data
int use_hashes
CODE:
STRLEN size, pos;
unsigned char *ptr;
int32_t row_count;
int i, j, col_count;
struct cc_column *columns;
RETVAL = newAV();
sv_2mortal((SV*)RETVAL); /* work around a bug in perl */
ptr = (unsigned char*)SvPV(data, size);
pos = 0;
if (UNLIKELY(!ptr))
croak("Invalid input to decode()");
col_count = self->column_count;
columns = self->columns;
row_count = unpack_int(aTHX_ ptr, size, &pos);
/* This came up while fuzzing: when we have 1000000 rows but no columns, we
* just flood the memory with empty arrays/hashes. Let's just reject this
* corner case. If you need this, please contact the author! */
if (UNLIKELY(row_count > 1000 && !col_count))
croak("Refusing to decode %d rows without known column information", row_count);
for (i = 0; i < row_count; i++) {
if (use_hashes) {
HV *this_row = newHV();
av_push(RETVAL, newRV_noinc((SV*)this_row));
for (j = 0; j < col_count; j++) {
SV *decoded = newSV(0);
hv_store_ent(this_row, columns[j].name, decoded, columns[j].name_hash);
decode_cell(aTHX_ ptr, size, &pos, &columns[j].type, decoded);
}
} else {
AV *this_row = newAV();
av_push(RETVAL, newRV_noinc((SV*)this_row));
for (j = 0; j < col_count; j++) {
SV *decoded = newSV(0);
av_push(this_row, decoded);
decode_cell(aTHX_ ptr, size, &pos, &columns[j].type, decoded);
}
}
}
OUTPUT:
RETVAL
SV*
encode(self, row)
Cassandra::Client::RowMeta *self
SV* row
CODE:
int column_count, i, use_hash;
STRLEN size_estimate;
AV *row_a;
HV *row_h;
if (UNLIKELY(row == NULL))
croak("row must be passed");
if (UNLIKELY(!SvROK(row)))
croak("encode: argument must be a reference");
column_count = self->column_count;
if (SvTYPE(SvRV(row)) == SVt_PVAV) {
row_a = (AV*)SvRV(row);
use_hash = 0;
if (UNLIKELY((av_len(row_a)+1) != column_count))
croak("row encoder expected %d column(s), but got %d", column_count, ((int)av_len(row_a))+1);
} else if (SvTYPE(SvRV(row)) == SVt_PVHV) {
row_h = (HV*)SvRV(row);
use_hash = 1;
if (UNLIKELY(HvUSEDKEYS(row_h) != self->uniq_column_count))
croak("row encoder expected %d column(s), but got %d", self->uniq_column_count, (int)HvUSEDKEYS(row_h));
} else {
croak("encode: argument must be an ARRAY or HASH reference");
}
/* Rough estimate. We only use it to predict Sv size, we don't rely on it being accurate.
If we overshoot, we waste some memory, and if we undershoot we copy a bit too often. */
size_estimate = 2 + (column_count * 12);
if (size_estimate <= 0) /* overflows aren't impossible, I guess */
size_estimate = 0; /* wing it */
RETVAL = newSV(size_estimate);
sv_setpvn(RETVAL, "", 0);
pack_short(aTHX_ RETVAL, column_count);
if (!use_hash) {
for (i = 0; i < column_count; i++) {
SV **maybe_cell = av_fetch(row_a, i, 0);
if (UNLIKELY(maybe_cell == NULL))
croak("row encoder error. bailing out");
encode_cell(aTHX_ RETVAL, *maybe_cell, &self->columns[i].type);
}
} else {
for (i = 0; i < column_count; i++) {
struct cc_column *column;
HE *ent;
column = &self->columns[i];
ent = hv_fetch_ent(row_h, column->name, 0, column->name_hash);
if (UNLIKELY(!ent)) {
croak("missing value for required entry <%s>", SvPV_nolen(column->name));
}
encode_cell(aTHX_ RETVAL, HeVAL(ent), &column->type);
}
}
OUTPUT:
RETVAL
AV*
column_names(self)
Cassandra::Client::RowMeta *self
CODE:
int i;
RETVAL = newAV();
sv_2mortal((SV*)RETVAL); /* work around a bug in perl */
for (i = 0; i < self->column_count; i++) {
av_push(RETVAL, SvREFCNT_inc(self->columns[i].name));
}
OUTPUT:
RETVAL
void
DESTROY(self)
Cassandra::Client::RowMeta *self
CODE:
int i;
for (i = 0; i < self->column_count; i++) {
struct cc_column *column = &(self->columns[i]);
SvREFCNT_dec(column->keyspace);
SvREFCNT_dec(column->table);
SvREFCNT_dec(column->name);
cc_type_destroy(aTHX_ &column->type);
}
Safefree(self->columns);
Safefree(self);