NAME

Kafka::Producer - interface to the 'producer' client.

VERSION

This documentation refers to Kafka::Producer version 0.800_1 .

SYNOPSIS

use 5.010;
use strict;
use warnings;

#-- Connection
use Kafka::Connection;

my $connect = Kafka::Connection->new( host => 'localhost' );

#-- Producer
use Kafka::Producer;

my $producer = Kafka::Producer->new( Connection => $connect );

# Sending a single message
my $response = $producer->send(
    'mytopic',          # topic
    0,                  # partition
    'Single message'    # message
);

die 'Error: ('.$producer->last_errorcode.') '.$producer->last_error."\n"
    unless $response;

# Sending a series of messages
$response = $producer->send(
    'mytopic',          # topic
    0,                  # partition
    [                   # messages
        'The first message',
        'The second message',
        'The third message',
    ]
);

# Closes the producer and cleans up
undef $producer;
undef $connect;

DESCRIPTION

Kafka producer API is implemented by Kafka::Producer class.

The main features of the Kafka::Producer class are:

  • Provides an object oriented model of communication.

  • Supports Apache Kafka 0.8 PRODUCE Requests (with no compression codec attribute now).

CONSTRUCTOR

new

Creates new producer client object. Returns the created Kafka::Producer object.

An error will cause the program to halt or the constructor will return the Kafka::Producer object without halt, depending on the value of the RaiseError attribute.

You can use the methods of the Kafka::Producer class - "last_errorcode" and "last_error" for information about the error.

new() takes arguments in key-value pairs. The following arguments are currently recognized:

Connection => $connect

$connect is the Kafka::Connection object that allow you to communicate to the Apache Kafka cluster.

RaiseError => $mode

Optional, default = 0 .

An error will cause the program to halt if "RaiseError" is true: confess if the argument is not valid or die in the other error case (this can always be trapped with eval).

You should always check for errors, when not establishing the RaiseError mode to true.

CorrelationId => $correlation_id

Optional, default = undef .

Correlation is a user-supplied integer. It will be passed back in the response by the server, unmodified. The $correlation_id should be an integer number.

The program will cause an error if a CorrelationId in request does not match the CorrelationId received in response.

If CorrelationId is not passed to constructor, its value will be assigned automatically (random negative integer).

ClientId => $client_id

This is a user supplied identifier (string) for the client application.

If ClientId is not passed to constructor, its value will be automatically assigned (to string 'producer').

RequiredAcks => $acks

The $acks should be an integer number.

RTFM: Indicates how many acknowledgements the servers should receive before responding to the request.

If it is $NOT_SEND_ANY_RESPONSE the server does not send any response.

If it is $WAIT_WRITTEN_TO_LOCAL_LOG, the server will wait the data is written to the local log before sending a response.

If it is $BLOCK_UNTIL_IS_COMMITTED the server will block until the message is committed by all in sync replicas before sending a response.

For any number > 1 the server will block waiting for this number of acknowledgements to occur.

$NOT_SEND_ANY_RESPONSE, $WAIT_WRITTEN_TO_LOCAL_LOG, $BLOCK_UNTIL_IS_COMMITTED can be imported from the Kafka module.

Timeout => $timeout

This provides a maximum time the server can await the receipt of the number of acknowledgements in RequiredAcks.

The $timeout in secs, could be any integer or floating-point type.

Optional, default = $REQUEST_TIMEOUT.

$REQUEST_TIMEOUT is the default timeout that can be imported from the Kafka module.

METHODS

The following methods are defined for the Kafka::Producer class:

send( $topic, $partition, $messages )

Sends a messages on a Kafka::Connection object.

Returns a non-blank value (a reference to a hash describing the answer) if the message is successfully sent. If there's an error, returns the undefined value if the RaiseError is not true.

send() takes arguments. The following arguments are currently recognized:

$topic

The $topic must be a normal non-false string of non-zero length.

$partition

The $partition must be a non-negative integer.

$messages

The $messages is an arbitrary amount of data (a simple data string or a reference to an array of the data strings).

$key

The $key is an optional message key, must be a string. $key may used in the producer for partitioning with each message, so the consumer knows the partitioning key.

RaiseError

This method returns current value showing how errors are handled within Kafka module. If set to true, die() is dispatched when error during communication is detected.

last_errorcode and last_error are diagnostic methods and can be used to get detailed error codes and messages for various cases: when server or the resource is not available, access to the resource was denied, etc.

last_errorcode

Returns code of the last error.

last_error

Returns an error message that contains information about the encountered failure.

DIAGNOSTICS

Consult documentation of the "RaiseError" method for additional information about possible errors.

It's advised to always check "last_errorcode" and more descriptive "last_error" when "RaiseError" is not set.

Invalid argument

This means that you didn't give the right argument to a new constructor or to other method.

Can't send

This means that the message can't be sent on a Kafka::IO object socket.

Can't recv

This means that the message can't be received on a Kafka::IO object socket.

Can't bind

This means that the socket TCP connection can't be established on on given host and port.

Can't get metadata

This means that the IO error present, errors found in the structure of the reply or the reply contains a non-zero error codes.

Description leader not found

This means that information about the server-leader in the resulting metadata is missing.

Mismatch CorrelationId

This means that do not match CorrelationId request and response.

There are no known brokers

This means that information about brokers in the cluster obtained metadata is missing.

Can't get metadata

This means that metadata obtained incorrect internal structure.

For more error description, always look at the message from "last_error" method from Kafka::Producer object.

If the reply does not contain zero error codes, the error description can also be seen in the information of the method " last_error".

SEE ALSO

The basic operation of the Kafka package modules:

Kafka - constants and messages used by the Kafka package modules.

Kafka::Connection - interface to connect to a Kafka cluster.

Kafka::Producer - interface for producing client.

Kafka::Consumer - interface for consuming client.

Kafka::Message - interface to access Kafka message properties.

Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.

Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.

Kafka::IO - low level interface for communication with Kafka server.

Kafka::Internals - Internal constants and functions used by several package modules.

A wealth of detail about the Apache Kafka and the Kafka Protocol:

Main page at http://kafka.apache.org/

Kafka Protocol at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

AUTHOR

Sergey Gladkov, <sgladkov@trackingsoft.com>

CONTRIBUTORS

Alexander Solovey

Jeremy Jordan

Vlad Marchenko

COPYRIGHT AND LICENSE

Copyright (C) 2012-2013 by TrackingSoft LLC.

This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.