NAME
Kafka::Producer - interface to the 'producer' client.
VERSION
This documentation refers to Kafka::Producer version 0.800_1 .
SYNOPSIS
use 5.010;
use strict;
use warnings;
#-- Connection
use Kafka::Connection;
my $connect = Kafka::Connection->new( host => 'localhost' );
#-- Producer
use Kafka::Producer;
my $producer = Kafka::Producer->new( Connection => $connect );
# Sending a single message
my $response = $producer->send(
'mytopic', # topic
0, # partition
'Single message' # message
);
die 'Error: ('.$producer->last_errorcode.') '.$producer->last_error."\n"
unless $response;
# Sending a series of messages
$response = $producer->send(
'mytopic', # topic
0, # partition
[ # messages
'The first message',
'The second message',
'The third message',
]
);
# Closes the producer and cleans up
undef $producer;
undef $connect;
DESCRIPTION
Kafka producer API is implemented by Kafka::Producer class.
The main features of the Kafka::Producer class are:
Provides an object oriented model of communication.
Supports Apache Kafka 0.8 PRODUCE Requests (with no compression codec attribute now).
CONSTRUCTOR
new
Creates new producer client object. Returns the created Kafka::Producer object.
An error will cause the program to halt or the constructor will return the Kafka::Producer object without halt, depending on the value of the RaiseError attribute.
You can use the methods of the Kafka::Producer class - "last_errorcode" and "last_error" for information about the error.
new() takes arguments in key-value pairs. The following arguments are currently recognized:
Connection => $connect-
$connectis the Kafka::Connection object that allow you to communicate to the Apache Kafka cluster. RaiseError => $mode-
Optional, default = 0 .
An error will cause the program to halt if "RaiseError" is true:
confessif the argument is not valid ordiein the other error case (this can always be trapped witheval).You should always check for errors, when not establishing the
RaiseErrormode to true. CorrelationId => $correlation_id-
Optional, default =
undef.Correlationis a user-supplied integer. It will be passed back in the response by the server, unmodified. The$correlation_idshould be an integer number.The program will cause an error if a
CorrelationIdin request does not match theCorrelationIdreceived in response.If
CorrelationIdis not passed to constructor, its value will be assigned automatically (random negative integer). ClientId => $client_id-
This is a user supplied identifier (string) for the client application.
If
ClientIdis not passed to constructor, its value will be automatically assigned (to string'producer'). RequiredAcks => $acks-
The
$acksshould be an integer number.RTFM: Indicates how many acknowledgements the servers should receive before responding to the request.
If it is
$NOT_SEND_ANY_RESPONSEthe server does not send any response.If it is
$WAIT_WRITTEN_TO_LOCAL_LOG, the server will wait the data is written to the local log before sending a response.If it is
$BLOCK_UNTIL_IS_COMMITTEDthe server will block until the message is committed by all in sync replicas before sending a response.For any number > 1 the server will block waiting for this number of acknowledgements to occur.
$NOT_SEND_ANY_RESPONSE,$WAIT_WRITTEN_TO_LOCAL_LOG,$BLOCK_UNTIL_IS_COMMITTEDcan be imported from the Kafka module. Timeout => $timeout-
This provides a maximum time the server can await the receipt of the number of acknowledgements in RequiredAcks.
The
$timeoutin secs, could be any integer or floating-point type.Optional, default =
$REQUEST_TIMEOUT.$REQUEST_TIMEOUTis the default timeout that can be imported from the Kafka module.
METHODS
The following methods are defined for the Kafka::Producer class:
send( $topic, $partition, $messages )
Sends a messages on a Kafka::Connection object.
Returns a non-blank value (a reference to a hash describing the answer) if the message is successfully sent. If there's an error, returns the undefined value if the RaiseError is not true.
send() takes arguments. The following arguments are currently recognized:
$topic-
The
$topicmust be a normal non-false string of non-zero length. $partition-
The
$partitionmust be a non-negative integer. $messages-
The
$messagesis an arbitrary amount of data (a simple data string or a reference to an array of the data strings). $key-
The
$keyis an optional message key, must be a string.$keymay used in the producer for partitioning with each message, so the consumer knows the partitioning key.
RaiseError
This method returns current value showing how errors are handled within Kafka module. If set to true, die() is dispatched when error during communication is detected.
last_errorcode and last_error are diagnostic methods and can be used to get detailed error codes and messages for various cases: when server or the resource is not available, access to the resource was denied, etc.
last_errorcode
Returns code of the last error.
last_error
Returns an error message that contains information about the encountered failure.
DIAGNOSTICS
Consult documentation of the "RaiseError" method for additional information about possible errors.
It's advised to always check "last_errorcode" and more descriptive "last_error" when "RaiseError" is not set.
Invalid argument-
This means that you didn't give the right argument to a
newconstructor or to other method. Can't send-
This means that the message can't be sent on a Kafka::IO object socket.
Can't recv-
This means that the message can't be received on a Kafka::IO object socket.
Can't bind-
This means that the socket TCP connection can't be established on on given host and port.
Can't get metadata-
This means that the IO error present, errors found in the structure of the reply or the reply contains a non-zero error codes.
Description leader not found-
This means that information about the server-leader in the resulting metadata is missing.
Mismatch CorrelationId-
This means that do not match
CorrelationIdrequest and response. There are no known brokers-
This means that information about brokers in the cluster obtained metadata is missing.
Can't get metadata-
This means that metadata obtained incorrect internal structure.
For more error description, always look at the message from "last_error" method from Kafka::Producer object.
If the reply does not contain zero error codes, the error description can also be seen in the information of the method " last_error".
SEE ALSO
The basic operation of the Kafka package modules:
Kafka - constants and messages used by the Kafka package modules.
Kafka::Connection - interface to connect to a Kafka cluster.
Kafka::Producer - interface for producing client.
Kafka::Consumer - interface for consuming client.
Kafka::Message - interface to access Kafka message properties.
Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.
Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.
Kafka::IO - low level interface for communication with Kafka server.
Kafka::Internals - Internal constants and functions used by several package modules.
A wealth of detail about the Apache Kafka and the Kafka Protocol:
Main page at http://kafka.apache.org/
Kafka Protocol at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
AUTHOR
Sergey Gladkov, <sgladkov@trackingsoft.com>
CONTRIBUTORS
Alexander Solovey
Jeremy Jordan
Vlad Marchenko
COPYRIGHT AND LICENSE
Copyright (C) 2012-2013 by TrackingSoft LLC.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.