NAME
Kafks::Protocol - functions to process messages in the Apache Kafka's Wire Format
VERSION
This documentation refers to Kafka::Consumer
version 0.01
SYNOPSIS
Setting up:
#-- Export
use Kafka::Protocol qw(
DEFAULT_MAX_SIZE
REQUESTTYPE_PRODUCE
REQUESTTYPE_FETCH
REQUESTTYPE_MULTIFETCH
REQUESTTYPE_MULTIPRODUCE
REQUESTTYPE_OFFSETS
produce_request
fetch_request
offsets_request
fetch_response
offsets_response
);
print "REQUEST_TYPE(s):\n";
print
REQUESTTYPE_PRODUCE, " ",
REQUESTTYPE_FETCH " ",
REQUESTTYPE_MULTIFETCH " ",
REQUESTTYPE_MULTIPRODUCE " ",
REQUESTTYPE_OFFSETS "\n";
#-- declaration of variables to test
my $topic = "test";
my $partition = 0;
my $single_message = "The first message";
my $series_of_messages = [
"The first message",
"The second message",
"The third message",
];
my $offset = 0;
my $max_size = DEFAULT_MAX_SIZE;
my $time = -2;
my $max_number = 100;
my ( $str, $hsh_ref, $arr_ref );
Requests:
#-- Producer request:
$str = unpack( "H*",
produce_request( $topic, $partition, $single_message );
$str = unpack( "H*",
produce_request( $topic, $partition, $series_of_messages );
#-- Offsets request:
$str = unpack( "H*",
offsets_request( $topic, $partition, $time, $max_number );
#-- Fetch request:
$str = unpack( "H*",
fetch_request( $topic, $partition, $offset, $max_size );
Responses (look at the Sample Data section of the Kafka::Mock module for a %responses
example):
#-- Offsets response
$arr_ref = offsets_response( \$responses{4} );
#-- Fetch response
$hsh_ref = fetch_response( \$responses{1} );
An error:
eval { fetch_response( [] ) }; # expecting to die
# 'Mismatch argument'
print STDERR
"(", Kafka::Protocol::last_error(), ") ",
$Kafka::Protocol::last_error(), "\n";
DESCRIPTION
When producing messages, the driver has to specify what topic and partition to send the message to. When requesting messages, the driver has to specify what topic, partition, and offset it wants them pulled from.
While you can request "old" messages if you know their topic, partition, and offset, Kafka does not have a message index. You cannot efficiently query Kafka for the N-1000th message, or ask for all messages written between 30 and 35 minutes ago.
The main features of the Kafka::Protocol
module are:
Supports parsing the Apache Kafka Wire Format protocol.
Supports Apache Kafka Requests and Responses (PRODUCE and FETCH with no compression codec attribute now). Within this package we currently support access to PRODUCE Request, FETCH Request, OFFSETS Request, FETCH Response, OFFSETS Response.
Support for working with 64 bit elements of the Kafka Wire Format protocol on 32 bit systems.
FUNCTIONS
The following functions are available for Kafka::Protocol
module.
offset, max_size or time, max_number are the additional information that might encode parameters of the messages we want to access.
produce_request( $topic, $partition, $messages )
Returns a binary PRODUCE request string coded according to the Apache Kafka Wire Format protocol, or error will cause the program to halt (confess
) if the argument is not valid.
produce_request()
takes arguments. The following arguments are currently recognized:
$topic
-
The
$topic
must be a normal non-false string of non-zero length. $partition
-
The
$partition
must be a non-negative integer (of any length). That is, a positive integer, or zero. $messages
-
The
$messages
is an arbitrary amount of data (a simple data string or a reference to an array of the data strings).
fetch_request( $topic, $partition, $offset, $max_size )
Returns a binary FETCH request string coded according to the Apache Kafka Wire Format protocol, or error will cause the program to halt (confess
) if the argument is not valid.
fetch_request()
takes arguments. The following arguments are currently recognized:
$topic
-
The
$topic
must be a normal non-false string of non-zero length. $partition
-
The
$partition
must be a non-negative integer (of any length). That is, a positive integer, or zero. $offset
-
Offset in topic and partition to start from (64 bits).
The argument must be a non-negative integer (of any length). That is, a positive integer, or zero. The argument may be a Math::BigInt integer on 32 bit system.
$max_size
-
$max_number
is the maximum size of the message set to return. The argument must be a positive integer (of any length).
offsets_request( $topic, $partition, $time, $max_number )
Returns a binary OFFSETS request string coded according to the Apache Kafka Wire Format protocol, or error will cause the program to halt (confess
) if the argument is not valid.
offsets_request()
takes arguments. The following arguments are currently recognized:
$topic
-
The
$topic
must be a normal non-false string of non-zero length. $partition
-
The
$partition
must be a non-negative integer (of any length). That is, a positive integer, or zero. $time
-
$time
is the timestamp of the offsets before this time - milliseconds since UNIX Epoch.The argument must be a positive number. That is, it is defined and Perl thinks it's a number. The argument may be a Math::BigInt integer on 32 bit system.
The special values -1 (latest), -2 (earliest) are allowed.
$max_number
-
$max_number
is the maximum number of offsets to retrieve. The argument must be a positive integer (of any length).
offsets_response( $response )
Decodes the argument and returns a reference to the hash representing the structure of the OFFSETS Response. Offsets are Math::BigInt integers on 32 bit system. Hash additionally comprises a pair of items {error}
describing the possible error at line structure of the argument (now only "Amount received offsets does not match 'NUMBER of OFFSETS'" possible). Error will cause the program to halt (confess
) if the argument is not valid.
offsets_response()
takes arguments. The following arguments are currently recognized:
$response
-
$response
is a reference to the OFFSETS Response buffer. The buffer must be a non-empty string 6+ bytes long.
fetch_response( $response )
Decodes the argument and returns a reference to the hash representing the structure of the FETCH Response. Error will cause the program to halt (confess
) if the argument is not valid.
fetch_response()
takes arguments. The following arguments are currently recognized:
$response
-
$response
is a reference to the FETCH Response buffer. The buffer must be a non-empty string 6+ bytes long.
last_errorcode
This method returns an error code that specifies the position of the description in the @Kafka::ERROR
array. Analysing this information can be done to determine the cause of the error.
The server or the resource might not be available, access to the resource might be denied or other things might have failed for some reason.
last_error
This method returns an error message that contains information about the encountered failure. Messages returned from this method may contain additional details and do not comply with the Kafka::ERROR
array.
EXPORT
None by default.
It has an additional constants available for import, which can be used to define the module functions, and to identify REQUEST types (look at "SEE ALSO" section):
0 -
REQUESTTYPE_PRODUCE
1 -
REQUESTTYPE_FETCH
2 -
REQUESTTYPE_MULTIFETCH
3 -
REQUESTTYPE_MULTIPRODUCE
4 -
REQUESTTYPE_OFFSETS
DIAGNOSTICS
Kafka::Protocol
is not a user module and any functions error is FATAL. FATAL errors will cause the program to halt (confess
), since the problem is so severe that it would be dangerous to continue. (This can always be trapped with eval
. Under the circumstances, dying is the best thing to do).
Mismatch argument
-
This means that you didn't give the right argument to some of functions.
Invalid message
-
This means that the array of messages contain a reference instead a simple data string.
For more error description, always look at the message from the "last_error" from the Kafka::Protocol::last_error
function.
SEE ALSO
The basic operation of the Kafka package modules:
Kafka - constants and messages used by the Kafka package modules
Kafka::IO - object interface to socket communications with the Apache Kafka server
Kafka::Producer - object interface to the producer client
Kafka::Consumer - object interface to the consumer client
Kafka::Message - object interface to the Kafka message properties
Kafka::Protocol - functions to process messages in the Apache Kafka's wire format
Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems
Kafka::Mock - object interface to the TCP mock server for testing
A wealth of detail about the Apache Kafka and Wire Format:
Main page at http://incubator.apache.org/kafka/
Wire Format at http://cwiki.apache.org/confluence/display/KAFKA/Wire+Format/
Writing a Driver for Kafka at http://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka
AUTHOR
Sergey Gladkov, <sgladkov@trackingsoft.com>
CONTRIBUTORS
Alexander Solovey
Jeremy Jordan
Vlad Marchenko
COPYRIGHT AND LICENSE
Copyright (C) 2012 by TrackingSoft LLC. All rights reserved.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
1 POD Error
The following errors were encountered while parsing the POD:
- Around line 708:
Non-ASCII character seen before =encoding in '-1'. Assuming UTF-8