NAME
EV::Kafka - High-performance asynchronous Kafka/Redpanda client using EV
SYNOPSIS
use EV::Kafka;
my $kafka = EV::Kafka->new(
brokers => '127.0.0.1:9092',
acks => -1,
on_error => sub { warn "kafka: @_" },
on_message => sub {
my ($topic, $partition, $offset, $key, $value, $headers) = @_;
print "$topic:$partition @ $offset $key = $value\n";
},
);
# Producer
$kafka->connect(sub {
$kafka->produce('my-topic', 'key', 'value', sub {
my ($result, $err) = @_;
my $off = $result->{topics}[0]{partitions}[0]{base_offset};
print "produced at offset $off\n";
});
});
# Consumer (manual assignment)
$kafka->assign([{ topic => 'my-topic', partition => 0, offset => 0 }]);
my $poll = EV::timer 0, 0.1, sub { $kafka->poll };
# Consumer group
$kafka->subscribe('my-topic',
group_id => 'my-group',
on_assign => sub { ... },
on_revoke => sub { ... },
);
EV::run;
DESCRIPTION
EV::Kafka is a high-performance asynchronous Kafka client that implements the Kafka binary protocol in XS with EV event loop integration. It targets Redpanda and Apache Kafka (protocol version 0.11+).
Two-layer architecture:
EV::Kafka::Conn (XS) -- single broker TCP connection with protocol encoding/decoding, correlation ID matching, pipelining, optional TLS and SASL (PLAIN, SCRAM-SHA-256/512) authentication.
EV::Kafka::Client (Perl) -- cluster management with metadata discovery, broker connection pooling, partition leader routing, producer with key-based partitioning, consumer with manual assignment or consumer groups.
Features:
Binary protocol implemented in pure XS (no librdkafka dependency)
Automatic request pipelining per broker connection
Metadata-driven partition leader routing
Producer: acks modes (-1/0/1), key-based partitioning (murmur2), headers, fire-and-forget (acks=0), idempotent producer with epoch-bump recovery, transactional / exactly-once stream processing
Consumer: manual partition assignment, offset tracking, poll-based message delivery; consumer groups with JoinGroup/SyncGroup/Heartbeat, sticky partition assignment, offset commit/fetch, automatic rebalancing, session-expiry recovery
Compression: lz4, gzip, zstd, snappy (each gated by build-time library detection)
TLS (OpenSSL) and SASL/PLAIN, SCRAM-SHA-256/512 (with full RFC 5802 server-signature verification)
Automatic reconnection at the connection layer; bootstrap-broker failover; periodic metadata refresh
ANYEVENT INTEGRATION
AnyEvent has EV as one of its backends, so EV::Kafka can be used in AnyEvent applications seamlessly.
NO UTF-8 SUPPORT
This module handles all values as bytes. Encode your UTF-8 strings before passing them:
use Encode;
$kafka->produce($topic, $key, encode_utf8($val), sub { ... });
CLUSTER CLIENT METHODS
new(%options)
Create a new EV::Kafka client. Returns a blessed EV::Kafka::Client object.
my $kafka = EV::Kafka->new(
brokers => '10.0.0.1:9092,10.0.0.2:9092',
acks => -1,
on_error => sub { warn @_ },
);
Options:
- brokers => 'Str'
-
Comma-separated list of bootstrap broker addresses (host:port). Default:
127.0.0.1:9092. - client_id => 'Str' (default 'ev-kafka')
-
Client identifier sent to brokers.
- tls => Bool
-
Enable TLS encryption.
- tls_ca_file => 'Str'
-
Path to CA certificate file for TLS verification.
- tls_skip_verify => Bool
-
Skip TLS certificate verification.
- sasl => \%opts
-
Enable SASL authentication. Supported mechanisms:
PLAIN,SCRAM-SHA-256,SCRAM-SHA-512.sasl => { mechanism => 'PLAIN', username => 'user', password => 'pass' } - acks => Int (default -1)
-
Producer acknowledgment mode.
-1= all in-sync replicas,0= no acknowledgment (fire-and-forget),1= leader only. - linger_ms => Int (default 5)
-
Time in milliseconds to accumulate records before flushing a batch. Lower values reduce latency; higher values improve throughput.
- batch_size => Int (default 16384)
-
Maximum batch size in bytes before a batch is flushed immediately.
- compression => 'Str'
-
Compression type for produce batches:
'lz4'(requires liblz4),'gzip'(requires zlib),'zstd'(requires libzstd),'snappy'(requires libsnappy), orundeffor none. - idempotent => Bool (default 0)
-
Enable idempotent producer. Calls
InitProducerIdon connect and sets producer_id/epoch/sequence in each RecordBatch for exactly-once delivery (broker-side deduplication). Only one batch per (topic, partition) is in-flight at a time when this is enabled, to prevent sequence-number aliasing on retry. - transactional_id => 'Str'
-
Enable transactional producer. Implies idempotent. Required for
begin_transaction/commit_transaction/abort_transactionandsend_offsets_to_transaction(full EOS). - partitioner => $cb->($topic, $key, $num_partitions)
-
Custom partition selection function. Default: murmur2 hash of key, or round-robin for null keys.
- on_error => $cb->($errstr)
-
Error callback. Default:
die. - on_connect => $cb->()
-
Called once after initial metadata fetch completes.
- on_message => $cb->($topic, $partition, $offset, $key, $value, $headers)
-
Message delivery callback for consumer operations.
- fetch_max_wait_ms => Int (default 500)
-
Maximum time the broker waits to accumulate
fetch_min_bytesof data before returning a fetch response. - fetch_max_bytes => Int (default 1048576)
-
Maximum bytes per fetch response.
- fetch_min_bytes => Int (default 1)
-
Minimum bytes before the broker responds to a fetch.
- metadata_refresh => Int (default 300)
-
Periodic metadata refresh interval in seconds. Set to
0to disable. Refreshes happen in the background, so consumers and producers pick up leader changes without waiting for a request to fail first. - loop => $ev_loop
-
EV loop object to use. Default:
EV::default_loop.
connect([$cb])
Connect to the cluster. Connects to the first available bootstrap broker, fetches cluster metadata, then fires $cb->($metadata). On bootstrap-broker failure the next address is tried; if all fail, the on_error handler fires.
$kafka->connect(sub {
my $meta = shift;
# $meta->{brokers}, $meta->{topics}
});
produce($topic, $key, $value, [\%opts,] [$cb])
Produce a message. Routes to the correct partition leader automatically.
# with callback (acks=1 or acks=-1)
$kafka->produce('topic', 'key', 'value', sub {
my ($result, $err) = @_;
});
# with headers
$kafka->produce('topic', 'key', 'value',
{ headers => { 'h1' => 'v1' } }, sub { ... });
# fire-and-forget (acks=0)
$kafka->produce('topic', 'key', 'value');
# explicit partition
$kafka->produce('topic', 'key', 'value',
{ partition => 3 }, sub { ... });
produce_many(\@messages, $cb)
Produce multiple messages with a single completion callback. Each message is an arrayref [$topic, $key, $value] or a hashref {topic, key, value}. $cb fires when all messages are acknowledged.
$kafka->produce_many([
['my-topic', 'k1', 'v1'],
['my-topic', 'k2', 'v2'],
], sub {
my $errors = shift;
warn "some failed: @$errors" if $errors;
});
flush([$cb])
Flush all accumulated produce batches and wait for all in-flight requests to complete. $cb fires when all pending responses have been received.
assign(\@partitions)
Manually assign partitions for consuming.
$kafka->assign([
{ topic => 'my-topic', partition => 0, offset => 0 },
{ topic => 'my-topic', partition => 1, offset => 100 },
]);
seek($topic, $partition, $offset, [$cb])
Seek a partition to a specific offset. Use -2 for earliest, -1 for latest. Updates the assignment in-place.
$kafka->seek('my-topic', 0, -1, sub { print "at latest\n" });
offsets_for($topic, $cb)
Get earliest and latest offsets for all partitions of a topic.
$kafka->offsets_for('my-topic', sub {
my $offsets = shift;
# { 0 => { earliest => 0, latest => 42 }, 1 => ... }
});
lag($cb)
Get consumer lag for all assigned partitions.
$kafka->lag(sub {
my $lag = shift;
# { "topic:0" => { current => 10, latest => 42, lag => 32 } }
});
error_name($code)
Convert a Kafka numeric error code to its name. Callable as a method or class function.
$kafka->error_name(3); # "UNKNOWN_TOPIC_OR_PARTITION"
EV::Kafka::Client::error_name(3); # same
poll([$cb])
Fetch messages from assigned partitions. Calls on_message for each received record. $cb fires when all fetch responses have arrived.
my $timer = EV::timer 0, 0.1, sub { $kafka->poll };
subscribe(@topics, %opts)
Join a consumer group and subscribe to one or more topics. The list of topic names comes first, followed by option key/value pairs. The group protocol handles partition assignment automatically.
$kafka->subscribe('topic-a', 'topic-b',
group_id => 'my-group',
session_timeout => 30000, # ms
rebalance_timeout => 60000, # ms
heartbeat_interval => 3, # seconds
auto_commit => 1, # commit on unsubscribe (default)
auto_offset_reset => 'earliest', # or 'latest'
group_instance_id => 'pod-abc', # KIP-345 static membership
on_assign => sub {
my $partitions = shift;
# [{topic, partition, offset}, ...]
},
on_revoke => sub {
my $partitions = shift;
},
);
commit([$cb])
Commit current consumer offsets to the group coordinator.
$kafka->commit(sub {
my $err = shift;
warn "commit failed: $err" if $err;
});
unsubscribe([$cb])
Leave the consumer group (sends LeaveGroup for fast rebalance), stop heartbeat and fetch loop. If auto_commit is enabled, commits offsets before leaving.
begin_transaction
Start a transaction. Requires transactional_id in constructor.
send_offsets_to_transaction($group_id, [$cb])
Commit consumer offsets within the current transaction via TxnOffsetCommit. This is the key step for exactly-once consume-process-produce pipelines.
$kafka->send_offsets_to_transaction('my-group', sub {
my ($result, $err) = @_;
});
commit_transaction([$cb])
Commit the current transaction. All produced messages and offset commits within the transaction become visible atomically.
abort_transaction([$cb])
Abort the current transaction. All produced messages are discarded and offset commits are rolled back.
close([$cb])
Graceful shutdown: stop timers, disconnect all broker connections.
$kafka->close(sub { EV::break });
LOW-LEVEL CONNECTION METHODS
EV::Kafka::Conn provides direct access to a single broker connection. Useful for custom protocols, debugging, or when cluster-level routing is not needed.
my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef);
$conn->on_error(sub { warn @_ });
$conn->on_connect(sub { ... });
$conn->connect('127.0.0.1', 9092, 5.0);
connect($host, $port, [$timeout])
Connect to a broker. Timeout in seconds (0 = no timeout).
disconnect
Disconnect from broker.
connected
Returns true if the connection is ready (ApiVersions handshake complete).
metadata(\@topics, $cb)
Request cluster metadata. Pass undef for all topics.
$conn->metadata(['my-topic'], sub {
my ($result, $err) = @_;
# $result->{brokers}, $result->{topics}
});
produce($topic, $partition, $key, $value, [\%opts,] [$cb])
Produce a message to a specific partition.
$conn->produce('topic', 0, 'key', 'value', sub {
my ($result, $err) = @_;
});
Options: acks (default 1), headers (hashref), timestamp (epoch ms, default now), compression ('none', 'lz4', 'gzip', 'zstd', 'snappy'; each requires its respective library at build time).
produce_batch($topic, $partition, \@records, [\%opts,] [$cb])
Produce multiple records in a single RecordBatch. Each record is {key, value, headers}. Options: acks, compression, producer_id, producer_epoch, base_sequence.
$conn->produce_batch('topic', 0, [
{ key => 'k1', value => 'v1' },
{ key => 'k2', value => 'v2' },
], sub { my ($result, $err) = @_ });
fetch($topic, $partition, $offset, [\%opts,] $cb)
Fetch messages from a partition starting at $offset. %opts may set max_bytes (per-partition cap, default 1 MiB), max_wait_ms (broker block-time, default 500), min_bytes (default 1).
$conn->fetch('topic', 0, 0, sub {
my ($result, $err) = @_;
for my $rec (@{ $result->{topics}[0]{partitions}[0]{records} }) {
printf "offset=%d key=%s value=%s\n",
$rec->{offset}, $rec->{key}, $rec->{value};
}
});
fetch_multi(\%topics, [\%opts,] $cb)
Multi-partition fetch in a single request. Groups multiple topic-partitions into one Fetch call to the broker. %opts accepts the same keys as fetch.
$conn->fetch_multi({
'topic-a' => [{ partition => 0, offset => 10 },
{ partition => 1, offset => 20 }],
'topic-b' => [{ partition => 0, offset => 0 }],
}, sub { my ($result, $err) = @_ });
Used internally by poll() to batch fetches by broker leader, with max_bytes/max_wait_ms/min_bytes taken from the cluster client config.
list_offsets($topic, $partition, $timestamp, $cb)
Get offsets by timestamp. Use -2 for earliest, -1 for latest.
find_coordinator($key, $cb, [$key_type])
Find the coordinator broker. $key_type: 0=group (default), 1=transaction.
join_group($group_id, $member_id, \@topics, $cb, [$session_timeout_ms, $rebalance_timeout_ms, $group_instance_id])
Join a consumer group. Pass $group_instance_id for KIP-345 static membership.
sync_group($group_id, $generation_id, $member_id, \@assignments, $cb, [$group_instance_id])
Synchronize group state after join.
heartbeat($group_id, $generation_id, $member_id, $cb, [$group_instance_id])
Send heartbeat to group coordinator.
offset_commit($group_id, $generation_id, $member_id, \@offsets, $cb)
Commit consumer offsets.
offset_fetch($group_id, \@topics, $cb)
Fetch committed offsets for a consumer group.
api_versions
Returns a hashref of supported API keys to max versions, or undef if not yet negotiated.
my $vers = $conn->api_versions;
# { 0 => 7, 1 => 11, 3 => 8, ... }
on_error([$cb]), on_connect([$cb]), on_disconnect([$cb])
Set connection-level handler callbacks. Call with no argument or undef to clear.
client_id($id)
Set the client identifier.
tls($enable, [$ca_file, $skip_verify])
Configure TLS.
sasl($mechanism, [$username, $password])
Configure SASL authentication.
auto_reconnect($enable, [$delay_ms])
Enable automatic reconnection with delay in milliseconds (default 1000).
leave_group($group_id, $member_id, $cb)
Send LeaveGroup to coordinator for fast partition rebalance.
create_topics(\@topics, $timeout_ms, $cb)
Create topics. Each element: {name, num_partitions, replication_factor}.
$conn->create_topics(
[{ name => 'new-topic', num_partitions => 3, replication_factor => 1 }],
5000, sub { my ($res, $err) = @_ }
);
delete_topics(\@topic_names, $timeout_ms, $cb)
Delete topics by name.
init_producer_id($transactional_id, $txn_timeout_ms, $cb)
Initialize a producer ID for idempotent/transactional produce. Pass undef for non-transactional idempotent producer.
add_partitions_to_txn($txn_id, $producer_id, $epoch, \@topics, $cb)
Register partitions with the transaction coordinator.
end_txn($txn_id, $producer_id, $epoch, $committed, $cb)
Commit ($committed=1) or abort ($committed=0) a transaction.
txn_offset_commit($txn_id, $group_id, $producer_id, $epoch, $generation, $member_id, \@offsets, $cb)
Commit consumer offsets within a transaction (API 28).
pending
Number of requests awaiting broker response.
state
Connection state as integer (0=disconnected, 6=ready).
UTILITY FUNCTIONS
EV::Kafka::_murmur2($key)
Kafka-compatible murmur2 hash. Returns a non-negative 31-bit integer.
EV::Kafka::_crc32c($data)
CRC32C checksum (Castagnoli). Used internally for RecordBatch integrity.
EV::Kafka::_error_name($code)
Convert Kafka error code to string name.
RESULT STRUCTURES
Produce result
$result = {
topics => [{
topic => 'name',
partitions => [{
partition => 0,
error_code => 0,
base_offset => 42,
}],
}],
};
Fetch result
$result = {
topics => [{
topic => 'name',
partitions => [{
partition => 0,
error_code => 0,
high_watermark => 100,
records => [{
offset => 42,
timestamp => 1712345678000,
key => 'key', # or undef
value => 'value', # or undef
headers => { h => 'v' }, # if present
}],
}],
}],
};
Metadata result
$result = {
controller_id => 0,
brokers => [{ node_id => 0, host => '10.0.0.1', port => 9092 }],
topics => [{
name => 'topic',
error_code => 0,
partitions => [{
partition => 0,
leader => 0,
error_code => 0,
}],
}],
};
ERROR HANDLING
Errors are delivered through two channels:
- Connection-level errors fire the
on_errorcallback (orcroakif none set). These include connection refused, DNS failure, TLS errors, SASL auth failure, and protocol violations. - Request-level errors are delivered as the second argument to the request callback:
$cb->($result, $error). If$erroris defined,$resultmay be undef.
Within result structures, per-partition error_code fields use Kafka numeric codes:
0 No error
1 OFFSET_OUT_OF_RANGE
3 UNKNOWN_TOPIC_OR_PARTITION
6 NOT_LEADER_OR_FOLLOWER (retried by the producer)
15 COORDINATOR_NOT_AVAILABLE (retried)
16 NOT_COORDINATOR (retried)
22 ILLEGAL_GENERATION (group rejoin)
25 UNKNOWN_MEMBER_ID (group rejoin)
27 REBALANCE_IN_PROGRESS (group rejoin)
36 TOPIC_ALREADY_EXISTS
45 OUT_OF_ORDER_SEQUENCE_NUMBER (idempotent: epoch bump)
46 DUPLICATE_SEQUENCE_NUMBER (idempotent: epoch bump)
79 MEMBER_ID_REQUIRED (group rejoin with assigned id)
Use EV::Kafka::Client::error_name($code) for the full list.
When a broker disconnects mid-flight, all pending callbacks receive (undef, "connection closed by broker") or (undef, "disconnected").
ENVIRONMENT VARIABLES
These are used by tests and examples (not by the module itself):
TEST_KAFKA_BROKER broker address for tests (host:port)
KAFKA_BROKER broker address for examples
KAFKA_HOST broker hostname for low-level examples
KAFKA_PORT broker port for low-level examples
KAFKA_TOPIC topic name for examples
KAFKA_GROUP_ID consumer group for examples
KAFKA_LIMIT message limit for consume example
KAFKA_COUNT message count for fire-and-forget
BENCH_BROKER broker for benchmarks
BENCH_MESSAGES message count for benchmarks
BENCH_VALUE_SIZE value size in bytes for benchmarks
BENCH_TOPIC topic name for benchmarks
QUICK START
Minimal producer + consumer lifecycle:
use EV;
use EV::Kafka;
my $kafka = EV::Kafka->new(
brokers => '127.0.0.1:9092',
acks => 1,
on_error => sub { warn "kafka: @_\n" },
on_message => sub {
my ($topic, $part, $offset, $key, $value) = @_;
print "got: $key=$value\n";
},
);
$kafka->connect(sub {
# produce
$kafka->produce('test', 'k1', 'hello', sub {
print "produced\n";
# consume from the beginning
$kafka->assign([{topic=>'test', partition=>0, offset=>0}]);
$kafka->seek('test', 0, -2, sub {
my $t = EV::timer 0, 0.1, sub { $kafka->poll };
$kafka->{cfg}{_t} = $t;
});
});
});
EV::run;
COOKBOOK
Produce JSON with headers
use JSON::PP;
my $json = JSON::PP->new->utf8;
$kafka->produce('events', 'user-42',
$json->encode({ action => 'click', page => '/home' }),
{ headers => { 'content-type' => 'application/json' } },
sub { ... }
);
Consume from latest offset only
$kafka->subscribe('live-feed',
group_id => 'realtime',
auto_offset_reset => 'latest',
on_assign => sub { print "ready\n" },
);
Graceful shutdown
$SIG{INT} = sub {
$kafka->commit(sub {
$kafka->unsubscribe(sub {
$kafka->close(sub { EV::break });
});
});
};
At-least-once processing
$kafka->subscribe('jobs',
group_id => 'workers',
auto_commit => 0,
);
# in on_message: process, then commit
on_message => sub {
process($_[4]);
$kafka->commit if ++$count % 100 == 0;
},
Batch produce
$kafka->produce_many([
['events', 'k1', 'v1'],
['events', 'k2', 'v2'],
['events', 'k3', 'v3'],
], sub {
my $errs = shift;
print $errs ? "some failed\n" : "all done\n";
});
Exactly-once stream processing (EOS)
my $kafka = EV::Kafka->new(
brokers => '...',
transactional_id => 'my-eos-app',
acks => -1,
on_message => sub {
my ($t, $p, $off, $key, $value) = @_;
my $result = process($value);
$kafka->produce('output-topic', $key, $result);
},
);
# consume-process-produce loop:
$kafka->begin_transaction;
$kafka->poll(sub {
$kafka->send_offsets_to_transaction('my-group', sub {
$kafka->commit_transaction(sub {
$kafka->begin_transaction; # next transaction
});
});
});
Topic administration
my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef);
$conn->on_connect(sub {
$conn->create_topics(
[{ name => 'new-topic', num_partitions => 6, replication_factor => 3 }],
10000, sub { ... }
);
});
BENCHMARKS
Measured on Linux with TCP loopback to Redpanda, 100-byte values, Perl 5.40.2, 20K messages (bench/benchmark.pl):
Pipeline produce (acks=1) 100K msg/sec 11.0 MB/s
Fire-and-forget (acks=0) 120K msg/sec 13.2 MB/s
Sequential round-trip 24K msg/sec 42 us avg latency
Metadata request 21K req/sec 47 us avg latency
Throughput by value size (pipelined, acks=1):
10 bytes 105K msg/sec 1.5 MB/s
100 bytes 100K msg/sec 10.5 MB/s
1000 bytes 70K msg/sec 70.7 MB/s
10000 bytes 20K msg/sec 202.0 MB/s
Latency histogram (20K round-trips, acks=1, bench/latency.pl):
median: 39 us p90: 59 us p95: 75 us p99: 122 us
Pipeline produce throughput is limited by Perl callback overhead per message. Fire-and-forget mode (acks=0) skips the response cycle entirely, reaching ~120K msg/sec. Sequential round-trip (one produce, wait for ack, repeat) measures raw broker latency around 39us median.
The fetch path is sequential (fetch, process, fetch again) which introduces one round-trip per batch. With larger max_bytes and dense topics, fetch throughput increases proportionally.
Run perl bench/benchmark.pl for throughput results. Set BENCH_BROKER, BENCH_MESSAGES, BENCH_VALUE_SIZE, and BENCH_TOPIC to customize.
Run perl bench/latency.pl for a latency histogram with percentiles (min, avg, median, p90, p95, p99, max).
KAFKA PROTOCOL
This module implements the Kafka binary protocol directly in XS. All integers are big-endian. Requests use a 4-byte size prefix followed by a header (API key, version, correlation ID, client ID) and a version-specific body.
Responses are matched to requests by correlation ID. The broker guarantees FIFO ordering per connection, so the response queue is a simple FIFO.
RecordBatch encoding (magic=2) is used for produce. CRC32C covers the batch from attributes through the last record. Records use ZigZag-encoded varints for lengths and deltas.
The connection handshake sends ApiVersions (v0) on connect to discover supported protocol versions. SASL authentication uses SaslHandshake (v1) + SaslAuthenticate (v2) with PLAIN mechanism.
Consumer group protocol uses sticky partition assignment with MEMBER_ID_REQUIRED (error 79) retry per KIP-394.
Non-flexible API versions are used throughout (capped below the flexible-version threshold for each API) to avoid the compact encoding complexity.
LIMITATIONS
Blocking DNS for hostnames -- numeric IPv4/IPv6 literals take a fast path (
AI_NUMERICHOST) and never block. Non-literal hostnames callgetaddrinfosynchronously, blocking the EV loop until the resolver responds. For fully non-blocking operation against named brokers, pre-resolve in Perl-land.No GSSAPI/OAUTHBEARER -- only SASL/PLAIN and SCRAM-SHA-256/512 are implemented.
No flexible API versions -- all API versions are capped below the flexible-version threshold to avoid compact string/array encoding. Works with Kafka 0.11+ and Redpanda; loses access to a few newer protocol features.
Producer retry policy -- transient errors (NOT_LEADER, COORDINATOR_NOT_AVAILABLE) trigger metadata refresh and up to 3 retries with backoff. Hard idempotent errors (OUT_OF_ORDER_SEQUENCE, DUPLICATE_SEQUENCE) trigger one InitProducerId-with-fresh-epoch recovery attempt. Other broker errors are surfaced to the callback immediately.
AUTHOR
vividsnow
LICENSE
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.