NAME

EV::Kafka - High-performance asynchronous Kafka/Redpanda client using EV

SYNOPSIS

use EV::Kafka;

my $kafka = EV::Kafka->new(
    brokers  => '127.0.0.1:9092',
    acks     => -1,
    on_error => sub { warn "kafka: @_" },
    on_message => sub {
        my ($topic, $partition, $offset, $key, $value, $headers) = @_;
        print "$topic:$partition @ $offset  $key = $value\n";
    },
);

# Producer
$kafka->connect(sub {
    $kafka->produce('my-topic', 'key', 'value', sub {
        my ($result, $err) = @_;
        say "produced at offset " . $result->{topics}[0]{partitions}[0]{base_offset};
    });
});

# Consumer (manual assignment)
$kafka->assign([{ topic => 'my-topic', partition => 0, offset => 0 }]);
my $poll = EV::timer 0, 0.1, sub { $kafka->poll };

# Consumer group
$kafka->subscribe('my-topic',
    group_id  => 'my-group',
    on_assign => sub { ... },
    on_revoke => sub { ... },
);

EV::run;

DESCRIPTION

EV::Kafka is a high-performance asynchronous Kafka client that implements the Kafka binary protocol in XS with EV event loop integration. It targets Redpanda and Apache Kafka (protocol version 0.11+).

Two-layer architecture:

  • EV::Kafka::Conn (XS) -- single broker TCP connection with protocol encoding/decoding, correlation ID matching, pipelining, optional TLS and SASL/PLAIN authentication.

  • EV::Kafka::Client (Perl) -- cluster management with metadata discovery, broker connection pooling, partition leader routing, producer with key-based partitioning, consumer with manual assignment or consumer groups.

Features:

  • Binary protocol implemented in pure XS (no librdkafka dependency)

  • Automatic request pipelining per broker connection

  • Metadata-driven partition leader routing

  • Producer: acks modes (-1/0/1), key-based partitioning (murmur2), headers, fire-and-forget (acks=0)

  • Consumer: manual partition assignment, offset tracking, poll-based message delivery

  • Consumer groups: JoinGroup/SyncGroup/Heartbeat, sticky partition assignment, offset commit/fetch, automatic rebalancing

  • TLS (OpenSSL) and SASL/PLAIN authentication

  • Automatic reconnection at the connection layer

  • Bootstrap broker failover (tries all listed brokers)

ANYEVENT INTEGRATION

AnyEvent has EV as one of its backends, so EV::Kafka can be used in AnyEvent applications seamlessly.

NO UTF-8 SUPPORT

This module handles all values as bytes. Encode your UTF-8 strings before passing them:

use Encode;

$kafka->produce($topic, $key, encode_utf8($val), sub { ... });

CLUSTER CLIENT METHODS

new(%options)

Create a new EV::Kafka client. Returns a blessed EV::Kafka::Client object.

my $kafka = EV::Kafka->new(
    brokers  => '10.0.0.1:9092,10.0.0.2:9092',
    acks     => -1,
    on_error => sub { warn @_ },
);

Options:

brokers => 'Str'

Comma-separated list of bootstrap broker addresses (host:port). Default: 127.0.0.1:9092.

client_id => 'Str' (default 'ev-kafka')

Client identifier sent to brokers.

tls => Bool

Enable TLS encryption.

tls_ca_file => 'Str'

Path to CA certificate file for TLS verification.

tls_skip_verify => Bool

Skip TLS certificate verification.

sasl => \%opts

Enable SASL authentication. Supports PLAIN mechanism:

sasl => { mechanism => 'PLAIN', username => 'user', password => 'pass' }
acks => Int (default -1)

Producer acknowledgment mode. -1 = all in-sync replicas, 0 = no acknowledgment (fire-and-forget), 1 = leader only.

linger_ms => Int (default 5)

Time in milliseconds to accumulate records before flushing a batch. Lower values reduce latency; higher values improve throughput.

batch_size => Int (default 16384)

Maximum batch size in bytes before a batch is flushed immediately.

compression => 'Str'

Compression type for produce batches: 'lz4' (requires liblz4), 'gzip' (requires zlib), or undef for none.

idempotent => Bool (default 0)

Enable idempotent producer. Calls InitProducerId on connect and sets producer_id/epoch/sequence in each RecordBatch for exactly-once delivery (broker-side deduplication).

transactional_id => 'Str'

Enable transactional producer. Implies idempotent. Required for begin_transaction/commit_transaction/abort_transaction and send_offsets_to_transaction (full EOS).

partitioner => $cb->($topic, $key, $num_partitions)

Custom partition selection function. Default: murmur2 hash of key, or round-robin for null keys.

on_error => $cb->($errstr)

Error callback. Default: die.

on_connect => $cb->()

Called once after initial metadata fetch completes.

on_message => $cb->($topic, $partition, $offset, $key, $value, $headers)

Message delivery callback for consumer operations.

fetch_max_wait_ms => Int (default 500)

Maximum time the broker waits for fetch_min_bytes of data.

fetch_max_bytes => Int (default 1048576)

Maximum bytes per fetch response.

fetch_min_bytes => Int (default 1)

Minimum bytes before the broker responds to a fetch.

metadata_refresh => Int (default 300)

Metadata refresh interval in seconds (reserved, not yet wired).

loop => EV::Loop

EV loop to use. Default: EV::default_loop.

connect($cb)

Connect to the cluster. Connects to the first available bootstrap broker, fetches cluster metadata, then fires $cb-($metadata)>.

$kafka->connect(sub {
    my $meta = shift;
    # $meta->{brokers}, $meta->{topics}
});

produce($topic, $key, $value, [\%opts,] [$cb])

Produce a message. Routes to the correct partition leader automatically.

# with callback (acks=1 or acks=-1)
$kafka->produce('topic', 'key', 'value', sub {
    my ($result, $err) = @_;
});

# with headers
$kafka->produce('topic', 'key', 'value',
    { headers => { 'h1' => 'v1' } }, sub { ... });

# fire-and-forget (acks=0)
$kafka->produce('topic', 'key', 'value');

# explicit partition
$kafka->produce('topic', 'key', 'value',
    { partition => 3 }, sub { ... });

produce_many(\@messages, $cb)

Produce multiple messages with a single completion callback. Each message is an arrayref [$topic, $key, $value] or a hashref {topic, key, value}. $cb fires when all messages are acknowledged.

$kafka->produce_many([
    ['my-topic', 'k1', 'v1'],
    ['my-topic', 'k2', 'v2'],
], sub {
    my $errors = shift;
    warn "some failed: @$errors" if $errors;
});

flush([$cb])

Flush all accumulated produce batches and wait for all in-flight requests to complete. $cb fires when all pending responses have been received.

assign(\@partitions)

Manually assign partitions for consuming.

$kafka->assign([
    { topic => 'my-topic', partition => 0, offset => 0 },
    { topic => 'my-topic', partition => 1, offset => 100 },
]);

seek($topic, $partition, $offset, [$cb])

Seek a partition to a specific offset. Use -2 for earliest, -1 for latest. Updates the assignment in-place.

$kafka->seek('my-topic', 0, -1, sub { print "at latest\n" });

offsets_for($topic, $cb)

Get earliest and latest offsets for all partitions of a topic.

$kafka->offsets_for('my-topic', sub {
    my $offsets = shift;
    # { 0 => { earliest => 0, latest => 42 }, 1 => ... }
});

lag($cb)

Get consumer lag for all assigned partitions.

$kafka->lag(sub {
    my $lag = shift;
    # { "topic:0" => { current => 10, latest => 42, lag => 32 } }
});

error_name($code)

Convert a Kafka numeric error code to its name.

EV::Kafka::Client::error_name(3)  # "UNKNOWN_TOPIC_OR_PARTITION"

poll([$cb])

Fetch messages from assigned partitions. Calls on_message for each received record. $cb fires when all fetch responses have arrived.

my $timer = EV::timer 0, 0.1, sub { $kafka->poll };

subscribe($topic, ..., %opts)

Join a consumer group and subscribe to topics. The group protocol handles partition assignment automatically.

$kafka->subscribe('topic-a', 'topic-b',
    group_id           => 'my-group',
    session_timeout    => 30000,      # ms
    rebalance_timeout  => 60000,      # ms
    heartbeat_interval => 3,          # seconds
    auto_commit        => 1,          # commit on unsubscribe (default)
    auto_offset_reset  => 'earliest', # or 'latest'
    group_instance_id  => 'pod-abc', # KIP-345 static membership
    on_assign => sub {
        my $partitions = shift;
        # [{topic, partition, offset}, ...]
    },
    on_revoke => sub {
        my $partitions = shift;
    },
);

commit([$cb])

Commit current consumer offsets to the group coordinator.

$kafka->commit(sub {
    my $err = shift;
    warn "commit failed: $err" if $err;
});

unsubscribe([$cb])

Leave the consumer group (sends LeaveGroup for fast rebalance), stop heartbeat and fetch loop. If auto_commit is enabled, commits offsets before leaving.

begin_transaction

Start a transaction. Requires transactional_id in constructor.

send_offsets_to_transaction($group_id, [$cb])

Commit consumer offsets within the current transaction via TxnOffsetCommit. This is the key step for exactly-once consume-process-produce pipelines.

$kafka->send_offsets_to_transaction('my-group', sub {
    my ($result, $err) = @_;
});

commit_transaction([$cb])

Commit the current transaction. All produced messages and offset commits within the transaction become visible atomically.

abort_transaction([$cb])

Abort the current transaction. All produced messages are discarded and offset commits are rolled back.

close([$cb])

Graceful shutdown: stop timers, disconnect all broker connections.

$kafka->close(sub { EV::break });

LOW-LEVEL CONNECTION METHODS

EV::Kafka::Conn provides direct access to a single broker connection. Useful for custom protocols, debugging, or when cluster-level routing is not needed.

my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef);
$conn->on_error(sub { warn @_ });
$conn->on_connect(sub { ... });
$conn->connect('127.0.0.1', 9092, 5.0);

connect($host, $port, [$timeout])

Connect to a broker. Timeout in seconds (0 = no timeout).

disconnect

Disconnect from broker.

connected

Returns true if the connection is ready (ApiVersions handshake complete).

metadata(\@topics, $cb)

Request cluster metadata. Pass undef for all topics.

$conn->metadata(['my-topic'], sub {
    my ($result, $err) = @_;
    # $result->{brokers}, $result->{topics}
});

produce($topic, $partition, $key, $value, [\%opts,] [$cb])

Produce a message to a specific partition.

$conn->produce('topic', 0, 'key', 'value', sub {
    my ($result, $err) = @_;
});

Options: acks (default 1), headers (hashref), timestamp (epoch ms, default now), compression ('none', 'lz4'; requires LZ4 at build time).

produce_batch($topic, $partition, \@records, [\%opts,] [$cb])

Produce multiple records in a single RecordBatch. Each record is {key, value, headers}. Options: acks, compression, producer_id, producer_epoch, base_sequence.

$conn->produce_batch('topic', 0, [
    { key => 'k1', value => 'v1' },
    { key => 'k2', value => 'v2' },
], sub { my ($result, $err) = @_ });

fetch($topic, $partition, $offset, $cb, [$max_bytes])

Fetch messages from a partition starting at $offset.

$conn->fetch('topic', 0, 0, sub {
    my ($result, $err) = @_;
    for my $rec (@{ $result->{topics}[0]{partitions}[0]{records} }) {
        printf "offset=%d key=%s value=%s\n",
            $rec->{offset}, $rec->{key}, $rec->{value};
    }
});

fetch_multi(\%topics, $cb, [$max_bytes])

Multi-partition fetch in a single request. Groups multiple topic-partitions into one Fetch call to the broker.

$conn->fetch_multi({
    'topic-a' => [{ partition => 0, offset => 10 },
                   { partition => 1, offset => 20 }],
    'topic-b' => [{ partition => 0, offset => 0 }],
}, sub { my ($result, $err) = @_ });

Used internally by poll() to batch fetches by broker leader.

list_offsets($topic, $partition, $timestamp, $cb)

Get offsets by timestamp. Use -2 for earliest, -1 for latest.

find_coordinator($key, $cb, [$key_type])

Find the coordinator broker. $key_type: 0=group (default), 1=transaction.

join_group($group_id, $member_id, \@topics, $cb, [$session_timeout_ms, $rebalance_timeout_ms, $group_instance_id])

Join a consumer group. Pass $group_instance_id for KIP-345 static membership.

sync_group($group_id, $generation_id, $member_id, \@assignments, $cb, [$group_instance_id])

Synchronize group state after join.

heartbeat($group_id, $generation_id, $member_id, $cb, [$group_instance_id])

Send heartbeat to group coordinator.

offset_commit($group_id, $generation_id, $member_id, \@offsets, $cb)

Commit consumer offsets.

offset_fetch($group_id, \@topics, $cb)

Fetch committed offsets for a consumer group.

api_versions

Returns a hashref of supported API keys to max versions, or undef if not yet negotiated.

my $vers = $conn->api_versions;
# { 0 => 7, 1 => 11, 3 => 8, ... }

on_error([$cb])

on_connect([$cb])

on_disconnect([$cb])

Set handler callbacks. Pass undef to clear.

client_id($id)

Set the client identifier.

tls($enable, [$ca_file, $skip_verify])

Configure TLS.

sasl($mechanism, [$username, $password])

Configure SASL authentication.

auto_reconnect($enable, [$delay_ms])

Enable automatic reconnection with delay in milliseconds (default 1000).

leave_group($group_id, $member_id, $cb)

Send LeaveGroup to coordinator for fast partition rebalance.

create_topics(\@topics, $timeout_ms, $cb)

Create topics. Each element: {name, num_partitions, replication_factor}.

$conn->create_topics(
    [{ name => 'new-topic', num_partitions => 3, replication_factor => 1 }],
    5000, sub { my ($res, $err) = @_ }
);

delete_topics(\@topic_names, $timeout_ms, $cb)

Delete topics by name.

init_producer_id($transactional_id, $txn_timeout_ms, $cb)

Initialize a producer ID for idempotent/transactional produce. Pass undef for non-transactional idempotent producer.

add_partitions_to_txn($txn_id, $producer_id, $epoch, \@topics, $cb)

Register partitions with the transaction coordinator.

end_txn($txn_id, $producer_id, $epoch, $committed, $cb)

Commit ($committed=1) or abort ($committed=0) a transaction.

txn_offset_commit($txn_id, $group_id, $producer_id, $epoch, $generation, $member_id, \@offsets, $cb)

Commit consumer offsets within a transaction (API 28).

pending

Number of requests awaiting broker response.

state

Connection state as integer (0=disconnected, 6=ready).

UTILITY FUNCTIONS

EV::Kafka::_murmur2($key)

Kafka-compatible murmur2 hash. Returns a non-negative 31-bit integer.

EV::Kafka::_crc32c($data)

CRC32C checksum (Castagnoli). Used internally for RecordBatch integrity.

EV::Kafka::_error_name($code)

Convert Kafka error code to string name.

RESULT STRUCTURES

Produce result

$result = {
    topics => [{
        topic      => 'name',
        partitions => [{
            partition   => 0,
            error_code  => 0,
            base_offset => 42,
        }],
    }],
};

Fetch result

$result = {
    topics => [{
        topic      => 'name',
        partitions => [{
            partition      => 0,
            error_code     => 0,
            high_watermark => 100,
            records => [{
                offset    => 42,
                timestamp => 1712345678000,
                key       => 'key',      # or undef
                value     => 'value',     # or undef
                headers   => { h => 'v' },  # if present
            }],
        }],
    }],
};

Metadata result

$result = {
    controller_id => 0,
    brokers => [{ node_id => 0, host => '10.0.0.1', port => 9092 }],
    topics  => [{
        name       => 'topic',
        error_code => 0,
        partitions => [{
            partition  => 0,
            leader     => 0,
            error_code => 0,
        }],
    }],
};

ERROR HANDLING

Errors are delivered through two channels:

Connection-level errors fire the on_error callback (or croak if none set). These include connection refused, DNS failure, TLS errors, SASL auth failure, and protocol violations.
Request-level errors are delivered as the second argument to the request callback: $cb->($result, $error). If $error is defined, $result may be undef.

Within result structures, per-partition error_code fields use Kafka numeric codes:

0   No error
1   OFFSET_OUT_OF_RANGE
3   UNKNOWN_TOPIC_OR_PARTITION
6   NOT_LEADER_OR_FOLLOWER
15  COORDINATOR_NOT_AVAILABLE
16  NOT_COORDINATOR
25  UNKNOWN_MEMBER_ID
27  REBALANCE_IN_PROGRESS
36  TOPIC_ALREADY_EXISTS
79  MEMBER_ID_REQUIRED

When a broker disconnects mid-flight, all pending callbacks receive (undef, "connection closed by broker") or (undef, "disconnected").

ENVIRONMENT VARIABLES

These are used by tests and examples (not by the module itself):

TEST_KAFKA_BROKER    broker address for tests (host:port)
KAFKA_BROKER         broker address for examples
KAFKA_HOST           broker hostname for low-level examples
KAFKA_PORT           broker port for low-level examples
KAFKA_TOPIC          topic name for examples
KAFKA_GROUP_ID       consumer group for examples
KAFKA_LIMIT          message limit for consume example
KAFKA_COUNT          message count for fire-and-forget
BENCH_BROKER         broker for benchmarks
BENCH_MESSAGES       message count for benchmarks
BENCH_VALUE_SIZE     value size in bytes for benchmarks
BENCH_TOPIC          topic name for benchmarks

QUICK START

Minimal producer + consumer lifecycle:

use EV;
use EV::Kafka;

my $kafka = EV::Kafka->new(
    brokers    => '127.0.0.1:9092',
    acks       => 1,
    on_error   => sub { warn "kafka: @_\n" },
    on_message => sub {
        my ($topic, $part, $offset, $key, $value) = @_;
        print "got: $key=$value\n";
    },
);

$kafka->connect(sub {
    # produce
    $kafka->produce('test', 'k1', 'hello', sub {
        print "produced\n";

        # consume from the beginning
        $kafka->assign([{topic=>'test', partition=>0, offset=>0}]);
        $kafka->seek('test', 0, -2, sub {
            my $t = EV::timer 0, 0.1, sub { $kafka->poll };
            $kafka->{cfg}{_t} = $t;
        });
    });
});

EV::run;

COOKBOOK

Produce JSON with headers

use JSON::PP;
my $json = JSON::PP->new->utf8;

$kafka->produce('events', 'user-42',
    $json->encode({ action => 'click', page => '/home' }),
    { headers => { 'content-type' => 'application/json' } },
    sub { ... }
);

Consume from latest offset only

$kafka->subscribe('live-feed',
    group_id          => 'realtime',
    auto_offset_reset => 'latest',
    on_assign         => sub { print "ready\n" },
);

Graceful shutdown

$SIG{INT} = sub {
    $kafka->commit(sub {
        $kafka->unsubscribe(sub {
            $kafka->close(sub { EV::break });
        });
    });
};

At-least-once processing

$kafka->subscribe('jobs',
    group_id    => 'workers',
    auto_commit => 0,
);

# in on_message: process, then commit
on_message => sub {
    process($_[4]);
    $kafka->commit if ++$count % 100 == 0;
},

Batch produce

$kafka->produce_many([
    ['events', 'k1', 'v1'],
    ['events', 'k2', 'v2'],
    ['events', 'k3', 'v3'],
], sub {
    my $errs = shift;
    print $errs ? "some failed\n" : "all done\n";
});

Exactly-once stream processing (EOS)

my $kafka = EV::Kafka->new(
    brokers          => '...',
    transactional_id => 'my-eos-app',
    acks             => -1,
    on_message => sub {
        my ($t, $p, $off, $key, $value) = @_;
        my $result = process($value);
        $kafka->produce('output-topic', $key, $result);
    },
);

# consume-process-produce loop:
$kafka->begin_transaction;
$kafka->poll(sub {
    $kafka->send_offsets_to_transaction('my-group', sub {
        $kafka->commit_transaction(sub {
            $kafka->begin_transaction;  # next transaction
        });
    });
});

Topic administration

my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef);
$conn->on_connect(sub {
    $conn->create_topics(
        [{ name => 'new-topic', num_partitions => 6, replication_factor => 3 }],
        10000, sub { ... }
    );
});

BENCHMARKS

Measured on Linux with TCP loopback to Redpanda, 100-byte values, Perl 5.40.2, 50K messages (bench/benchmark.pl):

Pipeline produce (acks=1)    68K msg/sec     7.4 MB/s
Fire-and-forget (acks=0)    100K msg/sec    11.0 MB/s
Fetch throughput             31K msg/sec     3.4 MB/s
Sequential round-trip        19K msg/sec    54 us avg latency
Metadata request             25K req/sec    41 us avg latency

Throughput by value size (pipelined, acks=1):

   10 bytes    61K msg/sec      0.9 MB/s
  100 bytes    68K msg/sec      7.4 MB/s
 1000 bytes    50K msg/sec     50.2 MB/s
10000 bytes    18K msg/sec    178.5 MB/s

Pipeline produce throughput is limited by Perl callback overhead per message. Fire-and-forget mode (acks=0) skips the response cycle entirely, reaching ~100K msg/sec. Sequential round-trip (one produce, wait for ack, repeat) measures raw broker latency at ~54 microseconds.

The fetch path is sequential (fetch, process, fetch again) which introduces one round-trip per batch. With larger max_bytes and dense topics, fetch throughput increases proportionally.

Run perl bench/benchmark.pl for throughput results. Set BENCH_BROKER, BENCH_MESSAGES, BENCH_VALUE_SIZE, and BENCH_TOPIC to customize.

Run perl bench/latency.pl for a latency histogram with percentiles (min, avg, median, p90, p95, p99, max).

KAFKA PROTOCOL

This module implements the Kafka binary protocol directly in XS. All integers are big-endian. Requests use a 4-byte size prefix followed by a header (API key, version, correlation ID, client ID) and a version-specific body.

Responses are matched to requests by correlation ID. The broker guarantees FIFO ordering per connection, so the response queue is a simple FIFO.

RecordBatch encoding (magic=2) is used for produce. CRC32C covers the batch from attributes through the last record. Records use ZigZag-encoded varints for lengths and deltas.

The connection handshake sends ApiVersions (v0) on connect to discover supported protocol versions. SASL authentication uses SaslHandshake (v1) + SaslAuthenticate (v2) with PLAIN mechanism.

Consumer group protocol uses sticky partition assignment with MEMBER_ID_REQUIRED (error 79) retry per KIP-394.

Non-flexible API versions are used throughout (capped below the flexible-version threshold for each API) to avoid the compact encoding complexity.

LIMITATIONS

  • LZ4 and gzip compression -- supported when built with liblz4 and zlib. snappy and zstd are not implemented.

  • Transactions / EOS -- begin_transaction, send_offsets_to_transaction, commit_transaction, abort_transaction provide full exactly-once stream processing. InitProducerId, AddPartitionsToTxn, TxnOffsetCommit, EndTxn are all wired. Requires transactional_id in constructor.

  • No GSSAPI/OAUTHBEARER -- SASL/PLAIN and SCRAM-SHA-256/512 are supported. GSSAPI (Kerberos) and OAUTHBEARER are not implemented.

  • Sticky partition assignment -- assignments are preserved across rebalances where possible. New partitions are distributed to the least-loaded member. Overloaded members shed excess partitions.

  • Blocking DNS resolution -- getaddrinfo is called synchronously in conn_start_connect. For fully non-blocking operation, use IP addresses instead of hostnames.

  • No flexible API versions -- all API versions are capped below the flexible-version threshold to avoid compact string/array encoding. This limits interoperability with very new protocol features but works with all Kafka 0.11+ and Redpanda brokers.

  • Limited produce retry -- transient errors (NOT_LEADER, COORDINATOR_NOT_AVAILABLE) trigger metadata refresh and up to 3 retries with backoff. Non-retriable errors are surfaced to the callback immediately.

AUTHOR

vividsnow

LICENSE

This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.