NAME
Kafka::Producer - Perl interface for Kafka producer client.
VERSION
This documentation refers to Kafka::Producer
version 1.08 .
SYNOPSIS
use 5.010;
use strict;
use warnings;
use Scalar::Util qw(
blessed
);
use Try::Tiny;
use Kafka::Connection;
use Kafka::Producer;
my ( $connection, $producer );
try {
#-- Connection
$connection = Kafka::Connection->new( host => 'localhost' );
#-- Producer
$producer = Kafka::Producer->new( Connection => $connection );
# Sending a single message
my $response = $producer->send(
'mytopic', # topic
0, # partition
'Single message' # message
);
# Sending a series of messages
$response = $producer->send(
'mytopic', # topic
0, # partition
[ # messages
'The first message',
'The second message',
'The third message',
]
);
} catch {
my $error = $_;
if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
warn 'Error: (', $error->code, ') ', $error->message, "\n";
exit;
} else {
die $error;
}
};
# Closes the producer and cleans up
undef $producer;
$connection->close;
undef $connection;
DESCRIPTION
Kafka producer API is implemented by Kafka::Producer
class.
The main features of the Kafka::Producer
class are:
Provides object-oriented API for producing messages.
Provides Kafka PRODUCE requests.
CONSTRUCTOR
new
Creates new producer client object.
new()
takes arguments in key-value pairs. The following arguments are currently recognized:
Connection => $connection
-
$connection
is the Kafka::Connection object responsible for communication with the Apache Kafka cluster. ClientId => $client_id
-
This is a user supplied identifier (string) for the client application.
If
ClientId
is not passed to constructor, its value will be automatically assigned (to string'producer'
). RequiredAcks => $acks
-
The
$acks
should be an int16 signed integer.Indicates how many acknowledgements the servers should receive before responding to the request.
If it is
$NOT_SEND_ANY_RESPONSE
the server does not send any response.If it is
$WAIT_WRITTEN_TO_LOCAL_LOG
, (default) the server will wait until the data is written to the local log before sending a response.If it is
$BLOCK_UNTIL_IS_COMMITTED
the server will block until the message is committed by all in sync replicas before sending a response.$NOT_SEND_ANY_RESPONSE
,$WAIT_WRITTEN_TO_LOCAL_LOG
,$BLOCK_UNTIL_IS_COMMITTED
can be imported from the Kafka module. Timeout => $timeout
-
This provides a maximum time the server can await the receipt of the number of acknowledgements in
RequiredAcks
.The
$timeout
in seconds, could be any integer or floating-point type not bigger than int32 positive integer.Optional, default =
$REQUEST_TIMEOUT
.$REQUEST_TIMEOUT
is the default timeout that can be imported from the Kafka module.
METHODS
The following methods are defined for the Kafka::Producer
class:
send( $topic, $partition, $messages, $keys, $compression_codec )
Sends a messages on a Kafka::Connection object.
Returns a non-blank value (a reference to a hash with server response description) if the message is successfully sent.
send()
takes the following arguments:
$topic
-
The
$topic
must be a normal non-false string of non-zero length. $partition
-
The
$partition
must be a non-negative integer. $messages
-
The
$messages
is an arbitrary amount of data (a simple data string or a reference to an array of the data strings). $keys
-
The
$keys
are optional message keys, for partitioning with each message, so the consumer knows the partitioning key. This argument should be either a single string (common key for all messages), or an array of strings with length matching messages array. $compression_codec
-
Optional.
$compression_codec
sets the required type of$messages
compression, if the compression is desirable.Supported codecs:
$COMPRESSION_NONE
,$COMPRESSION_GZIP
,$COMPRESSION_SNAPPY
,$COMPRESSION_LZ4
. The defaults that can be imported from the Kafka module. $timestamps
-
Optional.
This is the timestamps of the
$messages
.This argument should be either a single number (common timestamp for all messages), or an array of integers with length matching messages array.
Unit is milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC)).
WARNING: timestamps supported since Kafka 0.10.0.
Do not use
$Kafka::SEND_MAX_ATTEMPTS
inKafka::Producer-<gt
send> request to prevent duplicates.
DIAGNOSTICS
When error is detected, an exception, represented by object of Kafka::Exception::Producer
class, is thrown (see Kafka::Exceptions).
code and a more descriptive message provide information about thrown exception. Consult documentation of the Kafka::Exceptions for the list of all available methods.
Authors suggest using of Try::Tiny's try
and catch
to handle exceptions while working with Kafka package.
Invalid argument
-
Invalid arguments were provided to a
new
constructor or to other method. Cannot send
-
Request cannot be sent.
Cannot receive
-
Response cannot be received.
Cannot bind
-
TCP connection cannot be established on a given host and port.
Cannot get metadata
-
IO error is present, errors found in the structure of the reply or the reply contains a non-zero error codes.
Description leader not found
-
Information about the server-leader is missing in metadata.
Mismatch CorrelationId
-
CorrelationId
of response doesn't match one in request. There are no known brokers
-
Information about brokers in the cluster is missing.
Cannot get metadata
-
Obtained metadata is incorrect or failed to obtain metadata.
SEE ALSO
The basic operation of the Kafka package modules:
Kafka - constants and messages used by the Kafka package modules.
Kafka::Connection - interface to connect to a Kafka cluster.
Kafka::Producer - interface for producing client.
Kafka::Consumer - interface for consuming client.
Kafka::Message - interface to access Kafka message properties.
Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.
Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.
Kafka::IO - low-level interface for communication with Kafka server.
Kafka::Exceptions - module designated to handle Kafka exceptions.
Kafka::Internals - internal constants and functions used by several package modules.
A wealth of detail about the Apache Kafka and the Kafka Protocol:
Main page at http://kafka.apache.org/
Kafka Protocol at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
SOURCE CODE
Kafka package is hosted on GitHub: https://github.com/TrackingSoft/Kafka
AUTHOR
Sergey Gladkov
Please use GitHub project link above to report problems or contact authors.
CONTRIBUTORS
Alexander Solovey
Jeremy Jordan
Sergiy Zuban
Vlad Marchenko
COPYRIGHT AND LICENSE
Copyright (C) 2012-2017 by TrackingSoft LLC.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.