NAME
EV::Kafka - High-performance asynchronous Kafka/Redpanda client using EV
SYNOPSIS
use EV::Kafka;
my $kafka = EV::Kafka->new(
brokers => '127.0.0.1:9092',
acks => -1,
on_error => sub { warn "kafka: @_" },
on_message => sub {
my ($topic, $partition, $offset, $key, $value, $headers) = @_;
print "$topic:$partition @ $offset $key = $value\n";
},
);
# Producer
$kafka->connect(sub {
$kafka->produce('my-topic', 'key', 'value', sub {
my ($result, $err) = @_;
say "produced at offset " . $result->{topics}[0]{partitions}[0]{base_offset};
});
});
# Consumer (manual assignment)
$kafka->assign([{ topic => 'my-topic', partition => 0, offset => 0 }]);
my $poll = EV::timer 0, 0.1, sub { $kafka->poll };
# Consumer group
$kafka->subscribe('my-topic',
group_id => 'my-group',
on_assign => sub { ... },
on_revoke => sub { ... },
);
EV::run;
DESCRIPTION
EV::Kafka is a high-performance asynchronous Kafka client that implements the Kafka binary protocol in XS with EV event loop integration. It targets Redpanda and Apache Kafka (protocol version 0.11+).
Two-layer architecture:
EV::Kafka::Conn (XS) -- single broker TCP connection with protocol encoding/decoding, correlation ID matching, pipelining, optional TLS and SASL/PLAIN authentication.
EV::Kafka::Client (Perl) -- cluster management with metadata discovery, broker connection pooling, partition leader routing, producer with key-based partitioning, consumer with manual assignment or consumer groups.
Features:
Binary protocol implemented in pure XS (no librdkafka dependency)
Automatic request pipelining per broker connection
Metadata-driven partition leader routing
Producer: acks modes (-1/0/1), key-based partitioning (murmur2), headers, fire-and-forget (acks=0)
Consumer: manual partition assignment, offset tracking, poll-based message delivery
Consumer groups: JoinGroup/SyncGroup/Heartbeat, sticky partition assignment, offset commit/fetch, automatic rebalancing
TLS (OpenSSL) and SASL/PLAIN authentication
Automatic reconnection at the connection layer
Bootstrap broker failover (tries all listed brokers)
ANYEVENT INTEGRATION
AnyEvent has EV as one of its backends, so EV::Kafka can be used in AnyEvent applications seamlessly.
NO UTF-8 SUPPORT
This module handles all values as bytes. Encode your UTF-8 strings before passing them:
use Encode;
$kafka->produce($topic, $key, encode_utf8($val), sub { ... });
CLUSTER CLIENT METHODS
new(%options)
Create a new EV::Kafka client. Returns a blessed EV::Kafka::Client object.
my $kafka = EV::Kafka->new(
brokers => '10.0.0.1:9092,10.0.0.2:9092',
acks => -1,
on_error => sub { warn @_ },
);
Options:
- brokers => 'Str'
-
Comma-separated list of bootstrap broker addresses (host:port). Default:
127.0.0.1:9092. - client_id => 'Str' (default 'ev-kafka')
-
Client identifier sent to brokers.
- tls => Bool
-
Enable TLS encryption.
- tls_ca_file => 'Str'
-
Path to CA certificate file for TLS verification.
- tls_skip_verify => Bool
-
Skip TLS certificate verification.
- sasl => \%opts
-
Enable SASL authentication. Supports PLAIN mechanism:
sasl => { mechanism => 'PLAIN', username => 'user', password => 'pass' } - acks => Int (default -1)
-
Producer acknowledgment mode.
-1= all in-sync replicas,0= no acknowledgment (fire-and-forget),1= leader only. - linger_ms => Int (default 5)
-
Time in milliseconds to accumulate records before flushing a batch. Lower values reduce latency; higher values improve throughput.
- batch_size => Int (default 16384)
-
Maximum batch size in bytes before a batch is flushed immediately.
- compression => 'Str'
-
Compression type for produce batches:
'lz4'(requires liblz4),'gzip'(requires zlib), orundeffor none. - idempotent => Bool (default 0)
-
Enable idempotent producer. Calls
InitProducerIdon connect and sets producer_id/epoch/sequence in each RecordBatch for exactly-once delivery (broker-side deduplication). - transactional_id => 'Str'
-
Enable transactional producer. Implies idempotent. Required for
begin_transaction/commit_transaction/abort_transactionandsend_offsets_to_transaction(full EOS). - partitioner => $cb->($topic, $key, $num_partitions)
-
Custom partition selection function. Default: murmur2 hash of key, or round-robin for null keys.
- on_error => $cb->($errstr)
-
Error callback. Default:
die. - on_connect => $cb->()
-
Called once after initial metadata fetch completes.
- on_message => $cb->($topic, $partition, $offset, $key, $value, $headers)
-
Message delivery callback for consumer operations.
- fetch_max_wait_ms => Int (default 500)
-
Maximum time the broker waits for
fetch_min_bytesof data. - fetch_max_bytes => Int (default 1048576)
-
Maximum bytes per fetch response.
- fetch_min_bytes => Int (default 1)
-
Minimum bytes before the broker responds to a fetch.
- metadata_refresh => Int (default 300)
-
Metadata refresh interval in seconds (reserved, not yet wired).
- loop => EV::Loop
-
EV loop to use. Default:
EV::default_loop.
connect($cb)
Connect to the cluster. Connects to the first available bootstrap broker, fetches cluster metadata, then fires $cb-($metadata)>.
$kafka->connect(sub {
my $meta = shift;
# $meta->{brokers}, $meta->{topics}
});
produce($topic, $key, $value, [\%opts,] [$cb])
Produce a message. Routes to the correct partition leader automatically.
# with callback (acks=1 or acks=-1)
$kafka->produce('topic', 'key', 'value', sub {
my ($result, $err) = @_;
});
# with headers
$kafka->produce('topic', 'key', 'value',
{ headers => { 'h1' => 'v1' } }, sub { ... });
# fire-and-forget (acks=0)
$kafka->produce('topic', 'key', 'value');
# explicit partition
$kafka->produce('topic', 'key', 'value',
{ partition => 3 }, sub { ... });
produce_many(\@messages, $cb)
Produce multiple messages with a single completion callback. Each message is an arrayref [$topic, $key, $value] or a hashref {topic, key, value}. $cb fires when all messages are acknowledged.
$kafka->produce_many([
['my-topic', 'k1', 'v1'],
['my-topic', 'k2', 'v2'],
], sub {
my $errors = shift;
warn "some failed: @$errors" if $errors;
});
flush([$cb])
Flush all accumulated produce batches and wait for all in-flight requests to complete. $cb fires when all pending responses have been received.
assign(\@partitions)
Manually assign partitions for consuming.
$kafka->assign([
{ topic => 'my-topic', partition => 0, offset => 0 },
{ topic => 'my-topic', partition => 1, offset => 100 },
]);
seek($topic, $partition, $offset, [$cb])
Seek a partition to a specific offset. Use -2 for earliest, -1 for latest. Updates the assignment in-place.
$kafka->seek('my-topic', 0, -1, sub { print "at latest\n" });
offsets_for($topic, $cb)
Get earliest and latest offsets for all partitions of a topic.
$kafka->offsets_for('my-topic', sub {
my $offsets = shift;
# { 0 => { earliest => 0, latest => 42 }, 1 => ... }
});
lag($cb)
Get consumer lag for all assigned partitions.
$kafka->lag(sub {
my $lag = shift;
# { "topic:0" => { current => 10, latest => 42, lag => 32 } }
});
error_name($code)
Convert a Kafka numeric error code to its name.
EV::Kafka::Client::error_name(3) # "UNKNOWN_TOPIC_OR_PARTITION"
poll([$cb])
Fetch messages from assigned partitions. Calls on_message for each received record. $cb fires when all fetch responses have arrived.
my $timer = EV::timer 0, 0.1, sub { $kafka->poll };
subscribe($topic, ..., %opts)
Join a consumer group and subscribe to topics. The group protocol handles partition assignment automatically.
$kafka->subscribe('topic-a', 'topic-b',
group_id => 'my-group',
session_timeout => 30000, # ms
rebalance_timeout => 60000, # ms
heartbeat_interval => 3, # seconds
auto_commit => 1, # commit on unsubscribe (default)
auto_offset_reset => 'earliest', # or 'latest'
group_instance_id => 'pod-abc', # KIP-345 static membership
on_assign => sub {
my $partitions = shift;
# [{topic, partition, offset}, ...]
},
on_revoke => sub {
my $partitions = shift;
},
);
commit([$cb])
Commit current consumer offsets to the group coordinator.
$kafka->commit(sub {
my $err = shift;
warn "commit failed: $err" if $err;
});
unsubscribe([$cb])
Leave the consumer group (sends LeaveGroup for fast rebalance), stop heartbeat and fetch loop. If auto_commit is enabled, commits offsets before leaving.
begin_transaction
Start a transaction. Requires transactional_id in constructor.
send_offsets_to_transaction($group_id, [$cb])
Commit consumer offsets within the current transaction via TxnOffsetCommit. This is the key step for exactly-once consume-process-produce pipelines.
$kafka->send_offsets_to_transaction('my-group', sub {
my ($result, $err) = @_;
});
commit_transaction([$cb])
Commit the current transaction. All produced messages and offset commits within the transaction become visible atomically.
abort_transaction([$cb])
Abort the current transaction. All produced messages are discarded and offset commits are rolled back.
close([$cb])
Graceful shutdown: stop timers, disconnect all broker connections.
$kafka->close(sub { EV::break });
LOW-LEVEL CONNECTION METHODS
EV::Kafka::Conn provides direct access to a single broker connection. Useful for custom protocols, debugging, or when cluster-level routing is not needed.
my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef);
$conn->on_error(sub { warn @_ });
$conn->on_connect(sub { ... });
$conn->connect('127.0.0.1', 9092, 5.0);
connect($host, $port, [$timeout])
Connect to a broker. Timeout in seconds (0 = no timeout).
disconnect
Disconnect from broker.
connected
Returns true if the connection is ready (ApiVersions handshake complete).
metadata(\@topics, $cb)
Request cluster metadata. Pass undef for all topics.
$conn->metadata(['my-topic'], sub {
my ($result, $err) = @_;
# $result->{brokers}, $result->{topics}
});
produce($topic, $partition, $key, $value, [\%opts,] [$cb])
Produce a message to a specific partition.
$conn->produce('topic', 0, 'key', 'value', sub {
my ($result, $err) = @_;
});
Options: acks (default 1), headers (hashref), timestamp (epoch ms, default now), compression ('none', 'lz4'; requires LZ4 at build time).
produce_batch($topic, $partition, \@records, [\%opts,] [$cb])
Produce multiple records in a single RecordBatch. Each record is {key, value, headers}. Options: acks, compression, producer_id, producer_epoch, base_sequence.
$conn->produce_batch('topic', 0, [
{ key => 'k1', value => 'v1' },
{ key => 'k2', value => 'v2' },
], sub { my ($result, $err) = @_ });
fetch($topic, $partition, $offset, $cb, [$max_bytes])
Fetch messages from a partition starting at $offset.
$conn->fetch('topic', 0, 0, sub {
my ($result, $err) = @_;
for my $rec (@{ $result->{topics}[0]{partitions}[0]{records} }) {
printf "offset=%d key=%s value=%s\n",
$rec->{offset}, $rec->{key}, $rec->{value};
}
});
fetch_multi(\%topics, $cb, [$max_bytes])
Multi-partition fetch in a single request. Groups multiple topic-partitions into one Fetch call to the broker.
$conn->fetch_multi({
'topic-a' => [{ partition => 0, offset => 10 },
{ partition => 1, offset => 20 }],
'topic-b' => [{ partition => 0, offset => 0 }],
}, sub { my ($result, $err) = @_ });
Used internally by poll() to batch fetches by broker leader.
list_offsets($topic, $partition, $timestamp, $cb)
Get offsets by timestamp. Use -2 for earliest, -1 for latest.
find_coordinator($key, $cb, [$key_type])
Find the coordinator broker. $key_type: 0=group (default), 1=transaction.
join_group($group_id, $member_id, \@topics, $cb, [$session_timeout_ms, $rebalance_timeout_ms, $group_instance_id])
Join a consumer group. Pass $group_instance_id for KIP-345 static membership.
sync_group($group_id, $generation_id, $member_id, \@assignments, $cb, [$group_instance_id])
Synchronize group state after join.
heartbeat($group_id, $generation_id, $member_id, $cb, [$group_instance_id])
Send heartbeat to group coordinator.
offset_commit($group_id, $generation_id, $member_id, \@offsets, $cb)
Commit consumer offsets.
offset_fetch($group_id, \@topics, $cb)
Fetch committed offsets for a consumer group.
api_versions
Returns a hashref of supported API keys to max versions, or undef if not yet negotiated.
my $vers = $conn->api_versions;
# { 0 => 7, 1 => 11, 3 => 8, ... }
on_error([$cb])
on_connect([$cb])
on_disconnect([$cb])
Set handler callbacks. Pass undef to clear.
client_id($id)
Set the client identifier.
tls($enable, [$ca_file, $skip_verify])
Configure TLS.
sasl($mechanism, [$username, $password])
Configure SASL authentication.
auto_reconnect($enable, [$delay_ms])
Enable automatic reconnection with delay in milliseconds (default 1000).
leave_group($group_id, $member_id, $cb)
Send LeaveGroup to coordinator for fast partition rebalance.
create_topics(\@topics, $timeout_ms, $cb)
Create topics. Each element: {name, num_partitions, replication_factor}.
$conn->create_topics(
[{ name => 'new-topic', num_partitions => 3, replication_factor => 1 }],
5000, sub { my ($res, $err) = @_ }
);
delete_topics(\@topic_names, $timeout_ms, $cb)
Delete topics by name.
init_producer_id($transactional_id, $txn_timeout_ms, $cb)
Initialize a producer ID for idempotent/transactional produce. Pass undef for non-transactional idempotent producer.
add_partitions_to_txn($txn_id, $producer_id, $epoch, \@topics, $cb)
Register partitions with the transaction coordinator.
end_txn($txn_id, $producer_id, $epoch, $committed, $cb)
Commit ($committed=1) or abort ($committed=0) a transaction.
txn_offset_commit($txn_id, $group_id, $producer_id, $epoch, $generation, $member_id, \@offsets, $cb)
Commit consumer offsets within a transaction (API 28).
pending
Number of requests awaiting broker response.
state
Connection state as integer (0=disconnected, 6=ready).
UTILITY FUNCTIONS
EV::Kafka::_murmur2($key)
Kafka-compatible murmur2 hash. Returns a non-negative 31-bit integer.
EV::Kafka::_crc32c($data)
CRC32C checksum (Castagnoli). Used internally for RecordBatch integrity.
EV::Kafka::_error_name($code)
Convert Kafka error code to string name.
RESULT STRUCTURES
Produce result
$result = {
topics => [{
topic => 'name',
partitions => [{
partition => 0,
error_code => 0,
base_offset => 42,
}],
}],
};
Fetch result
$result = {
topics => [{
topic => 'name',
partitions => [{
partition => 0,
error_code => 0,
high_watermark => 100,
records => [{
offset => 42,
timestamp => 1712345678000,
key => 'key', # or undef
value => 'value', # or undef
headers => { h => 'v' }, # if present
}],
}],
}],
};
Metadata result
$result = {
controller_id => 0,
brokers => [{ node_id => 0, host => '10.0.0.1', port => 9092 }],
topics => [{
name => 'topic',
error_code => 0,
partitions => [{
partition => 0,
leader => 0,
error_code => 0,
}],
}],
};
ERROR HANDLING
Errors are delivered through two channels:
- Connection-level errors fire the
on_errorcallback (orcroakif none set). These include connection refused, DNS failure, TLS errors, SASL auth failure, and protocol violations. - Request-level errors are delivered as the second argument to the request callback:
$cb->($result, $error). If$erroris defined,$resultmay be undef.
Within result structures, per-partition error_code fields use Kafka numeric codes:
0 No error
1 OFFSET_OUT_OF_RANGE
3 UNKNOWN_TOPIC_OR_PARTITION
6 NOT_LEADER_OR_FOLLOWER
15 COORDINATOR_NOT_AVAILABLE
16 NOT_COORDINATOR
25 UNKNOWN_MEMBER_ID
27 REBALANCE_IN_PROGRESS
36 TOPIC_ALREADY_EXISTS
79 MEMBER_ID_REQUIRED
When a broker disconnects mid-flight, all pending callbacks receive (undef, "connection closed by broker") or (undef, "disconnected").
ENVIRONMENT VARIABLES
These are used by tests and examples (not by the module itself):
TEST_KAFKA_BROKER broker address for tests (host:port)
KAFKA_BROKER broker address for examples
KAFKA_HOST broker hostname for low-level examples
KAFKA_PORT broker port for low-level examples
KAFKA_TOPIC topic name for examples
KAFKA_GROUP_ID consumer group for examples
KAFKA_LIMIT message limit for consume example
KAFKA_COUNT message count for fire-and-forget
BENCH_BROKER broker for benchmarks
BENCH_MESSAGES message count for benchmarks
BENCH_VALUE_SIZE value size in bytes for benchmarks
BENCH_TOPIC topic name for benchmarks
QUICK START
Minimal producer + consumer lifecycle:
use EV;
use EV::Kafka;
my $kafka = EV::Kafka->new(
brokers => '127.0.0.1:9092',
acks => 1,
on_error => sub { warn "kafka: @_\n" },
on_message => sub {
my ($topic, $part, $offset, $key, $value) = @_;
print "got: $key=$value\n";
},
);
$kafka->connect(sub {
# produce
$kafka->produce('test', 'k1', 'hello', sub {
print "produced\n";
# consume from the beginning
$kafka->assign([{topic=>'test', partition=>0, offset=>0}]);
$kafka->seek('test', 0, -2, sub {
my $t = EV::timer 0, 0.1, sub { $kafka->poll };
$kafka->{cfg}{_t} = $t;
});
});
});
EV::run;
COOKBOOK
Produce JSON with headers
use JSON::PP;
my $json = JSON::PP->new->utf8;
$kafka->produce('events', 'user-42',
$json->encode({ action => 'click', page => '/home' }),
{ headers => { 'content-type' => 'application/json' } },
sub { ... }
);
Consume from latest offset only
$kafka->subscribe('live-feed',
group_id => 'realtime',
auto_offset_reset => 'latest',
on_assign => sub { print "ready\n" },
);
Graceful shutdown
$SIG{INT} = sub {
$kafka->commit(sub {
$kafka->unsubscribe(sub {
$kafka->close(sub { EV::break });
});
});
};
At-least-once processing
$kafka->subscribe('jobs',
group_id => 'workers',
auto_commit => 0,
);
# in on_message: process, then commit
on_message => sub {
process($_[4]);
$kafka->commit if ++$count % 100 == 0;
},
Batch produce
$kafka->produce_many([
['events', 'k1', 'v1'],
['events', 'k2', 'v2'],
['events', 'k3', 'v3'],
], sub {
my $errs = shift;
print $errs ? "some failed\n" : "all done\n";
});
Exactly-once stream processing (EOS)
my $kafka = EV::Kafka->new(
brokers => '...',
transactional_id => 'my-eos-app',
acks => -1,
on_message => sub {
my ($t, $p, $off, $key, $value) = @_;
my $result = process($value);
$kafka->produce('output-topic', $key, $result);
},
);
# consume-process-produce loop:
$kafka->begin_transaction;
$kafka->poll(sub {
$kafka->send_offsets_to_transaction('my-group', sub {
$kafka->commit_transaction(sub {
$kafka->begin_transaction; # next transaction
});
});
});
Topic administration
my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef);
$conn->on_connect(sub {
$conn->create_topics(
[{ name => 'new-topic', num_partitions => 6, replication_factor => 3 }],
10000, sub { ... }
);
});
BENCHMARKS
Measured on Linux with TCP loopback to Redpanda, 100-byte values, Perl 5.40.2, 50K messages (bench/benchmark.pl):
Pipeline produce (acks=1) 68K msg/sec 7.4 MB/s
Fire-and-forget (acks=0) 100K msg/sec 11.0 MB/s
Fetch throughput 31K msg/sec 3.4 MB/s
Sequential round-trip 19K msg/sec 54 us avg latency
Metadata request 25K req/sec 41 us avg latency
Throughput by value size (pipelined, acks=1):
10 bytes 61K msg/sec 0.9 MB/s
100 bytes 68K msg/sec 7.4 MB/s
1000 bytes 50K msg/sec 50.2 MB/s
10000 bytes 18K msg/sec 178.5 MB/s
Pipeline produce throughput is limited by Perl callback overhead per message. Fire-and-forget mode (acks=0) skips the response cycle entirely, reaching ~100K msg/sec. Sequential round-trip (one produce, wait for ack, repeat) measures raw broker latency at ~54 microseconds.
The fetch path is sequential (fetch, process, fetch again) which introduces one round-trip per batch. With larger max_bytes and dense topics, fetch throughput increases proportionally.
Run perl bench/benchmark.pl for throughput results. Set BENCH_BROKER, BENCH_MESSAGES, BENCH_VALUE_SIZE, and BENCH_TOPIC to customize.
Run perl bench/latency.pl for a latency histogram with percentiles (min, avg, median, p90, p95, p99, max).
KAFKA PROTOCOL
This module implements the Kafka binary protocol directly in XS. All integers are big-endian. Requests use a 4-byte size prefix followed by a header (API key, version, correlation ID, client ID) and a version-specific body.
Responses are matched to requests by correlation ID. The broker guarantees FIFO ordering per connection, so the response queue is a simple FIFO.
RecordBatch encoding (magic=2) is used for produce. CRC32C covers the batch from attributes through the last record. Records use ZigZag-encoded varints for lengths and deltas.
The connection handshake sends ApiVersions (v0) on connect to discover supported protocol versions. SASL authentication uses SaslHandshake (v1) + SaslAuthenticate (v2) with PLAIN mechanism.
Consumer group protocol uses sticky partition assignment with MEMBER_ID_REQUIRED (error 79) retry per KIP-394.
Non-flexible API versions are used throughout (capped below the flexible-version threshold for each API) to avoid the compact encoding complexity.
LIMITATIONS
LZ4 and gzip compression -- supported when built with liblz4 and zlib. snappy and zstd are not implemented.
Transactions / EOS --
begin_transaction,send_offsets_to_transaction,commit_transaction,abort_transactionprovide full exactly-once stream processing.InitProducerId,AddPartitionsToTxn,TxnOffsetCommit,EndTxnare all wired. Requirestransactional_idin constructor.No GSSAPI/OAUTHBEARER -- SASL/PLAIN and SCRAM-SHA-256/512 are supported. GSSAPI (Kerberos) and OAUTHBEARER are not implemented.
Sticky partition assignment -- assignments are preserved across rebalances where possible. New partitions are distributed to the least-loaded member. Overloaded members shed excess partitions.
Blocking DNS resolution --
getaddrinfois called synchronously inconn_start_connect. For fully non-blocking operation, use IP addresses instead of hostnames.No flexible API versions -- all API versions are capped below the flexible-version threshold to avoid compact string/array encoding. This limits interoperability with very new protocol features but works with all Kafka 0.11+ and Redpanda brokers.
Limited produce retry -- transient errors (NOT_LEADER, COORDINATOR_NOT_AVAILABLE) trigger metadata refresh and up to 3 retries with backoff. Non-retriable errors are surfaced to the callback immediately.
AUTHOR
vividsnow
LICENSE
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.