NAME

Kafka::Protocol - Functions to process messages in the Apache Kafka protocol.

VERSION

This documentation refers to Kafka::Protocol version 0.800_15 .

SYNOPSIS

use 5.010;
use strict;
use warnings;

use Data::Compare;
use Kafka qw(
    $ERROR_NO_ERROR
    $REQUEST_TIMEOUT
    $WAIT_WRITTEN_TO_LOCAL_LOG
);
use Kafka::Internals qw(
    $PRODUCER_ANY_OFFSET
);
use Kafka::Protocol qw(
    $COMPRESSION_NONE
    decode_produce_response
    encode_produce_request
);

# a encoded produce request hex stream
my $encoded = pack( q{H*}, '00000049000000000000000400000001000005dc0000000100076d79746f7069630000000100000000000000200000000000000000000000148dc795a20000ffffffff0000000648656c6c6f21' );

# a decoded produce request
my $decoded = {
    CorrelationId                       => 4,
    ClientId                            => q{},
    RequiredAcks                        => $WAIT_WRITTEN_TO_LOCAL_LOG,
    Timeout                             => $REQUEST_TIMEOUT * 100,  # ms
    topics                              => [
        {
            TopicName                   => 'mytopic',
            partitions                  => [
                {
                    Partition           => 0,
                    MessageSet              => [
                        {
                            Offset          => $PRODUCER_ANY_OFFSET,
                            MagicByte       => 0,
                            Attributes      => $COMPRESSION_NONE,
                            Key             => q{},
                            Value           => 'Hello!',
                        },
                    ],
                },
            ],
        },
    ],
};

my $encoded_request = encode_produce_request( $decoded );
say 'encoded correctly' if $encoded_request eq $encoded;

# a encoded produce response hex stream
$encoded = pack( q{H*}, '00000023000000040000000100076d79746f706963000000010000000000000000000000000000' );

# a decoded produce response
$decoded = {
    CorrelationId                           => 4,
    topics                                  => [
        {
            TopicName                       => 'mytopic',
            partitions                      => [
                {
                    Partition               => 0,
                    ErrorCode               => $ERROR_NO_ERROR,
                    Offset                  => 0,
                },
            ],
        },
    ],
};

my $decoded_response = decode_produce_response( \$encoded );
say 'decoded correctly' if Compare( $decoded_response, $decoded );

# more examples, see t/??_decode_encode.t

DESCRIPTION

This module is not a user module.

In order to achieve better performance, functions of this module do not perform arguments validation.

The main features of the Kafka::Protocol module are:

  • Supports parsing the Apache Kafka protocol.

  • Supports Apache Kafka Requests and Responses (PRODUCE and FETCH with no compression codec attribute now). Within this package we currently support access to PRODUCE, FETCH, OFFSET, METADATA Requests and Responses.

  • Support for working with 64 bit elements of the Kafka protocol on 32 bit systems.

EXPORT

The following constants are available for export

$APIVERSION

According to Apache Kafka documentation: 'This is a numeric version number for this api. Currently the supported version for all APIs is 0 .'

$COMPRESSION_NONE

According to Apache Kafka documentation: 'Kafka currently supports two compression codecs for message sets with the following codec numbers: None = 0, ...'

$CONSUMERS_REPLICAID

According to Apache Kafka documentation: 'ReplicaId - Normal client consumers should always specify this as -1 as they have no node id.'

$NULL_BYTES_LENGTH

According to Apache Kafka documentation: 'Protocol Primitive Types: ... bytes, string - A length of -1 indicates null.'

$BAD_OFFSET

According to Apache Kafka documentation: 'Offset - When the producer is sending messages it doesn't actually know the offset and can fill in any value here it likes.'

FUNCTIONS

The following functions are available for Kafka::MockProtocol module.

encode_produce_request( $Produce_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function take argument. The following argument is currently recognized:

$Produce_Request

$Produce_Request is a reference to the hash representing the structure of the PRODUCE Request (examples see t/??_decode_encode.t).

decode_produce_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the PRODUCE Response (examples see t/??_decode_encode.t).

This function take argument. The following argument is currently recognized:

$bin_stream_ref

$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_fetch_request( $Fetch_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function take argument. The following argument is currently recognized:

$Fetch_Request

$Fetch_Request is a reference to the hash representing the structure of the FETCH Request (examples see t/??_decode_encode.t).

decode_fetch_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the FETCH Response (examples see t/??_decode_encode.t).

This function take argument. The following argument is currently recognized:

$bin_stream_ref

$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_offset_request( $Offset_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function take argument. The following argument is currently recognized:

$Offset_Request

$Offset_Request is a reference to the hash representing the structure of the OFFSET Request (examples see t/??_decode_encode.t).

decode_offset_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the OFFSET Response (examples see t/??_decode_encode.t).

This function take argument. The following argument is currently recognized:

$bin_stream_ref

$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_metadata_request( $Metadata_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function take argument. The following argument is currently recognized:

$Metadata_Request

$Metadata_Request is a reference to the hash representing the structure of the METADATA Request (examples see t/??_decode_encode.t).

decode_metadata_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the METADATA Response (examples see t/??_decode_encode.t).

This function take argument. The following argument is currently recognized:

$bin_stream_ref

$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

DIAGNOSTICS

In order to achieve better performance, functions of this module do not perform arguments validation.

SEE ALSO

The basic operation of the Kafka package modules:

Kafka - constants and messages used by the Kafka package modules.

Kafka::Connection - interface to connect to a Kafka cluster.

Kafka::Producer - interface for producing client.

Kafka::Consumer - interface for consuming client.

Kafka::Message - interface to access Kafka message properties.

Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.

Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.

Kafka::IO - low-level interface for communication with Kafka server.

Kafka::Exceptions - module designated to handle Kafka exceptions.

Kafka::Internals - internal constants and functions used by several package modules.

A wealth of detail about the Apache Kafka and the Kafka Protocol:

Main page at http://kafka.apache.org/

Kafka Protocol at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

AUTHOR

Sergey Gladkov, <sgladkov@trackingsoft.com>

CONTRIBUTORS

Alexander Solovey

Jeremy Jordan

Vlad Marchenko

COPYRIGHT AND LICENSE

Copyright (C) 2012-2013 by TrackingSoft LLC.

This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.