NAME

Kafka::Protocol - Functions to process messages in the Apache Kafka protocol.

VERSION

This documentation refers to Kafka::Protocol version 1.08 .

SYNOPSIS

use 5.010;
use strict;
use warnings;

use Data::Compare;
use Kafka qw(
    $COMPRESSION_NONE
    $ERROR_NO_ERROR
    $REQUEST_TIMEOUT
    $WAIT_WRITTEN_TO_LOCAL_LOG
);
use Kafka::Internals qw(
    $PRODUCER_ANY_OFFSET
);
use Kafka::Protocol qw(
    decode_produce_response
    encode_produce_request
);

# a encoded produce request hex stream
my $encoded = pack( q{H*}, '00000049000000000000000400000001000005dc0000000100076d79746f7069630000000100000000000000200000000000000000000000148dc795a20000ffffffff0000000648656c6c6f21' );

# a decoded produce request
my $decoded = {
    CorrelationId                       => 4,
    ClientId                            => q{},
    RequiredAcks                        => $WAIT_WRITTEN_TO_LOCAL_LOG,
    Timeout                             => $REQUEST_TIMEOUT * 100,  # ms
    topics                              => [
        {
            TopicName                   => 'mytopic',
            partitions                  => [
                {
                    Partition           => 0,
                    MessageSet              => [
                        {
                            Offset          => $PRODUCER_ANY_OFFSET,
                            MagicByte       => 0,
                            Attributes      => $COMPRESSION_NONE,
                            Key             => q{},
                            Value           => 'Hello!',
                        },
                    ],
                },
            ],
        },
    ],
};

my $encoded_request = encode_produce_request( $decoded );
say 'encoded correctly' if $encoded_request eq $encoded;

# a encoded produce response hex stream
$encoded = pack( q{H*}, '00000023000000040000000100076d79746f706963000000010000000000000000000000000000' );

# a decoded produce response
$decoded = {
    CorrelationId                           => 4,
    topics                                  => [
        {
            TopicName                       => 'mytopic',
            partitions                      => [
                {
                    Partition               => 0,
                    ErrorCode               => $ERROR_NO_ERROR,
                    Offset                  => 0,
                },
            ],
        },
    ],
};

my $decoded_response = decode_produce_response( \$encoded );
say 'decoded correctly' if Compare( $decoded_response, $decoded );

# more examples, see t/*_decode_encode.t

DESCRIPTION

This module is not a user module.

In order to achieve better performance, functions of this module do not perform arguments validation.

The main features of the Kafka::Protocol module are:

  • Supports parsing the Apache Kafka protocol.

  • Supports Apache Kafka Requests and Responses (PRODUCE and FETCH). Within this package we currently support access to PRODUCE, FETCH, OFFSET, METADATA Requests and Responses.

  • Support for working with 64 bit elements of the Kafka protocol on 32 bit systems.

EXPORT

The following constants are available for export

$DEFAULT_APIVERSION

The default API version that will be used as fallback, if it's not possible to detect what the Kafka server supports. Only Kafka servers > 0.10.0.0 can be queried to get which API version they implements. On Kafka servers 0.8.x and 0.9.x, the protocol will default to use $DEFAULT_APIVERSION. Currently its value is '0'

$CONSUMERS_REPLICAID

According to Apache Kafka documentation: 'ReplicaId - Normal client consumers should always specify this as -1 as they have no node id.'

$NULL_BYTES_LENGTH

According to Apache Kafka documentation: 'Protocol Primitive Types: ... bytes, string - A length of -1 indicates null.'

$BAD_OFFSET

According to Apache Kafka documentation: 'Offset - When the producer is sending messages it doesn't actually know the offset and can fill in any value here it likes.'

FUNCTIONS

The following functions are available for Kafka::MockProtocol module.

encode_api_versions_request( $ApiVersions_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function takes the following arguments:

$ApiVersions_Request

$ApiVersions_Request is a reference to the hash representing the structure of the APIVERSIONS Request. it contains CorrelationId, ClientId (can be empty string), and ApiVersion (must be 0)

decode_api_versions_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the APIVERSIONS Response.

This function takes the following arguments:

$bin_stream_ref

$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_find_coordinator_request( $FindCoordinator_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function takes the following arguments:

$FindCoordinator_Request

$FindCoordinator_Request is a reference to the hash representing the structure of the FINDCOORDINATOR Request. it contains CorrelationId, ClientId (can be empty string), CoordinatorKey and CoordinatorType (for version 1 of protocol)

decode_find_coordinator_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the FINDCOORDINATOR Response.

This function takes the following arguments:

$bin_stream_ref

$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_produce_request( $Produce_Request, $compression_codec )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function takes the following arguments:

$Produce_Request

$Produce_Request is a reference to the hash representing the structure of the PRODUCE Request (examples see t/*_decode_encode.t).

$compression_codec

Optional.

$compression_codec sets the required type of $messages compression, if the compression is desirable.

Supported codecs: $COMPRESSION_NONE, $COMPRESSION_GZIP, $COMPRESSION_SNAPPY, $COMPRESSION_LZ4.

NOTE: $COMPRESSION_LZ4 requires Kafka 0.10 or higher, as initial implementation of LZ4 in Kafka did not follow the standard LZ4 framing specification.

decode_produce_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the PRODUCE Response (examples see t/*_decode_encode.t).

This function takes the following arguments:

$bin_stream_ref

$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_fetch_request( $Fetch_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function takes the following arguments:

$Fetch_Request

$Fetch_Request is a reference to the hash representing the structure of the FETCH Request (examples see t/*_decode_encode.t).

decode_fetch_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the FETCH Response (examples see t/*_decode_encode.t).

This function takes the following arguments:

$bin_stream_ref

$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_offset_request( $Offset_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function takes the following arguments:

$Offset_Request

$Offset_Request is a reference to the hash representing the structure of the OFFSET Request (examples see t/*_decode_encode.t).

decode_offset_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the OFFSET Response (examples see t/*_decode_encode.t).

This function takes the following arguments:

$bin_stream_ref

$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_metadata_request( $Metadata_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function takes the following arguments:

$Metadata_Request

$Metadata_Request is a reference to the hash representing the structure of the METADATA Request (examples see t/*_decode_encode.t).

decode_metadata_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the METADATA Response (examples see t/*_decode_encode.t).

This function takes the following arguments:

$bin_stream_ref

$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_offsetcommit_request( $OffsetCommit_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function takes the following arguments:

$OffsetCommit_Request

$OffsetCommit_Request is a reference to the hash representing the structure of the OffsetCommit Request (examples see t/*_decode_encode.t).

decode_offsetcommit_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the OFFSETCOMMIT Response (examples see t/*_decode_encode.t).

This function takes the following arguments:

$bin_stream_ref

$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_offsetfetch_request( $OffsetFetch_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function takes the following arguments:

$OffsetFetch_Request

$OffsetFetch_Request is a reference to the hash representing the structure of the OffsetFetch Request (examples see t/*_decode_encode.t).

decode_offsetfetch_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the OFFSETFETCH Response (examples see t/*_decode_encode.t).

This function takes the following arguments:

$bin_stream_ref

$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_saslhandshake_request( $SaslHandshake_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function takes the following arguments:

$SaslHandshake_Request

$SaslHandshake_Request is a reference to the hash representing the structure of the SaslHandshake Request.

decode_saslhandshake_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the SaslHandshake Response.

This function takes the following arguments:

$bin_stream_ref

$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

DIAGNOSTICS

In order to achieve better performance, functions of this module do not perform arguments validation.

SEE ALSO

The basic operation of the Kafka package modules:

Kafka - constants and messages used by the Kafka package modules.

Kafka::Connection - interface to connect to a Kafka cluster.

Kafka::Producer - interface for producing client.

Kafka::Consumer - interface for consuming client.

Kafka::Message - interface to access Kafka message properties.

Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.

Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.

Kafka::IO - low-level interface for communication with Kafka server.

Kafka::Exceptions - module designated to handle Kafka exceptions.

Kafka::Internals - internal constants and functions used by several package modules.

A wealth of detail about the Apache Kafka and the Kafka Protocol:

Main page at http://kafka.apache.org/

Kafka Protocol at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

SOURCE CODE

Kafka package is hosted on GitHub: https://github.com/TrackingSoft/Kafka

AUTHOR

Sergey Gladkov

Please use GitHub project link above to report problems or contact authors.

CONTRIBUTORS

Alexander Solovey

Jeremy Jordan

Sergiy Zuban

Vlad Marchenko

Damien Krotkine

Greg Franklin

COPYRIGHT AND LICENSE

Copyright (C) 2012-2017 by TrackingSoft LLC.

This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.