NAME
Kafks::Consumer - object interface to the consumer client
VERSION
This documentation refers to Kafka::Consumer
version 0.01
SYNOPSIS
Setting up:
#-- IO
use Kafka qw( KAFKA_SERVER_PORT DEFAULT_TIMEOUT );
use Kafka::IO;
my $io;
$io = Kafka::IO->new(
host => "localhost",
port => KAFKA_SERVER_PORT,
timeout => DEFAULT_TIMEOUT, # Optional,
# default = DEFAULT_TIMEOUT
RaiseError => 0 # Optional, default = 0
);
Consumer:
#-- Consumer
use Kafka::Consumer;
my $consumer = Kafka::Consumer->new(
IO => $io,
RaiseError => 0 # Optional, default = 0
);
# Get a list of valid offsets up max_number before the given time
my $offsets = $consumer->offsets(
"test", # topic
0, # partition
TIMESTAMP_EARLIEST, # time
DEFAULT_MAX_OFFSETS # max_number
);
if( $offsets )
{
foreach my $offset ( @$offsets )
{
print "Received offset: $offset\n";
}
}
if ( !$offsets or $consumer->last_error )
{
print STDERR
"(", $consumer->last_errorcode, ") ",
$consumer->last_error, "\n";
}
# Consuming messages
my $messages = $consumer->fetch(
"test", # topic
0, # partition
0, # offset
DEFAULT_MAX_SIZE # Maximum size of MESSAGE(s) to receive
);
if ( $messages )
{
foreach my $message ( @$messages )
{
if( $message->valid )
{
print "payload : ", $message->payload, "\n";
print "offset : ", $message->offset, "\n";
print "next_offset: ", $message->next_offset, "\n";
}
else
{
print "error : ", $message->error, "\n";
}
}
}
# Closes the consumer and cleans up
$consumer->close;
Use only one Kafka::Consumer
object at the same time.
DESCRIPTION
Kafka consumer API is implemented by Kafka::Consumer
class.
The main features of the Kafka::Consumer
class are:
Provides an object oriented model of communication.
Supports parsing the Apache Kafka Wire Format protocol.
Supports Apache Kafka Requests and Responses (FETCH with no compression codec attribute now). Within this module we currently support access to FETCH Request, OFFSETS Request, FETCH Response, OFFSETS Response.
Support for working with 64 bit elements of the Kafka Wire Format protocol on 32 bit systems.
The Kafka consumer response has an ARRAY reference type for offsets
, and fetch
methods. For the offsets
response array has the offset integers, in descending order.
For the fetch
response array has the class name Kafka::Message elements.
CONSTRUCTOR
new
Creates new consumer client object. Returns the created Kafka::Consumer
object.
An error will cause the program to halt or the constructor will return the undefined value, depending on the value of the RaiseError
attribute. You can use the methods of the Kafka::Consumer
class "last_errorcode" and "last_error" for the information about the error.
new()
takes arguments in key-value pairs. The following arguments are currently recognized:
IO => $io
-
$io
is the Kafka::IO object that allow you to communicate to the Apache Kafka server without using the Apache ZooKeeper service. RaiseError => $mode
-
Optional, default = 0 .
An error will cause the program to halt if
RaiseError
is true:confess
if the argument is not valid ordie
in the other error case (this can always be trapped witheval
).It must be a non-negative integer. That is, a positive integer, or zero.
You should always check for errors, when not establishing the
RaiseError
mode to true.
METHODS
The arguments below offset, max_size or time, max_number are the additional information that might encode parameters of the messages we want to access.
The following methods are defined for the Kafka::Consumer
class:
offsets( $topic, $partition, $time, $max_number )
Get a list of valid offsets up $max_number
before the given time.
Returns the offsets response array of the offset integers, in descending order (Math::BigInt integers on 32 bit system). If there's an error, returns the undefined value if the RaiseError
is not true.
offsets()
takes arguments. The following arguments are currently recognized:
$topic
-
The
$topic
must be a normal non-false string of non-zero length. $partition
-
The
$partition
must be a non-negative integer (of any length). That is, a positive integer, or zero. $time
-
$time
is the timestamp of the offsets before this time - milliseconds since UNIX Epoch.The argument be a positive number. That is, it is defined and Perl thinks it's a number. The argument may be a Math::BigInt integer on 32 bit system.
The special values -1 (latest), -2 (earliest) are allowed.
$max_number
-
$max_number
is the maximum number of offsets to retrieve. The argument must be a positive integer (of any length).
fetch( $topic, $partition, $offset, $max_size )
Get a list of messages to consume one by one up $max_size
bytes.
Returns the reference to array of the Kafka::Message class name elements. If there's an error, returns the undefined value if the RaiseError
is not true.
fetch()
takes arguments. The following arguments are currently recognized:
$topic
-
The
$topic
must be a normal non-false string of non-zero length. $partition
-
The
$partition
must be a non-negative integer (of any length). That is, a positive integer, or zero. $offset
-
Offset in topic and partition to start from (64 bits).
The argument must be a non-negative integer (of any length). That is, a positive integer, or zero. The argument may be a Math::BigInt integer on 32 bit system.
$max_size
-
$max_number
is the maximum size of the message set to return. The argument be a positive integer (of any length). The maximum size of a request limited byMAX_SOCKET_REQUEST_BYTES
that can be imported from Kafka module.
close
The method to close the Kafka::Consumer
object and clean up.
last_errorcode
This method returns an error code that specifies the position of the description in the @Kafka::ERROR
array. Analysing this information can be done to determine the cause of the error.
The server or the resource might not be available, access to the resource might be denied or other things might have failed for some reason.
last_error
This method returns an error message that contains information about the encountered failure. Messages returned from this method may contain additional details and do not comply with the Kafka::ERROR
array.
DIAGNOSTICS
Look at the RaiseError
description for additional information on error handeling.
The methods for the possible error to analyse: "last_errorcode" and more descriptive "last_error".
Mismatch argument
-
This means that you didn't give the right argument to a
new
constructor or to other method. Nothing to receive
-
This means that there are no messages matching your request.
Response contains an error in 'ERROR_CODE'
-
This means that the response to a request contains an error code in the box ERROR_CODE. The error description is available through the method "last_error".
Can't send
-
This means that the request can't be sent on a
Kafka::IO
IO object socket. Can't recv
-
This means that the response can't be received on a
Kafka::IO
IO object socket. - IO errors
-
Look at Kafka::IO DIAGNOSTICS section to obtain information about IO errors.
For more error description, always look at the message from the "last_error" method or from the Kafka::Consumer::last_error
class method.
SEE ALSO
The basic operation of the Kafka package modules:
Kafka - constants and messages used by the Kafka package modules
Kafka::IO - object interface to socket communications with the Apache Kafka server
Kafka::Producer - object interface to the producer client
Kafka::Consumer - object interface to the consumer client
Kafka::Message - object interface to the Kafka message properties
Kafka::Protocol - functions to process messages in the Apache Kafka's wire format
Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems
Kafka::Mock - object interface to the TCP mock server for testing
A wealth of detail about the Apache Kafka and Wire Format:
Main page at http://incubator.apache.org/kafka/
Wire Format at http://cwiki.apache.org/confluence/display/KAFKA/Wire+Format/
Writing a Driver for Kafka at http://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka
AUTHOR
Sergey Gladkov, <sgladkov@trackingsoft.com>
CONTRIBUTORS
Alexander Solovey
Jeremy Jordan
Vlad Marchenko
COPYRIGHT AND LICENSE
Copyright (C) 2012 by TrackingSoft LLC. All rights reserved.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
1 POD Error
The following errors were encountered while parsing the POD:
- Around line 451:
Non-ASCII character seen before =encoding in '-1'. Assuming UTF-8