NAME
Kafka::Consumer - Perl interface for Kafka consumer client.
VERSION
This documentation refers to Kafka::Consumer
version 1.08 .
SYNOPSIS
use 5.010;
use strict;
use warnings;
use Scalar::Util qw(
blessed
);
use Try::Tiny;
use Kafka qw(
$DEFAULT_MAX_BYTES
$DEFAULT_MAX_NUMBER_OF_OFFSETS
$RECEIVE_EARLIEST_OFFSET
);
use Kafka::Connection;
use Kafka::Consumer;
my ( $connection, $consumer );
try {
#-- Connection
$connection = Kafka::Connection->new( host => 'localhost' );
#-- Consumer
$consumer = Kafka::Consumer->new( Connection => $connection );
# Get a valid offset before the given time
my $offsets = $consumer->offset_before_time(
'mytopic', # topic
0, # partition
(time()-3600) * 1000, # time
);
if ( @$offsets ) {
say "Received offset: $_" foreach @$offsets;
} else {
warn "Error: Offsets are not received\n";
}
# Consuming messages
my $messages = $consumer->fetch(
'mytopic', # topic
0, # partition
0, # offset
$DEFAULT_MAX_BYTES # Maximum size of MESSAGE(s) to receive
);
if ( $messages ) {
foreach my $message ( @$messages ) {
if ( $message->valid ) {
say 'payload : ', $message->payload;
say 'key : ', $message->key;
say 'offset : ', $message->offset;
say 'next_offset: ', $message->next_offset;
} else {
say 'error : ', $message->error;
}
}
}
} catch {
my $error = $_;
if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
warn 'Error: (', $error->code, ') ', $error->message, "\n";
exit;
} else {
die $error;
}
};
# Closes the consumer and cleans up
undef $consumer;
$connection->close;
undef $connection;
DESCRIPTION
Kafka consumer API is implemented by Kafka::Consumer
class.
The main features of the Kafka::Consumer
class are:
Provides an object-oriented API for consuming messages.
Provides Kafka FETCH and OFFSETS requests.
Supports parsing the Apache Kafka 0.9+ Wire Format protocol.
Works with 64-bit elements of the Kafka Wire Format protocol on 32 bit systems.
The Kafka consumer response returns ARRAY references for offsets
and fetch
methods.
Array returned by offsets
contains offset integers.
Array returned by fetch
contains objects of Kafka::Message class.
CONSTRUCTOR
new
Creates a new consumer client object. Returns the created Kafka::Consumer
object.
new()
takes arguments in key-value pairs. The following arguments are recognized:
Connection => $connection
-
$connection
is the Kafka::Connection object responsible for communication with the Apache Kafka cluster. ClientId => $client_id
-
This is a user supplied identifier (string) for the client application.
ClientId
will be auto-assigned if not passed in when creating Kafka::Producer object. MaxWaitTime => $max_time
-
The maximum amount of time (seconds, may be fractional) to wait when no sufficient data is available at the time the request was issued.
Optional, default is
$DEFAULT_MAX_WAIT_TIME
.$DEFAULT_MAX_WAIT_TIME
is the default time that can be imported from the Kafka module.The
$max_time
must be a positive number. MinBytes => $min_bytes
-
The minimum number of bytes of messages that must be available to give a response. If the client sets this to
$MIN_BYTES_RESPOND_IMMEDIATELY
the server will always respond immediately. If it is set to$MIN_BYTES_RESPOND_HAS_DATA
, the server will respond as soon as at least one partition has at least 1 byte of data or the specified timeout occurs. Setting higher values in combination with the bigger timeouts allows reading larger chunks of data.Optional, int32 signed integer, default is
$MIN_BYTES_RESPOND_IMMEDIATELY
.$MIN_BYTES_RESPOND_IMMEDIATELY
,$MIN_BYTES_RESPOND_HAS_DATA
are the defaults that can be imported from the Kafka module.The
$min_bytes
must be a non-negative int32 signed integer. MaxBytes => $max_bytes
-
The maximum bytes to include in the message set for this partition.
Optional, int32 signed integer, default =
$DEFAULT_MAX_BYTES
(1_000_000).The
$max_bytes
must be more than$MESSAGE_SIZE_OVERHEAD
(size of protocol overhead - data added by Kafka wire protocol to each message).$DEFAULT_MAX_BYTES
,$MESSAGE_SIZE_OVERHEAD
are the defaults that can be imported from the Kafka module. MaxNumberOfOffsets => $max_number
-
Limit the number of offsets returned by Kafka.
That is a non-negative integer.
Optional, int32 signed integer, default =
$DEFAULT_MAX_NUMBER_OF_OFFSETS
(100).$DEFAULT_MAX_NUMBER_OF_OFFSETS
is the default that can be imported from the Kafka module.
METHODS
The following methods are defined for the Kafka::Consumer
class:
fetch( $topic, $partition, $start_offset, $max_size )
Get a list of messages to consume one by one up to $max_size
bytes.
Returns the reference to array of the Kafka::Message objects.
fetch()
takes the following arguments:
$topic
-
The
$topic
must be a normal non-false string of non-zero length. $partition
-
The
$partition
must be a non-negative integer. $start_offset
-
Offset in topic and partition to start from (64-bit integer).
The argument must be a non-negative integer. The argument may be a Math::BigInt integer on 32-bit system.
$max_size
-
$max_size
is the maximum size of the messages set to return. The argument must be a positive int32 signed integer.The maximum size of a request limited by
MAX_SOCKET_REQUEST_BYTES
that can be imported from Kafka module.
offset_at_time( $topic, $partition, $time )
Returns an offset, given a topic, partition and time.
The returned offset is the earliest offset whose timestamp is greater than or equal to the given timestamp. The return value is a HashRef, containing timestamp
and offset
keys.
WARNING: this method requires Kafka 0.10.0, and messages with timestamps. Check the configuration of the brokers or topic, specifically message.timestamp.type
, and set it either to LogAppentTime
to have Kafka automatically set messages timestamps based on the broker clock, or CreateTime
, in which case the client populating your topic has to set the timestamps when producing messages.
offset_at_time()
takes the following arguments:
$topic
-
The
$topics
must be a normal non-false strings of non-zero length. $partition
-
The
$partitions
must be a non-negative integers. $time
-
Get offsets before the given time (in milliseconds since UNIX Epoch).
The argument must be a positive number.
The argument may be a Math::BigInt integer on 32 bit system.
offset_before_time( $topic, $partition, $time )
Returns an offset, given a topic, partition and time.
The returned offset is an offset whose timestamp is guaranteed to be earlier than the given timestamp. The return value is a Number
This method works with all version of Kafka, and doesn't require messages with timestamps.
offset_before_time()
takes the following arguments:
$topic
-
The
$topics
must be a normal non-false strings of non-zero length. $partition
-
The
$partitions
must be a non-negative integers. $time
-
Get offsets before the given time (in milliseconds since UNIX Epoch).
The argument must be a positive number.
The argument may be a Math::BigInt integer on 32 bit system.
offset_earliest( $topic, $partition )
Returns the earliest offset for a given topic and partition
offset_earliest()
takes the following arguments:
$topic
-
The
$topics
must be a normal non-false strings of non-zero length. $partition
-
The
$partitions
must be a non-negative integers.
offset_latest( $topic, $partition )
Returns the latest offset for a given topic and partition
offset_latest()
takes the following arguments:
$topic
-
The
$topics
must be a normal non-false strings of non-zero length. $partition
-
The
$partitions
must be a non-negative integers.
offsets( $topic, $partition, $time, $max_number )
WARNING: This method is DEPRECATED, please use one of offset_at_time()
, offset_before_time()
, offset_earliest()
, offset_latest()
. It is kept for backward compatibility.
Returns an ArrayRef of offsets
offsets()
takes the following arguments:
$topic
-
The
$topics
must be a normal non-false strings of non-zero length. $partition
-
The
$partitions
must be a non-negative integers. $time
-
Get offsets before the given time (in milliseconds since UNIX Epoch). It must be a positive number. It may be a Math::BigInt integer on 32 bit system.
The special values
$RECEIVE_LATEST_OFFSETS
(-1),$RECEIVE_EARLIEST_OFFSET
(-2) are allowed. They can be imported from the Kafka module. $max_number
-
Maximum number of offsets to be returned
commit_offsets( $topic, $partition, $offset, $group )
Commit offsets using the offset commit/fetch API.
Returns a non-blank value (a reference to a hash with server response description) if the message is successfully sent.
commit_offsets()
takes the following arguments:
$topic
-
The
$topic
must be a normal non-false string of non-zero length. $partition
-
The
$partition
must be a non-negative integer. $offset
-
Offset in topic and partition to commit.
The argument must be a positive number.
The argument may be a Math::BigInt integer on 32 bit system.
$group
-
The name of the consumer group
The argument must be a normal non-false string of non-zero length.
fetch_offsets( $topic, $partition, $group )
Fetch Committed offsets using the offset commit/fetch API.
Returns a non-blank value (a reference to a hash with server response description) if the message is successfully sent.
fetch_offsets()
takes the following arguments:
$topic
-
The
$topic
must be a normal non-false string of non-zero length. $partition
-
The
$partition
must be a non-negative integer. $group
-
The name of the consumer group
The argument must be a normal non-false string of non-zero length.
DIAGNOSTICS
When error is detected, an exception, represented by object of Kafka::Exception::Consumer
class, is thrown (see Kafka::Exceptions).
code and a more descriptive message provide information about thrown exception. Consult documentation of the Kafka::Exceptions for the list of all available methods.
Authors suggest using of Try::Tiny's try
and catch
to handle exceptions while working with Kafka package.
Invalid argument
-
Invalid argument passed to a
new
constructor or other method. Cannot send
-
Request cannot be sent.
Cannot receive
-
Response cannot be received.
Cannott bind
-
TCP connection can't be established on the given host and port.
Cannot get metadata
-
Failed to obtain metadata from Kafka servers.
Leader not found
-
Missing information about server-leader in metadata.
Mismatch CorrelationId
-
CorrelationId
of response doesn't match one in request. There are no known brokers
-
Resulting metadata has no information about cluster brokers.
Cannot get metadata
-
Received metadata has incorrect internal structure.
SEE ALSO
The basic operation of the Kafka package modules:
Kafka - constants and messages used by the Kafka package modules.
Kafka::Connection - interface to connect to a Kafka cluster.
Kafka::Producer - interface for producing client.
Kafka::Consumer - interface for consuming client.
Kafka::Message - interface to access Kafka message properties.
Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.
Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.
Kafka::IO - low-level interface for communication with Kafka server.
Kafka::Exceptions - module designated to handle Kafka exceptions.
Kafka::Internals - internal constants and functions used by several package modules.
A wealth of detail about the Apache Kafka and the Kafka Protocol:
Main page at http://kafka.apache.org/
Kafka Protocol at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
SOURCE CODE
Kafka package is hosted on GitHub: https://github.com/TrackingSoft/Kafka
AUTHOR
Sergey Gladkov
Please use GitHub project link above to report problems or contact authors.
CONTRIBUTORS
Alexander Solovey
Jeremy Jordan
Sergiy Zuban
Vlad Marchenko
Damien Krotkine
Greg Franklin
COPYRIGHT AND LICENSE
Copyright (C) 2012-2017 by TrackingSoft LLC.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.