NAME
Kafka::Connection - Object interface to connect to a kafka cluster.
VERSION
This documentation refers to Kafka::Connection
version 0.8009_1 .
SYNOPSIS
use 5.010;
use strict;
use warnings;
use Scalar::Util qw(
blessed
);
use Try::Tiny;
# A simple example of Kafka::Connection usage:
use Kafka::Connection;
# connect to local cluster with the defaults
my $connection;
try {
$connection = Kafka::Connection->new( host => 'localhost' );
} catch {
if ( blessed( $_ ) && $_->isa( 'Kafka::Exception' ) ) {
warn $_->message, "\n", $_->trace->as_string, "\n";
exit;
} else {
die $_;
}
};
# Closes the connection and cleans up
undef $connection;
DESCRIPTION
The main features of the Kafka::Connection
class are:
Provides API for communication with Kafka 0.8 cluster.
Performs requests encoding and responses decoding, provides automatic selection or promotion of a leader server from Kafka cluster.
Provides information about Kafka cluster.
EXPORT
The following constants are available for export
%RETRY_ON_ERRORS
These are non-fatal errors, which when happen causes refreshing of meta-data from Kafka followed by another attempt to fetch data.
CONSTRUCTOR
new
Creates Kafka::Connection
object for interaction with Kafka cluster. Returns created Kafka::Connection
object.
new()
takes arguments in key-value pairs. The following arguments are currently recognized:
host => $host
-
$host
is any Apache Kafka cluster host to connect to. It can be a hostname or the IP-address in the "xx.xx.xx.xx" form.Optional. Either
host
orbroker_list
must be supplied. port => $port
-
Optional, default =
$KAFKA_SERVER_PORT
.$port
is the attribute denoting the port number of the service we want to access (Apache Kafka service).$port
should be an integer number.$KAFKA_SERVER_PORT
is the default Apache Kafka server port constant (9092
) that can be imported from the Kafka module. broker_list => $broker_list
-
Optional,
$broker_list
is a reference to array of the host:port strings, defining the list of Kafka servers. This list will be used to locate the new leader if the server specified viahost => $host
andport => $port
arguments becomes unavailable. Eitherhost
orbroker_list
must be supplied. timeout => $timeout
-
Optional, default =
$Kafka::REQUEST_TIMEOUT
.$timeout
specifies how long we wait for the remote server to respond.$timeout
is in seconds, could be a positive integer or a floating-point number not bigger than int32 positive integer.Special behavior when
timeout
is set toundef
:
Alarms are not used internally (namely when performing
gethostbyname
).Default
$REQUEST_TIMEOUT
is used for the rest of IO operations.
CorrelationId => $correlation_id
-
Optional, default =
undef
.Correlation
is a user-supplied integer. It will be passed back with the response by the server, unmodified. The$correlation_id
should be an integer number.An exception is thrown if
CorrelationId
in response does not match the one supplied in request.If
CorrelationId
is not provided, it is set to a random negative integer. SEND_MAX_ATTEMPTS => $attempts
-
Optional, int32 signed integer, default =
$Kafka::SEND_MAX_ATTEMPTS
.In some circumstances (leader is temporarily unavailable, outdated metadata, etc) we may fail to send a message. This property specifies the maximum number of attempts to send a message. The
$attempts
should be an integer number. RECEIVE_MAX_ATTEMPTS => $attempts
-
Optional, int32 signed integer, default =
$Kafka::RECEIVE_MAX_ATTEMPTS
.In some circumstances (temporarily network issues, server high load, socket error, etc) we may fail to receive a response. This property specifies the maximum number of attempts to receive a message. The
$attempts
should be an integer number. RETRY_BACKOFF => $backoff
-
Optional, default =
$Kafka::RETRY_BACKOFF
.Since leader election takes a bit of time, this property specifies the amount of time, in milliseconds, that the producer waits before refreshing the metadata. The
$backoff
should be an integer number. AutoCreateTopicsEnable => $mode
-
Optional, default value is 0 (false).
AutoCreateTopicsEnable controls how this module handles the first access to non-existent topic when
auto.create.topics.enable
in server configuration istrue
. If AutoCreateTopicsEnable is false (default), the first access to non-existent topic produces an exception; however, the topic is created and next attempts to access it will succeed.If AutoCreateTopicsEnable is true, this module waits (according to the
SEND_MAX_ATTEMPTS
andRETRY_BACKOFF
properties) until the topic is created, to avoid errors on the first access to non-existent topic.If
auto.create.topics.enable
in server configuration isfalse
, this setting has no effect. MaxLoggedErrors => $number
-
Optional, default value is 100.
Defines maximum number of last non-fatal errors that we keep in log. Use method "nonfatal_errors" to access those errors.
METHODS
The following methods are defined for the Kafka::Producer
class:
get_known_servers
Returns the list of known Kafka servers (in host:port format).
get_metadata( $topic )
If $topic
is present, it must be a non-false string of non-zero length.
If $topic
is absent, this method returns metadata for all topics.
Updates kafka cluster's metadata description and returns the hash reference to metadata, which can be schematically described as:
{
TopicName => {
Partition => {
'Leader' => ...,
'Replicas' => [
...,
],
'Isr' => [
...,
],
},
...,
},
...,
}
Consult Kafka "Wire protocol" documentation for more details about metadata structure.
is_server_known( $server )
Returns true, if $server
(host:port) is known in cluster.
is_server_alive( $server )
Returns true, if known $server
(host:port) is accessible. Checks the accessibility of the server.
is_server_connected( $server )
Returns true, if successful connection is established with $server
(host:port).
receive_response_to_request( $request, $compression_codec )
$request
-
$request
is a reference to the hash representing the structure of the request.This method encodes
$request
, passes it to the leader of cluster, receives reply, decodes and returns it in a form of hash reference.
WARNING:
This method should be considered private and should not be called by an end user.
In order to achieve better performance, this method does not perform arguments validation.
$compression_codec
-
Optional.
$compression_codec
sets the required type of$messages
compression, if the compression is desirable.Supported codecs: $COMPRESSION_NONE, $COMPRESSION_GZIP, $COMPRESSION_SNAPPY.
exists_topic_partition( $topic, $partition )
Returns true if the metadata contains information about specified combination of topic and partition. Otherwise returns false.
exists_topic_partition()
takes the following arguments:
close_connection( $server )
Closes connection with $server
(defined as host:port).
close
Closes connection with all known Kafka servers.
cluster_errors
Returns a reference to a hash.
Each hash key is the identifier of the server (host:port), and the value is the last communication error with that server.
An empty hash is returned if there were no communication errors.
nonfatal_errors
Returns a reference to an array of the last non-fatal errors.
Maximum number of entries is set using MaxLoggedErrors
parameter of constructor.
A reference to the empty array is returned if there were no non-fatal errors or parameter MaxLoggedErrors
is set to 0.
clear_nonfatals
Clears an array of the last non-fatal errors.
A reference to the empty array is returned because there are no non-fatal errors now.
DIAGNOSTICS
When error is detected, an exception, represented by object of Kafka::Exception::Connection class, is thrown (see Kafka::Exceptions).
code and a more descriptive message provide information about exception. Consult documentation of the Kafka::Exceptions for the list of all available methods.
Here is the list of possible error messages that Kafka::Connection
may produce:
Invalid argument
-
Invalid argument was provided to
new
constructor or to other method. Can't send
-
Request cannot be sent to Kafka.
Can't recv
-
Response cannot be received from Kafka.
Can't bind
-
A successful TCP connection can't be established on given host and port.
Can't get metadata
-
Error detected during parsing of response from Kafka.
Leader not found
-
Failed to locate leader of Kafka cluster.
Mismatch CorrelationId
-
Mismatch of
CorrelationId
of request and response. There are no known brokers
-
Failed to locate cluster broker.
Can't get metadata
-
Received meta data is incorrect or missing.
Debug mode
Debug output can be enabled by passing desired level via environment variable using one of the following ways:
PERL_KAFKA_DEBUG=1
- debug is enabled for the whole Kafka package.
PERL_KAFKA_DEBUG=Connection:1
- enable debug for Kafka::Connection
only.
Kafka::Connection
prints to STDERR
information about non-fatal errors, re-connection attempts and such when debug level is set to 1 or higher.
SEE ALSO
The basic operation of the Kafka package modules:
Kafka - constants and messages used by the Kafka package modules.
Kafka::Connection - interface to connect to a Kafka cluster.
Kafka::Producer - interface for producing client.
Kafka::Consumer - interface for consuming client.
Kafka::Message - interface to access Kafka message properties.
Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.
Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.
Kafka::IO - low-level interface for communication with Kafka server.
Kafka::Exceptions - module designated to handle Kafka exceptions.
Kafka::Internals - internal constants and functions used by several package modules.
A wealth of detail about the Apache Kafka and the Kafka Protocol:
Main page at http://kafka.apache.org/
Kafka Protocol at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
SOURCE CODE
Kafka package is hosted on GitHub: https://github.com/TrackingSoft/Kafka
AUTHOR
Sergey Gladkov, <sgladkov@trackingsoft.com>
CONTRIBUTORS
Alexander Solovey
Jeremy Jordan
Sergiy Zuban
Vlad Marchenko
COPYRIGHT AND LICENSE
Copyright (C) 2012-2013 by TrackingSoft LLC.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.