NAME
Kafka - constants and messages used by the Kafka package modules
VERSION
This documentation refers to Kafka
package version 0.10
SYNOPSIS
An example of Kafka
usage:
use Kafka qw(
BITS64
KAFKA_SERVER_PORT
DEFAULT_TIMEOUT
TIMESTAMP_EARLIEST
DEFAULT_MAX_OFFSETS
DEFAULT_MAX_SIZE
);
# common information
print "This is Kafka package $Kafka::VERSION\n";
print "You have a ", BITS64 ? "64" : "32", " bit system\n";
use Kafka::IO;
# connect to local server with the defaults
my $io = Kafka::IO->new( host => "localhost" );
# decoding of the error code
unless ( $io )
{
print STDERR "last error: ",
$Kafka::ERROR[Kafka::IO::last_errorcode], "\n";
}
To see a brief but working code example of the Kafka
package usage look at the "An Example" section.
ABSTRACT
The Kafka package is a set of Perl modules which provides a simple and consistent application programming interface (API) to Apache Kafka 0.7, a high-throughput distributed messaging system. This is a low-level API implementation which DOES NOT interract with an Apache ZooKeeper for consumer coordination and/or load balancing.
DESCRIPTION
The user modules in this package provide an object oriented API. The IO agents, requests sent, and responses received from the Apache Kafka or mock servers are all represented by objects. This makes a simple and powerful interface to these services.
The main features of the package are:
Contains various reusable components (modules) that can be used separately or together.
Provides an object oriented model of communication.
Supports parsing the Apache Kafka Wire Format protocol.
Supports the Apache Kafka Requests and Responses (PRODUCE and FETCH with no compression codec attribute now). Within this package we currently support access to the PRODUCE Request, FETCH Request, OFFSETS Request, FETCH Response, OFFSETS Response.
Simple producer and consumer clients.
Simple mock server instance for testing without Apache Kafka server.
Support for working with 64 bit elements of the Kafka Wire Format protocol on 32 bit systems.
APACHE KAFKA'S STYLE COMMUNICATION
The Kafka package is based on Kafka's 0.7 Wire Format specification document at http://cwiki.apache.org/confluence/display/KAFKA/Wire+Format/
The Kafka's Wire Format protocol is based on a request/response paradigm. A client establishes a connection with a server and sends a request to the server in the form of a request method, followed by a messages containing request modifiers. The server responds with a success or error code, followed by a messages containing entity meta-information and content.
Communication with Kafka always assumes to follow these steps: First the IO and client objects are created and configured.
The Kafka client has the class name Kafka::Producer or Kafka::Consumer.
Messages are the fundamental unit of communication. They are published to a topic by a producer, which means they are physically sent to a server acting as a broker. Some number of consumers subscribe to a topic, and each published message is delivered to all the consumers. The messages stream is partitioned on the brokers as a set of distinct partitions. The semantic meaning of these partitions is left up to the producer and the producer specifies which partition a message belongs to. Within a partition the messages are stored in the order in which they arrive at the broker, and will be given out to consumers in that same order. In Apache Kafka, the consumers are responsible for maintaining state information (offset) on what has been consumed. A consumer can deliberately rewind back to an old offset and re-consume data. Each message is uniquely identified by a 64-bit integer offset giving the byte position of the start of this message in the stream of all messages ever sent to that topic on that partition. Reads are done by giving the 64-bit logical offset of a message and a max chunk size.
The request is then passed through the client to a server and we get the response in return to a consumer request that we can examine. A request is always independent of any previous requests, i.e. the service is stateless. This API is completely stateless, with the topic and partition being passed in on every request.
The IO Object
The clients uses the IO object to communicate with the Apache Kafka server. The IO object is an interface layer between your application code and the network.
IO object is required to create objects of classes Kafka::Producer and Kafka::Consumer.
Kafka IO API is implemented by Kafka::IO class.
use Kafka::IO;
# connect to local server with the defaults
my $io = Kafka::IO->new( host => "localhost" );
The main attributes of the IO object are:
host, and port are the IO object attributes denoting the server and the port name of the "documents" (messages) service we want to access (Apache Kafka server).
timeout specifies how much time we give remote servers to respond before the IO object disconnects and creates an internal exception.
The Producer Object
Kafka producer API is implemented by Kafka::Producer class.
use Kafka::Producer;
#-- Producer
my $producer = Kafka::Producer->new( IO => $io );
# Sending a single message
$producer->send(
"test", # topic
0, # partition
"Single message" # message
);
# Sending a series of messages
$producer->send(
"test", # topic
0, # partition
[ # messages
"The first message",
"The second message",
"The third message",
]
);
The main attributes of the producer request are:
The request method of the producer object is
send()
.topic and partition encode parameters of the messages we want to send.
messages is an arbitrary amount of data (a simple data string or an array of the data strings).
The Consumer Object
Kafka consumer API is implemented by Kafka::Consumer class.
use Kafka::Consumer;
$consumer = Kafka::Consumer->new( IO => $io );
The request methods of the consumer object are offsets()
and fetch()
.
offsets
method returns a reference to the list of offsets of received messages.
fetch
method returns a reference to the list of received Kafka::Message objects.
# Get a list of valid offsets up to max_number before the given time
if ( my $offsets = $consumer->offsets(
"test", # topic
0, # partition
TIMESTAMP_EARLIEST, # time
DEFAULT_MAX_OFFSETS # max_number
) )
{
foreach my $offset ( @$offsets )
{
print "Received offset: $offset\n";
}
}
# Consuming messages
if ( my $messages = $consumer->fetch(
"test", # topic
0, # partition
0, # offset
DEFAULT_MAX_SIZE # max_size
) )
{
foreach my $message ( @$messages )
{
if( $message->valid )
{
print "payload : ", $message->payload, "\n";
print "offset : ", $message->offset, "\n";
print "next_offset: ", $message->next_offset, "\n";
}
}
}
The arguments:
topic and partition specify the location of the messages we want to retrieve.
offset, max_size or time, max_number arguments are additional information that specify attributes of the messages we want to access.
time is the timestamp of the offsets before this time (ms). max_number is the maximum number of offsets to retrieve. This additional information about the request must be used to describe the range of the messages.
The Message Object
Kafka message API is implemented by Kafka::Message class.
if( $message->valid )
{
print "payload : ", $message->payload, "\n";
print "offset : ", $message->offset, "\n";
print "next_offset: ", $message->next_offset, "\n";
}
else
{
print "error : ", $message->error, "\n";
}
Available methods of Kafka::Message object are:
payload
A simple message received from the Apache Kafka server.valid
A message entry is valid if the CRC32 of the message payload matches to the CRC stored with the message.error
A description of the message inconsistence (currently only for message is not valid or compressed).offset
The offset beginning of the message in the Apache Kafka server.next_offset
The offset beginning of the next message in the Apache Kafka server.
Common
Both Kafka::Producer and Kafka::Consumer objects described above also have the following common methods:
RaiseError
is a method which causes Kafka to die if an error is detected.last_errorcode
andlast_error
diagnostic methods. Use them to get detailed error message if server or the resource might not be available, access to the resource might be denied, or other things might have failed for some reason.close
method: terminates connection with Kafka and clean up.my $producer = Kafka::Producer->new( IO => $io, RaiseError => 1 ); unless ( $producer->send( "test", 0, "Single message" ) { print "error code : ", $producer->last_errorcode, "\n", "error description: ", $producer->last_error, "\n"; } $producer->close;
EXPORT
None by default.
Additional constants are available for import, which can be used to define some type of parameters, and to identify various error cases.
These are the defaults:
KAFKA_SERVER_PORT
-
default Apache Kafka server port - 9092.
DEFAULT_TIMEOUT
-
timeout in secs, for
gethostbyname
,connect
, blockingreceive
andsend
calls (could be any integer or floating-point type) - 0.5 sec. TIMESTAMP_LATEST
-
timestamp of the offsets before this time (ms) special value -1 : latest
TIMESTAMP_EARLIEST
-
timestamp of the offsets before this time (ms) special value -2 : earliest
DEFAULT_MAX_SIZE
-
maximum size of message(s) to receive - 1MB
DEFAULT_MAX_OFFSETS
-
maximum number of offsets to retrieve - 100
MAX_SOCKET_REQUEST_BYTES
-
The maximum size of a request that the socket server will accept (protection against OOM). Default limit (as configured in server.properties) is 104857600
Possible error codes returned by last_errorcode
method (complies with an array of descriptions @Kafka::ERROR
):
ERROR_INVALID_MESSAGE_CODE
-
0 - Invalid message
ERROR_MISMATCH_ARGUMENT
-
1 - Mismatch argument
ERROR_WRONG_CONNECT
-
2 - You must configure a host to connect to!
ERROR_CANNOT_SEND
-
3 - Can't send
ERROR_CANNOT_RECV
-
4 - Can't receive
ERROR_CANNOT_BIND
-
5 - Can't bind
ERROR_CHECKSUM_ERROR
-
6 - Checksum error
ERROR_COMPRESSED_PAYLOAD
-
7 - Compressed payload
ERROR_NUMBER_OF_OFFSETS
-
7 - Amount received offsets does not match 'NUMBER of OFFSETS'
ERROR_NOTHING_RECEIVE
-
8 - Nothing to receive
ERROR_IN_ERRORCODE
-
9 - Response contains an error in 'ERROR_CODE'
Support for working with 64 bit elements of the Kafka Wire Format protocol on 32 bit systems:
BITS64
-
Know you are working on 64 or 32 bit system
GLOBAL VARIABLES
@Kafka::ERROR
-
Contain the descriptions for possible error codes returned by
last_errorcode
methods and functions of the package modules. %Kafka::ERROR_CODE
-
Contain the descriptions for possible error codes in the ERROR_CODE box of Apache Kafka Wire Format protocol responses.
An Example
use Kafka qw(
KAFKA_SERVER_PORT
DEFAULT_TIMEOUT
TIMESTAMP_EARLIEST
DEFAULT_MAX_OFFSETS
DEFAULT_MAX_SIZE
);
use Kafka::IO;
use Kafka::Producer;
use Kafka::Consumer;
#-- IO
my $io = Kafka::IO->new( host => "localhost" );
#-- Producer
my $producer = Kafka::Producer->new( IO => $io );
# Sending a single message
$producer->send(
"test", # topic
0, # partition
"Single message" # message
);
# Sending a series of messages
$producer->send(
"test", # topic
0, # partition
[ # messages
"The first message",
"The second message",
"The third message",
]
);
$producer->close;
#-- Consumer
my $consumer = Kafka::Consumer->new( IO => $io );
# Get a list of valid offsets up max_number before the given time
my $offsets;
if ( $offsets = $consumer->offsets(
"test", # topic
0, # partition
TIMESTAMP_EARLIEST, # time
DEFAULT_MAX_OFFSETS # max_number
) )
{
foreach my $offset ( @$offsets )
{
print "Received offset: $offset\n";
}
}
if ( !$offsets or $consumer->last_error )
{
print
"(", $consumer->last_errorcode, ") ",
$consumer->last_error, "\n";
}
# Consuming messages
if ( my $messages = $consumer->fetch(
"test", # topic
0, # partition
0, # offset
DEFAULT_MAX_SIZE # Maximum size of MESSAGE(s) to receive
) )
{
foreach my $message ( @$messages )
{
if( $message->valid )
{
print "payload : ", $message->payload, "\n";
print "offset : ", $message->offset, "\n";
print "next_offset: ", $message->next_offset, "\n";
}
else
{
print "error : ", $message->error, "\n";
}
}
}
$consumer->close;
$io
, $producer
, and $consumer
are created once when the application starts up.
DEPENDENCIES
In order to install and use this package you will need Perl version 5.10 or later. Some modules within this package depend on other packages that are distributed separately from Perl. We recommend that you have the following packages installed before you install Kafka:
Digest::CRC
Params::Util
Kafka package has the following optional dependencies:
Test::Pod
Test::Pod::Coverage
Test::Exception
CPAN::Meta
Test::Deep
Test::Distribution
Test::Kwalitee
If the optional modules are missing, some "prereq" tests are skipped.
BUGS AND LIMITATIONS
Currently, the package does not implement send and response of compressed messages. Also does not implement the MULTIFETCH and MULTIPRODUCE requests.
Use only one Kafka::Mock
object at the same time (it has class variables for the exchange of TCP server processes).
The Kafka package was written, tested, and found working on recent Linux distributions.
There are no known bugs in this package.
Please report problems to the "AUTHOR".
Patches are welcome.
MORE DOCUMENTATION
All modules contain detailed information on the interfaces they provide.
SEE ALSO
The basic operation of the Kafka package modules:
Kafka - constants and messages used by the Kafka package modules
Kafka::IO - object interface to socket communications with the Apache Kafka server
Kafka::Producer - object interface to the producer client
Kafka::Consumer - object interface to the consumer client
Kafka::Message - object interface to the Kafka message properties
Kafka::Protocol - functions to process messages in the Apache Kafka's wire format
Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems
Kafka::Mock - object interface to the TCP mock server for testing
A wealth of detail about the Apache Kafka and Wire Format:
Main page at http://incubator.apache.org/kafka/
Wire Format at http://cwiki.apache.org/confluence/display/KAFKA/Wire+Format/
Writing a Driver for Kafka at http://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka
AUTHOR
Sergey Gladkov, <sgladkov@trackingsoft.com>
CONTRIBUTORS
Alexander Solovey
Jeremy Jordan
Vlad Marchenko
COPYRIGHT AND LICENSE
Copyright (C) 2012-2013 by TrackingSoft LLC. All rights reserved.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.