NAME
Kafka::Connection - Object interface to connect to a kafka cluster.
VERSION
This documentation refers to Kafka::Connection version 0.8007 .
SYNOPSIS
use 5.010;
use strict;
use warnings;
use Scalar::Util qw(
blessed
);
use Try::Tiny;
# A simple example of Kafka::Connection usage:
use Kafka::Connection;
# connect to local cluster with the defaults
my $connection;
try {
$connection = Kafka::Connection->new( host => 'localhost' );
} catch {
if ( blessed( $_ ) && $_->isa( 'Kafka::Exception' ) ) {
warn $_->message, "\n", $_->trace->as_string, "\n";
exit;
} else {
die $_;
}
};
# Closes the connection and cleans up
undef $connection;
DESCRIPTION
The main features of the Kafka::Connection class are:
Provides API for communication with Kafka 0.8 cluster.
Performs requests encoding and responses decoding, provides automatic selection or promotion of a leader server from Kafka cluster.
Provides information about Kafka cluster.
EXPORT
The following constants are available for export
%RETRY_ON_ERRORS
These are non-fatal errors, which when happen causes refreshing of meta-data from Kafka followed by another attempt to fetch data.
CONSTRUCTOR
new
Creates Kafka::Connection object for interaction with Kafka cluster. Returns created Kafka::Connection object.
new() takes arguments in key-value pairs. The following arguments are currently recognized:
host => $host-
$hostis any Apache Kafka cluster host to connect to. It can be a hostname or the IP-address in the "xx.xx.xx.xx" form.Optional. Either
hostorbroker_listmust be supplied. port => $port-
Optional, default =
$KAFKA_SERVER_PORT.$portis the attribute denoting the port number of the service we want to access (Apache Kafka service).$portshould be an integer number.$KAFKA_SERVER_PORTis the default Apache Kafka server port constant (9092) that can be imported from the Kafka module. broker_list => $broker_list-
Optional,
$broker_listis a reference to array of the host:port strings, defining the list of Kafka servers. This list will be used to locate the new leader if the server specified viahost => $hostandport => $portarguments becomes unavailable. Eitherhostorbroker_listmust be supplied. timeout => $timeout-
Optional, default =
$Kafka::REQUEST_TIMEOUT.$timeoutspecifies how long we wait for the remote server to respond.$timeoutis in seconds, could be a positive integer or a floating-point number not bigger than int32 positive integer.Special behavior when
timeoutis set toundef:
Alarms are not used internally (namely when performing
gethostbyname).Default
$REQUEST_TIMEOUTis used for the rest of IO operations.
CorrelationId => $correlation_id-
Optional, default =
undef.Correlationis a user-supplied integer. It will be passed back with the response by the server, unmodified. The$correlation_idshould be an integer number.An exception is thrown if
CorrelationIdin response does not match the one supplied in request.If
CorrelationIdis not provided, it is set to a random negative integer. SEND_MAX_RETRIES => $retries-
Optional, int32 signed integer, default =
$Kafka::SEND_MAX_RETRIES.In some circumstances (leader is temporarily unavailable, outdated metadata, etc) we may fail to send a message. This property specifies the maximum number of attempts to send a message. The
$retriesshould be an integer number. RECEIVE_MAX_RETRIES => $retries-
Optional, int32 signed integer, default =
$Kafka::RECEIVE_MAX_RETRIES.In some circumstances (temporarily network issues, server high load, socket error, etc) we may fail to receive a response. This property specifies the maximum number of attempts to receive a message. The
$retriesshould be an integer number. RETRY_BACKOFF => $backoff-
Optional, default =
$Kafka::RETRY_BACKOFF.Since leader election takes a bit of time, this property specifies the amount of time, in milliseconds, that the producer waits before refreshing the metadata. The
$backoffshould be an integer number. AutoCreateTopicsEnable => $mode-
Optional, default value is 0 (false).
AutoCreateTopicsEnable controls how this module handles the first access to non-existent topic when
auto.create.topics.enablein server configuration istrue. If AutoCreateTopicsEnable is false (default), the first access to non-existent topic produces an exception; however, the topic is created and next attempts to access it will succeed.If AutoCreateTopicsEnable is true, this module waits (according to the
SEND_MAX_RETRIESandRETRY_BACKOFFproperties) until the topic is created, to avoid errors on the first access to non-existent topic.If
auto.create.topics.enablein server configuration isfalse, this setting has no effect. MaxLoggedErrors => $number-
Optional, default value is 100.
Defines maximum number of last non-fatal errors that we keep in log. Use method "nonfatal_errors" to access those errors.
METHODS
The following methods are defined for the Kafka::Producer class:
get_known_servers
Returns the list of known Kafka servers (in host:port format).
is_server_known( $server )
Returns true, if $server (host:port) is known in cluster.
is_server_alive( $server )
Returns true, if successful connection is established with $server (host:port).
receive_response_to_request( $request )
$request is a reference to the hash representing the structure of the request.
This method encodes $request, passes it to the leader of cluster, receives reply, decodes and returns it in a form of hash reference.
WARNING:
This method should be considered private and should not be called by an end user.
In order to achieve better performance, this method does not perform arguments validation.
close_connection( $server )
Closes connection with $server (defined as host:port).
close
Closes connection with all known Kafka servers.
cluster_errors
Returns a reference to a hash.
Each hash key is the identifier of the server (host:port), and the value is the last communication error with that server.
An empty hash is returned if there were no communication errors.
nonfatal_errors
Returns a reference to an array of the last non-fatal errors.
Maximum number of entries is set using MaxLoggedErrors parameter of constructor.
A reference to the empty array is returned if there were no non-fatal errors or parameter MaxLoggedErrors is set to 0.
clear_nonfatals
Clears an array of the last non-fatal errors.
A reference to the empty array is returned because there are no non-fatal errors now.
DIAGNOSTICS
When error is detected, an exception, represented by object of Kafka::Exception::Connection class, is thrown (see Kafka::Exceptions).
code and a more descriptive message provide information about exception. Consult documentation of the Kafka::Exceptions for the list of all available methods.
Here is the list of possible error messages that Kafka::Connection may produce:
Invalid argument-
Invalid argument was provided to
newconstructor or to other method. Can't send-
Request cannot be sent to Kafka.
Can't recv-
Response cannot be received from Kafka.
Can't bind-
A successful TCP connection can't be established on given host and port.
Can't get metadata-
Error detected during parsing of response from Kafka.
Leader not found-
Failed to locate leader of Kafka cluster.
Mismatch CorrelationId-
Mismatch of
CorrelationIdof request and response. There are no known brokers-
Failed to locate cluster broker.
Can't get metadata-
Received meta data is incorrect or missing.
Debug mode
Debug output can be enabled by passing desired level via environment variable using one of the following ways:
PERL_KAFKA_DEBUG=1 - debug is enabled for the whole Kafka package.
PERL_KAFKA_DEBUG=Connection:1 - enable debug for Kafka::Connection only.
Kafka::Connection prints to STDERR information about non-fatal errors, re-connection attempts and such when debug level is set to 1 or higher.
SEE ALSO
The basic operation of the Kafka package modules:
Kafka - constants and messages used by the Kafka package modules.
Kafka::Connection - interface to connect to a Kafka cluster.
Kafka::Producer - interface for producing client.
Kafka::Consumer - interface for consuming client.
Kafka::Message - interface to access Kafka message properties.
Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.
Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.
Kafka::IO - low-level interface for communication with Kafka server.
Kafka::Exceptions - module designated to handle Kafka exceptions.
Kafka::Internals - internal constants and functions used by several package modules.
A wealth of detail about the Apache Kafka and the Kafka Protocol:
Main page at http://kafka.apache.org/
Kafka Protocol at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
AUTHOR
Sergey Gladkov, <sgladkov@trackingsoft.com>
CONTRIBUTORS
Alexander Solovey
Jeremy Jordan
Sergiy Zuban
Vlad Marchenko
COPYRIGHT AND LICENSE
Copyright (C) 2012-2013 by TrackingSoft LLC.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.