NAME
Kafka::Protocol - Functions to process messages in the Apache Kafka protocol.
VERSION
This documentation refers to Kafka::Protocol
version 1.08 .
SYNOPSIS
use 5.010;
use strict;
use warnings;
use Data::Compare;
use Kafka qw(
$COMPRESSION_NONE
$ERROR_NO_ERROR
$REQUEST_TIMEOUT
$WAIT_WRITTEN_TO_LOCAL_LOG
);
use Kafka::Internals qw(
$PRODUCER_ANY_OFFSET
);
use Kafka::Protocol qw(
decode_produce_response
encode_produce_request
);
# a encoded produce request hex stream
my $encoded = pack( q{H*}, '00000049000000000000000400000001000005dc0000000100076d79746f7069630000000100000000000000200000000000000000000000148dc795a20000ffffffff0000000648656c6c6f21' );
# a decoded produce request
my $decoded = {
CorrelationId => 4,
ClientId => q{},
RequiredAcks => $WAIT_WRITTEN_TO_LOCAL_LOG,
Timeout => $REQUEST_TIMEOUT * 100, # ms
topics => [
{
TopicName => 'mytopic',
partitions => [
{
Partition => 0,
MessageSet => [
{
Offset => $PRODUCER_ANY_OFFSET,
MagicByte => 0,
Attributes => $COMPRESSION_NONE,
Key => q{},
Value => 'Hello!',
},
],
},
],
},
],
};
my $encoded_request = encode_produce_request( $decoded );
say 'encoded correctly' if $encoded_request eq $encoded;
# a encoded produce response hex stream
$encoded = pack( q{H*}, '00000023000000040000000100076d79746f706963000000010000000000000000000000000000' );
# a decoded produce response
$decoded = {
CorrelationId => 4,
topics => [
{
TopicName => 'mytopic',
partitions => [
{
Partition => 0,
ErrorCode => $ERROR_NO_ERROR,
Offset => 0,
},
],
},
],
};
my $decoded_response = decode_produce_response( \$encoded );
say 'decoded correctly' if Compare( $decoded_response, $decoded );
# more examples, see t/*_decode_encode.t
DESCRIPTION
This module is not a user module.
In order to achieve better performance, functions of this module do not perform arguments validation.
The main features of the Kafka::Protocol
module are:
Supports parsing the Apache Kafka protocol.
Supports Apache Kafka Requests and Responses (PRODUCE and FETCH). Within this package we currently support access to PRODUCE, FETCH, OFFSET, METADATA Requests and Responses.
Support for working with 64 bit elements of the Kafka protocol on 32 bit systems.
EXPORT
The following constants are available for export
$DEFAULT_APIVERSION
The default API version that will be used as fallback, if it's not possible to detect what the Kafka server supports. Only Kafka servers > 0.10.0.0 can be queried to get which API version they implements. On Kafka servers 0.8.x and 0.9.x, the protocol will default to use $DEFAULT_APIVERSION. Currently its value is '0'
$CONSUMERS_REPLICAID
According to Apache Kafka documentation: 'ReplicaId - Normal client consumers should always specify this as -1 as they have no node id.'
$NULL_BYTES_LENGTH
According to Apache Kafka documentation: 'Protocol Primitive Types: ... bytes, string - A length of -1 indicates null.'
$BAD_OFFSET
According to Apache Kafka documentation: 'Offset - When the producer is sending messages it doesn't actually know the offset and can fill in any value here it likes.'
FUNCTIONS
The following functions are available for Kafka::MockProtocol
module.
encode_api_versions_request( $ApiVersions_Request )
Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.
This function takes the following arguments:
$ApiVersions_Request
-
$ApiVersions_Request
is a reference to the hash representing the structure of the APIVERSIONS Request. it contains CorrelationId, ClientId (can be empty string), and ApiVersion (must be 0)
decode_api_versions_response( $bin_stream_ref )
Decodes the argument and returns a reference to the hash representing the structure of the APIVERSIONS Response.
This function takes the following arguments:
$bin_stream_ref
-
$bin_stream_ref
is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.
encode_find_coordinator_request( $FindCoordinator_Request )
Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.
This function takes the following arguments:
$FindCoordinator_Request
-
$FindCoordinator_Request
is a reference to the hash representing the structure of the FINDCOORDINATOR Request. it contains CorrelationId, ClientId (can be empty string), CoordinatorKey and CoordinatorType (for version 1 of protocol)
decode_find_coordinator_response( $bin_stream_ref )
Decodes the argument and returns a reference to the hash representing the structure of the FINDCOORDINATOR Response.
This function takes the following arguments:
$bin_stream_ref
-
$bin_stream_ref
is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.
encode_produce_request( $Produce_Request, $compression_codec )
Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.
This function takes the following arguments:
$Produce_Request
-
$Produce_Request
is a reference to the hash representing the structure of the PRODUCE Request (examples seet/*_decode_encode.t
). $compression_codec
-
Optional.
$compression_codec
sets the required type of$messages
compression, if the compression is desirable.Supported codecs: $COMPRESSION_NONE, $COMPRESSION_GZIP, $COMPRESSION_SNAPPY, $COMPRESSION_LZ4.
NOTE: $COMPRESSION_LZ4 requires Kafka 0.10 or higher, as initial implementation of LZ4 in Kafka did not follow the standard LZ4 framing specification.
decode_produce_response( $bin_stream_ref )
Decodes the argument and returns a reference to the hash representing the structure of the PRODUCE Response (examples see t/*_decode_encode.t
).
This function takes the following arguments:
$bin_stream_ref
-
$bin_stream_ref
is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.
encode_fetch_request( $Fetch_Request )
Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.
This function takes the following arguments:
$Fetch_Request
-
$Fetch_Request
is a reference to the hash representing the structure of the FETCH Request (examples seet/*_decode_encode.t
).
decode_fetch_response( $bin_stream_ref )
Decodes the argument and returns a reference to the hash representing the structure of the FETCH Response (examples see t/*_decode_encode.t
).
This function takes the following arguments:
$bin_stream_ref
-
$bin_stream_ref
is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.
encode_offset_request( $Offset_Request )
Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.
This function takes the following arguments:
$Offset_Request
-
$Offset_Request
is a reference to the hash representing the structure of the OFFSET Request (examples seet/*_decode_encode.t
).
decode_offset_response( $bin_stream_ref )
Decodes the argument and returns a reference to the hash representing the structure of the OFFSET Response (examples see t/*_decode_encode.t
).
This function takes the following arguments:
$bin_stream_ref
-
$bin_stream_ref
is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.
encode_metadata_request( $Metadata_Request )
Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.
This function takes the following arguments:
$Metadata_Request
-
$Metadata_Request
is a reference to the hash representing the structure of the METADATA Request (examples seet/*_decode_encode.t
).
decode_metadata_response( $bin_stream_ref )
Decodes the argument and returns a reference to the hash representing the structure of the METADATA Response (examples see t/*_decode_encode.t
).
This function takes the following arguments:
$bin_stream_ref
-
$bin_stream_ref
is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.
encode_offsetcommit_request( $OffsetCommit_Request )
Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.
This function takes the following arguments:
$OffsetCommit_Request
-
$OffsetCommit_Request
is a reference to the hash representing the structure of the OffsetCommit Request (examples seet/*_decode_encode.t
).
decode_offsetcommit_response( $bin_stream_ref )
Decodes the argument and returns a reference to the hash representing the structure of the OFFSETCOMMIT Response (examples see t/*_decode_encode.t
).
This function takes the following arguments:
$bin_stream_ref
-
$bin_stream_ref
is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.
encode_offsetfetch_request( $OffsetFetch_Request )
Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.
This function takes the following arguments:
$OffsetFetch_Request
-
$OffsetFetch_Request
is a reference to the hash representing the structure of the OffsetFetch Request (examples seet/*_decode_encode.t
).
decode_offsetfetch_response( $bin_stream_ref )
Decodes the argument and returns a reference to the hash representing the structure of the OFFSETFETCH Response (examples see t/*_decode_encode.t
).
This function takes the following arguments:
$bin_stream_ref
-
$bin_stream_ref
is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.
encode_saslhandshake_request( $SaslHandshake_Request )
Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.
This function takes the following arguments:
$SaslHandshake_Request
-
$SaslHandshake_Request
is a reference to the hash representing the structure of the SaslHandshake Request.
decode_saslhandshake_response( $bin_stream_ref )
Decodes the argument and returns a reference to the hash representing the structure of the SaslHandshake Response.
This function takes the following arguments:
$bin_stream_ref
-
$bin_stream_ref
is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.
DIAGNOSTICS
In order to achieve better performance, functions of this module do not perform arguments validation.
SEE ALSO
The basic operation of the Kafka package modules:
Kafka - constants and messages used by the Kafka package modules.
Kafka::Connection - interface to connect to a Kafka cluster.
Kafka::Producer - interface for producing client.
Kafka::Consumer - interface for consuming client.
Kafka::Message - interface to access Kafka message properties.
Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.
Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.
Kafka::IO - low-level interface for communication with Kafka server.
Kafka::Exceptions - module designated to handle Kafka exceptions.
Kafka::Internals - internal constants and functions used by several package modules.
A wealth of detail about the Apache Kafka and the Kafka Protocol:
Main page at http://kafka.apache.org/
Kafka Protocol at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
SOURCE CODE
Kafka package is hosted on GitHub: https://github.com/TrackingSoft/Kafka
AUTHOR
Sergey Gladkov
Please use GitHub project link above to report problems or contact authors.
CONTRIBUTORS
Alexander Solovey
Jeremy Jordan
Sergiy Zuban
Vlad Marchenko
Damien Krotkine
Greg Franklin
COPYRIGHT AND LICENSE
Copyright (C) 2012-2017 by TrackingSoft LLC.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.