NAME
Kafka - Apache Kafka interface for Perl
VERSION
This documentation refers to Kafka
package version 0.800_3 .
SYNOPSIS
use 5.010;
use strict;
use warnings;
use Kafka qw(
$BITS64
);
# A simple example of Kafka usage:
# common information
say 'This is Kafka package ', $Kafka::VERSION;
say 'You have a ', $BITS64 ? '64' : '32', ' bit system';
use Kafka::Connection;
# connect to local cluster with the defaults
my $connect = Kafka::Connection->new( host => 'localhost' );
# decoding of the error
say STDERR 'last error: ', $connect->last_error
unless $connect->last_errorcode;
# To see a brief but working code example of the Kafka package usage
# look at the "An Example" section.
ABSTRACT
The Kafka package is a set of Perl modules which provides a simple and consistent application programming interface (API) to Apache Kafka 0.8, a high-throughput distributed messaging system.
DESCRIPTION
The user modules in this package provide an object oriented API. The IO agents, requests sent, and responses received from the Apache Kafka or mock servers are all represented by objects. This makes a simple and powerful interface to these services.
The main features of the package are:
Contains various reusable components (modules) that can be used separately or together.
Provides an object oriented model of communication.
Supports parsing the Apache Kafka protocol.
Supports the Apache Kafka Requests and Responses. Within this package the following implements of Kafka's protocol are implemented: PRODUCE, FETCH, OFFSETS, and METADATA. Note that PRODUCE and FETCH do not support compression at this point.
Simple producer and consumer clients.
A simple interface to control the test Kafka server cluster (in the test directory).
Simple mock server instance (located in the test directory) for testing without Apache Kafka server.
Support for working with 64 bit elements of the Kafka protocol on 32 bit systems.
APACHE KAFKA'S STYLE COMMUNICATION
The Kafka package is based on Kafka's 0.8 Protocol specification document at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
The Kafka's protocol is based on a request/response paradigm. A client establishes a connection with a server and sends a request to the server in the form of a request method, followed by a messages containing request modifiers. The server responds with a success or error code, followed by a messages containing entity meta-information and content.
Messages are the fundamental unit of communication. They are published to a topic by a producer, which means they are physically sent to a server acting as a broker. Some number of consumers subscribe to a topic, and each published message is delivered to all the consumers. The messages stream is partitioned on the brokers as a set of distinct partitions. The semantic meaning of these partitions is left up to the producer and the producer specifies which partition a message belongs to. Within a partition the messages are stored in the order in which they arrive at the broker, and will be given out to consumers in that same order. In Apache Kafka, the consumers are responsible for maintaining state information (offset) on what has been consumed. A consumer can deliberately rewind back to an old offset and re-consume data. Each message is uniquely identified by a 64-bit integer offset giving the position of the start of this message in the stream of all messages ever sent to that topic on that partition. Reads are done by giving the 64-bit logical offset of a message and a max chunk size.
The request is then passed through the client to a server and we get the response in return to a consumer request that we can examine. A request is always independent of any previous requests, i.e. the service is stateless. This API is completely stateless, with the topic and partition being passed in on every request.
The Connection Object
Clients use the Connection object to communicate with the Apache Kafka cluster. The Connection object is an interface layer between your application code and the Apache Kafka cluster.
Connection object is required to create instances of classes Kafka::Producer or Kafka::Consumer.
Kafka Connection API is implemented by Kafka::Connection class.
use Kafka::Connection;
# connect to local cluster with the defaults
my $connection = Kafka::Connection->new( host => 'localhost' );
The main attributes of the Connection object are:
host and port are the IO object attributes denoting any server from the Kafka cluster a client wants to connect.
timeout specifies how much time remote servers is given to respond before disconnection occurs and internal exception is thrown.
The IO Object
The Kafka::Consumer objects use internal class Kafka::IO to maintain communication with the particular server of Kafka cluster The IO object is an interface layer between Kafka::Consumer object and the network.
Kafka IO API is implemented by Kafka::IO class. Note that end user normally should have no need to use Kafka::IO but work with Kafka::Connection instead.
use Kafka::IO;
# connect to local server with the defaults
my $io = Kafka::IO->new( host => 'localhost' );
The main attributes of the IO object are:
host and port are the IO object attributes denoting the server and the port of Apache Kafka server.
timeout specifies how much time is given remote servers to respond before the IO object disconnects and generates an internal exception.
The Producer Object
Kafka producer API is implemented by Kafka::Producer class.
use Kafka::Producer;
#-- Producer
my $producer = Kafka::Producer->new( Connection => $connection );
# Sending a single message
$producer->send(
'mytopic', # topic
0, # partition
'Single message' # message
);
# Sending a series of messages
$producer->send(
'mytopic', # topic
0, # partition
[ # messages
'The first message',
'The second message',
'The third message',
]
);
The main methods and attributes of the producer request are:
The request method of the producer object is
send()
.topic and partition define respective parameters of the messages we want to send.
messages is an arbitrary amount of data (a simple data string or reference to an array of the data strings).
The Consumer Object
Kafka consumer API is implemented by Kafka::Consumer class.
use Kafka::Consumer;
$consumer = Kafka::Consumer->new( Connection => $connection );
The request methods of the consumer object are offsets()
and fetch()
.
offsets
method returns a reference to the list of offsets of received messages.
fetch
method returns a reference to the list of received Kafka::Message objects.
use Kafka qw(
$DEFAULT_MAX_BYTES
$DEFAULT_MAX_NUMBER_OF_OFFSETS
$RECEIVE_EARLIEST_OFFSETS
);
# Get a list of valid offsets up to max_number before the given time
if ( my $offsets = $consumer->offsets(
'mytopic' # topic
0, # partition
$RECEIVE_EARLIEST_OFFSETS, # specifies time of the first offset we want to retreive
$DEFAULT_MAX_NUMBER_OF_OFFSETS # max number of offsets to return
) ) {
foreach my $offset ( @$offsets ) {
say "Received offset: $offset";
}
}
# Consuming messages
if ( my $messages = $consumer->fetch(
'mytopic', # topic
0, # partition
$offsets->[0], # message offset
$DEFAULT_MAX_BYTES # limit max amount of data we request
) ) {
foreach my $message ( @$messages ) {
if( $message->valid ) {
say 'payload : ', $message->payload;
say 'offset : ', $message->offset;
say 'next_offset: ', $message->next_offset;
}
}
}
See Kafka::Consumer for additional information and documentation about class methods and arguments.
The Message Object
Kafka message API is implemented by Kafka::Message class.
if( $message->valid ) {
say 'payload : ', $message->payload;
say 'offset : ', $message->offset;
say 'next_offset: ', $message->next_offset;
}
else {
say 'error : ', $message->error;
}
Methods available for Kafka::Message object :
payload
A simple message received from the Apache Kafka server.valid
A message entry is valid.error
A description of the message inconsistence (currently only for compressed message).offset
The offset beginning of the message in the Apache Kafka server.next_offset
The offset beginning of the next message in the Apache Kafka server.
Common
Kafka::Connection, Kafka::Producer, and Kafka::Consumer objects described above also have the following common methods:
RaiseError
- when set to true, this method instructs Kafka to die when error during communication is detected.last_errorcode
andlast_error
are diagnostic methods. They provide detailed error codes and messages for various cases: when server or the resource is not available, access to the resource was denied, etc.
EXPORT
None by default.
Additional constants are available for import, which can be used to define some type of parameters, and to identify various error cases.
$KAFKA_SERVER_PORT
-
default Apache Kafka server port - 9092.
$REQUEST_TIMEOUT
-
1.5 sec - timeout in secs, for
gethostbyname
,connect
, blockingreceive
andsend
calls (could be any integer or floating-point type). $DEFAULT_MAX_BYTES
-
1MB - maximum size of message(s) to receive.
$SEND_MAX_RETRIES
-
3 - The leader may be unavailable transiently, which can fail the sending of a message. This property specifies the number of retries when such failures occur.
$RETRY_BACKOFF
-
100 - (ms) Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.
$RECEIVE_LATEST_OFFSET
-
-1 : special value that denotes latest available offset.
$RECEIVE_EARLIEST_OFFSETS
-
-2 : special value that denotes earliest available offset.
$DEFAULT_MAX_NUMBER_OF_OFFSETS
-
100 - maximum number of offsets to retrieve.
$MIN_BYTES_RESPOND_IMMEDIATELY
-
The minimum number of bytes of messages that must be available to give a response.
0 - the server will always respond immediately.
$MIN_BYTES_RESPOND_HAS_DATA
-
The minimum number of bytes of messages that must be available to give a response.
1 - the server will respond as soon as at least one partition has at least 1 byte of data or the specified timeout occurs.
$NOT_SEND_ANY_RESPONSE
-
Indicates how many acknowledgements the servers should receive before responding to the request.
0 - the server does not send any response.
$WAIT_WRITTEN_TO_LOCAL_LOG
-
Indicates how long the servers should wait for the data to be written to the local long before responding to the request.
1 - the server will wait the data is written to the local log before sending a response.
$BLOCK_UNTIL_IS_COMMITTED
-
Wait for message to be committed by all sync replicas.
-1 - the server will block until the message is committed by all in sync replicas before sending a response.
$DEFAULT_MAX_WAIT_TIME
-
The maximum amount of time (ms) to wait when no sufficient amount of data is available at the time the request is dispatched.
100 - the server will block until the message is committed by all in sync replicas before sending a response.
$MESSAGE_SIZE_OVERHEAD
-
26 - size of protocol overhead (data added by protocol) for each message.
Possible error codes returned by last_errorcode
method (complies with a hash of descriptions $ERROR
):
$ERROR_MISMATCH_ARGUMENT
-
-1000 - Invalid argument
$ERROR_CANNOT_SEND
-
-1001 - Can't send
ERROR_CANNOT_RECV
-
-1002 - Can't receive
ERROR_CANNOT_BIND
-
-1003 - Can't bind
$ERROR_COMPRESSED_PAYLOAD
-
-1004 - Compressed payload
$ERROR_UNKNOWN_APIKEY
-
-1005 - Unknown ApiKey
$ERROR_CANNOT_GET_METADATA
-
-1006 - Can't get Metadata
$ERROR_LEADER_NOT_FOUND
-
-1007 - Leader not found
$ERROR_MISMATCH_CORRELATIONID
-
-1008 - Mismatch CorrelationId
$ERROR_NO_KNOWN_BROKERS
-
-1009 - There are no known brokers
$ERROR_REQUEST_OR_RESPONSE
-
-1010 - Bad request or response element
$ERROR_TOPIC_DOES_NOT_MATCH
-
-1011 - Topic does not match the requested
$ERROR_PARTITION_DOES_NOT_MATCH
-
-1012 - Partition does not match the requested
$ERROR_NOT_BINARY_STRING
-
-1013 - Not binary string
Contains the descriptions of possible error codes obtained via ERROR_CODE box of Apache Kafka Wire Format protocol response.
$ERROR_NO_ERROR
-
0 -
q{}
No error
$ERROR_UNKNOWN
-
-1 - An unexpected server error
$ERROR_OFFSET_OUT_OF_RANGE
-
1 - The requested offset is outside the range of offsets available at the server for the given topic/partition
$ERROR_INVALID_MESSAGE
-
2 - Message contents does not match its control sum
$ERROR_UNKNOWN_TOPIC_OR_PARTITION
-
3 - Unknown topic or partition
$ERROR_INVALID_MESSAGE_SIZE
-
4 - Message has invalid size
$ERROR_LEADER_NOT_AVAILABLE
-
5 - Unable to write due to ongoing Kafka leader selection
This error is thrown if we are in the middle of a leadership election and there is no current leader for this partition, hence it is unavailable for writes.
$ERROR_NOT_LEADER_FOR_PARTITION
-
6 - Server is not a leader for partition
Client attempts to send messages to a replica that is not the leader for given partition. It usually indicates that client's metadata is out of date.
$ERROR_REQUEST_TIMED_OUT
-
7 - Request time-out
Request exceeds the user-specified time limit for the request.
$ERROR_BROKER_NOT_AVAILABLE
-
8 - Broker is not available
This is not a client facing error and is used only internally by intra-cluster broker communication.
$ERROR_REPLICA_NOT_AVAILABLE
-
9 - Replica not available
RTFM: 'What is the difference between this and LeaderNotAvailable?'
$ERROR_MESSAGE_SIZE_TOO_LARGE
-
10 - Message is too big
The server has a configurable maximum message size to avoid unbounded memory allocation. This error is thrown when client attempts to produce a message larger than possible maximum size.
$ERROR_STALE_CONTROLLER_EPOCH_CODE
-
11 - Stale Controller Epoch Code
RTFM: '???'
$ERROR_OFFSET_METADATA_TOO_LARGE_CODE
-
12 - Specified metadata offset is too big
If you specify a value larger than configured maximum for offset metadata.
%ERROR
-
Contain the descriptions for possible error codes returned by
last_error
methods and functions of the package modules.
BITS64
-
Know you are working on 64 or 32 bit system
An Example
use 5.010;
use strict;
use warnings;
use Kafka qw(
$KAFKA_SERVER_PORT
$REQUEST_TIMEOUT
$RECEIVE_EARLIEST_OFFSETS
$DEFAULT_MAX_NUMBER_OF_OFFSETS
$DEFAULT_MAX_BYTES
);
use Kafka::Connection;
use Kafka::Producer;
use Kafka::Consumer;
#-- Connection
my $connection = Kafka::IO->new( host => 'localhost' );
#-- Producer
my $producer = Kafka::Producer->new( Connection => $connection );
# Sending a single message
$producer->send(
'mytopic', # topic
0, # partition
'Single message' # message
);
# Sending a series of messages
$producer->send(
'mytopic', # topic
0, # partition
[ # messages
'The first message',
'The second message',
'The third message',
]
);
undef $producer;
#-- Consumer
my $consumer = Kafka::Consumer->new( Connection => $connection );
# Get a list of valid offsets up max_number before the given time
my $offsets;
if ( $offsets = $consumer->offsets(
'mytopic', # topic
0, # partition
$RECEIVE_EARLIEST_OFFSETS, # time
$DEFAULT_MAX_NUMBER_OF_OFFSETS, # max_number
) ) {
foreach my $offset ( @$offsets ) {
say "Received offset: $offset";
}
}
if ( !$offsets or $consumer->last_error ) {
say '(', $consumer->last_errorcode, ') ', $consumer->last_error;
}
# Consuming messages
if ( my $messages = $consumer->fetch(
'mytopic', # topic
0, # partition
$offsets->[0], # offset
$DEFAULT_MAX_BYTES # Maximum size of MESSAGE(s) to receive
) ) {
foreach my $message ( @$messages ) {
if( $message->valid ) {
say 'payload : ', $message->payload;
say 'offset : ', $message->offset;
say 'next_offset: ', $message->next_offset;
}
else {
say 'error : ', $message->error;
}
}
}
undef $consumer;
undef $connection;
DEPENDENCIES
In order to install and use this package you will need Perl version 5.10 or later. Some modules within this package depend on other packages that are distributed separately from Perl. We recommend that you have the following packages installed before you install Kafka:
Const::Fast
List::MoreUtils
Params::Util
Scalar::Util::Numeric
String::CRC32
Sys::SigAction
Kafka package has the following optional dependencies:
Capture::Tiny
Config::IniFiles
Proc::Daemon
Sub::Install
Test::Deep
Test::Exception
Test::TCP
If the optional modules are missing, some "prereq" tests are skipped.
BUGS AND LIMITATIONS
Currently, the package does not implement send and response of compressed messages. Producer and Consumer methods only work with one topic and one partition at a time. Also module does not implement the Offset Commit/Fetch API.
Producer's, Consumer's, Connection's string arguments must be binary strings. Using Unicode strings may cause an error or data corruption.
This module does not support Kafka protocol versions earlier 0.8.
The Kafka package was written, tested, and found working on recent Linux distributions.
There are no known bugs in this package.
Please report problems to the "AUTHOR".
Patches are welcome.
MORE DOCUMENTATION
All modules contain detailed information on the interfaces they provide.
SEE ALSO
The basic operation of the Kafka package modules:
Kafka - constants and messages used by the Kafka package modules.
Kafka::Connection - interface to connect to a Kafka cluster.
Kafka::Producer - interface for producing client.
Kafka::Consumer - interface for consuming client.
Kafka::Message - interface to access Kafka message properties.
Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.
Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.
Kafka::IO - low level interface for communication with Kafka server.
Kafka::Internals - Internal constants and functions used by several package modules.
A wealth of detail about the Apache Kafka and the Kafka Protocol:
Main page at http://kafka.apache.org/
Kafka Protocol at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
AUTHOR
Sergey Gladkov, <sgladkov@trackingsoft.com>
CONTRIBUTORS
Alexander Solovey
Jeremy Jordan
Vlad Marchenko
COPYRIGHT AND LICENSE
Copyright (C) 2012-2013 by TrackingSoft LLC.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.