NAME
Kafka::Producer - Perl interface for Kafka producer client.
VERSION
This documentation refers to Kafka::Producer version 1.02 .
SYNOPSIS
use 5.010;
use strict;
use warnings;
use Scalar::Util qw(
blessed
);
use Try::Tiny;
use Kafka::Connection;
use Kafka::Producer;
my ( $connection, $producer );
try {
#-- Connection
$connection = Kafka::Connection->new( host => 'localhost' );
#-- Producer
$producer = Kafka::Producer->new( Connection => $connection );
# Sending a single message
my $response = $producer->send(
'mytopic', # topic
0, # partition
'Single message' # message
);
# Sending a series of messages
$response = $producer->send(
'mytopic', # topic
0, # partition
[ # messages
'The first message',
'The second message',
'The third message',
]
);
} catch {
my $error = $_;
if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
warn 'Error: (', $error->code, ') ', $error->message, "\n";
exit;
} else {
die $error;
}
};
# Closes the producer and cleans up
undef $producer;
$connection->close;
undef $connection;
DESCRIPTION
Kafka producer API is implemented by Kafka::Producer class.
The main features of the Kafka::Producer class are:
Provides object-oriented API for producing messages.
Provides Kafka PRODUCE requests.
CONSTRUCTOR
new
Creates new producer client object.
new() takes arguments in key-value pairs. The following arguments are currently recognized:
Connection => $connection-
$connectionis the Kafka::Connection object responsible for communication with the Apache Kafka cluster. ClientId => $client_id-
This is a user supplied identifier (string) for the client application.
If
ClientIdis not passed to constructor, its value will be automatically assigned (to string'producer'). RequiredAcks => $acks-
The
$acksshould be an int16 signed integer.Indicates how many acknowledgements the servers should receive before responding to the request.
If it is
$NOT_SEND_ANY_RESPONSEthe server does not send any response.If it is
$WAIT_WRITTEN_TO_LOCAL_LOG, (default) the server will wait until the data is written to the local log before sending a response.If it is
$BLOCK_UNTIL_IS_COMMITTEDthe server will block until the message is committed by all in sync replicas before sending a response.$NOT_SEND_ANY_RESPONSE,$WAIT_WRITTEN_TO_LOCAL_LOG,$BLOCK_UNTIL_IS_COMMITTEDcan be imported from the Kafka module. Timeout => $timeout-
This provides a maximum time the server can await the receipt of the number of acknowledgements in
RequiredAcks.The
$timeoutin seconds, could be any integer or floating-point type not bigger than int32 positive integer.Optional, default =
$REQUEST_TIMEOUT.$REQUEST_TIMEOUTis the default timeout that can be imported from the Kafka module.
METHODS
The following methods are defined for the Kafka::Producer class:
send( $topic, $partition, $messages, $keys, $compression_codec )
Sends a messages on a Kafka::Connection object.
Returns a non-blank value (a reference to a hash with server response description) if the message is successfully sent.
send() takes the following arguments:
$topic-
The
$topicmust be a normal non-false string of non-zero length. $partition-
The
$partitionmust be a non-negative integer. $messages-
The
$messagesis an arbitrary amount of data (a simple data string or a reference to an array of the data strings). $keys-
The
$keysare optional message keys, for partitioning with each message, so the consumer knows the partitioning key. This argument should be either a single string (common key for all messages), or an array of strings with length matching messages array. $compression_codec-
Optional.
$compression_codecsets the required type of$messagescompression, if the compression is desirable.Supported codecs:
$COMPRESSION_NONE,$COMPRESSION_GZIP,$COMPRESSION_SNAPPY. The defaults that can be imported from the Kafka module.Do not use
$Kafka::SEND_MAX_ATTEMPTSinKafka::Producer-<gtsend> request to prevent duplicates.
DIAGNOSTICS
When error is detected, an exception, represented by object of Kafka::Exception::Producer class, is thrown (see Kafka::Exceptions).
code and a more descriptive message provide information about thrown exception. Consult documentation of the Kafka::Exceptions for the list of all available methods.
Authors suggest using of Try::Tiny's try and catch to handle exceptions while working with Kafka package.
Invalid argument-
Invalid arguments were provided to a
newconstructor or to other method. Cannot send-
Request cannot be sent.
Cannot receive-
Response cannot be received.
Cannot bind-
TCP connection cannot be established on a given host and port.
Cannot get metadata-
IO error is present, errors found in the structure of the reply or the reply contains a non-zero error codes.
Description leader not found-
Information about the server-leader is missing in metadata.
Mismatch CorrelationId-
CorrelationIdof response doesn't match one in request. There are no known brokers-
Information about brokers in the cluster is missing.
Cannot get metadata-
Obtained metadata is incorrect or failed to obtain metadata.
SEE ALSO
The basic operation of the Kafka package modules:
Kafka - constants and messages used by the Kafka package modules.
Kafka::Connection - interface to connect to a Kafka cluster.
Kafka::Producer - interface for producing client.
Kafka::Consumer - interface for consuming client.
Kafka::Message - interface to access Kafka message properties.
Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.
Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.
Kafka::IO - low-level interface for communication with Kafka server.
Kafka::Exceptions - module designated to handle Kafka exceptions.
Kafka::Internals - internal constants and functions used by several package modules.
A wealth of detail about the Apache Kafka and the Kafka Protocol:
Main page at http://kafka.apache.org/
Kafka Protocol at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
SOURCE CODE
Kafka package is hosted on GitHub: https://github.com/TrackingSoft/Kafka
AUTHOR
Sergey Gladkov, <sgladkov@trackingsoft.com>
Please use GitHub project link above to report problems or contact authors.
CONTRIBUTORS
Alexander Solovey
Jeremy Jordan
Sergiy Zuban
Vlad Marchenko
COPYRIGHT AND LICENSE
Copyright (C) 2012-2017 by TrackingSoft LLC.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.