From Code to Community: Sponsoring The Perl and Raku Conference 2025 Learn more

=head1 NAME
Kafka::Protocol - Functions to process messages in the Apache Kafka protocol.
=head1 VERSION
This documentation refers to C<Kafka::Protocol> version 1.08 .
=cut
use 5.010;
use strict;
our $VERSION = 'v1.08';
use Exporter qw(
import
);
our @EXPORT_OK = qw(
decode_fetch_response
decode_metadata_response
decode_offset_response
decode_produce_response
decode_api_versions_response
decode_find_coordinator_response
decode_offsetcommit_response
decode_offsetfetch_response
decode_saslhandshake_response
encode_fetch_request
encode_metadata_request
encode_offset_request
encode_produce_request
encode_api_versions_request
encode_find_coordinator_request
encode_offsetcommit_request
encode_offsetfetch_request
encode_saslhandshake_request
_decode_MessageSet_template
_decode_MessageSet_array
_encode_Message
_encode_MessageSet_array
_encode_string
_pack64
_unpack64
_verify_string
$DEFAULT_APIVERSION
$IMPLEMENTED_APIVERSIONS
$BAD_OFFSET
$COMPRESSION_CODEC_MASK
$CONSUMERS_REPLICAID
$NULL_BYTES_LENGTH
$_int64_template
);
use Gzip::Faster qw( gzip gunzip );
use Params::Util qw(
_ARRAY
_HASH
_SCALAR
_STRING
);
use Scalar::Util qw(
dualvar
);
use Kafka qw(
$BITS64
$BLOCK_UNTIL_IS_COMMITTED
$COMPRESSION_GZIP
$COMPRESSION_NONE
$COMPRESSION_SNAPPY
$COMPRESSION_LZ4
$DEFAULT_MAX_WAIT_TIME
%ERROR
$ERROR_COMPRESSION
$ERROR_MISMATCH_ARGUMENT
$ERROR_REQUEST_OR_RESPONSE
$NOT_SEND_ANY_RESPONSE
$WAIT_WRITTEN_TO_LOCAL_LOG
);
$APIKEY_FETCH
$APIKEY_METADATA
$APIKEY_OFFSET
$APIKEY_PRODUCE
$APIKEY_APIVERSIONS
$APIKEY_FINDCOORDINATOR
$APIKEY_OFFSETCOMMIT
$APIKEY_OFFSETFETCH
$APIKEY_SASLHANDSHAKE
$PRODUCER_ANY_OFFSET
format_message
debug_level
);
=head1 SYNOPSIS
use 5.010;
use strict;
use warnings;
use Data::Compare;
use Kafka qw(
$COMPRESSION_NONE
$ERROR_NO_ERROR
$REQUEST_TIMEOUT
$WAIT_WRITTEN_TO_LOCAL_LOG
);
use Kafka::Internals qw(
$PRODUCER_ANY_OFFSET
);
use Kafka::Protocol qw(
decode_produce_response
encode_produce_request
);
# a encoded produce request hex stream
my $encoded = pack( q{H*}, '00000049000000000000000400000001000005dc0000000100076d79746f7069630000000100000000000000200000000000000000000000148dc795a20000ffffffff0000000648656c6c6f21' );
# a decoded produce request
my $decoded = {
CorrelationId => 4,
ClientId => q{},
RequiredAcks => $WAIT_WRITTEN_TO_LOCAL_LOG,
Timeout => $REQUEST_TIMEOUT * 100, # ms
topics => [
{
TopicName => 'mytopic',
partitions => [
{
Partition => 0,
MessageSet => [
{
Offset => $PRODUCER_ANY_OFFSET,
MagicByte => 0,
Attributes => $COMPRESSION_NONE,
Key => q{},
Value => 'Hello!',
},
],
},
],
},
],
};
my $encoded_request = encode_produce_request( $decoded );
say 'encoded correctly' if $encoded_request eq $encoded;
# a encoded produce response hex stream
$encoded = pack( q{H*}, '00000023000000040000000100076d79746f706963000000010000000000000000000000000000' );
# a decoded produce response
$decoded = {
CorrelationId => 4,
topics => [
{
TopicName => 'mytopic',
partitions => [
{
Partition => 0,
ErrorCode => $ERROR_NO_ERROR,
Offset => 0,
},
],
},
],
};
my $decoded_response = decode_produce_response( \$encoded );
say 'decoded correctly' if Compare( $decoded_response, $decoded );
# more examples, see t/*_decode_encode.t
=head1 DESCRIPTION
This module is not a user module.
In order to achieve better performance,
functions of this module do not perform arguments validation.
The main features of the C<Kafka::Protocol> module are:
=over 3
=item *
Supports parsing the Apache Kafka protocol.
=item *
Supports Apache Kafka Requests and Responses (PRODUCE and FETCH).
Within this package we currently support
access to PRODUCE, FETCH, OFFSET, METADATA Requests and Responses.
=item *
Support for working with 64 bit elements of the Kafka protocol on 32 bit systems.
=back
=cut
# A Guide To The Kafka Protocol 0.8:
#
# -- Protocol Primitive Types
# int8, int16, int32, int64
# Signed integers
# stored in big endian order.
# bytes, string
# consist of a signed integer
# giving a length N
# followed by N bytes of content.
# A length of -1 indicates null.
# string uses an int16 for its size,
# and bytes uses an int32.
# Arrays
# These will always be encoded as an int32 size containing the length N
# followed by N repetitions of the structure
# which can itself be made up of other primitive types.
#
# -- N.B.
# - The response will always match the paired request
# - One structure common to both the produce and fetch requests is the message set format.
# - MessageSets are not preceded by an int32 like other array elements in the protocol.
# - A message set is also the unit of compression in Kafka,
# and we allow messages to recursively contain compressed message sets.
#
# -- Protocol Fields
# ApiKey => int16 That identifies the API being invoked
# ApiVersion => int16 This is a numeric version number for this api.
# Currently the supported version for all APIs is 0.
# Attributes => int8 Metadata attributes about the message.
# The lowest 2 bits contain the compression codec used for the message.
# ClientId => string This is a user supplied identifier for the client application.
# CorrelationId => int32 This is a user-supplied integer.
# It will be passed back in the response by the server, unmodified.
# It is useful for matching request and response between the client and server.
# Crc => int32 The CRC32 of the remainder of the message bytes.
# ErrorCode => int16 The error from this partition, if any.
# Errors are given on a per-partition basis
# because a given partition may be unavailable or maintained on a different host,
# while others may have successfully accepted the produce request.
# FetchOffset => int64 The offset to begin this fetch from.
# HighwaterMarkOffset => int64 The offset at the end of the log for this partition.
# This can be used by the client to determine how many messages behind the end of the log they are.
# - 0.8 documents: Replication design
# The high watermark is the offset of the last committed message.
# Each log is periodically synced to disks.
# Data before the flushed offset is guaranteed to be persisted on disks.
# As we will see, the flush offset can be before or after high watermark.
# - 0.7 documents: Wire protocol
# If the last segment file for the partition is not empty and was modified earlier than TIME,
# it will return both the first offset for that segment and the high water mark.
# The high water mark is not the offset of the last message,
# but rather the offset that the next message sent to the partition will be written to.
# Host => string The brokers hostname
# Isr => [ReplicaId] The set subset of the replicas that are "caught up" to the leader - a set of in-sync replicas (ISR)
# Key => bytes An optional message key
# The key can be null.
# Leader => int32 The node id for the kafka broker currently acting as leader for this partition.
# If no leader exists because we are in the middle of a leader election this id will be -1.
# MagicByte => int8 A version id used to allow backwards compatible evolution of the message binary format.
# MaxBytes => int32 The maximum bytes to include in the message set for this partition.
# MaxNumberOfOffsets => int32 Kafka here is return up to 'MaxNumberOfOffsets' of offsets
# MaxWaitTime => int32 The maximum amount of time (ms)
# to block waiting
# if insufficient data is available at the time the request is issued.
# MessageSetSize => int32 The size in bytes of the message set for this partition
# MessageSize => int32 The size of the subsequent request or response message in bytes
# MinBytes => int32 The minimum number of bytes of messages that must be available to give a response.
# If the client sets this to 0 the server will always respond immediately.
# If this is set to 1,
# the server will respond as soon
# as at least one partition
# has at least 1 byte of data
# or the specified timeout occurs.
# By setting higher values
# in combination with the timeout
# for reading only large chunks of data
# NodeId => int32 The id of the broker.
# This must be set to a unique integer for each broker.
# Offset => int64 The offset used in kafka as the log sequence number.
# When the producer is sending messages it doesn't actually know the offset
# and can fill in any value here it likes.
# Partition => int32 The id of the partition the fetch is for
# or the partition that data is being published to
# or the partition this response entry corresponds to.
# Port => int32 The brokers port
# ReplicaId => int32 Indicates the node id of the replica initiating this request.
# Normal client consumers should always specify this as -1 as they have no node id.
# Replicas => [ReplicaId] The set of alive nodes that currently acts as slaves for the leader for this partition.
# RequiredAcks => int16 Indicates how many acknowledgements the servers should receive
# before responding to the request.
# If it is 0 the server does not send any response.
# If it is 1, the server will wait the data is written to the local log before sending a response.
# If it is -1 the server will block until the message is committed by all in sync replicas before sending a response.
# Size => int32 The size of the subsequent request or response message in bytes
# Time => int64 Used to ask for all messages before a certain time (ms).
# There are two special values.
# Specify -1 to receive the latest offset (this will only ever return one offset).
# Specify -2 to receive the earliest available offsets.
# Timeout => int32 This provides a maximum time (ms) the server can await the receipt
# of the number of acknowledgements in RequiredAcks.
# The timeout is not an exact limit on the request time for a few reasons:
# (1) it does not include network latency,
# (2) the timer begins at the beginning of the processing of this request
# so if many requests are queued due to server overload
# that wait time will not be included,
# (3) we will not terminate a local write
# so if the local write time exceeds this timeout it will not be respected.
# To get a hard timeout of this type the client should use the socket timeout.
# TopicName => string The name of the topic.
# Value => bytes The actual message contents
# Kafka supports recursive messages in which case this may itself contain a message set.
# The message can be null.
our $_int64_template; # Used to unpack a 64 bit value
if ( $BITS64 ) {
$_int64_template = q{q>};
# unpack a big-endian signed quad (64-bit) value on 64 bit systems.
*_unpack64 = sub { $_[0] };
# pack a big-endian signed quad (64-bit) value on 64 bit systems.
*_pack64 = sub { pack( q{q>}, $_[0] ) };
} else {
eval q{ require Kafka::Int64; } ## no critic
or die "Cannot load Kafka::Int64 : $@";
$_int64_template = q{a[8]};
# unpack a big-endian signed quad (64-bit) value on 32 bit systems.
*_unpack64 = \&Kafka::Int64::unpackq;
# pack a big-endian signed quad (64-bit) value on 32 bit systems.
*_pack64 = \&Kafka::Int64::packq;
}
=head2 EXPORT
The following constants are available for export
=cut
=head3 C<$DEFAULT_APIVERSION>
The default API version that will be used as fallback, if it's not possible to
detect what the Kafka server supports. Only Kafka servers > 0.10.0.0 can be
queried to get which API version they implements. On Kafka servers 0.8.x and
0.9.x, the protocol will default to use $DEFAULT_APIVERSION. Currently its
value is '0'
=cut
const our $DEFAULT_APIVERSION => 0;
# HashRef, keys are APIKEYs, values are the api version that this protocol
# implements. If not populated, the connection will use the $DEFAULT_APIVERSION
our $IMPLEMENTED_APIVERSIONS = {};
# Attributes
# According to Apache Kafka documentation:
# Attributes - Metadata attributes about the message.
# The lowest 2 bits contain the compression codec used for the message.
const our $COMPRESSION_CODEC_MASK => 0b11;
=head3 C<$CONSUMERS_REPLICAID>
According to Apache Kafka documentation: 'ReplicaId - Normal client consumers should always specify this as -1 as they have no node id.'
=cut
const our $CONSUMERS_REPLICAID => -1;
=head3 C<$NULL_BYTES_LENGTH>
According to Apache Kafka documentation: 'Protocol Primitive Types: ... bytes, string - A length of -1 indicates null.'
=cut
const our $NULL_BYTES_LENGTH => -1;
=head3 C<$BAD_OFFSET>
According to Apache Kafka documentation: 'Offset - When the producer is sending messages it doesn't actually know the offset
and can fill in any value here it likes.'
=cut
const our $BAD_OFFSET => -1;
my ( $_Request_header_template, $_Request_header_length ) = (
q{l>s>s>l>s>}, # Size
# 2 ApiKey
# 2 ApiVersion
# 4 CorrelationId
# 2 ClientId length
10 # 'Size' is not included in the calculation of length
);
my ( $_ProduceRequest_header_template, $_ProduceRequest_header_length ) = (
q{s>l>l>}, # 2 RequiredAcks
# 4 Timeout
# 4 topics array size
10
);
my ( $_MessageSet_template, $_MessageSet_length ) = (
# qq{${_int64_template}l>},
q{a[8]l>},
# 8 Offset
# 4 MessageSize
12
);
my ( $_FetchRequest_header_template, $_FetchRequest_header_length ) = (
q{l>l>l>l>},
# 4 ReplicaId
# 4 MaxWaitTime
# 4 MinBytes
# 4 topics array size
16
);
my ( $_FetchRequest_header_template_v3, $_FetchRequest_header_length_v3 ) = (
q{l>l>l>l>l>},
# 4 ReplicaId
# 4 MaxWaitTime
# 4 MinBytes
# 4 MaxBytes
# 4 topics array size
20
);
my ( $_FetchRequest_body_template, $_FetchRequest_body_length ) = (
# qq{l>${_int64_template}l>},
q{l>a[8]l>},
# 4 Partition
# 8 FetchOffset
# 4 MaxBytes
16
);
my ( $_OffsetRequest_header_template, $_OffsetRequest_header_length ) = (
q{l>l>}, # 4 ReplicaId
# 4 topics array size
8
);
my ( $_OffsetRequest_body_template, $_OffsetRequest_body_length ) = (
# qq{l>${_int64_template}l>},
q{l>a[8]l>},
# 4 Partition
# 8 Time
# 4 MaxNumberOfOffsets
16
);
my ( $_OffsetRequest_body_template_v1, $_OffsetRequest_body_length_v1 ) = (
# qq{l>${_int64_template}},
q{l>a[8]},
# 4 Partition
# 8 Time
12
);
my ( $_FetchResponse_header_template, $_FetchResponse_header_length ) = (
q{x[l]l>l>}, # Size (skip)
# 4 CorrelationId
# 4 topics array size
8
);
my ( $_FetchResponse_header_template_v1, $_FetchResponse_header_length_v1 ) = (
q{x[l]l>l>l>}, # Size (skip)
# 4 CorrelationId
# 4 throttle_time_ms
# 4 topics array size
12
);
my ( $_Message_template, $_Message_length ) = (
qq(${_int64_template}l>l>ccl>),
# 8 Offset
# MessageSize
# Crc
# MagicByte
# Attributes
# Key length
8 # Only Offset length
);
my ( $_Message_template_with_timestamp, $_Message_length_with_timestamp ) = (
qq(${_int64_template}l>l>cc${_int64_template}l>),
# 8 Offset
# MessageSize
# Crc
# MagicByte
# Attributes
# Timestamp
# Key length
8 # Only Offset length
);
my ( $_FetchResponse_topic_body_template, $_FetchResponse_topic_body_length )= (
qq(s>/al>l>s>${_int64_template}),
# TopicName
# partitions array size
# 4 Partition
# 2 ErrorCode
# 8 HighwaterMarkOffset
14 # without TopicName and partitions array size
);
my $_Key_or_Value_template = q{X[l]l>/a}; # Key or Value
#-- public functions -----------------------------------------------------------
=head2 FUNCTIONS
The following functions are available for C<Kafka::MockProtocol> module.
=cut
=head3 C<encode_api_versions_request( $ApiVersions_Request )>
Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.
This function takes the following arguments:
=over 3
=item C<$ApiVersions_Request>
C<$ApiVersions_Request> is a reference to the hash representing the structure
of the APIVERSIONS Request. it contains CorrelationId, ClientId (can be empty
string), and ApiVersion (must be 0)
=back
=cut
$IMPLEMENTED_APIVERSIONS->{$APIKEY_APIVERSIONS} = 0;
sub encode_api_versions_request {
my ( $ApiVersions_Request ) = @_;
my @data;
my $request = {
# template => '...',
# len => ...,
data => \@data,
};
_encode_request_header( $request, $APIKEY_APIVERSIONS, $ApiVersions_Request );
# Size
# ApiKey
# ApiVersion
# CorrelationId
# ClientId
return pack( $request->{template}, $request->{len}, @data );
}
my $_decode_api_version_response_template = q{x[l]l>s>l>X[l]l>/(s>s>s>)};
# x[l] # Size (skip)
# l> # CorrelationId
# s> # ErrorCode
# l> # ApiVersions array size
# X[l]
# l>/( # ApiVersions array
# s> # ApiKey
# s> # MinVersion
# s> # MaxVersion
# )
=head3 C<decode_api_versions_response( $bin_stream_ref )>
Decodes the argument and returns a reference to the hash representing
the structure of the APIVERSIONS Response.
This function takes the following arguments:
=over 3
=item C<$bin_stream_ref>
C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.
=back
=cut
sub decode_api_versions_response {
my ( $bin_stream_ref ) = @_;
my @data = unpack( $_decode_api_version_response_template, $$bin_stream_ref );
my $i = 0;
my $ApiVersions_Response = {};
$ApiVersions_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
$ApiVersions_Response->{ErrorCode} = $data[ $i++ ]; # ErrorCode
my $ApiVersions_array = $ApiVersions_Response->{ApiVersions} = [];
my $ApiVersions_array_size = $data[ $i++ ]; # ApiVersions array size
while ( $ApiVersions_array_size-- ) {
push( @$ApiVersions_array, {
ApiKey => $data[ $i++ ], # ApiKey
MinVersion => $data[ $i++ ], # MinVersion
MaxVersion => $data[ $i++ ], # MaxVersion
}
);
}
return $ApiVersions_Response;
}
=head3 C<encode_find_coordinator_request( $FindCoordinator_Request )>
Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.
This function takes the following arguments:
=over 3
=item C<$FindCoordinator_Request>
C<$FindCoordinator_Request> is a reference to the hash representing the structure
of the FINDCOORDINATOR Request. it contains CorrelationId, ClientId (can be empty
string), CoordinatorKey and CoordinatorType (for version 1 of protocol)
=back
=cut
$IMPLEMENTED_APIVERSIONS->{$APIKEY_FINDCOORDINATOR} = 1;
sub encode_find_coordinator_request {
my ( $FindCoordinator_Request ) = @_;
my @data;
my $request = {
# template => '...',
# len => ...,
data => \@data,
};
my $api_version =
_encode_request_header( $request, $APIKEY_FINDCOORDINATOR, $FindCoordinator_Request );
# Size
# ApiKey
# ApiVersion
# CorrelationId
# ClientId
my $coordinator_key = $FindCoordinator_Request->{CoordinatorKey};
my $coordinator_type = $FindCoordinator_Request->{CoordinatorType};
$request->{template} .= q{s>}; # string length
$request->{len} += 2;
_encode_string( $request, $coordinator_key ); # CoordinatorKey (GroupId for version 0)
if ($api_version >= 1) {
# CoordinatorType (0 for groups)
$request->{template} .= q{c>};
$request->{len} += 1;
push( @{ $request->{data} }, $coordinator_type );
}
return pack( $request->{template}, $request->{len}, @data );
}
my $_decode_find_protocol_response_template = q{x[l]l>s>l>s>/al>};
# x[l] # Size (skip)
# l> # CorrelationId
# s> # ErrorCode
# l> # NodeId
# s>/a # Host
# l> # Port
my $_decode_find_protocol_response_template_v1 = q{x[l]l>l>s>l>s>/al>};
# x[l] # Size (skip)
# l> # CorrelationId
# l> # throttle_time_ms
# s> # ErrorCode
# s>/a # ErrorMessage
# l> # NodeId
# s>/a # Host
# l> # Port
=head3 C<decode_find_coordinator_response( $bin_stream_ref )>
Decodes the argument and returns a reference to the hash representing
the structure of the FINDCOORDINATOR Response.
This function takes the following arguments:
=over 3
=item C<$bin_stream_ref>
C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.
=back
=cut
sub decode_find_coordinator_response {
my ( $bin_stream_ref, $api_version ) = @_;
$api_version //= $DEFAULT_APIVERSION;
my $is_v1 = $api_version == 1;
my @data = unpack( $is_v1 ? $_decode_find_protocol_response_template_v1
: $_decode_find_protocol_response_template, $$bin_stream_ref );
my $i = 0;
my $FindCoordinator_Response = {};
$FindCoordinator_Response->{CorrelationId} = $data[ $i++ ];
if ($is_v1) {
$FindCoordinator_Response->{ThrottleTimeMs} = $data[ $i++ ]; # only v1
}
$FindCoordinator_Response->{ErrorCode} = $data[ $i++ ];
if ($is_v1) {
$FindCoordinator_Response->{ErrorMessage} = $data[ $i++ ]; # only v1
}
$FindCoordinator_Response->{NodeId} = $data[ $i++ ];
$FindCoordinator_Response->{Host} = $data[ $i++ ];
$FindCoordinator_Response->{Port} = $data[ $i++ ];
return $FindCoordinator_Response;
}
# PRODUCE Request --------------------------------------------------------------
=head3 C<encode_produce_request( $Produce_Request, $compression_codec )>
Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.
This function takes the following arguments:
=over 3
=item C<$Produce_Request>
C<$Produce_Request> is a reference to the hash representing
the structure of the PRODUCE Request (examples see C<t/*_decode_encode.t>).
=item C<$compression_codec>
Optional.
C<$compression_codec> sets the required type of C<$messages> compression,
if the compression is desirable.
Supported codecs:
L<$COMPRESSION_NONE|Kafka/$COMPRESSION_NONE>,
L<$COMPRESSION_GZIP|Kafka/$COMPRESSION_GZIP>,
L<$COMPRESSION_SNAPPY|Kafka/$COMPRESSION_SNAPPY>,
L<$COMPRESSION_LZ4|Kafka/$COMPRESSION_LZ4>.
NOTE: $COMPRESSION_LZ4 requires Kafka 0.10 or higher, as initial implementation of LZ4 in Kafka did not follow the standard LZ4 framing specification.
=back
=cut
$IMPLEMENTED_APIVERSIONS->{$APIKEY_PRODUCE} = 2;
sub encode_produce_request {
my ( $Produce_Request, $compression_codec ) = @_;
my @data;
my $request = {
# template => '...',
# len => ...,
data => \@data,
};
_encode_request_header( $request, $APIKEY_PRODUCE, $Produce_Request );
# Size
# ApiKey
# ApiVersion
# CorrelationId
# ClientId
my $topics_array = $Produce_Request->{topics};
push( @data,
$Produce_Request->{RequiredAcks}, # RequiredAcks
$Produce_Request->{Timeout}, # Timeout
scalar( @$topics_array ), # topics array size
);
$request->{template} .= $_ProduceRequest_header_template;
$request->{len} += $_ProduceRequest_header_length;
foreach my $topic ( @$topics_array ) {
$request->{template} .= q{s>}; # string length
$request->{len} += 2;
_encode_string( $request, $topic->{TopicName} ); # TopicName
my $partitions_array = $topic->{partitions};
push( @data, scalar( @$partitions_array ) );
$request->{template} .= q{l>}; # partitions array size
$request->{len} += 4;
foreach my $partition ( @$partitions_array ) {
push( @data, $partition->{Partition} );
$request->{template} .= q{l>}; # Partition
$request->{len} += 4;
_encode_MessageSet_array( $request, $partition->{MessageSet}, $compression_codec );
}
}
return pack( $request->{template}, $request->{len}, @data );
}
# PRODUCE Response -------------------------------------------------------------
my $_decode_produce_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}))};
# x[l] # Size (skip)
# l> # CorrelationId
# l> # topics array size
# X[l]
# l>/( # topics array
# s>/a # TopicName
# l> # partitions array size
# X[l]
# l>/( # partitions array
# l> # Partition
# s> # ErrorCode
# $_int64_template # Offset
# )
# )
my $_decode_produce_response_template_v1 = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}))l>};
# x[l] # Size (skip)
# l> # CorrelationId
# l> # topics array size
# X[l]
# l>/( # topics array
# s>/a # TopicName
# l> # partitions array size
# X[l]
# l>/( # partitions array
# l> # Partition
# s> # ErrorCode
# $_int64_template # Offset
# )
# )
# l> # Throttle_Time_Ms
my $_decode_produce_response_template_v2 = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}${_int64_template}))l>};
# x[l] # Size (skip)
# l> # CorrelationId
# l> # topics array size
# X[l]
# l>/( # topics array
# s>/a # TopicName
# l> # partitions array size
# X[l]
# l>/( # partitions array
# l> # Partition
# s> # ErrorCode
# $_int64_template # Offset
# $_int64_template # Log_Append_Time
# )
# )
# l> # Throttle_Time_Ms
=head3 C<decode_produce_response( $bin_stream_ref )>
Decodes the argument and returns a reference to the hash representing
the structure of the PRODUCE Response (examples see C<t/*_decode_encode.t>).
This function takes the following arguments:
=over 3
=item C<$bin_stream_ref>
C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.
=back
=cut
sub decode_produce_response {
my ( $bin_stream_ref, $api_version ) = @_;
$api_version //= $DEFAULT_APIVERSION;
my $is_v1 = $api_version == 1;
my $is_v2 = $api_version == 2;
my @data = unpack( $is_v1 ? $_decode_produce_response_template_v1
: $is_v2 ? $_decode_produce_response_template_v2
: $_decode_produce_response_template, $$bin_stream_ref );
my $i = 0;
my $Produce_Response = {};
$Produce_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
my $topics_array = $Produce_Response->{topics} = [];
my $topics_array_size = $data[ $i++ ]; # topics array size
while ( $topics_array_size-- ) {
my $topic = {
TopicName => $data[ $i++ ],
};
my $partitions_array = $topic->{partitions} = [];
my $partitions_array_size = $data[ $i++ ]; # partitions array size
while ( $partitions_array_size-- ) {
my $partition = {
Partition => $data[ $i++ ], # Partition
ErrorCode => $data[ $i++ ], # ErrorCode
Offset => _unpack64( $data[ $i++ ] ), # Offset
( $is_v2 ? (Log_Append_Time => _unpack64( $data[ $i++ ] )) : () ), # Log_Append_Time
};
push( @$partitions_array, $partition );
}
push( @$topics_array, $topic );
}
defined $data[ $i ]
and $Produce_Response->{Throttle_Time_Ms} = $data[ $i++ ];
return $Produce_Response;
}
# FETCH Request ----------------------------------------------------------------
=head3 C<encode_fetch_request( $Fetch_Request )>
Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.
This function takes the following arguments:
=over 3
=item C<$Fetch_Request>
C<$Fetch_Request> is a reference to the hash representing
the structure of the FETCH Request (examples see C<t/*_decode_encode.t>).
=back
=cut
# version 0, 1, 2 have the same request protocol.
$IMPLEMENTED_APIVERSIONS->{$APIKEY_FETCH} = 3;
sub encode_fetch_request {
my ( $Fetch_Request ) = @_;
my @data;
my $request = {
# template => '...',
# len => ...,
data => \@data,
};
my $api_version = _encode_request_header( $request, $APIKEY_FETCH, $Fetch_Request );
# Size
# ApiKey
# ApiVersion
# CorrelationId
# ClientId
my $is_v3 = $api_version == 3;
push( @data, $CONSUMERS_REPLICAID ); # ReplicaId
my $topics_array = $Fetch_Request->{topics};
push( @data,
$Fetch_Request->{MaxWaitTime}, # MaxWaitTime
$Fetch_Request->{MinBytes}, # MinBytes
( $is_v3 ? $Fetch_Request->{MaxBytes} : () ), # MaxBytes (version 3 only)
scalar( @$topics_array ), # topics array size
);
if ($is_v3) {
$request->{template} .= $_FetchRequest_header_template_v3;
$request->{len} += $_FetchRequest_header_length_v3;
} else {
$request->{template} .= $_FetchRequest_header_template;
$request->{len} += $_FetchRequest_header_length;
}
foreach my $topic ( @$topics_array ) {
$request->{template} .= q{s>}; # string length
$request->{len} += 2;
_encode_string( $request, $topic->{TopicName} ); # TopicName
my $partitions_array = $topic->{partitions};
push( @data, scalar( @$partitions_array ) );
$request->{template} .= q{l>}; # partitions array size
$request->{len} += 4;
foreach my $partition ( @$partitions_array ) {
push( @data,
$partition->{Partition}, # Partition
_pack64( $partition->{FetchOffset} ), # FetchOffset
$partition->{MaxBytes}, # MaxBytes
);
$request->{template} .= $_FetchRequest_body_template;
$request->{len} += $_FetchRequest_body_length;
}
}
return pack( $request->{template}, $request->{len}, @data );
}
# FETCH Response ---------------------------------------------------------------
=head3 C<decode_fetch_response( $bin_stream_ref )>
Decodes the argument and returns a reference to the hash representing
the structure of the FETCH Response (examples see C<t/*_decode_encode.t>).
This function takes the following arguments:
=over 3
=item C<$bin_stream_ref>
C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.
=back
=cut
sub decode_fetch_response {
my ( $bin_stream_ref, $api_version ) = @_;
$api_version //= $DEFAULT_APIVERSION;
my $is_v1 = $api_version == 1;
my $is_v2 = $api_version == 2;
my $is_v3 = $api_version == 3;
# According to Apache Kafka documentation:
# As an optimization the server is allowed to return a partial message at the end of the message set.
# Clients should handle this case.
# NOTE: look inside _decode_MessageSet_template and _decode_MessageSet_array
my @data;
my $response = {
# template => '...',
# stream_offset => ...,
data => \@data,
bin_stream => $bin_stream_ref,
};
_decode_fetch_response_template( $response, $api_version );
@data = unpack( $response->{template}, $$bin_stream_ref );
my $i = 0;
my $Fetch_Response = {};
$Fetch_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
if ($is_v1 || $is_v2 || $is_v3) {
$Fetch_Response->{ThrottleTimeMs} = $data[ $i++ ]; # ( ThrottleTimeMs ) only v1 and above
}
my $topics_array = $Fetch_Response->{topics} = [];
my $topics_array_size = $data[ $i++ ]; # topics array size
while ( $topics_array_size-- ) {
my $topic = {
TopicName => $ data[ $i++ ], # TopicName
};
my $partitions_array = $topic->{partitions} = [];
my $partitions_array_size = $data[ $i++ ]; # partitions array size
my ( $MessageSetSize, $MessageSet_array );
while ( $partitions_array_size-- ) {
my $partition = {
Partition => $data[ $i++ ], # Partition
ErrorCode => $data[ $i++ ], # ErrorCode
HighwaterMarkOffset => _unpack64( $data[ $i++ ] ), # HighwaterMarkOffset
};
$MessageSetSize = $data[ $i++ ]; # MessageSetSize
$MessageSet_array = $partition->{MessageSet} = [];
_decode_MessageSet_array( $response, $MessageSetSize, \$i, $MessageSet_array );
push( @$partitions_array, $partition );
}
push( @$topics_array, $topic );
}
return $Fetch_Response;
}
# OFFSET Request ---------------------------------------------------------------
=head3 C<encode_offset_request( $Offset_Request )>
Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.
This function takes the following arguments:
=over 3
=item C<$Offset_Request>
C<$Offset_Request> is a reference to the hash representing
the structure of the OFFSET Request (examples see C<t/*_decode_encode.t>).
=back
=cut
$IMPLEMENTED_APIVERSIONS->{$APIKEY_OFFSET} = 1;
sub encode_offset_request {
my ( $Offset_Request ) = @_;
my @data;
my $request = {
# template => '...',
# len => ...,
data => \@data,
};
my $api_version = _encode_request_header( $request, $APIKEY_OFFSET, $Offset_Request );
# Size
# ApiKey
# ApiVersion
# CorrelationId
# ClientId
my $is_v1 = $api_version == 1;
my $topics_array = $Offset_Request->{topics};
push( @data,
$CONSUMERS_REPLICAID, # ReplicaId
scalar( @$topics_array ), # topics array size
);
$request->{template} .= $_OffsetRequest_header_template;
$request->{len} += $_OffsetRequest_header_length;
my $body_template = $is_v1 ? $_OffsetRequest_body_template_v1 : $_OffsetRequest_body_template;
my $body_length = $is_v1 ? $_OffsetRequest_body_length_v1 : $_OffsetRequest_body_length;
foreach my $topic ( @$topics_array ) {
$request->{template} .= q{s>}; # string length
$request->{len} += 2;
_encode_string( $request, $topic->{TopicName} ); # TopicName
my $partitions_array = $topic->{partitions};
push( @data, scalar( @$partitions_array ) );
$request->{template} .= q{l>}; # partitions array size
$request->{len} += 4; # [l] partitions array size
foreach my $partition ( @$partitions_array ) {
push( @data,
$partition->{Partition}, # Partition
_pack64( $partition->{Time} ), # Time
$is_v1 ? () : $partition->{MaxNumberOfOffsets}, # MaxNumberOfOffsets
);
$request->{template} .= $body_template;
$request->{len} += $body_length;
}
}
# say STDERR $request->{template};
# say STDERR HexDump($_) foreach @data; use Data::HexDump;
return pack( $request->{template}, $request->{len}, @data );
}
# OFFSET Response --------------------------------------------------------------
my $_decode_offset_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>l>X[l]l>/(${_int64_template})))};
# x[l] # Size (skip)
# l> # CorrelationId
# l> # topics array size
# X[l]
# l>/( # topics array
# s>/a # TopicName
# l> # PartitionOffsets array size
# X[l]
# l>/( # PartitionOffsets array
# l> # Partition
# s> # ErrorCode
# l> # Offset array size
# X[l]
# l>/( # Offset array
# $_int64_template # Offset
# )
# )
# )
my $_decode_offset_response_template_v1 = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}${_int64_template}))};
# x[l] # Size (skip)
# l> # CorrelationId
# l> # topics array size
# X[l]
# l>/( # topics array
# s>/a # TopicName
# l> # PartitionOffsets array size
# X[l]
# l>/( # PartitionOffsets array
# l> # Partition
# s> # ErrorCode
# $_int64_template # Timestamp
# $_int64_template # Offset
# )
# )
=head3 C<decode_offset_response( $bin_stream_ref )>
Decodes the argument and returns a reference to the hash representing
the structure of the OFFSET Response (examples see C<t/*_decode_encode.t>).
This function takes the following arguments:
=over 3
=item C<$bin_stream_ref>
C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.
=back
=cut
sub decode_offset_response {
my ( $bin_stream_ref, $api_version ) = @_;
$api_version //= $DEFAULT_APIVERSION;
my $is_v1 = $api_version == 1;
my $template = $is_v1 ? $_decode_offset_response_template_v1 : $_decode_offset_response_template;
my @data = unpack( $template, $$bin_stream_ref );
my $i = 0;
my $Offset_Response = { };
$Offset_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
my $topics_array = $Offset_Response->{topics} = [];
my $topics_array_size = $data[ $i++ ]; # topics array size
while ( $topics_array_size-- ) {
my $topic = {
TopicName => $data[ $i++ ], # TopicName
};
my $PartitionOffsets_array = $topic->{PartitionOffsets} = [];
my $PartitionOffsets_array_size = $data[ $i++ ]; # PartitionOffsets array size
my ( $PartitionOffset, $Offset_array, $Offset_array_size );
while ( $PartitionOffsets_array_size-- ) {
$PartitionOffset = {
Partition => $data[ $i++ ], # Partition
ErrorCode => $data[ $i++ ], # ErrorCode
};
if ($is_v1) {
$PartitionOffset->{Timestamp} = _unpack64($data[ $i++ ]); # Timestamp (v1 only)
$PartitionOffset->{Offset} = _unpack64($data[ $i++ ]); # Offset (v1 only)
} else {
$Offset_array = $PartitionOffset->{Offset} = [];
$Offset_array_size = $data[ $i++ ]; # Offset array size
while ( $Offset_array_size-- ) {
push( @$Offset_array, _unpack64( $data[ $i++ ] ) ); # Offset
}
}
push( @$PartitionOffsets_array, $PartitionOffset );
}
push( @$topics_array, $topic );
}
return $Offset_Response;
}
# METADATA Request -------------------------------------------------------------
=head3 C<encode_metadata_request( $Metadata_Request )>
Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.
This function takes the following arguments:
=over 3
=item C<$Metadata_Request>
C<$Metadata_Request> is a reference to the hash representing
the structure of the METADATA Request (examples see C<t/*_decode_encode.t>).
=back
=cut
$IMPLEMENTED_APIVERSIONS->{$APIKEY_METADATA} = 0;
sub encode_metadata_request {
my ( $Metadata_Request ) = @_;
my @data;
my $request = {
# template => '...',
# len => ...,
data => \@data,
};
_encode_request_header( $request, $APIKEY_METADATA, $Metadata_Request );
# Size
# ApiKey
# ApiVersion
# CorrelationId
# ClientId
my $topics_array = $Metadata_Request->{topics};
push( @data, scalar( @$topics_array ) ); # topics array size
$request->{template} .= q{l>};
$request->{len} += 4;
foreach my $topic ( @$topics_array ) {
$request->{template} .= q{s>}; # string length
$request->{len} += 2;
_encode_string( $request, $topic ); # TopicName
}
return pack( $request->{template}, $request->{len}, @data );
}
# METADATA Response ------------------------------------------------------------
my $_decode_metadata_response_template = q{x[l]l>l>X[l]l>/(l>s>/al>)l>X[l]l>/(s>s>/al>X[l]l>/(s>l>l>l>X[l]l>/(l>)l>X[l]l>/(l>)))};
# x[l] # Size (skip)
# l> # CorrelationId
# l> # Broker array size
# X[l]
# l>/( # Broker array
# l> # NodeId
# s>/a # Host
# l> # Port
# )
# l> # TopicMetadata array size
# X[l]
# l>/( # TopicMetadata array
# s> # ErrorCode
# s>/a # TopicName
# l> # PartitionMetadata array size
# X[l]
# l>/( # PartitionMetadata array
# s> # ErrorCode
# l> # Partition
# l> # Leader
# l> # Replicas array size
# X[l]
# l>/( # Replicas array
# l> # ReplicaId
# )
# l> # Isr array size
# X[l]
# l>/( # Isr array
# l> # ReplicaId
# )
# )
# )
=head3 C<decode_metadata_response( $bin_stream_ref )>
Decodes the argument and returns a reference to the hash representing
the structure of the METADATA Response (examples see C<t/*_decode_encode.t>).
This function takes the following arguments:
=over 3
=item C<$bin_stream_ref>
C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.
=back
=cut
sub decode_metadata_response {
my ( $bin_stream_ref ) = @_;
my @data = unpack( $_decode_metadata_response_template, $$bin_stream_ref );
my $i = 0;
my $Metadata_Response = {};
$Metadata_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
my $Broker_array = $Metadata_Response->{Broker} = [];
my $Broker_array_size = $data[ $i++ ]; # Broker array size
while ( $Broker_array_size-- ) {
push( @$Broker_array, {
NodeId => $data[ $i++ ], # NodeId
Host => $data[ $i++ ], # Host
Port => $data[ $i++ ], # Port
}
);
}
my $TopicMetadata_array = $Metadata_Response->{TopicMetadata} = [];
my $TopicMetadata_array_size = $data[ $i++ ]; # TopicMetadata array size
while ( $TopicMetadata_array_size-- ) {
my $TopicMetadata = {
ErrorCode => $data[ $i++ ], # ErrorCode
TopicName => $data[ $i++ ], # TopicName
};
my $PartitionMetadata_array = $TopicMetadata->{PartitionMetadata} = [];
my $PartitionMetadata_array_size = $data[ $i++ ]; # PartitionMetadata array size
while ( $PartitionMetadata_array_size-- ) {
my $PartitionMetadata = {
ErrorCode => $data[ $i++ ], # ErrorCode
Partition => $data[ $i++ ], # Partition
Leader => $data[ $i++ ], # Leader
};
my $Replicas_array = $PartitionMetadata->{Replicas} = [];
my $Replicas_array_size = $data[ $i++ ]; # Replicas array size
while ( $Replicas_array_size-- ) {
push( @$Replicas_array, $data[ $i++ ] ); # ReplicaId
}
my $Isr_array = $PartitionMetadata->{Isr} = [];
my $Isr_array_size = $data[ $i++ ]; # Isr array size
while ( $Isr_array_size-- ) {
push( @$Isr_array, $data[ $i++ ] ); # ReplicaId
}
push( @$PartitionMetadata_array, $PartitionMetadata );
}
push( @$TopicMetadata_array, $TopicMetadata );
}
return $Metadata_Response;
}
# OffsetCommit Request -------------------------------------------------------------
=head3 C<encode_offsetcommit_request( $OffsetCommit_Request )>
Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.
This function takes the following arguments:
=over 3
=item C<$OffsetCommit_Request>
C<$OffsetCommit_Request> is a reference to the hash representing
the structure of the OffsetCommit Request (examples see C<t/*_decode_encode.t>).
=back
=cut
$IMPLEMENTED_APIVERSIONS->{$APIKEY_OFFSETCOMMIT} = 1;
sub encode_offsetcommit_request {
my ( $OffsetCommit_Request ) = @_;
my @data;
my $request = {
# template => '...',
# len => ...,
data => \@data,
};
my $api_version = _encode_request_header( $request, $APIKEY_OFFSETCOMMIT, $OffsetCommit_Request );
# Size
# ApiKey
# ApiVersion
# CorrelationId
# ClientId
my $is_v1 = $api_version == 1;
$request->{template} .= q{s>};
$request->{len} += 2;
_encode_string( $request, $OffsetCommit_Request->{GroupId} ); # GroupId
if ($is_v1) {
$request->{template} .= q{l>}; # GroupGenerationId
$request->{len} += 4;
### WARNING TODO: we don't support consumer groups properly, so we set
### generation id to -1 and member id null (see
### https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol )
push( @data, -1 );
$request->{template} .= q{s>}; # string length
$request->{len} += 2;
_encode_string( $request, '' ); # MemberId
}
my $topics_array = $OffsetCommit_Request->{topics};
push( @data, scalar( @$topics_array ) ); # topics array size
$request->{template} .= q{l>};
$request->{len} += 4;
foreach my $topic ( @$topics_array ) {
$request->{template} .= q{s>};
$request->{len} += 2;
_encode_string( $request, $topic->{TopicName} ); # TopicName
my $partitions_array = $topic->{partitions};
push( @data, scalar( @$partitions_array ) ); # partitions array size
$request->{template} .= q{l>};
$request->{len} += 4;
foreach my $partition ( @$partitions_array ){
push( @data,
$partition->{Partition}, # Partition
$partition->{Offset}, # Offset
);
$request->{template} .= qq{l>${_int64_template}};
$request->{len} += 12;
if ($is_v1) { # timestamp
### WARNING TODO: currently we hardcode "now"
push( @data, time() * 1000);
$request->{template} .= qq{${_int64_template}};
$request->{len} += 8;
}
$request->{template} .= q{s>};
$request->{len} += 2;
_encode_string( $request, $partition->{Metadata} ), # Metadata
}
}
return pack( $request->{template}, $request->{len}, @data );
}
# OFFSETCOMMIT Response --------------------------------------------------------------
my $_decode_offsetcommit_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>))};
# x[l] # Size (skip)
# l> # CorrelationId
# l> # topics array size
# X[l]
# l>/( # topics array
# s>/a # TopicName
# l> # partitions array size
# X[l]
# l>/( # partitions array
# l> # Partition
# s> # ErrorCode
# )
# )
=head3 C<decode_offsetcommit_response( $bin_stream_ref )>
Decodes the argument and returns a reference to the hash representing
the structure of the OFFSETCOMMIT Response (examples see C<t/*_decode_encode.t>).
This function takes the following arguments:
=over 3
=item C<$bin_stream_ref>
C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.
=back
=cut
sub decode_offsetcommit_response {
my ( $bin_stream_ref ) = @_;
my @data = unpack( $_decode_offsetcommit_response_template, $$bin_stream_ref );
my $i = 0;
my $OffsetCommit_Response = {};
$OffsetCommit_Response->{CorrelationId} = $data[ $i++ ]; #CorrelationId
my $topics_array = $OffsetCommit_Response->{topics} = [];
my $topics_array_size = $data[ $i++ ]; # topics array size
while ( $topics_array_size-- ) {
my $topic = {
TopicName => $data[ $i++ ], # TopicName
};
my $Partition_array = $topic->{partitions} = [];
my $Partition_array_size = $data[ $i++ ]; # Partitions array size
while ( $Partition_array_size-- ) {
my $Partition = {
Partition => $data[ $i++ ], # Partition
ErrorCode => $data[ $i++ ], # ErrorCode
};
push( @$Partition_array, $Partition);
}
push( @$topics_array, $topic);
}
return $OffsetCommit_Response;
}
# OffsetFetch Request -------------------------------------------------------------
=head3 C<encode_offsetfetch_request( $OffsetFetch_Request )>
Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.
This function takes the following arguments:
=over 3
=item C<$OffsetFetch_Request>
C<$OffsetFetch_Request> is a reference to the hash representing
the structure of the OffsetFetch Request (examples see C<t/*_decode_encode.t>).
=back
=cut
$IMPLEMENTED_APIVERSIONS->{$APIKEY_OFFSETFETCH} = 1;
sub encode_offsetfetch_request {
my ( $OffsetFetch_Request ) = @_;
my @data;
my $request = {
# template => '...',
# len => ...,
data => \@data,
};
_encode_request_header( $request, $APIKEY_OFFSETFETCH, $OffsetFetch_Request );
# Size
# ApiKey
# ApiVersion
# CorrelationId
# ClientId
$request->{template} .= q{s>};
$request->{len} += 2;
_encode_string( $request, $OffsetFetch_Request->{GroupId} ); # GroupId
my $topics_array = $OffsetFetch_Request->{topics};
push( @data, scalar( @$topics_array ) ); # topics array size
$request->{template} .= q{l>};
$request->{len} += 4;
foreach my $topic ( @$topics_array ) {
$request->{template} .= q{s>};
$request->{len} += 2;
_encode_string( $request, $topic->{TopicName} ); # TopicName
my $partitions_array = $topic->{partitions};
push( @data, scalar( @$partitions_array ) ); # partitions array size
$request->{template} .= q{l>};
$request->{len} += 4;
foreach my $partition ( @$partitions_array ){
push( @data,
$partition->{Partition}, # Partition
);
$request->{template} .= qq{l>};
$request->{len} += 4;
}
}
return pack( $request->{template}, $request->{len}, @data );
}
# OFFSETFETCH Response --------------------------------------------------------------
my $_decode_offsetfetch_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>${_int64_template}s>/as>))};
# x[l] # Size (skip)
# l> # CorrelationId
# l> # topics array size
# X[l]
# l>/( # topics array
# s>/a # TopicName
# l> # partitions array size
# X[l]
# l>/( # partitions array
# l> # Partition
# $_int64_template # Offset
# s>/a # Metadata
# s> # ErrorCode
# )
# )
=head3 C<decode_offsetfetch_response( $bin_stream_ref )>
Decodes the argument and returns a reference to the hash representing
the structure of the OFFSETFETCH Response (examples see C<t/*_decode_encode.t>).
This function takes the following arguments:
=over 3
=item C<$bin_stream_ref>
C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.
=back
=cut
sub decode_offsetfetch_response {
my ( $bin_stream_ref ) = @_;
my @data = unpack( $_decode_offsetfetch_response_template, $$bin_stream_ref );
my $i = 0;
my $OffsetFetch_Response = {};
$OffsetFetch_Response->{CorrelationId} = $data[ $i++ ]; #CorrelationId
my $topics_array = $OffsetFetch_Response->{topics} = [];
my $topics_array_size = $data[ $i++ ]; # topics array size
while ( $topics_array_size-- ) {
my $topic = {
TopicName => $data[ $i++ ], # TopicName
};
my $Partition_array = $topic->{partitions} = [];
my $Partition_array_size = $data[ $i++ ]; # Partitions array size
while ( $Partition_array_size-- ) {
my $Partition = {
Partition => $data[ $i++ ], # Partition
Offset => $data[ $i++ ], # Partition
Metadata => $data[ $i++ ], # Partition
ErrorCode => $data[ $i++ ], # ErrorCode
};
push( @$Partition_array, $Partition);
}
push( @$topics_array, $topic);
}
return $OffsetFetch_Response;
}
# SaslHandshake Request -------------------------------------------------------------
=head3 C<encode_saslhandshake_request( $SaslHandshake_Request )>
Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.
This function takes the following arguments:
=over 3
=item C<$SaslHandshake_Request>
C<$SaslHandshake_Request> is a reference to the hash representing
the structure of the SaslHandshake Request.
=back
=cut
$IMPLEMENTED_APIVERSIONS->{$APIKEY_SASLHANDSHAKE} = 0;
sub encode_saslhandshake_request {
my ( $SaslHandshake_Request ) = @_;
my @data;
my $request = {
# template => '...',
# len => ...,
data => \@data,
};
_encode_request_header( $request, $APIKEY_SASLHANDSHAKE, $SaslHandshake_Request );
# Size
# ApiKey
# ApiVersion
# CorrelationId
# ClientId
$request->{template} .= q{s>}; # string length
$request->{len} += 2;
_encode_string( $request, $SaslHandshake_Request->{Mechanism} ); # SASL Mechanism
return pack( $request->{template}, $request->{len}, @data );
}
# SASLHANDSHAKE Response --------------------------------------------------------------
my $_decode_saslhandshake_response_template = q{x[l]l>s>l>X[l]l>/(s>/a)};
# x[l] # Size (skip)
# l> # CorrelationId
# s> # ErrorCode
# l> # ApiVersions array size
# X[l]
# l>/( # Enabled Mechanisms
# s>/a # Mechanism
# )
=head3 C<decode_saslhandshake_response( $bin_stream_ref )>
Decodes the argument and returns a reference to the hash representing
the structure of the SaslHandshake Response.
This function takes the following arguments:
=over 3
=item C<$bin_stream_ref>
C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.
=back
=cut
sub decode_saslhandshake_response {
my ( $bin_stream_ref ) = @_;
my @data = unpack( $_decode_saslhandshake_response_template, $$bin_stream_ref );
my $i = 0;
my $SaslHandshake_Response = {};
$SaslHandshake_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
$SaslHandshake_Response->{ErrorCode} = $data[ $i++ ]; # ErrorCode
my $Mechanisms_array = $SaslHandshake_Response->{Mechanisms} = [];
my $Mechanisms_array_size = $data[ $i++ ]; # ApiVersions array size
while ( $Mechanisms_array_size-- ) {
push( @$Mechanisms_array, $data[ $i++ ] ); # Mechanism
}
return $SaslHandshake_Response;
}
#-- private functions ----------------------------------------------------------
# Generates a template to encrypt the request header
sub _encode_request_header {
my ( $request, $api_key, $request_ref ) = @_;
# we need to find out which API version to use for the request (and the
# response). $request_ref->{ApiVersion} can be specified by the end user,
# or providede by by Kafka::Connection ( see
# Kafka::Connection::_get_api_versions() ). If not provided, we
# default to $DEFAULT_APIVERSION
my $api_version = $request_ref->{ApiVersion} // $DEFAULT_APIVERSION;
@{ $request->{data} } = (
# Size
$api_key, # ApiKey
$api_version, # ApiVersion
$request_ref->{CorrelationId}, # CorrelationId
);
$request->{template} = $_Request_header_template;
$request->{len} = $_Request_header_length;
_encode_string( $request, $request_ref->{ClientId} ); # ClientId
return $api_version;
}
# Generates a template to decrypt the fetch response body
sub _decode_fetch_response_template {
my ( $response, $api_version ) = @_;
my $is_v1 = $api_version == 1;
my $is_v2 = $api_version == 2;
my $is_v3 = $api_version == 3;
if ($is_v1 || $is_v2 || $is_v3) {
$response->{template} = $_FetchResponse_header_template_v1;
$response->{stream_offset} = $_FetchResponse_header_length_v1; # bytes before topics array size
# [l] Size
# [l] CorrelationId
# [l] throttle_time_ms
} else {
$response->{template} = $_FetchResponse_header_template;
$response->{stream_offset} = $_FetchResponse_header_length; # bytes before topics array size
# [l] Size
# [l] CorrelationId
}
my $topics_array_size = unpack(
q{x[}.$response->{stream_offset} . q{]}
.q{l>}, # topics array size
${ $response->{bin_stream} }
);
$response->{stream_offset} += 4; # bytes before TopicName length
# [l] topics array size
my ( $TopicName_length, $partitions_array_size );
while ( $topics_array_size-- ) {
$TopicName_length = unpack(
q{x[}.$response->{stream_offset}
.q{]s>}, # TopicName length
${ $response->{bin_stream} }
);
$response->{stream_offset} += # bytes before partitions array size
2 # [s] TopicName length
+ $TopicName_length # TopicName
;
$partitions_array_size = unpack(
q{x[}.$response->{stream_offset}
.q{]l>}, # partitions array size
${ $response->{bin_stream} }
);
$response->{stream_offset} += 4; # bytes before Partition
# [l] partitions array size
$response->{template} .= $_FetchResponse_topic_body_template;
$response->{stream_offset} += $_FetchResponse_topic_body_length; # (without TopicName and partitions array size)
# bytes before MessageSetSize
# TopicName
# [l] # partitions array size
# [l] Partition
# [s] ErrorCode
# [q] HighwaterMarkOffset
_decode_MessageSet_template( $response );
}
return;
}
# kafka uses a snappy-java compressor, with it's own headers and frame formats
my $XERIAL_SNAPPY_MAGIC_HEADER = "\x82SNAPPY\x00";
my $XERIAL_SNAPPY_BLOCK_SIZE = 0x8000; # 32Kb
my $XERIAL_SNAPPY_FILE_VERSION = 1;
sub _snappy_xerial_decompress {
my ( $data ) = @_;
my $uncompressed;
my $raw_format_suspected = 1;
if ( length($data) > 16 ) {
my ( $header, $x_version, $x_compatversion) = unpack( q{a[8]L>L>}, $data );
if ( $header eq $XERIAL_SNAPPY_MAGIC_HEADER ) {
$raw_format_suspected = 0;
_error( $ERROR_COMPRESSION, 'snappy: bad compatversion in snappy xerial header' ) unless $x_compatversion == $XERIAL_SNAPPY_FILE_VERSION;
_error( $ERROR_COMPRESSION, 'snappy: bad version in snappy xerial header' ) unless $x_version == $XERIAL_SNAPPY_FILE_VERSION;
$data = substr( $data, 16 );
while ( length($data) > 0 ) {
$uncompressed //= '';
_error( $ERROR_COMPRESSION, 'snappy: bad frame length header' ) if length($data) < 4;
my $compressed_frame_length = unpack( q{L>}, $data );
$data = substr( $data, 4 );
_error( $ERROR_COMPRESSION, 'snappy: partial frame ' ) if length($data) < $compressed_frame_length;
my $compressed_frame = substr( $data, 0, $compressed_frame_length, '');
my $uncompressed_frame = Compress::Snappy::decompress( $compressed_frame );
_error( $ERROR_COMPRESSION, 'snappy: can\'t uncompress frame ' ) if not defined $uncompressed_frame;
$uncompressed .= $uncompressed_frame;
}
}
}
if ( $raw_format_suspected ) {
$uncompressed = Compress::Snappy::decompress( $data );
}
return $uncompressed;
}
sub _snappy_xerial_compress {
my ( $data ) = @_;
my $compressed_data;
while ( length($data) ) {
my $block = substr $data, 0, $XERIAL_SNAPPY_BLOCK_SIZE, '';
my $compressed_block = Compress::Snappy::compress( $block );
_error( $ERROR_COMPRESSION, 'snappy: can\'t compress frame!' ) if not defined $compressed_block;
$compressed_data //= pack( q{a[8]L>L>}, $XERIAL_SNAPPY_MAGIC_HEADER, $XERIAL_SNAPPY_FILE_VERSION, $XERIAL_SNAPPY_FILE_VERSION );
$compressed_data .= pack( q{L>}, length($compressed_block) ) . $compressed_block;
}
return $compressed_data;
}
sub _decompress_data {
my ( $data, $compression_codec ) = @_;
say STDERR format_message( '[%s] decompress_data request, compression_codec: %s, data: %s',
scalar( localtime ),
$compression_codec,
unpack( 'H*', $data ),
) if Kafka::Protocol->debug_level // 0 >= 3;
my $decompressed;
if ( $compression_codec == $COMPRESSION_GZIP ) {
try {
$decompressed = gunzip( $data );
} catch {
_error( $ERROR_COMPRESSION, format_message( 'gunzip failed: %s', $_ ) );
};
} elsif ( $compression_codec == $COMPRESSION_SNAPPY ) {
$decompressed = _snappy_xerial_decompress( $data )
// _error( $ERROR_COMPRESSION, 'Unable to decompress snappy compressed data' );
} elsif ( $compression_codec == $COMPRESSION_LZ4 ) {
# New 0.10 clients (proposed behavior) - produce and consume v1 messages w/ correct LZ4F checksum
if ( Compress::LZ4Frame::looks_like_lz4frame( $data ) ) {
$decompressed = Compress::LZ4Frame::decompress( $data ) // _error( $ERROR_COMPRESSION, 'Unable to decompress LZ4 compressed data' );
} else {
_error( $ERROR_COMPRESSION, 'Unable to decompress LZ4 compressed data. Frame is not valid' );
}
} else {
_error( $ERROR_COMPRESSION, "Unknown compression codec $compression_codec" );
}
_error( $ERROR_COMPRESSION, 'Decompression produced empty result' ) unless defined $decompressed;
return $decompressed;
}
sub _compress_data {
my ( $data, $compression_codec ) = @_;
my $compressed;
# Compression
if ( $compression_codec == $COMPRESSION_GZIP ) {
$compressed = gzip( $data )
// _error( $ERROR_COMPRESSION, 'Unable to compress gzip data' );
} elsif ( $compression_codec == $COMPRESSION_SNAPPY ) {
$compressed = _snappy_xerial_compress( $data )
// _error( $ERROR_COMPRESSION, 'Unable to compress snappy data' );
} elsif ( $compression_codec == $COMPRESSION_LZ4 ) {
# New 0.10 clients (proposed behavior) - produce and consume v1 messages w/ correct LZ4F checksum
$compressed = Compress::LZ4Frame::compress_checksum( $data )
// _error( $ERROR_COMPRESSION, 'Unable to compress LZ4 data' );
} else {
_error( $ERROR_COMPRESSION, "Unknown compression codec $compression_codec" );
}
return $compressed;
}
# Decrypts MessageSet
sub _decode_MessageSet_array {
my ( $response, $MessageSetSize, $i_ref, $MessageSet_array_ref, $_override_offset) = @_;
# $_override_offset should not be set manually, it's used when recursing,
# to decode internal compressed message, which have offsets starting at 0,
# 1, etc, and instead we need to set the external wrapping message offset.
my $data = $response->{data};
my $data_array_size = scalar @{ $data };
# NOTE: not all messages can be returned
my ( $Message, $MessageSize, $Crc, $Key_length, $Value_length );
while ( $MessageSetSize && $$i_ref < $data_array_size ) {
my $message_offset = _unpack64( $data->[ $$i_ref++ ] );
if (defined $_override_offset) {
$message_offset = $_override_offset;
}
$Message = {
Offset => $message_offset, # Offset
};
$MessageSize = $data->[ $$i_ref++ ]; # MessageSize
# NOTE: The CRC is the CRC32 of the remainder of the message bytes.
# This is used to check the integrity of the message on the broker and consumer:
# MagicByte + Attributes + Key length + Key + Value length + Value
$Crc = $data->[ $$i_ref++ ]; # Crc
# WARNING: The current version of the module does not support the following:
# A message set is also the unit of compression in Kafka,
# and we allow messages to recursively contain compressed message sets to allow batch compression.
$Message->{MagicByte} = $data->[ $$i_ref++ ]; # MagicByte
my $is_v1_msg_format = $Message->{MagicByte} == 1;
$Message->{Attributes} = $data->[ $$i_ref++ ]; # Attributes
if ($is_v1_msg_format) {
$Message->{Timestamp} = $data->[ $$i_ref++ ]; # Timestamp
}
$Key_length = $data->[ $$i_ref++ ]; # Key length
$Message->{Key} = $Key_length == $NULL_BYTES_LENGTH ? q{} : $data->[ $$i_ref++ ]; # Key
$Value_length = $data->[ $$i_ref++ ]; # Value length
$Message->{Value} = $Value_length == $NULL_BYTES_LENGTH ? q{} : $data->[ $$i_ref++ ]; # Value
if ( my $compression_codec = $Message->{Attributes} & $COMPRESSION_CODEC_MASK ) {
my $decompressed = _decompress_data($Message->{Value}, $compression_codec);
my @data;
my $Value_length = length $decompressed;
my $resp = {
data => \@data,
bin_stream => \$decompressed,
stream_offset => 0,
};
_decode_MessageSet_sized_template( $Value_length, $resp );
@data = unpack( $resp->{template}, ${ $resp->{bin_stream} } );
my $i = 0; # i_ref
my $size = length( $decompressed );
_decode_MessageSet_array(
$resp,
$size, # message set size
\$i, # i_ref
$MessageSet_array_ref,
$message_offset
);
} else {
push( @$MessageSet_array_ref, $Message );
}
$MessageSetSize -= 12
# [q] Offset
# [l] MessageSize
+ $MessageSize # Message
;
}
return;
}
# Generates a template to encode single message within MessageSet
sub _encode_Message {
my ($request, $Key, $Value, $Attributes, $Offset, $v1_format, $Timestamp) = @_;
# v1 format - supported since 0.10.0, introduces Timestamps
my $MagicByte = $v1_format ? 1 : 0;
$Timestamp //= -1;
$Attributes //= $COMPRESSION_NONE; # According to Apache Kafka documentation:
# The lowest 2 bits contain the compression codec used for the message.
# The other bits should be set to 0.
my $key_length = length( $Key );
my $value_length = length( $Value );
my $message_body = pack(
q{cc} # MagicByte
# Attributes
.( $v1_format ? q{a[8]} : q{} ) # Timestamp - v1 only (when MagicByte > 0) (#$_int64_template)
.q{l>} # Key length
.( $key_length ? qq{a[$key_length]} : q{} ) # Key
.q{l>} # Value length
.( $value_length ? qq{a[$value_length]} : q{} ) # Value
,
$MagicByte,
$Attributes,
$v1_format ? ( _pack64( $Timestamp ) ) : (),
$key_length ? ( $key_length, $Key ) : ( $NULL_BYTES_LENGTH ),
$value_length ? ( $value_length, $Value ) : ( $NULL_BYTES_LENGTH ),
);
my $MessageBodySize = length($message_body);
my $MessageSize = $MessageBodySize + 4;
my $data = $request->{data};
$request->{template} .= $_MessageSet_template . qq{l>a[$MessageBodySize]}; # crc + message_body
$request->{len} += $_MessageSet_length + $MessageSize;
push @$data, (_pack64( $Offset ), $MessageSize, crc32( $message_body ), $message_body );
return;
}
# Generates a template to encode MessageSet
sub _encode_MessageSet_array {
my ( $request, $MessageSet_array_ref, $compression_codec ) = @_;
# not sure if it would be good to mix different formats in different messages in the same set
# so detect if we should use v1 format
my $use_v1_format;
foreach my $MessageSet ( @$MessageSet_array_ref ) {
if ( $MessageSet->{Timestamp} ) {
$use_v1_format = 1;
last;
}
}
if ( $compression_codec ) {
my $subrequest = {
data => [],
template => '',
len => 0
};
my ( $Key, $Value );
foreach my $MessageSet ( @$MessageSet_array_ref ) {
_encode_Message( $subrequest,
$Key = $MessageSet->{Key},
$MessageSet->{Value},
$COMPRESSION_NONE,
$PRODUCER_ANY_OFFSET,
$use_v1_format,
$MessageSet->{Timestamp}
);
}
$Value = pack($subrequest->{template}, @{$subrequest->{data}});
$MessageSet_array_ref = [
{
Offset => $PRODUCER_ANY_OFFSET,
Key => $Key,
Value => _compress_data($Value, $compression_codec),
}
];
}
my $subrequest = {
data => [],
template => '',
len => 0
};
foreach my $MessageSet ( @$MessageSet_array_ref ) {
_encode_Message( $subrequest,
$MessageSet->{Key},
$MessageSet->{Value},
$compression_codec // $COMPRESSION_NONE,
$MessageSet->{Offset},
$use_v1_format,
$MessageSet->{Timestamp}
);
}
my $data = $request->{data};
$request->{template} .= q{l>} . $subrequest->{template};
$request->{len} += 4 + $subrequest->{len};
push @$data, ( $subrequest->{len}, @{ $subrequest->{data} } );
return;
}
# Generates a template to decrypt MessageSet
sub _decode_MessageSet_template {
my ( $response ) = @_;
my $MessageSetSize = unpack(
q{x[}.$response->{stream_offset}
.q{]l>}, # MessageSetSize
${ $response->{bin_stream} }
);
$response->{template} .= q{l>}; # MessageSetSize
$response->{stream_offset} += 4; # bytes before Offset
return _decode_MessageSet_sized_template($MessageSetSize, $response);
}
sub _decode_MessageSet_sized_template {
my ( $MessageSetSize, $response ) = @_;
my $bin_stream_length = length ${ $response->{bin_stream} };
my ( $local_template, $MessageSize, $MagicByte, $Key_length, $Value_length );
CREATE_TEMPLATE:
while ( $MessageSetSize ) {
# Not the full MessageSet
# 22 is the minimal size of a v0 message format until (and including) the Key
# Length. If the message version is v1, then it's 22 + 8. We'll check later
last CREATE_TEMPLATE if $MessageSetSize < 22;
# [q] Offset
# [l] MessageSize
# [l] Crc
# [c] MagicByte
# [c] Attributes
# ( [q] Timestamp ) message format v1 only
# [l] Key length
# [l] Value length
$local_template = q{};
MESSAGE_SET:
{
$response->{stream_offset} += 8; # the size of the offset data value
($MessageSize, $MagicByte) = unpack(
q{x[}.$response->{stream_offset}.q{]}
.q{l>} # MessageSize
.q{x[l]} # Crc
.q{c}, # MagicByte
${ $response->{bin_stream} }
);
my $is_v1_msg_format = $MagicByte == 1;
if ($is_v1_msg_format && $MessageSetSize < (22+8)) {
# Not the full MessageSet
$local_template = q{};
last MESSAGE_SET;
}
$local_template .= ( $is_v1_msg_format ? $_Message_template_with_timestamp : $_Message_template );
$response->{stream_offset} += ($is_v1_msg_format ? 18: 10);
$Key_length = unpack(
q{x[}.$response->{stream_offset}
.q{]l>}, # Key length
${ $response->{bin_stream} }
);
$response->{stream_offset} += 4; # bytes before Key or Value length
# [l] Key length
$response->{stream_offset} += $Key_length # bytes before Key
if $Key_length != $NULL_BYTES_LENGTH; # Key
if ( $bin_stream_length >= $response->{stream_offset} + 4 ) { # + [l] Value length
$local_template .= $_Key_or_Value_template
if $Key_length != $NULL_BYTES_LENGTH;
} else {
# Not the full MessageSet
$local_template = q{};
last MESSAGE_SET;
}
$local_template .= q{l>}; # Value length
$Value_length = unpack(
q{x[}.$response->{stream_offset}
.q{]l>}, # Value length
${ $response->{bin_stream} }
);
$response->{stream_offset} += # bytes before Value or next Message
4 # [l] Value length
;
$response->{stream_offset} += $Value_length # bytes before next Message
if $Value_length != $NULL_BYTES_LENGTH; # Value
if ( $bin_stream_length >= $response->{stream_offset} ) {
$local_template .= $_Key_or_Value_template
if $Value_length != $NULL_BYTES_LENGTH;
} else {
# Not the full MessageSet
$local_template = q{};
last MESSAGE_SET;
}
}
if ( $local_template ) {
$response->{template} .= $local_template;
$MessageSetSize -= 12
# [q] Offset
# [l] MessageSize
+ $MessageSize # Message
;
} else {
last CREATE_TEMPLATE;
}
}
return;
}
# Generates a template to encrypt string
sub _encode_string {
my ( $request, $string ) = @_;
if ( $string eq q{} ) {
push( @{ $request->{data} }, 0 );
} else {
my $string_length = length $string;
push( @{ $request->{data} }, $string_length, $string );
$request->{template} .= q{a*}; # string;
$request->{len} += $string_length;
}
return;
}
# Handler for errors
sub _error {
Kafka::Exception::Protocol->throw( throw_args( @_ ) );
return;
}
1;
__END__
=head1 DIAGNOSTICS
In order to achieve better performance, functions of this module do not perform
arguments validation.
=head1 SEE ALSO
The basic operation of the Kafka package modules:
L<Kafka|Kafka> - constants and messages used by the Kafka package modules.
L<Kafka::Connection|Kafka::Connection> - interface to connect to a Kafka cluster.
L<Kafka::Producer|Kafka::Producer> - interface for producing client.
L<Kafka::Consumer|Kafka::Consumer> - interface for consuming client.
L<Kafka::Message|Kafka::Message> - interface to access Kafka message
properties.
L<Kafka::Int64|Kafka::Int64> - functions to work with 64 bit elements of the
protocol on 32 bit systems.
L<Kafka::Protocol|Kafka::Protocol> - functions to process messages in the
Apache Kafka's Protocol.
L<Kafka::IO|Kafka::IO> - low-level interface for communication with Kafka server.
L<Kafka::Exceptions|Kafka::Exceptions> - module designated to handle Kafka exceptions.
L<Kafka::Internals|Kafka::Internals> - internal constants and functions used
by several package modules.
A wealth of detail about the Apache Kafka and the Kafka Protocol:
Main page at L<http://kafka.apache.org/>
Kafka Protocol at L<https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol>
=head1 SOURCE CODE
Kafka package is hosted on GitHub:
=head1 AUTHOR
Sergey Gladkov
Please use GitHub project link above to report problems or contact authors.
=head1 CONTRIBUTORS
Alexander Solovey
Jeremy Jordan
Sergiy Zuban
Vlad Marchenko
Damien Krotkine
Greg Franklin
=head1 COPYRIGHT AND LICENSE
Copyright (C) 2012-2017 by TrackingSoft LLC.
This package is free software; you can redistribute it and/or modify it under
the same terms as Perl itself. See I<perlartistic> at
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.
=cut