NAME

Kafka::Consumer - Perl interface for 'consumer' client.

VERSION

This documentation refers to Kafka::Consumer version 0.800_2 .

SYNOPSIS

use 5.010;
use strict;
use warnings;

use Kafka qw(
    $DEFAULT_MAX_BYTES
    $DEFAULT_MAX_NUMBER_OF_OFFSETS
    $RECEIVE_EARLIEST_OFFSETS
);
use Kafka::Connection;
use Kafka::Consumer;

#-- Connection
my $connect = Kafka::Connection->new( host => 'localhost' );

#-- Consumer
my $consumer = Kafka::Consumer->new( Connection  => $connect );

# Get a list of valid offsets up max_number before the given time
my $offsets = $consumer->offsets(
    'mytopic',                      # topic
    0,                              # partition
    $RECEIVE_EARLIEST_OFFSETS,      # time
    $DEFAULT_MAX_NUMBER_OF_OFFSETS  # max_number
);
if( $offsets ) {
    say "Received offset: $_" foreach @$offsets;
}
say STDERR 'Error: (', $consumer->last_errorcode, ') ', $consumer->last_error
    if !$offsets || $consumer->last_errorcode;

# Consuming messages
my $messages = $consumer->fetch(
    'mytopic',                      # topic
    0,                              # partition
    0,                              # offset
    $DEFAULT_MAX_BYTES              # Maximum size of MESSAGE(s) to receive
);
if ( $messages ) {
    foreach my $message ( @$messages ) {
        if( $message->valid ) {
            say 'payload    : ', $message->payload;
            say 'key        : ', $message->key;
            say 'offset     : ', $message->offset;
            say 'next_offset: ', $message->next_offset;
        }
        else {
            say 'error      : ', $message->error;
        }
    }
}

# Closes the consumer and cleans up
undef $consumer;

DESCRIPTION

Kafka consumer API is implemented by Kafka::Consumer class.

The main features of the Kafka::Consumer class are:

  • Provides an object oriented model of communication.

  • Supports parsing the Apache Kafka 0.8 Wire Format protocol.

  • Supports Apache Kafka Requests and Responses (FETCH with no compression codec attribute now). Within this module we currently support access to FETCH, OFFSETS Requests and Responses.

  • Support for working with 64 bit elements of the Kafka Wire Format protocol on 32 bit systems.

The Kafka consumer response has an ARRAY reference type for offsets and fetch methods. For the offsets response array has the offset integers.

For the fetch response the array has the class name Kafka::Message elements.

CONSTRUCTOR

new

Creates new consumer client object. Returns the created Kafka::Consumer object.

An error will cause the program to halt or the constructor will return the Kafka::Consumer object without halt, depending on the value of the RaiseError attribute.

You can use the methods of the Kafka::Consumer class - "last_errorcode" and "last_error" for information about the error.

new() takes arguments in key-value pairs. The following arguments are currently recognized:

Connection => $connect

$connect is the Kafka::Connection object that allow you to communicate to the Apache Kafka cluster.

RaiseError => $mode

Optional, default is 0.

An error will cause the program to halt if "RaiseError" is true: confess if the argument is not valid or die in the other error case (this can always be trapped with eval).

You should always check for errors, when not establishing the RaiseError mode to true.

CorrelationId => $correlation_id

Optional, default is undef.

Correlation is a user-supplied integer. It will be passed back in the response by the server, unmodified. The $correlation_id should be an integer number.

Error is thrown if CorrelationId of request will not match CorrelationId in response.

CorrelationId will be auto-assigned (random negative number) if it was not provided during creation of Kafka::Producer object.

ClientId => $client_id

This is a user supplied identifier (string) for the client application.

ClientId will be auto-assigned if not passed in when creating Kafka::Producer object.

MaxWaitTime => $max_time

The maximum amount of time (ms) to wait when no sufficient data is available at the time the request was issued.

Optional, default is $DEFAULT_MAX_WAIT_TIME.

$DEFAULT_MAX_WAIT_TIME is the default time that can be imported from the Kafka module.

The $max_time must be a positive integer.

MinBytes => $min_bytes

RTFM: The minimum number of bytes of messages that must be available to give a response. If the client sets this to $MIN_BYTES_RESPOND_IMMEDIATELY the server will always respond immediately. If it is set to $MIN_BYTES_RESPOND_HAS_DATA, the server will respond as soon as at least one partition has at least 1 byte of data or the specified timeout occurs. Setting higher values in combination with the bigger timeouts results in reading larger chunks of data.

Optional, default is $MIN_BYTES_RESPOND_IMMEDIATELY.

$MIN_BYTES_RESPOND_IMMEDIATELY, $MIN_BYTES_RESPOND_HAS_DATA are the defaults that can be imported from the Kafka module.

The $min_bytes must be a non-negative integer.

MaxBytes => $max_bytes

The maximum bytes to include in the message set for this partition.

Optional, default = $DEFAULT_MAX_BYTES (1_000_000).

The $max_bytes must be more than $MESSAGE_SIZE_OVERHEAD (size of protocol overhead - data added by protocol for each message).

$DEFAULT_MAX_BYTES, $MESSAGE_SIZE_OVERHEAD are the defaults that can be imported from the Kafka module.

MaxNumberOfOffsets => $max_number

Kafka is return up to $max_number of offsets.

That is a non-negative integer.

Optional, default = $DEFAULT_MAX_NUMBER_OF_OFFSETS (100).

$DEFAULT_MAX_NUMBER_OF_OFFSETS is the default that can be imported from the Kafka module.

METHODS

The following methods are defined for the Kafka::Consumer class:

  • The arguments below offset, max_size or time, max_number are the additional information that might be encoded parameters of the messages we want to access.

The following methods are defined for the Kafka::Consumer class:

fetch( $topic, $partition, $offset, $max_size )

Get a list of messages to consume one by one up $max_size bytes.

Returns the reference to array of the Kafka::Message class name elements. If there's an error, returns the reference to empty array if the RaiseError is not true.

fetch() takes arguments. The following arguments are currently recognized:

$topic

The $topic must be a normal non-false string of non-zero length.

$partition

The $partition must be a non-negative integer.

$offset

Offset in topic and partition to start from (64 bits).

Optional. The argument must be a non-negative integer. The argument may be a Math::BigInt integer on 32 bit system.

$max_size

$max_size is the maximum size of the message set to return. The argument must be a positive integer. The maximum size of a request limited by MAX_SOCKET_REQUEST_BYTES that can be imported from Kafka module.

offsets( $topic, $partition, $time, $max_number )

Get a list of valid offsets up $max_number before the given time.

Returns reference to the offsets response array of the offset integers (Math::BigInt integers on 32 bit system). If there's an error, returns the reference to empty array if the RaiseError is not true.

offsets() takes arguments. The following arguments are currently recognized:

$topic

The $topic must be a normal non-false string of non-zero length.

$partition

The $partition must be a non-negative integer.

$time

Used to ask for all messages before a certain time (ms). $time is the timestamp of the offsets before this time - milliseconds since UNIX Epoch.

The argument must be a positive number. The argument may be a Math::BigInt integer on 32 bit system.

The special values $RECEIVE_LATEST_OFFSET (-1), $RECEIVE_EARLIEST_OFFSETS (-2) are allowed.

$RECEIVE_LATEST_OFFSET, $RECEIVE_EARLIEST_OFFSETS are the defaults that can be imported from the Kafka module.

$max_number

$max_number is the maximum number of offsets to retrieve.

Optional. The argument must be a positive integer.

RaiseError

This method returns current value showing how errors are handled within Kafka module. If set to true, die() is dispatched when error during communication is detected.

last_errorcode and last_error are diagnostic methods and can be used to get detailed error codes and messages for various cases: when server or the resource is not available, access to the resource was denied, etc.

last_errorcode

Returns code of the last error.

last_error

Returns an error message that contains information about the encountered failure.

DIAGNOSTICS

Review documentation of the "RaiseError" method for additional information about possible errors.

It's advised to always check "last_errorcode" and more descriptive "last_error" when "RaiseError" is not set.

Invalid argument

Invalid argument passed to a new constructor or other method.

Can't send

Message can't be sent using Kafka::IO object socket.

Can't recv

Message can't be received using Kafka::IO object socket.

Can't bind

TCP connection can't be established on the given host and port.

Can't get metadata

Failed to obtain metadata from kafka servers

Leader not found

Missing information about server-leader in metadata.

Mismatch CorrelationId

CorrelationId of response doesn't match one in request.

There are no known brokers

Resulting metadata has no information about cluster brokers.

Can't get metadata

Received metadata has incorrect internal structure.

For more detailed error explanation call "last_error" method of Kafka::Consumer object.

SEE ALSO

The basic operation of the Kafka package modules:

Kafka - constants and messages used by the Kafka package modules.

Kafka::Connection - interface to connect to a Kafka cluster.

Kafka::Producer - interface for producing client.

Kafka::Consumer - interface for consuming client.

Kafka::Message - interface to access Kafka message properties.

Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.

Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.

Kafka::IO - low level interface for communication with Kafka server.

Kafka::Internals - Internal constants and functions used by several package modules.

A wealth of detail about the Apache Kafka and the Kafka Protocol:

Main page at http://kafka.apache.org/

Kafka Protocol at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

AUTHOR

Sergey Gladkov, <sgladkov@trackingsoft.com>

CONTRIBUTORS

Alexander Solovey

Jeremy Jordan

Vlad Marchenko

COPYRIGHT AND LICENSE

Copyright (C) 2012-2013 by TrackingSoft LLC.

This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.