NAME
Kafka::Consumer - Perl interface for 'consumer' client.
VERSION
This documentation refers to Kafka::Consumer
version 0.800_1 .
SYNOPSIS
use 5.010;
use strict;
use warnings;
use Kafka qw(
$DEFAULT_MAX_BYTES
$DEFAULT_MAX_NUMBER_OF_OFFSETS
$RECEIVE_EARLIEST_OFFSETS
);
use Kafka::Connection;
use Kafka::Consumer;
#-- Connection
my $connect = Kafka::Connection->new( host => 'localhost' );
#-- Consumer
my $consumer = Kafka::Consumer->new( Connection => $connect );
# Get a list of valid offsets up max_number before the given time
my $offsets = $consumer->offsets(
'mytopic', # topic
0, # partition
$RECEIVE_EARLIEST_OFFSETS, # time
$DEFAULT_MAX_NUMBER_OF_OFFSETS # max_number
);
if( $offsets ) {
say "Received offset: $_" foreach @$offsets;
}
say STDERR 'Error: (', $consumer->last_errorcode, ') ', $consumer->last_error
if !$offsets || $consumer->last_errorcode;
# Consuming messages
my $messages = $consumer->fetch(
'mytopic', # topic
0, # partition
0, # offset
$DEFAULT_MAX_BYTES # Maximum size of MESSAGE(s) to receive
);
if ( $messages ) {
foreach my $message ( @$messages ) {
if( $message->valid ) {
say 'payload : ', $message->payload;
say 'key : ', $message->key;
say 'offset : ', $message->offset;
say 'next_offset: ', $message->next_offset;
}
else {
say 'error : ', $message->error;
}
}
}
# Closes the consumer and cleans up
undef $consumer;
DESCRIPTION
Kafka consumer API is implemented by Kafka::Consumer
class.
The main features of the Kafka::Consumer
class are:
Provides an object oriented model of communication.
Supports parsing the Apache Kafka 0.8 Wire Format protocol.
Supports Apache Kafka Requests and Responses (FETCH with no compression codec attribute now). Within this module we currently support access to FETCH, OFFSETS Requests and Responses.
Support for working with 64 bit elements of the Kafka Wire Format protocol on 32 bit systems.
The Kafka consumer response has an ARRAY reference type for offsets
and fetch
methods. For the offsets
response array has the offset integers.
For the fetch
response the array has the class name Kafka::Message elements.
CONSTRUCTOR
new
Creates new consumer client object. Returns the created Kafka::Consumer
object.
An error will cause the program to halt or the constructor will return the Kafka::Consumer
object without halt, depending on the value of the RaiseError
attribute.
You can use the methods of the Kafka::Consumer
class - "last_errorcode" and "last_error" for information about the error.
new()
takes arguments in key-value pairs. The following arguments are currently recognized:
Connection => $connect
-
$connect
is the Kafka::Connection object that allow you to communicate to the Apache Kafka cluster. RaiseError => $mode
-
Optional, default is 0.
An error will cause the program to halt if "RaiseError" is true:
confess
if the argument is not valid ordie
in the other error case (this can always be trapped witheval
).You should always check for errors, when not establishing the
RaiseError
mode to true. CorrelationId => $correlation_id
-
Optional, default is
undef
.Correlation
is a user-supplied integer. It will be passed back in the response by the server, unmodified. The$correlation_id
should be an integer number.Error is thrown if
CorrelationId
of request will not matchCorrelationId
in response.CorrelationId
will be auto-assigned (random negative number) if it was not provided during creation ofKafka::Producer
object. ClientId => $client_id
-
This is a user supplied identifier (string) for the client application.
ClientId
will be auto-assigned if not passed in when creatingKafka::Producer
object. MaxWaitTime => $max_time
-
The maximum amount of time (ms) to wait when no sufficient data is available at the time the request was issued.
Optional, default is
$DEFAULT_MAX_WAIT_TIME
.$DEFAULT_MAX_WAIT_TIME
is the default time that can be imported from the Kafka module.The
$max_time
must be a positive integer. MinBytes => $min_bytes
-
RTFM: The minimum number of bytes of messages that must be available to give a response. If the client sets this to
$MIN_BYTES_RESPOND_IMMEDIATELY
the server will always respond immediately. If it is set to$MIN_BYTES_RESPOND_HAS_DATA
, the server will respond as soon as at least one partition has at least 1 byte of data or the specified timeout occurs. Setting higher values in combination with the bigger timeouts results in reading larger chunks of data.Optional, default is
$MIN_BYTES_RESPOND_IMMEDIATELY
.$MIN_BYTES_RESPOND_IMMEDIATELY
,$MIN_BYTES_RESPOND_HAS_DATA
are the defaults that can be imported from the Kafka module.The
$min_bytes
must be a non-negative integer. MaxBytes => $max_bytes
-
The maximum bytes to include in the message set for this partition.
Optional, default =
$DEFAULT_MAX_BYTES
(1_000_000).The
$max_bytes
must be more than$MESSAGE_SIZE_OVERHEAD
(size of protocol overhead - data added by protocol for each message).$DEFAULT_MAX_BYTES
,$MESSAGE_SIZE_OVERHEAD
are the defaults that can be imported from the Kafka module. MaxNumberOfOffsets => $max_number
-
Kafka is return up to
$max_number
of offsets.That is a non-negative integer.
Optional, default =
$DEFAULT_MAX_NUMBER_OF_OFFSETS
(100).$DEFAULT_MAX_NUMBER_OF_OFFSETS
is the default that can be imported from the Kafka module.
METHODS
The following methods are defined for the Kafka::Consumer
class:
The arguments below offset, max_size or time, max_number are the additional information that might be encoded parameters of the messages we want to access.
The following methods are defined for the Kafka::Consumer
class:
fetch( $topic, $partition, $offset, $max_size )
Get a list of messages to consume one by one up $max_size
bytes.
Returns the reference to array of the Kafka::Message class name elements. If there's an error, returns the reference to empty array if the RaiseError
is not true.
fetch()
takes arguments. The following arguments are currently recognized:
$topic
-
The
$topic
must be a normal non-false string of non-zero length. $partition
-
The
$partition
must be a non-negative integer. $offset
-
Offset in topic and partition to start from (64 bits).
Optional. The argument must be a non-negative integer. The argument may be a Math::BigInt integer on 32 bit system.
$max_size
-
$max_size
is the maximum size of the message set to return. The argument must be a positive integer. The maximum size of a request limited byMAX_SOCKET_REQUEST_BYTES
that can be imported from Kafka module.
offsets( $topic, $partition, $time, $max_number )
Get a list of valid offsets up $max_number
before the given time.
Returns reference to the offsets response array of the offset integers (Math::BigInt integers on 32 bit system). If there's an error, returns the reference to empty array if the RaiseError
is not true.
offsets()
takes arguments. The following arguments are currently recognized:
$topic
-
The
$topic
must be a normal non-false string of non-zero length. $partition
-
The
$partition
must be a non-negative integer. $time
-
Used to ask for all messages before a certain time (ms).
$time
is the timestamp of the offsets before this time - milliseconds since UNIX Epoch.The argument must be a positive number. The argument may be a Math::BigInt integer on 32 bit system.
The special values
$RECEIVE_LATEST_OFFSET
(-1),$RECEIVE_EARLIEST_OFFSETS
(-2) are allowed.$RECEIVE_LATEST_OFFSET
,$RECEIVE_EARLIEST_OFFSETS
are the defaults that can be imported from the Kafka module. $max_number
-
$max_number
is the maximum number of offsets to retrieve.Optional. The argument must be a positive integer.
RaiseError
This method returns current value showing how errors are handled within Kafka module. If set to true, die() is dispatched when error during communication is detected.
last_errorcode
and last_error
are diagnostic methods and can be used to get detailed error codes and messages for various cases: when server or the resource is not available, access to the resource was denied, etc.
last_errorcode
Returns code of the last error.
last_error
Returns an error message that contains information about the encountered failure.
DIAGNOSTICS
Review documentation of the "RaiseError" method for additional information about possible errors.
It's advised to always check "last_errorcode" and more descriptive "last_error" when "RaiseError" is not set.
Invalid argument
-
Invalid argument passed to a
new
constructor or other method. Can't send
-
Message can't be sent using Kafka::IO object socket.
Can't recv
-
Message can't be received using Kafka::IO object socket.
Can't bind
-
TCP connection can't be established on the given host and port.
Can't get metadata
-
Failed to obtain metadata from kafka servers
Leader not found
-
Missing information about server-leader in metadata.
Mismatch CorrelationId
-
CorrelationId
of response doesn't match one in request. There are no known brokers
-
Resulting metadata has no information about cluster brokers.
Can't get metadata
-
Received metadata has incorrect internal structure.
For more detailed error explanation call "last_error" method of Kafka::Consumer
object.
SEE ALSO
The basic operation of the Kafka package modules:
Kafka - constants and messages used by the Kafka package modules.
Kafka::Connection - interface to connect to a Kafka cluster.
Kafka::Producer - interface for producing client.
Kafka::Consumer - interface for consuming client.
Kafka::Message - interface to access Kafka message properties.
Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.
Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.
Kafka::IO - low level interface for communication with Kafka server.
Kafka::Internals - Internal constants and functions used by several package modules.
A wealth of detail about the Apache Kafka and the Kafka Protocol:
Main page at http://kafka.apache.org/
Kafka Protocol at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
AUTHOR
Sergey Gladkov, <sgladkov@trackingsoft.com>
CONTRIBUTORS
Alexander Solovey
Jeremy Jordan
Vlad Marchenko
COPYRIGHT AND LICENSE
Copyright (C) 2012-2013 by TrackingSoft LLC.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.