/* vim: set expandtab sts=4: */
#define PERL_NO_GET_CONTEXT
#include <EXTERN.h>
#include <perl.h>
#include <XSUB.h>
#include "ppport.h"
#include "rdkafkaxs.h"
MODULE = Kafka::Librd PACKAGE = Kafka::Librd PREFIX = krd_
PROTOTYPES: DISABLE
INCLUDE: const_xs.inc
int
krd_rd_kafka_version()
CODE:
RETVAL = rd_kafka_version();
OUTPUT:
RETVAL
const char*
krd_rd_kafka_version_str()
CODE:
RETVAL = rd_kafka_version_str();
OUTPUT:
RETVAL
rdkafka_t*
krd__new(type, params)
int type
HV* params
PREINIT:
rd_kafka_conf_t* conf;
rd_kafka_t* rk;
char errstr[1024];
CODE:
Newx(RETVAL, 1, rdkafka_t);
conf = krd_parse_config(aTHX_ RETVAL, params);
rk = rd_kafka_new(type, conf, errstr, 1024);
if (rk == NULL) {
croak("%s", errstr);
}
RETVAL->rk = rk;
RETVAL->thx = (IV)PERL_GET_THX;
OUTPUT:
RETVAL
int
krd_brokers_add(rdk, brokerlist)
rdkafka_t* rdk
char* brokerlist
CODE:
RETVAL = rd_kafka_brokers_add(rdk->rk, brokerlist);
OUTPUT:
RETVAL
int
krd_subscribe(rdk, topics)
rdkafka_t* rdk
AV* topics
PREINIT:
STRLEN strl;
int i, len;
rd_kafka_topic_partition_list_t* topic_list;
char* topic;
SV** topic_sv;
CODE:
len = av_len(topics) + 1;
topic_list = rd_kafka_topic_partition_list_new(len);
for (i=0; i < len; i++) {
topic_sv = av_fetch(topics, i, 0);
if (topic_sv != NULL) {
topic = SvPV(*topic_sv, strl);
rd_kafka_topic_partition_list_add(topic_list, topic, -1);
}
}
RETVAL = rd_kafka_subscribe(rdk->rk, topic_list);
rd_kafka_topic_partition_list_destroy(topic_list);
OUTPUT:
RETVAL
int
krd_unsubscribe(rdk)
rdkafka_t* rdk
CODE:
RETVAL = rd_kafka_unsubscribe(rdk->rk);
OUTPUT:
RETVAL
SV*
krd_subscription(rdk)
rdkafka_t* rdk
PREINIT:
rd_kafka_topic_partition_list_t* tpar;
rd_kafka_resp_err_t err;
AV* tp;
CODE:
err = rd_kafka_subscription(rdk->rk, &tpar);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
croak("Error retrieving subscriptions: %s", rd_kafka_err2str(err));
}
tp = krd_expand_topic_partition_list(aTHX_ tpar);
rd_kafka_topic_partition_list_destroy(tpar);
RETVAL = newRV_noinc((SV*)tp);
OUTPUT:
RETVAL
int
krd_assign(rdk, tplistsv = NULL)
rdkafka_t* rdk
SV* tplistsv
PREINIT:
AV* tplist;
rd_kafka_topic_partition_list_t* tpar = NULL;
CODE:
if (tplistsv != NULL && SvOK(tplistsv)) {
if (!SvROK(tplistsv) || strncmp(sv_reftype(SvRV(tplistsv), 0), "ARRAY", 6)) {
croak("first argument must be an array reference");
}
tplist = (AV*)SvRV(tplistsv);
tpar = krd_parse_topic_partition_list(aTHX_ tplist);
}
RETVAL = rd_kafka_assign(rdk->rk, tpar);
if (tpar != NULL)
rd_kafka_topic_partition_list_destroy(tpar);
OUTPUT:
RETVAL
SV*
krd_assignment(rdk)
rdkafka_t *rdk
PREINIT:
rd_kafka_topic_partition_list_t* tpar;
rd_kafka_resp_err_t err;
AV* tp;
CODE:
err = rd_kafka_assignment(rdk->rk, &tpar);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
croak("Error retrieving assignments: %s", rd_kafka_err2str(err));
}
tp = krd_expand_topic_partition_list(aTHX_ tpar);
rd_kafka_topic_partition_list_destroy(tpar);
RETVAL = newRV_noinc((SV*)tp);
OUTPUT:
RETVAL
int
krd_commit(rdk, tplistsv = NULL, async = 0)
rdkafka_t* rdk
SV* tplistsv
int async
PREINIT:
AV* tplist;
rd_kafka_topic_partition_list_t* tpar = NULL;
CODE:
if (tplistsv != NULL && SvOK(tplistsv)) {
if(!SvROK(tplistsv) || strncmp(sv_reftype(SvRV(tplistsv), 0), "ARRAY", 6)) {
croak("first argument must be an array reference");
}
tplist = (AV*)SvRV(tplistsv);
tpar = krd_parse_topic_partition_list(aTHX_ tplist);
}
RETVAL = rd_kafka_commit(rdk->rk, tpar, async);
if (tpar != NULL)
rd_kafka_topic_partition_list_destroy(tpar);
OUTPUT:
RETVAL
int
krd_commit_message(rdk, msg, async = 0)
rdkafka_t* rdk
rd_kafka_message_t* msg
int async
CODE:
RETVAL = rd_kafka_commit_message(rdk->rk, msg, async);
OUTPUT:
RETVAL
SV*
krd_committed(rdk, tplistsv, timeout_ms)
rdkafka_t* rdk
SV* tplistsv
int timeout_ms
PREINIT:
AV* tplist;
rd_kafka_topic_partition_list_t* tpar = NULL;
rd_kafka_resp_err_t err;
AV* tp;
CODE:
if (!SvROK(tplistsv) || strncmp(sv_reftype(SvRV(tplistsv), 0), "ARRAY", 6)) {
croak("first argument must be an array reference");
}
tplist = (AV*)SvRV(tplistsv);
tpar = krd_parse_topic_partition_list(aTHX_ tplist);
err = rd_kafka_committed(rdk->rk, tpar, timeout_ms);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
rd_kafka_topic_partition_list_destroy(tpar);
croak("Error retrieving commited offsets: %s", rd_kafka_err2str(err));
}
tp = krd_expand_topic_partition_list(aTHX_ tpar);
rd_kafka_topic_partition_list_destroy(tpar);
RETVAL = newRV_noinc((SV*)tp);
OUTPUT:
RETVAL
SV*
krd_position(rdk, tplistsv)
rdkafka_t* rdk
SV* tplistsv
PREINIT:
AV* tplist;
rd_kafka_topic_partition_list_t* tpar = NULL;
rd_kafka_resp_err_t err;
AV* tp;
CODE:
if (!SvROK(tplistsv) || strncmp(sv_reftype(SvRV(tplistsv), 0), "ARRAY", 6)) {
croak("first argument must be an array reference");
}
tplist = (AV*)SvRV(tplistsv);
tpar = krd_parse_topic_partition_list(aTHX_ tplist);
err = rd_kafka_position(rdk->rk, tpar);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
rd_kafka_topic_partition_list_destroy(tpar);
croak("Error retrieving positions: %s", rd_kafka_err2str(err));
}
tp = krd_expand_topic_partition_list(aTHX_ tpar);
rd_kafka_topic_partition_list_destroy(tpar);
RETVAL = newRV_noinc((SV*)tp);
OUTPUT:
RETVAL
rd_kafka_message_t*
krd_consumer_poll(rdk, timeout_ms)
rdkafka_t* rdk
int timeout_ms
CODE:
RETVAL = rd_kafka_consumer_poll(rdk->rk, timeout_ms);
OUTPUT:
RETVAL
int
krd_consumer_close(rdk)
rdkafka_t* rdk
CODE:
RETVAL = rd_kafka_consumer_close(rdk->rk);
OUTPUT:
RETVAL
rd_kafka_topic_t*
krd_topic(rdk, topic, params)
rdkafka_t* rdk
char *topic
HV* params
PREINIT:
rd_kafka_topic_conf_t* tcon;
char errstr[1024];
CODE:
tcon = krd_parse_topic_config(aTHX_ params, errstr);
if (tcon == NULL)
croak("Couldn't parse topic config: %s", errstr);
RETVAL = rd_kafka_topic_new(rdk->rk, topic, tcon);
tcon = NULL;
OUTPUT:
RETVAL
int
krd_poll(rdk, timeout_ms)
rdkafka_t* rdk
int timeout_ms
CODE:
RETVAL = rd_kafka_poll(rdk->rk, timeout_ms);
OUTPUT:
RETVAL
int
krd_outq_len(rdk)
rdkafka_t* rdk
CODE:
RETVAL = rd_kafka_outq_len(rdk->rk);
OUTPUT:
RETVAL
void
krd_flush(rdk, timeout_ms)
rdkafka_t* rdk
int timeout_ms
CODE:
rd_kafka_flush(rdk->rk, timeout_ms);
void
krd_DESTROY(rdk)
rdkafka_t* rdk
CODE:
if (rdk->thx == (IV)PERL_GET_THX) {
Safefree(rdk);
}
void
krd_destroy(rdk)
rdkafka_t* rdk
CODE:
rd_kafka_destroy(rdk->rk);
void
krd_dump(rdk)
rdkafka_t* rdk
CODE:
rd_kafka_dump(stdout, rdk->rk);
int
krd_rd_kafka_wait_destroyed(timeout_ms)
int timeout_ms
CODE:
RETVAL = rd_kafka_wait_destroyed(timeout_ms);
OUTPUT:
RETVAL
MODULE = Kafka::Librd PACKAGE = Kafka::Librd::Topic PREFIX = krdt_
PROTOTYPES: DISABLE
int
krdt_produce(rkt, partition, msgflags, payload, key)
rd_kafka_topic_t* rkt
int partition
int msgflags
SV* payload
SV* key
PREINIT:
STRLEN plen, klen;
char *plptr, *keyptr;
CODE:
plptr = SvPVbyte(payload, plen);
if (SvOK(key)) {
keyptr = SvPVbyte(key, klen);
} else {
keyptr = NULL;
klen = 0;
}
RETVAL = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY | msgflags, plptr, plen, keyptr, klen, NULL);
OUTPUT:
RETVAL
void
krdt_DESTROY(rkt)
rd_kafka_topic_t* rkt
CODE:
rd_kafka_topic_destroy(rkt);
void
krdt_destroy(rkt)
rd_kafka_topic_t* rkt
CODE:
rd_kafka_topic_destroy(rkt);
MODULE = Kafka::Librd PACKAGE = Kafka::Librd::Message PREFIX = krdm_
PROTOTYPES: DISABLE
int
krdm_err(msg)
rd_kafka_message_t* msg
CODE:
RETVAL = msg->err;
OUTPUT:
RETVAL
int
krdm_partition(msg)
rd_kafka_message_t* msg
CODE:
RETVAL = msg->partition;
OUTPUT:
RETVAL
const char*
krdm_topic(msg)
rd_kafka_message_t* msg
CODE:
RETVAL = rd_kafka_topic_name(msg->rkt);
OUTPUT:
RETVAL
SV*
krdm_payload(msg)
rd_kafka_message_t* msg
CODE:
RETVAL = newSVpvn(msg->payload, msg->len);
OUTPUT:
RETVAL
SV*
krdm_key(msg)
rd_kafka_message_t* msg
CODE:
if (msg->err == 0) {
RETVAL = newSVpvn(msg->key, msg->key_len);
} else {
RETVAL = &PL_sv_undef;
}
OUTPUT:
RETVAL
long
krdm_offset(msg)
rd_kafka_message_t* msg
CODE:
/* that will truncate offset if perl doesn't support 64bit ints */
RETVAL = msg->offset;
OUTPUT:
RETVAL
long
krdm_timestamp(msg,...)
rd_kafka_message_t* msg
CODE:
rd_kafka_timestamp_type_t tstype;
RETVAL = rd_kafka_message_timestamp(msg, &tstype);
if (items > 1) {
if (!SvROK(ST(1)) || strncmp(sv_reftype(SvRV(ST(1)), 0), "SCALAR", 7)) {
croak("second argument tstype must be a scalar reference");
}
sv_setiv(SvRV(ST(1)), tstype);
}
OUTPUT:
RETVAL
void
krdm_DESTROY(msg)
rd_kafka_message_t* msg
CODE:
rd_kafka_message_destroy(msg);
MODULE = Kafka::Librd PACKAGE = Kafka::Librd::Error PREFIX = krde_
PROTOTYPES: DISABLE
HV*
krde_rd_kafka_get_err_descs()
PREINIT:
const struct rd_kafka_err_desc* descs;
size_t cnt;
int i;
CODE:
rd_kafka_get_err_descs(&descs, &cnt);
RETVAL = newHV();
for (i = 0; i < cnt; i++) {
if (descs[i].name != NULL) {
hv_store(RETVAL, descs[i].name, strnlen(descs[i].name, 1024), newSViv(descs[i].code), 0);
}
}
OUTPUT:
RETVAL
const char*
krde_to_string(code)
int code
CODE:
RETVAL = rd_kafka_err2str(code);
OUTPUT:
RETVAL
const char*
krde_to_name(code)
int code
CODE:
RETVAL = rd_kafka_err2name(code);
OUTPUT:
RETVAL
int
krde_last_error()
CODE:
RETVAL = rd_kafka_last_error();
OUTPUT:
RETVAL