—————————————————————————package
Kafka::Protocol;
=head1 NAME
Kafka::Protocol - Functions to process messages in the Apache Kafka protocol.
=head1 VERSION
This documentation refers to C<Kafka::Protocol> version 1.08 .
=cut
use
5.010;
use
strict;
use
warnings;
our
$VERSION
=
'v1.08'
;
import
)
;
our
@EXPORT_OK
=
qw(
decode_fetch_response
decode_metadata_response
decode_offset_response
decode_produce_response
decode_api_versions_response
decode_find_coordinator_response
decode_offsetcommit_response
decode_offsetfetch_response
decode_saslhandshake_response
encode_fetch_request
encode_metadata_request
encode_offset_request
encode_produce_request
encode_api_versions_request
encode_find_coordinator_request
encode_offsetcommit_request
encode_offsetfetch_request
encode_saslhandshake_request
_decode_MessageSet_template
_decode_MessageSet_array
_encode_Message
_encode_MessageSet_array
_encode_string
_pack64
_unpack64
_verify_string
$DEFAULT_APIVERSION
$IMPLEMENTED_APIVERSIONS
$BAD_OFFSET
$COMPRESSION_CODEC_MASK
$CONSUMERS_REPLICAID
$NULL_BYTES_LENGTH
$_int64_template
)
;
use
Compress::Snappy ();
use
Compress::LZ4Frame ();
use
Const::Fast;
_ARRAY
_HASH
_SCALAR
_STRING
)
;
dualvar
)
;
use
String::CRC32;
use
Try::Tiny;
$BITS64
$BLOCK_UNTIL_IS_COMMITTED
$COMPRESSION_GZIP
$COMPRESSION_NONE
$COMPRESSION_SNAPPY
$COMPRESSION_LZ4
$DEFAULT_MAX_WAIT_TIME
%ERROR
$ERROR_COMPRESSION
$ERROR_MISMATCH_ARGUMENT
$ERROR_REQUEST_OR_RESPONSE
$NOT_SEND_ANY_RESPONSE
$WAIT_WRITTEN_TO_LOCAL_LOG
)
;
use
Kafka::Exceptions;
$APIKEY_FETCH
$APIKEY_METADATA
$APIKEY_OFFSET
$APIKEY_PRODUCE
$APIKEY_APIVERSIONS
$APIKEY_FINDCOORDINATOR
$APIKEY_OFFSETCOMMIT
$APIKEY_OFFSETFETCH
$APIKEY_SASLHANDSHAKE
$PRODUCER_ANY_OFFSET
format_message
debug_level
)
;
=head1 SYNOPSIS
use 5.010;
use strict;
use warnings;
use Data::Compare;
use Kafka qw(
$COMPRESSION_NONE
$ERROR_NO_ERROR
$REQUEST_TIMEOUT
$WAIT_WRITTEN_TO_LOCAL_LOG
);
use Kafka::Internals qw(
$PRODUCER_ANY_OFFSET
);
use Kafka::Protocol qw(
decode_produce_response
encode_produce_request
);
# a encoded produce request hex stream
my $encoded = pack( q{H*}, '00000049000000000000000400000001000005dc0000000100076d79746f7069630000000100000000000000200000000000000000000000148dc795a20000ffffffff0000000648656c6c6f21' );
# a decoded produce request
my $decoded = {
CorrelationId => 4,
ClientId => q{},
RequiredAcks => $WAIT_WRITTEN_TO_LOCAL_LOG,
Timeout => $REQUEST_TIMEOUT * 100, # ms
topics => [
{
TopicName => 'mytopic',
partitions => [
{
Partition => 0,
MessageSet => [
{
Offset => $PRODUCER_ANY_OFFSET,
MagicByte => 0,
Attributes => $COMPRESSION_NONE,
Key => q{},
Value => 'Hello!',
},
],
},
],
},
],
};
my $encoded_request = encode_produce_request( $decoded );
say 'encoded correctly' if $encoded_request eq $encoded;
# a encoded produce response hex stream
$encoded = pack( q{H*}, '00000023000000040000000100076d79746f706963000000010000000000000000000000000000' );
# a decoded produce response
$decoded = {
CorrelationId => 4,
topics => [
{
TopicName => 'mytopic',
partitions => [
{
Partition => 0,
ErrorCode => $ERROR_NO_ERROR,
Offset => 0,
},
],
},
],
};
my $decoded_response = decode_produce_response( \$encoded );
say 'decoded correctly' if Compare( $decoded_response, $decoded );
# more examples, see t/*_decode_encode.t
=head1 DESCRIPTION
This module is not a user module.
In order to achieve better performance,
functions of this module do not perform arguments validation.
The main features of the C<Kafka::Protocol> module are:
=over 3
=item *
Supports parsing the Apache Kafka protocol.
=item *
Supports Apache Kafka Requests and Responses (PRODUCE and FETCH).
Within this package we currently support
access to PRODUCE, FETCH, OFFSET, METADATA Requests and Responses.
=item *
Support for working with 64 bit elements of the Kafka protocol on 32 bit systems.
=back
=cut
# A Guide To The Kafka Protocol 0.8:
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
#
# -- Protocol Primitive Types
# int8, int16, int32, int64
# Signed integers
# stored in big endian order.
# bytes, string
# consist of a signed integer
# giving a length N
# followed by N bytes of content.
# A length of -1 indicates null.
# string uses an int16 for its size,
# and bytes uses an int32.
# Arrays
# These will always be encoded as an int32 size containing the length N
# followed by N repetitions of the structure
# which can itself be made up of other primitive types.
#
# -- N.B.
# - The response will always match the paired request
# - One structure common to both the produce and fetch requests is the message set format.
# - MessageSets are not preceded by an int32 like other array elements in the protocol.
# - A message set is also the unit of compression in Kafka,
# and we allow messages to recursively contain compressed message sets.
#
# -- Protocol Fields
# ApiKey => int16 That identifies the API being invoked
# ApiVersion => int16 This is a numeric version number for this api.
# Currently the supported version for all APIs is 0.
# Attributes => int8 Metadata attributes about the message.
# The lowest 2 bits contain the compression codec used for the message.
# ClientId => string This is a user supplied identifier for the client application.
# CorrelationId => int32 This is a user-supplied integer.
# It will be passed back in the response by the server, unmodified.
# It is useful for matching request and response between the client and server.
# Crc => int32 The CRC32 of the remainder of the message bytes.
# ErrorCode => int16 The error from this partition, if any.
# Errors are given on a per-partition basis
# because a given partition may be unavailable or maintained on a different host,
# while others may have successfully accepted the produce request.
# FetchOffset => int64 The offset to begin this fetch from.
# HighwaterMarkOffset => int64 The offset at the end of the log for this partition.
# This can be used by the client to determine how many messages behind the end of the log they are.
# - 0.8 documents: Replication design
# The high watermark is the offset of the last committed message.
# Each log is periodically synced to disks.
# Data before the flushed offset is guaranteed to be persisted on disks.
# As we will see, the flush offset can be before or after high watermark.
# - 0.7 documents: Wire protocol
# If the last segment file for the partition is not empty and was modified earlier than TIME,
# it will return both the first offset for that segment and the high water mark.
# The high water mark is not the offset of the last message,
# but rather the offset that the next message sent to the partition will be written to.
# Host => string The brokers hostname
# Isr => [ReplicaId] The set subset of the replicas that are "caught up" to the leader - a set of in-sync replicas (ISR)
# Key => bytes An optional message key
# The key can be null.
# Leader => int32 The node id for the kafka broker currently acting as leader for this partition.
# If no leader exists because we are in the middle of a leader election this id will be -1.
# MagicByte => int8 A version id used to allow backwards compatible evolution of the message binary format.
# MaxBytes => int32 The maximum bytes to include in the message set for this partition.
# MaxNumberOfOffsets => int32 Kafka here is return up to 'MaxNumberOfOffsets' of offsets
# MaxWaitTime => int32 The maximum amount of time (ms)
# to block waiting
# if insufficient data is available at the time the request is issued.
# MessageSetSize => int32 The size in bytes of the message set for this partition
# MessageSize => int32 The size of the subsequent request or response message in bytes
# MinBytes => int32 The minimum number of bytes of messages that must be available to give a response.
# If the client sets this to 0 the server will always respond immediately.
# If this is set to 1,
# the server will respond as soon
# as at least one partition
# has at least 1 byte of data
# or the specified timeout occurs.
# By setting higher values
# in combination with the timeout
# for reading only large chunks of data
# NodeId => int32 The id of the broker.
# This must be set to a unique integer for each broker.
# Offset => int64 The offset used in kafka as the log sequence number.
# When the producer is sending messages it doesn't actually know the offset
# and can fill in any value here it likes.
# Partition => int32 The id of the partition the fetch is for
# or the partition that data is being published to
# or the partition this response entry corresponds to.
# Port => int32 The brokers port
# ReplicaId => int32 Indicates the node id of the replica initiating this request.
# Normal client consumers should always specify this as -1 as they have no node id.
# Replicas => [ReplicaId] The set of alive nodes that currently acts as slaves for the leader for this partition.
# RequiredAcks => int16 Indicates how many acknowledgements the servers should receive
# before responding to the request.
# If it is 0 the server does not send any response.
# If it is 1, the server will wait the data is written to the local log before sending a response.
# If it is -1 the server will block until the message is committed by all in sync replicas before sending a response.
# Size => int32 The size of the subsequent request or response message in bytes
# Time => int64 Used to ask for all messages before a certain time (ms).
# There are two special values.
# Specify -1 to receive the latest offset (this will only ever return one offset).
# Specify -2 to receive the earliest available offsets.
# Timeout => int32 This provides a maximum time (ms) the server can await the receipt
# of the number of acknowledgements in RequiredAcks.
# The timeout is not an exact limit on the request time for a few reasons:
# (1) it does not include network latency,
# (2) the timer begins at the beginning of the processing of this request
# so if many requests are queued due to server overload
# that wait time will not be included,
# (3) we will not terminate a local write
# so if the local write time exceeds this timeout it will not be respected.
# To get a hard timeout of this type the client should use the socket timeout.
# TopicName => string The name of the topic.
# Value => bytes The actual message contents
# Kafka supports recursive messages in which case this may itself contain a message set.
# The message can be null.
our
$_int64_template
;
# Used to unpack a 64 bit value
if
(
$BITS64
) {
$_int64_template
=
q{q>}
;
# unpack a big-endian signed quad (64-bit) value on 64 bit systems.
*_unpack64
=
sub
{
$_
[0] };
# pack a big-endian signed quad (64-bit) value on 64 bit systems.
*_pack64
=
sub
{
pack
(
q{q>}
,
$_
[0] ) };
}
else
{
eval
q{ require Kafka::Int64; }
## no critic
or
die
"Cannot load Kafka::Int64 : $@"
;
$_int64_template
=
q{a[8]}
;
# unpack a big-endian signed quad (64-bit) value on 32 bit systems.
*_unpack64
= \
&Kafka::Int64::unpackq
;
# pack a big-endian signed quad (64-bit) value on 32 bit systems.
*_pack64
= \
&Kafka::Int64::packq
;
}
=head2 EXPORT
The following constants are available for export
=cut
=head3 C<$DEFAULT_APIVERSION>
The default API version that will be used as fallback, if it's not possible to
detect what the Kafka server supports. Only Kafka servers > 0.10.0.0 can be
queried to get which API version they implements. On Kafka servers 0.8.x and
0.9.x, the protocol will default to use $DEFAULT_APIVERSION. Currently its
value is '0'
=cut
const
our
$DEFAULT_APIVERSION
=> 0;
# HashRef, keys are APIKEYs, values are the api version that this protocol
# implements. If not populated, the connection will use the $DEFAULT_APIVERSION
our
$IMPLEMENTED_APIVERSIONS
= {};
# Attributes
# According to Apache Kafka documentation:
# Attributes - Metadata attributes about the message.
# The lowest 2 bits contain the compression codec used for the message.
const
our
$COMPRESSION_CODEC_MASK
=> 0b11;
=head3 C<$CONSUMERS_REPLICAID>
According to Apache Kafka documentation: 'ReplicaId - Normal client consumers should always specify this as -1 as they have no node id.'
=cut
const
our
$CONSUMERS_REPLICAID
=> -1;
=head3 C<$NULL_BYTES_LENGTH>
According to Apache Kafka documentation: 'Protocol Primitive Types: ... bytes, string - A length of -1 indicates null.'
=cut
const
our
$NULL_BYTES_LENGTH
=> -1;
=head3 C<$BAD_OFFSET>
According to Apache Kafka documentation: 'Offset - When the producer is sending messages it doesn't actually know the offset
and can fill in any value here it likes.'
=cut
const
our
$BAD_OFFSET
=> -1;
my
(
$_Request_header_template
,
$_Request_header_length
) = (
q{l>s>s>l>s>}
,
# Size
# 2 ApiKey
# 2 ApiVersion
# 4 CorrelationId
# 2 ClientId length
10
# 'Size' is not included in the calculation of length
);
my
(
$_ProduceRequest_header_template
,
$_ProduceRequest_header_length
) = (
q{s>l>l>}
,
# 2 RequiredAcks
# 4 Timeout
# 4 topics array size
10
);
my
(
$_MessageSet_template
,
$_MessageSet_length
) = (
# qq{${_int64_template}l>},
q{a[8]l>}
,
# 8 Offset
# 4 MessageSize
12
);
my
(
$_FetchRequest_header_template
,
$_FetchRequest_header_length
) = (
q{l>l>l>l>}
,
# 4 ReplicaId
# 4 MaxWaitTime
# 4 MinBytes
# 4 topics array size
16
);
my
(
$_FetchRequest_header_template_v3
,
$_FetchRequest_header_length_v3
) = (
q{l>l>l>l>l>}
,
# 4 ReplicaId
# 4 MaxWaitTime
# 4 MinBytes
# 4 MaxBytes
# 4 topics array size
20
);
my
(
$_FetchRequest_body_template
,
$_FetchRequest_body_length
) = (
# qq{l>${_int64_template}l>},
q{l>a[8]l>}
,
# 4 Partition
# 8 FetchOffset
# 4 MaxBytes
16
);
my
(
$_OffsetRequest_header_template
,
$_OffsetRequest_header_length
) = (
q{l>l>}
,
# 4 ReplicaId
# 4 topics array size
8
);
my
(
$_OffsetRequest_body_template
,
$_OffsetRequest_body_length
) = (
# qq{l>${_int64_template}l>},
q{l>a[8]l>}
,
# 4 Partition
# 8 Time
# 4 MaxNumberOfOffsets
16
);
my
(
$_OffsetRequest_body_template_v1
,
$_OffsetRequest_body_length_v1
) = (
# qq{l>${_int64_template}},
q{l>a[8]}
,
# 4 Partition
# 8 Time
12
);
my
(
$_FetchResponse_header_template
,
$_FetchResponse_header_length
) = (
q{x[l]l>l>}
,
# Size (skip)
# 4 CorrelationId
# 4 topics array size
8
);
my
(
$_FetchResponse_header_template_v1
,
$_FetchResponse_header_length_v1
) = (
q{x[l]l>l>l>}
,
# Size (skip)
# 4 CorrelationId
# 4 throttle_time_ms
# 4 topics array size
12
);
my
(
$_Message_template
,
$_Message_length
) = (
qq(${_int64_template}l>l>ccl>)
,
# 8 Offset
# MessageSize
# Crc
# MagicByte
# Attributes
# Key length
8
# Only Offset length
);
my
(
$_Message_template_with_timestamp
,
$_Message_length_with_timestamp
) = (
qq(${_int64_template}l>l>cc${_int64_template}l>)
,
# 8 Offset
# MessageSize
# Crc
# MagicByte
# Attributes
# Timestamp
# Key length
8
# Only Offset length
);
my
(
$_FetchResponse_topic_body_template
,
$_FetchResponse_topic_body_length
)= (
qq(s>/al>l>s>${_int64_template})
,
# TopicName
# partitions array size
# 4 Partition
# 2 ErrorCode
# 8 HighwaterMarkOffset
14
# without TopicName and partitions array size
);
my
$_Key_or_Value_template
=
q{X[l]l>/a}
;
# Key or Value
#-- public functions -----------------------------------------------------------
=head2 FUNCTIONS
The following functions are available for C<Kafka::MockProtocol> module.
=cut
=head3 C<encode_api_versions_request( $ApiVersions_Request )>
Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.
This function takes the following arguments:
=over 3
=item C<$ApiVersions_Request>
C<$ApiVersions_Request> is a reference to the hash representing the structure
of the APIVERSIONS Request. it contains CorrelationId, ClientId (can be empty
string), and ApiVersion (must be 0)
=back
=cut
$IMPLEMENTED_APIVERSIONS
->{
$APIKEY_APIVERSIONS
} = 0;
sub
encode_api_versions_request {
my
(
$ApiVersions_Request
) =
@_
;
my
@data
;
my
$request
= {
# template => '...',
# len => ...,
data
=> \
@data
,
};
_encode_request_header(
$request
,
$APIKEY_APIVERSIONS
,
$ApiVersions_Request
);
# Size
# ApiKey
# ApiVersion
# CorrelationId
# ClientId
return
pack
(
$request
->{template},
$request
->{len},
@data
);
}
my
$_decode_api_version_response_template
=
q{x[l]l>s>l>X[l]l>/(s>s>s>)}
;
# x[l] # Size (skip)
# l> # CorrelationId
# s> # ErrorCode
# l> # ApiVersions array size
# X[l]
# l>/( # ApiVersions array
# s> # ApiKey
# s> # MinVersion
# s> # MaxVersion
# )
=head3 C<decode_api_versions_response( $bin_stream_ref )>
Decodes the argument and returns a reference to the hash representing
the structure of the APIVERSIONS Response.
This function takes the following arguments:
=over 3
=item C<$bin_stream_ref>
C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.
=back
=cut
sub
decode_api_versions_response {
my
(
$bin_stream_ref
) =
@_
;
my
@data
=
unpack
(
$_decode_api_version_response_template
,
$$bin_stream_ref
);
my
$i
= 0;
my
$ApiVersions_Response
= {};
$ApiVersions_Response
->{CorrelationId} =
$data
[
$i
++ ];
# CorrelationId
$ApiVersions_Response
->{ErrorCode} =
$data
[
$i
++ ];
# ErrorCode
my
$ApiVersions_array
=
$ApiVersions_Response
->{ApiVersions} = [];
my
$ApiVersions_array_size
=
$data
[
$i
++ ];
# ApiVersions array size
while
(
$ApiVersions_array_size
-- ) {
push
(
@$ApiVersions_array
, {
ApiKey
=>
$data
[
$i
++ ],
# ApiKey
MinVersion
=>
$data
[
$i
++ ],
# MinVersion
MaxVersion
=>
$data
[
$i
++ ],
# MaxVersion
}
);
}
return
$ApiVersions_Response
;
}
=head3 C<encode_find_coordinator_request( $FindCoordinator_Request )>
Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.
This function takes the following arguments:
=over 3
=item C<$FindCoordinator_Request>
C<$FindCoordinator_Request> is a reference to the hash representing the structure
of the FINDCOORDINATOR Request. it contains CorrelationId, ClientId (can be empty
string), CoordinatorKey and CoordinatorType (for version 1 of protocol)
=back
=cut
$IMPLEMENTED_APIVERSIONS
->{
$APIKEY_FINDCOORDINATOR
} = 1;
sub
encode_find_coordinator_request {
my
(
$FindCoordinator_Request
) =
@_
;
my
@data
;
my
$request
= {
# template => '...',
# len => ...,
data
=> \
@data
,
};
my
$api_version
=
_encode_request_header(
$request
,
$APIKEY_FINDCOORDINATOR
,
$FindCoordinator_Request
);
# Size
# ApiKey
# ApiVersion
# CorrelationId
# ClientId
my
$coordinator_key
=
$FindCoordinator_Request
->{CoordinatorKey};
my
$coordinator_type
=
$FindCoordinator_Request
->{CoordinatorType};
$request
->{template} .=
q{s>}
;
# string length
$request
->{len} += 2;
_encode_string(
$request
,
$coordinator_key
);
# CoordinatorKey (GroupId for version 0)
if
(
$api_version
>= 1) {
# CoordinatorType (0 for groups)
$request
->{template} .=
q{c>}
;
$request
->{len} += 1;
push
( @{
$request
->{data} },
$coordinator_type
);
}
return
pack
(
$request
->{template},
$request
->{len},
@data
);
}
my
$_decode_find_protocol_response_template
=
q{x[l]l>s>l>s>/al>}
;
# x[l] # Size (skip)
# l> # CorrelationId
# s> # ErrorCode
# l> # NodeId
# s>/a # Host
# l> # Port
my
$_decode_find_protocol_response_template_v1
=
q{x[l]l>l>s>l>s>/al>}
;
# x[l] # Size (skip)
# l> # CorrelationId
# l> # throttle_time_ms
# s> # ErrorCode
# s>/a # ErrorMessage
# l> # NodeId
# s>/a # Host
# l> # Port
=head3 C<decode_find_coordinator_response( $bin_stream_ref )>
Decodes the argument and returns a reference to the hash representing
the structure of the FINDCOORDINATOR Response.
This function takes the following arguments:
=over 3
=item C<$bin_stream_ref>
C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.
=back
=cut
sub
decode_find_coordinator_response {
my
(
$bin_stream_ref
,
$api_version
) =
@_
;
$api_version
//=
$DEFAULT_APIVERSION
;
my
$is_v1
=
$api_version
== 1;
my
@data
=
unpack
(
$is_v1
?
$_decode_find_protocol_response_template_v1
:
$_decode_find_protocol_response_template
,
$$bin_stream_ref
);
my
$i
= 0;
my
$FindCoordinator_Response
= {};
$FindCoordinator_Response
->{CorrelationId} =
$data
[
$i
++ ];
if
(
$is_v1
) {
$FindCoordinator_Response
->{ThrottleTimeMs} =
$data
[
$i
++ ];
# only v1
}
$FindCoordinator_Response
->{ErrorCode} =
$data
[
$i
++ ];
if
(
$is_v1
) {
$FindCoordinator_Response
->{ErrorMessage} =
$data
[
$i
++ ];
# only v1
}
$FindCoordinator_Response
->{NodeId} =
$data
[
$i
++ ];
$FindCoordinator_Response
->{Host} =
$data
[
$i
++ ];
$FindCoordinator_Response
->{Port} =
$data
[
$i
++ ];
return
$FindCoordinator_Response
;
}
# PRODUCE Request --------------------------------------------------------------
=head3 C<encode_produce_request( $Produce_Request, $compression_codec )>
Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.
This function takes the following arguments:
=over 3
=item C<$Produce_Request>
C<$Produce_Request> is a reference to the hash representing
the structure of the PRODUCE Request (examples see C<t/*_decode_encode.t>).
=item C<$compression_codec>
Optional.
C<$compression_codec> sets the required type of C<$messages> compression,
if the compression is desirable.
Supported codecs:
L<$COMPRESSION_NONE|Kafka/$COMPRESSION_NONE>,
L<$COMPRESSION_GZIP|Kafka/$COMPRESSION_GZIP>,
L<$COMPRESSION_SNAPPY|Kafka/$COMPRESSION_SNAPPY>,
L<$COMPRESSION_LZ4|Kafka/$COMPRESSION_LZ4>.
NOTE: $COMPRESSION_LZ4 requires Kafka 0.10 or higher, as initial implementation of LZ4 in Kafka did not follow the standard LZ4 framing specification.
=back
=cut
$IMPLEMENTED_APIVERSIONS
->{
$APIKEY_PRODUCE
} = 2;
sub
encode_produce_request {
my
(
$Produce_Request
,
$compression_codec
) =
@_
;
my
@data
;
my
$request
= {
# template => '...',
# len => ...,
data
=> \
@data
,
};
_encode_request_header(
$request
,
$APIKEY_PRODUCE
,
$Produce_Request
);
# Size
# ApiKey
# ApiVersion
# CorrelationId
# ClientId
my
$topics_array
=
$Produce_Request
->{topics};
push
(
@data
,
$Produce_Request
->{RequiredAcks},
# RequiredAcks
$Produce_Request
->{Timeout},
# Timeout
scalar
(
@$topics_array
),
# topics array size
);
$request
->{template} .=
$_ProduceRequest_header_template
;
$request
->{len} +=
$_ProduceRequest_header_length
;
foreach
my
$topic
(
@$topics_array
) {
$request
->{template} .=
q{s>}
;
# string length
$request
->{len} += 2;
_encode_string(
$request
,
$topic
->{TopicName} );
# TopicName
my
$partitions_array
=
$topic
->{partitions};
push
(
@data
,
scalar
(
@$partitions_array
) );
$request
->{template} .=
q{l>}
;
# partitions array size
$request
->{len} += 4;
foreach
my
$partition
(
@$partitions_array
) {
push
(
@data
,
$partition
->{Partition} );
$request
->{template} .=
q{l>}
;
# Partition
$request
->{len} += 4;
_encode_MessageSet_array(
$request
,
$partition
->{MessageSet},
$compression_codec
);
}
}
return
pack
(
$request
->{template},
$request
->{len},
@data
);
}
# PRODUCE Response -------------------------------------------------------------
my
$_decode_produce_response_template
=
qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}
))};
# x[l] # Size (skip)
# l> # CorrelationId
# l> # topics array size
# X[l]
# l>/( # topics array
# s>/a # TopicName
# l> # partitions array size
# X[l]
# l>/( # partitions array
# l> # Partition
# s> # ErrorCode
# $_int64_template # Offset
# )
# )
my
$_decode_produce_response_template_v1
=
qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}
))l>};
# x[l] # Size (skip)
# l> # CorrelationId
# l> # topics array size
# X[l]
# l>/( # topics array
# s>/a # TopicName
# l> # partitions array size
# X[l]
# l>/( # partitions array
# l> # Partition
# s> # ErrorCode
# $_int64_template # Offset
# )
# )
# l> # Throttle_Time_Ms
my
$_decode_produce_response_template_v2
=
qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}
${_int64_template}))l>};
# x[l] # Size (skip)
# l> # CorrelationId
# l> # topics array size
# X[l]
# l>/( # topics array
# s>/a # TopicName
# l> # partitions array size
# X[l]
# l>/( # partitions array
# l> # Partition
# s> # ErrorCode
# $_int64_template # Offset
# $_int64_template # Log_Append_Time
# )
# )
# l> # Throttle_Time_Ms
=head3 C<decode_produce_response( $bin_stream_ref )>
Decodes the argument and returns a reference to the hash representing
the structure of the PRODUCE Response (examples see C<t/*_decode_encode.t>).
This function takes the following arguments:
=over 3
=item C<$bin_stream_ref>
C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.
=back
=cut
sub
decode_produce_response {
my
(
$bin_stream_ref
,
$api_version
) =
@_
;
$api_version
//=
$DEFAULT_APIVERSION
;
my
$is_v1
=
$api_version
== 1;
my
$is_v2
=
$api_version
== 2;
my
@data
=
unpack
(
$is_v1
?
$_decode_produce_response_template_v1
:
$is_v2
?
$_decode_produce_response_template_v2
:
$_decode_produce_response_template
,
$$bin_stream_ref
);
my
$i
= 0;
my
$Produce_Response
= {};
$Produce_Response
->{CorrelationId} =
$data
[
$i
++ ];
# CorrelationId
my
$topics_array
=
$Produce_Response
->{topics} = [];
my
$topics_array_size
=
$data
[
$i
++ ];
# topics array size
while
(
$topics_array_size
-- ) {
my
$topic
= {
TopicName
=>
$data
[
$i
++ ],
};
my
$partitions_array
=
$topic
->{partitions} = [];
my
$partitions_array_size
=
$data
[
$i
++ ];
# partitions array size
while
(
$partitions_array_size
-- ) {
my
$partition
= {
Partition
=>
$data
[
$i
++ ],
# Partition
ErrorCode
=>
$data
[
$i
++ ],
# ErrorCode
Offset
=> _unpack64(
$data
[
$i
++ ] ),
# Offset
(
$is_v2
? (
Log_Append_Time
=> _unpack64(
$data
[
$i
++ ] )) : () ),
# Log_Append_Time
};
push
(
@$partitions_array
,
$partition
);
}
push
(
@$topics_array
,
$topic
);
}
defined
$data
[
$i
]
and
$Produce_Response
->{Throttle_Time_Ms} =
$data
[
$i
++ ];
return
$Produce_Response
;
}
# FETCH Request ----------------------------------------------------------------
=head3 C<encode_fetch_request( $Fetch_Request )>
Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.
This function takes the following arguments:
=over 3
=item C<$Fetch_Request>
C<$Fetch_Request> is a reference to the hash representing
the structure of the FETCH Request (examples see C<t/*_decode_encode.t>).
=back
=cut
# version 0, 1, 2 have the same request protocol.
$IMPLEMENTED_APIVERSIONS
->{
$APIKEY_FETCH
} = 3;
sub
encode_fetch_request {
my
(
$Fetch_Request
) =
@_
;
my
@data
;
my
$request
= {
# template => '...',
# len => ...,
data
=> \
@data
,
};
my
$api_version
= _encode_request_header(
$request
,
$APIKEY_FETCH
,
$Fetch_Request
);
# Size
# ApiKey
# ApiVersion
# CorrelationId
# ClientId
my
$is_v3
=
$api_version
== 3;
push
(
@data
,
$CONSUMERS_REPLICAID
);
# ReplicaId
my
$topics_array
=
$Fetch_Request
->{topics};
push
(
@data
,
$Fetch_Request
->{MaxWaitTime},
# MaxWaitTime
$Fetch_Request
->{MinBytes},
# MinBytes
(
$is_v3
?
$Fetch_Request
->{MaxBytes} : () ),
# MaxBytes (version 3 only)
scalar
(
@$topics_array
),
# topics array size
);
if
(
$is_v3
) {
$request
->{template} .=
$_FetchRequest_header_template_v3
;
$request
->{len} +=
$_FetchRequest_header_length_v3
;
}
else
{
$request
->{template} .=
$_FetchRequest_header_template
;
$request
->{len} +=
$_FetchRequest_header_length
;
}
foreach
my
$topic
(
@$topics_array
) {
$request
->{template} .=
q{s>}
;
# string length
$request
->{len} += 2;
_encode_string(
$request
,
$topic
->{TopicName} );
# TopicName
my
$partitions_array
=
$topic
->{partitions};
push
(
@data
,
scalar
(
@$partitions_array
) );
$request
->{template} .=
q{l>}
;
# partitions array size
$request
->{len} += 4;
foreach
my
$partition
(
@$partitions_array
) {
push
(
@data
,
$partition
->{Partition},
# Partition
_pack64(
$partition
->{FetchOffset} ),
# FetchOffset
$partition
->{MaxBytes},
# MaxBytes
);
$request
->{template} .=
$_FetchRequest_body_template
;
$request
->{len} +=
$_FetchRequest_body_length
;
}
}
return
pack
(
$request
->{template},
$request
->{len},
@data
);
}
# FETCH Response ---------------------------------------------------------------
=head3 C<decode_fetch_response( $bin_stream_ref )>
Decodes the argument and returns a reference to the hash representing
the structure of the FETCH Response (examples see C<t/*_decode_encode.t>).
This function takes the following arguments:
=over 3
=item C<$bin_stream_ref>
C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.
=back
=cut
sub
decode_fetch_response {
my
(
$bin_stream_ref
,
$api_version
) =
@_
;
$api_version
//=
$DEFAULT_APIVERSION
;
my
$is_v1
=
$api_version
== 1;
my
$is_v2
=
$api_version
== 2;
my
$is_v3
=
$api_version
== 3;
# According to Apache Kafka documentation:
# As an optimization the server is allowed to return a partial message at the end of the message set.
# Clients should handle this case.
# NOTE: look inside _decode_MessageSet_template and _decode_MessageSet_array
my
@data
;
my
$response
= {
# template => '...',
# stream_offset => ...,
data
=> \
@data
,
bin_stream
=>
$bin_stream_ref
,
};
_decode_fetch_response_template(
$response
,
$api_version
);
@data
=
unpack
(
$response
->{template},
$$bin_stream_ref
);
my
$i
= 0;
my
$Fetch_Response
= {};
$Fetch_Response
->{CorrelationId} =
$data
[
$i
++ ];
# CorrelationId
if
(
$is_v1
||
$is_v2
||
$is_v3
) {
$Fetch_Response
->{ThrottleTimeMs} =
$data
[
$i
++ ];
# ( ThrottleTimeMs ) only v1 and above
}
my
$topics_array
=
$Fetch_Response
->{topics} = [];
my
$topics_array_size
=
$data
[
$i
++ ];
# topics array size
while
(
$topics_array_size
-- ) {
my
$topic
= {
TopicName
=> $ data[
$i
++ ],
# TopicName
};
my
$partitions_array
=
$topic
->{partitions} = [];
my
$partitions_array_size
=
$data
[
$i
++ ];
# partitions array size
my
(
$MessageSetSize
,
$MessageSet_array
);
while
(
$partitions_array_size
-- ) {
my
$partition
= {
Partition
=>
$data
[
$i
++ ],
# Partition
ErrorCode
=>
$data
[
$i
++ ],
# ErrorCode
HighwaterMarkOffset
=> _unpack64(
$data
[
$i
++ ] ),
# HighwaterMarkOffset
};
$MessageSetSize
=
$data
[
$i
++ ];
# MessageSetSize
$MessageSet_array
=
$partition
->{MessageSet} = [];
_decode_MessageSet_array(
$response
,
$MessageSetSize
, \
$i
,
$MessageSet_array
);
push
(
@$partitions_array
,
$partition
);
}
push
(
@$topics_array
,
$topic
);
}
return
$Fetch_Response
;
}
# OFFSET Request ---------------------------------------------------------------
=head3 C<encode_offset_request( $Offset_Request )>
Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.
This function takes the following arguments:
=over 3
=item C<$Offset_Request>
C<$Offset_Request> is a reference to the hash representing
the structure of the OFFSET Request (examples see C<t/*_decode_encode.t>).
=back
=cut
$IMPLEMENTED_APIVERSIONS
->{
$APIKEY_OFFSET
} = 1;
sub
encode_offset_request {
my
(
$Offset_Request
) =
@_
;
my
@data
;
my
$request
= {
# template => '...',
# len => ...,
data
=> \
@data
,
};
my
$api_version
= _encode_request_header(
$request
,
$APIKEY_OFFSET
,
$Offset_Request
);
# Size
# ApiKey
# ApiVersion
# CorrelationId
# ClientId
my
$is_v1
=
$api_version
== 1;
my
$topics_array
=
$Offset_Request
->{topics};
push
(
@data
,
$CONSUMERS_REPLICAID
,
# ReplicaId
scalar
(
@$topics_array
),
# topics array size
);
$request
->{template} .=
$_OffsetRequest_header_template
;
$request
->{len} +=
$_OffsetRequest_header_length
;
my
$body_template
=
$is_v1
?
$_OffsetRequest_body_template_v1
:
$_OffsetRequest_body_template
;
my
$body_length
=
$is_v1
?
$_OffsetRequest_body_length_v1
:
$_OffsetRequest_body_length
;
foreach
my
$topic
(
@$topics_array
) {
$request
->{template} .=
q{s>}
;
# string length
$request
->{len} += 2;
_encode_string(
$request
,
$topic
->{TopicName} );
# TopicName
my
$partitions_array
=
$topic
->{partitions};
push
(
@data
,
scalar
(
@$partitions_array
) );
$request
->{template} .=
q{l>}
;
# partitions array size
$request
->{len} += 4;
# [l] partitions array size
foreach
my
$partition
(
@$partitions_array
) {
push
(
@data
,
$partition
->{Partition},
# Partition
_pack64(
$partition
->{Time} ),
# Time
$is_v1
? () :
$partition
->{MaxNumberOfOffsets},
# MaxNumberOfOffsets
);
$request
->{template} .=
$body_template
;
$request
->{len} +=
$body_length
;
}
}
# say STDERR $request->{template};
# say STDERR HexDump($_) foreach @data; use Data::HexDump;
return
pack
(
$request
->{template},
$request
->{len},
@data
);
}
# OFFSET Response --------------------------------------------------------------
my
$_decode_offset_response_template
=
qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>l>X[l]l>/(${_int64_template}
)))};
# x[l] # Size (skip)
# l> # CorrelationId
# l> # topics array size
# X[l]
# l>/( # topics array
# s>/a # TopicName
# l> # PartitionOffsets array size
# X[l]
# l>/( # PartitionOffsets array
# l> # Partition
# s> # ErrorCode
# l> # Offset array size
# X[l]
# l>/( # Offset array
# $_int64_template # Offset
# )
# )
# )
my
$_decode_offset_response_template_v1
=
qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}
${_int64_template}))};
# x[l] # Size (skip)
# l> # CorrelationId
# l> # topics array size
# X[l]
# l>/( # topics array
# s>/a # TopicName
# l> # PartitionOffsets array size
# X[l]
# l>/( # PartitionOffsets array
# l> # Partition
# s> # ErrorCode
# $_int64_template # Timestamp
# $_int64_template # Offset
# )
# )
=head3 C<decode_offset_response( $bin_stream_ref )>
Decodes the argument and returns a reference to the hash representing
the structure of the OFFSET Response (examples see C<t/*_decode_encode.t>).
This function takes the following arguments:
=over 3
=item C<$bin_stream_ref>
C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.
=back
=cut
sub
decode_offset_response {
my
(
$bin_stream_ref
,
$api_version
) =
@_
;
$api_version
//=
$DEFAULT_APIVERSION
;
my
$is_v1
=
$api_version
== 1;
my
$template
=
$is_v1
?
$_decode_offset_response_template_v1
:
$_decode_offset_response_template
;
my
@data
=
unpack
(
$template
,
$$bin_stream_ref
);
my
$i
= 0;
my
$Offset_Response
= { };
$Offset_Response
->{CorrelationId} =
$data
[
$i
++ ];
# CorrelationId
my
$topics_array
=
$Offset_Response
->{topics} = [];
my
$topics_array_size
=
$data
[
$i
++ ];
# topics array size
while
(
$topics_array_size
-- ) {
my
$topic
= {
TopicName
=>
$data
[
$i
++ ],
# TopicName
};
my
$PartitionOffsets_array
=
$topic
->{PartitionOffsets} = [];
my
$PartitionOffsets_array_size
=
$data
[
$i
++ ];
# PartitionOffsets array size
my
(
$PartitionOffset
,
$Offset_array
,
$Offset_array_size
);
while
(
$PartitionOffsets_array_size
-- ) {
$PartitionOffset
= {
Partition
=>
$data
[
$i
++ ],
# Partition
ErrorCode
=>
$data
[
$i
++ ],
# ErrorCode
};
if
(
$is_v1
) {
$PartitionOffset
->{Timestamp} = _unpack64(
$data
[
$i
++ ]);
# Timestamp (v1 only)
$PartitionOffset
->{Offset} = _unpack64(
$data
[
$i
++ ]);
# Offset (v1 only)
}
else
{
$Offset_array
=
$PartitionOffset
->{Offset} = [];
$Offset_array_size
=
$data
[
$i
++ ];
# Offset array size
while
(
$Offset_array_size
-- ) {
push
(
@$Offset_array
, _unpack64(
$data
[
$i
++ ] ) );
# Offset
}
}
push
(
@$PartitionOffsets_array
,
$PartitionOffset
);
}
push
(
@$topics_array
,
$topic
);
}
return
$Offset_Response
;
}
# METADATA Request -------------------------------------------------------------
=head3 C<encode_metadata_request( $Metadata_Request )>
Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.
This function takes the following arguments:
=over 3
=item C<$Metadata_Request>
C<$Metadata_Request> is a reference to the hash representing
the structure of the METADATA Request (examples see C<t/*_decode_encode.t>).
=back
=cut
$IMPLEMENTED_APIVERSIONS
->{
$APIKEY_METADATA
} = 0;
sub
encode_metadata_request {
my
(
$Metadata_Request
) =
@_
;
my
@data
;
my
$request
= {
# template => '...',
# len => ...,
data
=> \
@data
,
};
_encode_request_header(
$request
,
$APIKEY_METADATA
,
$Metadata_Request
);
# Size
# ApiKey
# ApiVersion
# CorrelationId
# ClientId
my
$topics_array
=
$Metadata_Request
->{topics};
push
(
@data
,
scalar
(
@$topics_array
) );
# topics array size
$request
->{template} .=
q{l>}
;
$request
->{len} += 4;
foreach
my
$topic
(
@$topics_array
) {
$request
->{template} .=
q{s>}
;
# string length
$request
->{len} += 2;
_encode_string(
$request
,
$topic
);
# TopicName
}
return
pack
(
$request
->{template},
$request
->{len},
@data
);
}
# METADATA Response ------------------------------------------------------------
my
$_decode_metadata_response_template
=
q{x[l]l>l>X[l]l>/(l>s>/al>)l>X[l]l>/(s>s>/al>X[l]l>/(s>l>l>l>X[l]l>/(l>)l>X[l]l>/(l>)))}
;
# x[l] # Size (skip)
# l> # CorrelationId
# l> # Broker array size
# X[l]
# l>/( # Broker array
# l> # NodeId
# s>/a # Host
# l> # Port
# )
# l> # TopicMetadata array size
# X[l]
# l>/( # TopicMetadata array
# s> # ErrorCode
# s>/a # TopicName
# l> # PartitionMetadata array size
# X[l]
# l>/( # PartitionMetadata array
# s> # ErrorCode
# l> # Partition
# l> # Leader
# l> # Replicas array size
# X[l]
# l>/( # Replicas array
# l> # ReplicaId
# )
# l> # Isr array size
# X[l]
# l>/( # Isr array
# l> # ReplicaId
# )
# )
# )
=head3 C<decode_metadata_response( $bin_stream_ref )>
Decodes the argument and returns a reference to the hash representing
the structure of the METADATA Response (examples see C<t/*_decode_encode.t>).
This function takes the following arguments:
=over 3
=item C<$bin_stream_ref>
C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.
=back
=cut
sub
decode_metadata_response {
my
(
$bin_stream_ref
) =
@_
;
my
@data
=
unpack
(
$_decode_metadata_response_template
,
$$bin_stream_ref
);
my
$i
= 0;
my
$Metadata_Response
= {};
$Metadata_Response
->{CorrelationId} =
$data
[
$i
++ ];
# CorrelationId
my
$Broker_array
=
$Metadata_Response
->{Broker} = [];
my
$Broker_array_size
=
$data
[
$i
++ ];
# Broker array size
while
(
$Broker_array_size
-- ) {
push
(
@$Broker_array
, {
NodeId
=>
$data
[
$i
++ ],
# NodeId
Host
=>
$data
[
$i
++ ],
# Host
Port
=>
$data
[
$i
++ ],
# Port
}
);
}
my
$TopicMetadata_array
=
$Metadata_Response
->{TopicMetadata} = [];
my
$TopicMetadata_array_size
=
$data
[
$i
++ ];
# TopicMetadata array size
while
(
$TopicMetadata_array_size
-- ) {
my
$TopicMetadata
= {
ErrorCode
=>
$data
[
$i
++ ],
# ErrorCode
TopicName
=>
$data
[
$i
++ ],
# TopicName
};
my
$PartitionMetadata_array
=
$TopicMetadata
->{PartitionMetadata} = [];
my
$PartitionMetadata_array_size
=
$data
[
$i
++ ];
# PartitionMetadata array size
while
(
$PartitionMetadata_array_size
-- ) {
my
$PartitionMetadata
= {
ErrorCode
=>
$data
[
$i
++ ],
# ErrorCode
Partition
=>
$data
[
$i
++ ],
# Partition
Leader
=>
$data
[
$i
++ ],
# Leader
};
my
$Replicas_array
=
$PartitionMetadata
->{Replicas} = [];
my
$Replicas_array_size
=
$data
[
$i
++ ];
# Replicas array size
while
(
$Replicas_array_size
-- ) {
push
(
@$Replicas_array
,
$data
[
$i
++ ] );
# ReplicaId
}
my
$Isr_array
=
$PartitionMetadata
->{Isr} = [];
my
$Isr_array_size
=
$data
[
$i
++ ];
# Isr array size
while
(
$Isr_array_size
-- ) {
push
(
@$Isr_array
,
$data
[
$i
++ ] );
# ReplicaId
}
push
(
@$PartitionMetadata_array
,
$PartitionMetadata
);
}
push
(
@$TopicMetadata_array
,
$TopicMetadata
);
}
return
$Metadata_Response
;
}
# OffsetCommit Request -------------------------------------------------------------
=head3 C<encode_offsetcommit_request( $OffsetCommit_Request )>
Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.
This function takes the following arguments:
=over 3
=item C<$OffsetCommit_Request>
C<$OffsetCommit_Request> is a reference to the hash representing
the structure of the OffsetCommit Request (examples see C<t/*_decode_encode.t>).
=back
=cut
$IMPLEMENTED_APIVERSIONS
->{
$APIKEY_OFFSETCOMMIT
} = 1;
sub
encode_offsetcommit_request {
my
(
$OffsetCommit_Request
) =
@_
;
my
@data
;
my
$request
= {
# template => '...',
# len => ...,
data
=> \
@data
,
};
my
$api_version
= _encode_request_header(
$request
,
$APIKEY_OFFSETCOMMIT
,
$OffsetCommit_Request
);
# Size
# ApiKey
# ApiVersion
# CorrelationId
# ClientId
my
$is_v1
=
$api_version
== 1;
$request
->{template} .=
q{s>}
;
$request
->{len} += 2;
_encode_string(
$request
,
$OffsetCommit_Request
->{GroupId} );
# GroupId
if
(
$is_v1
) {
$request
->{template} .=
q{l>}
;
# GroupGenerationId
$request
->{len} += 4;
### WARNING TODO: we don't support consumer groups properly, so we set
### generation id to -1 and member id null (see
### https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol )
push
(
@data
, -1 );
$request
->{template} .=
q{s>}
;
# string length
$request
->{len} += 2;
_encode_string(
$request
,
''
);
# MemberId
}
my
$topics_array
=
$OffsetCommit_Request
->{topics};
push
(
@data
,
scalar
(
@$topics_array
) );
# topics array size
$request
->{template} .=
q{l>}
;
$request
->{len} += 4;
foreach
my
$topic
(
@$topics_array
) {
$request
->{template} .=
q{s>}
;
$request
->{len} += 2;
_encode_string(
$request
,
$topic
->{TopicName} );
# TopicName
my
$partitions_array
=
$topic
->{partitions};
push
(
@data
,
scalar
(
@$partitions_array
) );
# partitions array size
$request
->{template} .=
q{l>}
;
$request
->{len} += 4;
foreach
my
$partition
(
@$partitions_array
){
push
(
@data
,
$partition
->{Partition},
# Partition
$partition
->{Offset},
# Offset
);
$request
->{template} .=
qq{l>${_int64_template}
};
$request
->{len} += 12;
if
(
$is_v1
) {
# timestamp
### WARNING TODO: currently we hardcode "now"
push
(
@data
,
time
() * 1000);
$request
->{template} .=
qq{${_int64_template}
};
$request
->{len} += 8;
}
$request
->{template} .=
q{s>}
;
$request
->{len} += 2;
_encode_string(
$request
,
$partition
->{Metadata} ),
# Metadata
}
}
return
pack
(
$request
->{template},
$request
->{len},
@data
);
}
# OFFSETCOMMIT Response --------------------------------------------------------------
my
$_decode_offsetcommit_response_template
=
qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>))}
;
# x[l] # Size (skip)
# l> # CorrelationId
# l> # topics array size
# X[l]
# l>/( # topics array
# s>/a # TopicName
# l> # partitions array size
# X[l]
# l>/( # partitions array
# l> # Partition
# s> # ErrorCode
# )
# )
=head3 C<decode_offsetcommit_response( $bin_stream_ref )>
Decodes the argument and returns a reference to the hash representing
the structure of the OFFSETCOMMIT Response (examples see C<t/*_decode_encode.t>).
This function takes the following arguments:
=over 3
=item C<$bin_stream_ref>
C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.
=back
=cut
sub
decode_offsetcommit_response {
my
(
$bin_stream_ref
) =
@_
;
my
@data
=
unpack
(
$_decode_offsetcommit_response_template
,
$$bin_stream_ref
);
my
$i
= 0;
my
$OffsetCommit_Response
= {};
$OffsetCommit_Response
->{CorrelationId} =
$data
[
$i
++ ];
#CorrelationId
my
$topics_array
=
$OffsetCommit_Response
->{topics} = [];
my
$topics_array_size
=
$data
[
$i
++ ];
# topics array size
while
(
$topics_array_size
-- ) {
my
$topic
= {
TopicName
=>
$data
[
$i
++ ],
# TopicName
};
my
$Partition_array
=
$topic
->{partitions} = [];
my
$Partition_array_size
=
$data
[
$i
++ ];
# Partitions array size
while
(
$Partition_array_size
-- ) {
my
$Partition
= {
Partition
=>
$data
[
$i
++ ],
# Partition
ErrorCode
=>
$data
[
$i
++ ],
# ErrorCode
};
push
(
@$Partition_array
,
$Partition
);
}
push
(
@$topics_array
,
$topic
);
}
return
$OffsetCommit_Response
;
}
# OffsetFetch Request -------------------------------------------------------------
=head3 C<encode_offsetfetch_request( $OffsetFetch_Request )>
Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.
This function takes the following arguments:
=over 3
=item C<$OffsetFetch_Request>
C<$OffsetFetch_Request> is a reference to the hash representing
the structure of the OffsetFetch Request (examples see C<t/*_decode_encode.t>).
=back
=cut
$IMPLEMENTED_APIVERSIONS
->{
$APIKEY_OFFSETFETCH
} = 1;
sub
encode_offsetfetch_request {
my
(
$OffsetFetch_Request
) =
@_
;
my
@data
;
my
$request
= {
# template => '...',
# len => ...,
data
=> \
@data
,
};
_encode_request_header(
$request
,
$APIKEY_OFFSETFETCH
,
$OffsetFetch_Request
);
# Size
# ApiKey
# ApiVersion
# CorrelationId
# ClientId
$request
->{template} .=
q{s>}
;
$request
->{len} += 2;
_encode_string(
$request
,
$OffsetFetch_Request
->{GroupId} );
# GroupId
my
$topics_array
=
$OffsetFetch_Request
->{topics};
push
(
@data
,
scalar
(
@$topics_array
) );
# topics array size
$request
->{template} .=
q{l>}
;
$request
->{len} += 4;
foreach
my
$topic
(
@$topics_array
) {
$request
->{template} .=
q{s>}
;
$request
->{len} += 2;
_encode_string(
$request
,
$topic
->{TopicName} );
# TopicName
my
$partitions_array
=
$topic
->{partitions};
push
(
@data
,
scalar
(
@$partitions_array
) );
# partitions array size
$request
->{template} .=
q{l>}
;
$request
->{len} += 4;
foreach
my
$partition
(
@$partitions_array
){
push
(
@data
,
$partition
->{Partition},
# Partition
);
$request
->{template} .=
qq{l>}
;
$request
->{len} += 4;
}
}
return
pack
(
$request
->{template},
$request
->{len},
@data
);
}
# OFFSETFETCH Response --------------------------------------------------------------
my
$_decode_offsetfetch_response_template
=
qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>${_int64_template}
s>/as>))};
# x[l] # Size (skip)
# l> # CorrelationId
# l> # topics array size
# X[l]
# l>/( # topics array
# s>/a # TopicName
# l> # partitions array size
# X[l]
# l>/( # partitions array
# l> # Partition
# $_int64_template # Offset
# s>/a # Metadata
# s> # ErrorCode
# )
# )
=head3 C<decode_offsetfetch_response( $bin_stream_ref )>
Decodes the argument and returns a reference to the hash representing
the structure of the OFFSETFETCH Response (examples see C<t/*_decode_encode.t>).
This function takes the following arguments:
=over 3
=item C<$bin_stream_ref>
C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.
=back
=cut
sub
decode_offsetfetch_response {
my
(
$bin_stream_ref
) =
@_
;
my
@data
=
unpack
(
$_decode_offsetfetch_response_template
,
$$bin_stream_ref
);
my
$i
= 0;
my
$OffsetFetch_Response
= {};
$OffsetFetch_Response
->{CorrelationId} =
$data
[
$i
++ ];
#CorrelationId
my
$topics_array
=
$OffsetFetch_Response
->{topics} = [];
my
$topics_array_size
=
$data
[
$i
++ ];
# topics array size
while
(
$topics_array_size
-- ) {
my
$topic
= {
TopicName
=>
$data
[
$i
++ ],
# TopicName
};
my
$Partition_array
=
$topic
->{partitions} = [];
my
$Partition_array_size
=
$data
[
$i
++ ];
# Partitions array size
while
(
$Partition_array_size
-- ) {
my
$Partition
= {
Partition
=>
$data
[
$i
++ ],
# Partition
Offset
=>
$data
[
$i
++ ],
# Partition
Metadata
=>
$data
[
$i
++ ],
# Partition
ErrorCode
=>
$data
[
$i
++ ],
# ErrorCode
};
push
(
@$Partition_array
,
$Partition
);
}
push
(
@$topics_array
,
$topic
);
}
return
$OffsetFetch_Response
;
}
# SaslHandshake Request -------------------------------------------------------------
=head3 C<encode_saslhandshake_request( $SaslHandshake_Request )>
Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.
This function takes the following arguments:
=over 3
=item C<$SaslHandshake_Request>
C<$SaslHandshake_Request> is a reference to the hash representing
the structure of the SaslHandshake Request.
=back
=cut
$IMPLEMENTED_APIVERSIONS
->{
$APIKEY_SASLHANDSHAKE
} = 0;
sub
encode_saslhandshake_request {
my
(
$SaslHandshake_Request
) =
@_
;
my
@data
;
my
$request
= {
# template => '...',
# len => ...,
data
=> \
@data
,
};
_encode_request_header(
$request
,
$APIKEY_SASLHANDSHAKE
,
$SaslHandshake_Request
);
# Size
# ApiKey
# ApiVersion
# CorrelationId
# ClientId
$request
->{template} .=
q{s>}
;
# string length
$request
->{len} += 2;
_encode_string(
$request
,
$SaslHandshake_Request
->{Mechanism} );
# SASL Mechanism
return
pack
(
$request
->{template},
$request
->{len},
@data
);
}
# SASLHANDSHAKE Response --------------------------------------------------------------
my
$_decode_saslhandshake_response_template
=
q{x[l]l>s>l>X[l]l>/(s>/a)}
;
# x[l] # Size (skip)
# l> # CorrelationId
# s> # ErrorCode
# l> # ApiVersions array size
# X[l]
# l>/( # Enabled Mechanisms
# s>/a # Mechanism
# )
=head3 C<decode_saslhandshake_response( $bin_stream_ref )>
Decodes the argument and returns a reference to the hash representing
the structure of the SaslHandshake Response.
This function takes the following arguments:
=over 3
=item C<$bin_stream_ref>
C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.
=back
=cut
sub
decode_saslhandshake_response {
my
(
$bin_stream_ref
) =
@_
;
my
@data
=
unpack
(
$_decode_saslhandshake_response_template
,
$$bin_stream_ref
);
my
$i
= 0;
my
$SaslHandshake_Response
= {};
$SaslHandshake_Response
->{CorrelationId} =
$data
[
$i
++ ];
# CorrelationId
$SaslHandshake_Response
->{ErrorCode} =
$data
[
$i
++ ];
# ErrorCode
my
$Mechanisms_array
=
$SaslHandshake_Response
->{Mechanisms} = [];
my
$Mechanisms_array_size
=
$data
[
$i
++ ];
# ApiVersions array size
while
(
$Mechanisms_array_size
-- ) {
push
(
@$Mechanisms_array
,
$data
[
$i
++ ] );
# Mechanism
}
return
$SaslHandshake_Response
;
}
#-- private functions ----------------------------------------------------------
# Generates a template to encrypt the request header
sub
_encode_request_header {
my
(
$request
,
$api_key
,
$request_ref
) =
@_
;
# we need to find out which API version to use for the request (and the
# response). $request_ref->{ApiVersion} can be specified by the end user,
# or providede by by Kafka::Connection ( see
# Kafka::Connection::_get_api_versions() ). If not provided, we
# default to $DEFAULT_APIVERSION
my
$api_version
=
$request_ref
->{ApiVersion} //
$DEFAULT_APIVERSION
;
@{
$request
->{data} } = (
# Size
$api_key
,
# ApiKey
$api_version
,
# ApiVersion
$request_ref
->{CorrelationId},
# CorrelationId
);
$request
->{template} =
$_Request_header_template
;
$request
->{len} =
$_Request_header_length
;
_encode_string(
$request
,
$request_ref
->{ClientId} );
# ClientId
return
$api_version
;
}
# Generates a template to decrypt the fetch response body
sub
_decode_fetch_response_template {
my
(
$response
,
$api_version
) =
@_
;
my
$is_v1
=
$api_version
== 1;
my
$is_v2
=
$api_version
== 2;
my
$is_v3
=
$api_version
== 3;
if
(
$is_v1
||
$is_v2
||
$is_v3
) {
$response
->{template} =
$_FetchResponse_header_template_v1
;
$response
->{stream_offset} =
$_FetchResponse_header_length_v1
;
# bytes before topics array size
# [l] Size
# [l] CorrelationId
# [l] throttle_time_ms
}
else
{
$response
->{template} =
$_FetchResponse_header_template
;
$response
->{stream_offset} =
$_FetchResponse_header_length
;
# bytes before topics array size
# [l] Size
# [l] CorrelationId
}
my
$topics_array_size
=
unpack
(
q{x[}
.
$response
->{stream_offset} .
q{]}
.
q{l>}
,
# topics array size
${
$response
->{bin_stream} }
);
$response
->{stream_offset} += 4;
# bytes before TopicName length
# [l] topics array size
my
(
$TopicName_length
,
$partitions_array_size
);
while
(
$topics_array_size
-- ) {
$TopicName_length
=
unpack
(
q{x[}
.
$response
->{stream_offset}
.
q{]s>}
,
# TopicName length
${
$response
->{bin_stream} }
);
$response
->{stream_offset} +=
# bytes before partitions array size
2
# [s] TopicName length
+
$TopicName_length
# TopicName
;
$partitions_array_size
=
unpack
(
q{x[}
.
$response
->{stream_offset}
.
q{]l>}
,
# partitions array size
${
$response
->{bin_stream} }
);
$response
->{stream_offset} += 4;
# bytes before Partition
# [l] partitions array size
$response
->{template} .=
$_FetchResponse_topic_body_template
;
$response
->{stream_offset} +=
$_FetchResponse_topic_body_length
;
# (without TopicName and partitions array size)
# bytes before MessageSetSize
# TopicName
# [l] # partitions array size
# [l] Partition
# [s] ErrorCode
# [q] HighwaterMarkOffset
_decode_MessageSet_template(
$response
);
}
return
;
}
# kafka uses a snappy-java compressor, with it's own headers and frame formats
my
$XERIAL_SNAPPY_MAGIC_HEADER
=
"\x82SNAPPY\x00"
;
my
$XERIAL_SNAPPY_BLOCK_SIZE
= 0x8000;
# 32Kb
my
$XERIAL_SNAPPY_FILE_VERSION
= 1;
sub
_snappy_xerial_decompress {
my
(
$data
) =
@_
;
my
$uncompressed
;
my
$raw_format_suspected
= 1;
if
(
length
(
$data
) > 16 ) {
my
(
$header
,
$x_version
,
$x_compatversion
) =
unpack
(
q{a[8]L>L>}
,
$data
);
if
(
$header
eq
$XERIAL_SNAPPY_MAGIC_HEADER
) {
$raw_format_suspected
= 0;
_error(
$ERROR_COMPRESSION
,
'snappy: bad compatversion in snappy xerial header'
)
unless
$x_compatversion
==
$XERIAL_SNAPPY_FILE_VERSION
;
_error(
$ERROR_COMPRESSION
,
'snappy: bad version in snappy xerial header'
)
unless
$x_version
==
$XERIAL_SNAPPY_FILE_VERSION
;
$data
=
substr
(
$data
, 16 );
while
(
length
(
$data
) > 0 ) {
$uncompressed
//=
''
;
_error(
$ERROR_COMPRESSION
,
'snappy: bad frame length header'
)
if
length
(
$data
) < 4;
my
$compressed_frame_length
=
unpack
(
q{L>}
,
$data
);
$data
=
substr
(
$data
, 4 );
_error(
$ERROR_COMPRESSION
,
'snappy: partial frame '
)
if
length
(
$data
) <
$compressed_frame_length
;
my
$compressed_frame
=
substr
(
$data
, 0,
$compressed_frame_length
,
''
);
my
$uncompressed_frame
= Compress::Snappy::decompress(
$compressed_frame
);
_error(
$ERROR_COMPRESSION
,
'snappy: can\'t uncompress frame '
)
if
not
defined
$uncompressed_frame
;
$uncompressed
.=
$uncompressed_frame
;
}
}
}
if
(
$raw_format_suspected
) {
$uncompressed
= Compress::Snappy::decompress(
$data
);
}
return
$uncompressed
;
}
sub
_snappy_xerial_compress {
my
(
$data
) =
@_
;
my
$compressed_data
;
while
(
length
(
$data
) ) {
my
$block
=
substr
$data
, 0,
$XERIAL_SNAPPY_BLOCK_SIZE
,
''
;
my
$compressed_block
= Compress::Snappy::compress(
$block
);
_error(
$ERROR_COMPRESSION
,
'snappy: can\'t compress frame!'
)
if
not
defined
$compressed_block
;
$compressed_data
//=
pack
(
q{a[8]L>L>}
,
$XERIAL_SNAPPY_MAGIC_HEADER
,
$XERIAL_SNAPPY_FILE_VERSION
,
$XERIAL_SNAPPY_FILE_VERSION
);
$compressed_data
.=
pack
(
q{L>}
,
length
(
$compressed_block
) ) .
$compressed_block
;
}
return
$compressed_data
;
}
sub
_decompress_data {
my
(
$data
,
$compression_codec
) =
@_
;
say
STDERR format_message(
'[%s] decompress_data request, compression_codec: %s, data: %s'
,
scalar
(
localtime
),
$compression_codec
,
unpack
(
'H*'
,
$data
),
)
if
Kafka::Protocol->debug_level // 0 >= 3;
my
$decompressed
;
if
(
$compression_codec
==
$COMPRESSION_GZIP
) {
try
{
$decompressed
= gunzip(
$data
);
}
catch
{
_error(
$ERROR_COMPRESSION
, format_message(
'gunzip failed: %s'
,
$_
) );
};
}
elsif
(
$compression_codec
==
$COMPRESSION_SNAPPY
) {
$decompressed
= _snappy_xerial_decompress(
$data
)
// _error(
$ERROR_COMPRESSION
,
'Unable to decompress snappy compressed data'
);
}
elsif
(
$compression_codec
==
$COMPRESSION_LZ4
) {
# https://cwiki.apache.org/confluence/display/KAFKA/KIP-57+-+Interoperable+LZ4+Framing
# New 0.10 clients (proposed behavior) - produce and consume v1 messages w/ correct LZ4F checksum
if
( Compress::LZ4Frame::looks_like_lz4frame(
$data
) ) {
$decompressed
= Compress::LZ4Frame::decompress(
$data
) // _error(
$ERROR_COMPRESSION
,
'Unable to decompress LZ4 compressed data'
);
}
else
{
_error(
$ERROR_COMPRESSION
,
'Unable to decompress LZ4 compressed data. Frame is not valid'
);
}
}
else
{
_error(
$ERROR_COMPRESSION
,
"Unknown compression codec $compression_codec"
);
}
_error(
$ERROR_COMPRESSION
,
'Decompression produced empty result'
)
unless
defined
$decompressed
;
return
$decompressed
;
}
sub
_compress_data {
my
(
$data
,
$compression_codec
) =
@_
;
my
$compressed
;
# Compression
if
(
$compression_codec
==
$COMPRESSION_GZIP
) {
$compressed
= gzip(
$data
)
// _error(
$ERROR_COMPRESSION
,
'Unable to compress gzip data'
);
}
elsif
(
$compression_codec
==
$COMPRESSION_SNAPPY
) {
$compressed
= _snappy_xerial_compress(
$data
)
// _error(
$ERROR_COMPRESSION
,
'Unable to compress snappy data'
);
}
elsif
(
$compression_codec
==
$COMPRESSION_LZ4
) {
# https://cwiki.apache.org/confluence/display/KAFKA/KIP-57+-+Interoperable+LZ4+Framing
# New 0.10 clients (proposed behavior) - produce and consume v1 messages w/ correct LZ4F checksum
$compressed
= Compress::LZ4Frame::compress_checksum(
$data
)
// _error(
$ERROR_COMPRESSION
,
'Unable to compress LZ4 data'
);
}
else
{
_error(
$ERROR_COMPRESSION
,
"Unknown compression codec $compression_codec"
);
}
return
$compressed
;
}
# Decrypts MessageSet
sub
_decode_MessageSet_array {
my
(
$response
,
$MessageSetSize
,
$i_ref
,
$MessageSet_array_ref
,
$_override_offset
) =
@_
;
# $_override_offset should not be set manually, it's used when recursing,
# to decode internal compressed message, which have offsets starting at 0,
# 1, etc, and instead we need to set the external wrapping message offset.
my
$data
=
$response
->{data};
my
$data_array_size
=
scalar
@{
$data
};
# NOTE: not all messages can be returned
my
(
$Message
,
$MessageSize
,
$Crc
,
$Key_length
,
$Value_length
);
while
(
$MessageSetSize
&&
$$i_ref
<
$data_array_size
) {
my
$message_offset
= _unpack64(
$data
->[
$$i_ref
++ ] );
if
(
defined
$_override_offset
) {
$message_offset
=
$_override_offset
;
}
$Message
= {
Offset
=>
$message_offset
,
# Offset
};
$MessageSize
=
$data
->[
$$i_ref
++ ];
# MessageSize
# NOTE: The CRC is the CRC32 of the remainder of the message bytes.
# This is used to check the integrity of the message on the broker and consumer:
# MagicByte + Attributes + Key length + Key + Value length + Value
$Crc
=
$data
->[
$$i_ref
++ ];
# Crc
# WARNING: The current version of the module does not support the following:
# A message set is also the unit of compression in Kafka,
# and we allow messages to recursively contain compressed message sets to allow batch compression.
$Message
->{MagicByte} =
$data
->[
$$i_ref
++ ];
# MagicByte
my
$is_v1_msg_format
=
$Message
->{MagicByte} == 1;
$Message
->{Attributes} =
$data
->[
$$i_ref
++ ];
# Attributes
if
(
$is_v1_msg_format
) {
$Message
->{Timestamp} =
$data
->[
$$i_ref
++ ];
# Timestamp
}
$Key_length
=
$data
->[
$$i_ref
++ ];
# Key length
$Message
->{Key} =
$Key_length
==
$NULL_BYTES_LENGTH
?
q{}
:
$data
->[
$$i_ref
++ ];
# Key
$Value_length
=
$data
->[
$$i_ref
++ ];
# Value length
$Message
->{Value} =
$Value_length
==
$NULL_BYTES_LENGTH
?
q{}
:
$data
->[
$$i_ref
++ ];
# Value
if
(
my
$compression_codec
=
$Message
->{Attributes} &
$COMPRESSION_CODEC_MASK
) {
my
$decompressed
= _decompress_data(
$Message
->{Value},
$compression_codec
);
my
@data
;
my
$Value_length
=
length
$decompressed
;
my
$resp
= {
data
=> \
@data
,
bin_stream
=> \
$decompressed
,
stream_offset
=> 0,
};
_decode_MessageSet_sized_template(
$Value_length
,
$resp
);
@data
=
unpack
(
$resp
->{template}, ${
$resp
->{bin_stream} } );
my
$i
= 0;
# i_ref
my
$size
=
length
(
$decompressed
);
_decode_MessageSet_array(
$resp
,
$size
,
# message set size
\
$i
,
# i_ref
$MessageSet_array_ref
,
$message_offset
);
}
else
{
push
(
@$MessageSet_array_ref
,
$Message
);
}
$MessageSetSize
-= 12
# [q] Offset
# [l] MessageSize
+
$MessageSize
# Message
;
}
return
;
}
# Generates a template to encode single message within MessageSet
sub
_encode_Message {
my
(
$request
,
$Key
,
$Value
,
$Attributes
,
$Offset
,
$v1_format
,
$Timestamp
) =
@_
;
# v1 format - supported since 0.10.0, introduces Timestamps
my
$MagicByte
=
$v1_format
? 1 : 0;
$Timestamp
//= -1;
$Attributes
//=
$COMPRESSION_NONE
;
# According to Apache Kafka documentation:
# The lowest 2 bits contain the compression codec used for the message.
# The other bits should be set to 0.
my
$key_length
=
length
(
$Key
);
my
$value_length
=
length
(
$Value
);
my
$message_body
=
pack
(
q{cc}
# MagicByte
# Attributes
.(
$v1_format
?
q{a[8]}
:
q{}
)
# Timestamp - v1 only (when MagicByte > 0) (#$_int64_template)
.
q{l>}
# Key length
.(
$key_length
?
qq{a[$key_length]}
:
q{}
)
# Key
.
q{l>}
# Value length
.(
$value_length
?
qq{a[$value_length]}
:
q{}
)
# Value
,
$MagicByte
,
$Attributes
,
$v1_format
? ( _pack64(
$Timestamp
) ) : (),
$key_length
? (
$key_length
,
$Key
) : (
$NULL_BYTES_LENGTH
),
$value_length
? (
$value_length
,
$Value
) : (
$NULL_BYTES_LENGTH
),
);
my
$MessageBodySize
=
length
(
$message_body
);
my
$MessageSize
=
$MessageBodySize
+ 4;
my
$data
=
$request
->{data};
$request
->{template} .=
$_MessageSet_template
.
qq{l>a[$MessageBodySize]}
;
# crc + message_body
$request
->{len} +=
$_MessageSet_length
+
$MessageSize
;
push
@$data
, (_pack64(
$Offset
),
$MessageSize
, crc32(
$message_body
),
$message_body
);
return
;
}
# Generates a template to encode MessageSet
sub
_encode_MessageSet_array {
my
(
$request
,
$MessageSet_array_ref
,
$compression_codec
) =
@_
;
# not sure if it would be good to mix different formats in different messages in the same set
# so detect if we should use v1 format
my
$use_v1_format
;
foreach
my
$MessageSet
(
@$MessageSet_array_ref
) {
if
(
$MessageSet
->{Timestamp} ) {
$use_v1_format
= 1;
last
;
}
}
if
(
$compression_codec
) {
my
$subrequest
= {
data
=> [],
template
=>
''
,
len
=> 0
};
my
(
$Key
,
$Value
);
foreach
my
$MessageSet
(
@$MessageSet_array_ref
) {
_encode_Message(
$subrequest
,
$Key
=
$MessageSet
->{Key},
$MessageSet
->{Value},
$COMPRESSION_NONE
,
$PRODUCER_ANY_OFFSET
,
$use_v1_format
,
$MessageSet
->{Timestamp}
);
}
$Value
=
pack
(
$subrequest
->{template}, @{
$subrequest
->{data}});
$MessageSet_array_ref
= [
{
Offset
=>
$PRODUCER_ANY_OFFSET
,
Key
=>
$Key
,
Value
=> _compress_data(
$Value
,
$compression_codec
),
}
];
}
my
$subrequest
= {
data
=> [],
template
=>
''
,
len
=> 0
};
foreach
my
$MessageSet
(
@$MessageSet_array_ref
) {
_encode_Message(
$subrequest
,
$MessageSet
->{Key},
$MessageSet
->{Value},
$compression_codec
//
$COMPRESSION_NONE
,
$MessageSet
->{Offset},
$use_v1_format
,
$MessageSet
->{Timestamp}
);
}
my
$data
=
$request
->{data};
$request
->{template} .=
q{l>}
.
$subrequest
->{template};
$request
->{len} += 4 +
$subrequest
->{len};
push
@$data
, (
$subrequest
->{len}, @{
$subrequest
->{data} } );
return
;
}
# Generates a template to decrypt MessageSet
sub
_decode_MessageSet_template {
my
(
$response
) =
@_
;
my
$MessageSetSize
=
unpack
(
q{x[}
.
$response
->{stream_offset}
.
q{]l>}
,
# MessageSetSize
${
$response
->{bin_stream} }
);
$response
->{template} .=
q{l>}
;
# MessageSetSize
$response
->{stream_offset} += 4;
# bytes before Offset
return
_decode_MessageSet_sized_template(
$MessageSetSize
,
$response
);
}
sub
_decode_MessageSet_sized_template {
my
(
$MessageSetSize
,
$response
) =
@_
;
my
$bin_stream_length
=
length
${
$response
->{bin_stream} };
my
(
$local_template
,
$MessageSize
,
$MagicByte
,
$Key_length
,
$Value_length
);
CREATE_TEMPLATE:
while
(
$MessageSetSize
) {
# Not the full MessageSet
# 22 is the minimal size of a v0 message format until (and including) the Key
# Length. If the message version is v1, then it's 22 + 8. We'll check later
last
CREATE_TEMPLATE
if
$MessageSetSize
< 22;
# [q] Offset
# [l] MessageSize
# [l] Crc
# [c] MagicByte
# [c] Attributes
# ( [q] Timestamp ) message format v1 only
# [l] Key length
# [l] Value length
$local_template
=
q{}
;
MESSAGE_SET:
{
$response
->{stream_offset} += 8;
# the size of the offset data value
(
$MessageSize
,
$MagicByte
) =
unpack
(
q{x[}
.
$response
->{stream_offset}.
q{]}
.
q{l>}
# MessageSize
.
q{x[l]}
# Crc
.
q{c}
,
# MagicByte
${
$response
->{bin_stream} }
);
my
$is_v1_msg_format
=
$MagicByte
== 1;
if
(
$is_v1_msg_format
&&
$MessageSetSize
< (22+8)) {
# Not the full MessageSet
$local_template
=
q{}
;
last
MESSAGE_SET;
}
$local_template
.= (
$is_v1_msg_format
?
$_Message_template_with_timestamp
:
$_Message_template
);
$response
->{stream_offset} += (
$is_v1_msg_format
? 18: 10);
$Key_length
=
unpack
(
q{x[}
.
$response
->{stream_offset}
.
q{]l>}
,
# Key length
${
$response
->{bin_stream} }
);
$response
->{stream_offset} += 4;
# bytes before Key or Value length
# [l] Key length
$response
->{stream_offset} +=
$Key_length
# bytes before Key
if
$Key_length
!=
$NULL_BYTES_LENGTH
;
# Key
if
(
$bin_stream_length
>=
$response
->{stream_offset} + 4 ) {
# + [l] Value length
$local_template
.=
$_Key_or_Value_template
if
$Key_length
!=
$NULL_BYTES_LENGTH
;
}
else
{
# Not the full MessageSet
$local_template
=
q{}
;
last
MESSAGE_SET;
}
$local_template
.=
q{l>}
;
# Value length
$Value_length
=
unpack
(
q{x[}
.
$response
->{stream_offset}
.
q{]l>}
,
# Value length
${
$response
->{bin_stream} }
);
$response
->{stream_offset} +=
# bytes before Value or next Message
4
# [l] Value length
;
$response
->{stream_offset} +=
$Value_length
# bytes before next Message
if
$Value_length
!=
$NULL_BYTES_LENGTH
;
# Value
if
(
$bin_stream_length
>=
$response
->{stream_offset} ) {
$local_template
.=
$_Key_or_Value_template
if
$Value_length
!=
$NULL_BYTES_LENGTH
;
}
else
{
# Not the full MessageSet
$local_template
=
q{}
;
last
MESSAGE_SET;
}
}
if
(
$local_template
) {
$response
->{template} .=
$local_template
;
$MessageSetSize
-= 12
# [q] Offset
# [l] MessageSize
+
$MessageSize
# Message
;
}
else
{
last
CREATE_TEMPLATE;
}
}
return
;
}
# Generates a template to encrypt string
sub
_encode_string {
my
(
$request
,
$string
) =
@_
;
if
(
$string
eq
q{}
) {
push
( @{
$request
->{data} }, 0 );
}
else
{
my
$string_length
=
length
$string
;
push
( @{
$request
->{data} },
$string_length
,
$string
);
$request
->{template} .=
q{a*}
;
# string;
$request
->{len} +=
$string_length
;
}
return
;
}
# Handler for errors
sub
_error {
Kafka::Exception::Protocol->throw( throw_args(
@_
) );
return
;
}
1;
__END__
=head1 DIAGNOSTICS
In order to achieve better performance, functions of this module do not perform
arguments validation.
=head1 SEE ALSO
The basic operation of the Kafka package modules:
L<Kafka|Kafka> - constants and messages used by the Kafka package modules.
L<Kafka::Connection|Kafka::Connection> - interface to connect to a Kafka cluster.
L<Kafka::Producer|Kafka::Producer> - interface for producing client.
L<Kafka::Consumer|Kafka::Consumer> - interface for consuming client.
L<Kafka::Message|Kafka::Message> - interface to access Kafka message
properties.
L<Kafka::Int64|Kafka::Int64> - functions to work with 64 bit elements of the
protocol on 32 bit systems.
L<Kafka::Protocol|Kafka::Protocol> - functions to process messages in the
Apache Kafka's Protocol.
L<Kafka::IO|Kafka::IO> - low-level interface for communication with Kafka server.
L<Kafka::Exceptions|Kafka::Exceptions> - module designated to handle Kafka exceptions.
L<Kafka::Internals|Kafka::Internals> - internal constants and functions used
by several package modules.
A wealth of detail about the Apache Kafka and the Kafka Protocol:
Main page at L<http://kafka.apache.org/>
Kafka Protocol at L<https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol>
=head1 SOURCE CODE
Kafka package is hosted on GitHub:
=head1 AUTHOR
Sergey Gladkov
Please use GitHub project link above to report problems or contact authors.
=head1 CONTRIBUTORS
Alexander Solovey
Jeremy Jordan
Sergiy Zuban
Vlad Marchenko
Damien Krotkine
Greg Franklin
=head1 COPYRIGHT AND LICENSE
Copyright (C) 2012-2017 by TrackingSoft LLC.
This package is free software; you can redistribute it and/or modify it under
the same terms as Perl itself. See I<perlartistic> at
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.
=cut