NAME

Kafka::Mock - object interface to the TCP mock server for testing

VERSION

This documentation refers to Kafka::Mock version 0.10

SYNOPSIS

To use the Mock server, the application might be started and finished as below:

use Kafka qw( DEFAULT_TIMEOUT );
use Kafka::Mock;

# Mock server
my $mock = Kafka::Mock->new(
    requests    => \%requests,
    responses   => \%responses,
    timeout     => 0.1  # Optional, secs float
    );

#-- IO
my $io = Kafka::IO->new(
    host        => "localhost",
    port        => $mock->port
    );

# ... the application body

# to kill the mock server process
$mock->close;

Look at the "Sample Data" section for the %requests and %responses sample.

Use only one Kafka::Mock object at the same time.

DESCRIPTION

This Kafka::Mock mock server provides a local TCP server with a set of behaviours for use in testing of Kafka clients with the Kafka::IO object. It is intended to be use when testing the Kafka::Producer and Kafka::Consumer client interfaces. Testing allows you to determine the settings for the Apache Kafka server instance and timeouts for your clients.

Kafka mock server API is implemented by Kafka::Mock class.

The main features of the Kafka::Mock class are:

  • Provides an object oriented model of communication.

  • Simple mock server instance for testing without an Apache Kafka server.

  • Allows you to set and clear the delay in these positions in the reception and transmission of data.

  • Provides diagnostic information on the commands received and fulfilled by the server.

CONSTRUCTOR

new

Creates a local TCP server process. Establishes bidirectional communication to the server using socketpair. The server receives the hash references for the requests and appropriate responses to be returned. Also, the server receives the value of the delay between taking the requests.

Returns the created a Kafka::Mock object, or error will cause the program to halt (confess) if the argument is not a valid.

The Mock server returns to the IO object the response data that corresponds to the request from the %requests (based on the REQUEST_TYPE of the request).

new() takes arguments, these arguments are in key-value pairs. The following arguments are currently recognized:

timeout => $timeout

Optional, default = DEFAULT_TIMEOUT .

DEFAULT_TIMEOUT is the default timeout that can be imported from the Kafka module.

$timeout specifies how much time we give remote server before it goes into the next reception of a request. The $timeout in secs (could be any integer or floating-point type).

requests => \%requests

%requests is the reference to the hash denoting the control bytes strings of the requests.

responses => \%responses

%responses is the reference to the hash denoting the control bytes strings of the responses.

The keys of the %requests, and %responses must comply with the Apache Kafka request types:

  • 0 - PRODUCE

  • 1 - FETCH

  • 2 - MULTIFETCH

  • 3 - MULTIPRODUCE

  • 4 - OFFSETS

The values of the %requests, and %responses should be in the "H*" format for the command pack.

METHODS

The following methods are defined for the Kafka::Mock class:

port

The method returns the Mock server port to use for Kafka::IO object with "localhost" as his host attribute.

last_request

For the last query string of bytes received by the server (when invoked with no arguments). The resulting string can be used for comparison with the control passed to the query.

# received a last request
my $last_request = $mock->last_request;

The following modes are used only for internal testing of the mock server:

# received a report on delays
my $delays = $mock->last_request( "note" );
# report completed delays
my $sleep = $server->last_request( "sleep" );

delay( $mode, $position, $delay )

$mock->delay( "request",  10, 0.5 );
$mock->delay( "response", 10, 0.5 );

To set a delay in the transmission or reception of data server.

To set the required number of delays should perform successive calls to delay.

$mode should be set to "request" or "response" for the job delays in receipt of a request or response in the transmission, respectively.

$position specifies the byte position in the message will be followed by performed delay $delay seconds. $position must be a positive integer. That is, it is defined and Perl thinks it's an integer. For mode "request" argument to $position should be set not less than 6 as the request identifier should be holistic (REQUEST_LENGTH + REQUEST_TYPE).

$delay must be a positive number. That is, it is defined and Perl thinks it's a number.

Error will cause the program to halt (confess) if an argument is not valid.

clear

The method to delete all previously specified delays.

close

The method to kill the mock server process and clean up.

Sample Data

my %requests = (
    0   =>                                      # PRODUCE Request
        # Request Header
         '0000005f'                             # REQUEST_LENGTH
        .'0000'                                 # REQUEST_TYPE
        .'0004'                                 # TOPIC_LENGTH
        .'74657374'                             # TOPIC ("test")
        .'00000000'                             # PARTITION
        # PRODUCE Request
        .'0000004f'                             # MESSAGES_LENGTH
        # MESSAGE
        .'00000016'                             # LENGTH
        .'00'                                   # MAGIC
        .''                                     # COMPRESSION
        .'d94a22be'                             # CHECKSUM
        # "The first message"
        .'546865206669727374206d657373616765'   # PAYLOAD
        # MESSAGE
        .'00000017'                             # LENGTH
        .'00'                                   # MAGIC
        .''                                     # COMPRESSION
        .'a3810845'                             # CHECKSUM
        # "The second message"
        .'546865207365636f6e64206d657373616765' # PAYLOAD
        # MESSAGE
        .'00000016'                             # LENGTH
        .'00'                                   # MAGIC
        .''                                     # COMPRESSION
        .'58611780'                             # CHECKSUM
        # "The third message"
        .'546865207468697264206d657373616765',  # PAYLOAD
    1   =>                                      # FETCH Request
        # Request Header
         '00000018'                             # REQUEST_LENGTH
        .'0001'                                 # REQUEST_TYPE
        .'0004'                                 # TOPIC_LENGTH
        .'74657374'                             # TOPIC ("test")
        .'00000000'                             # PARTITION
        # FETCH Request
        .'0000000000000000'                     # OFFSET
        .'00100000',                            # MAX_SIZE (1MB)
    2   => '',                                  # MULTIFETCH Request
    3   => '',                                  # MULTIPRODUCE Reqst
    4   =>                                      # OFFSETS Request
        # Request Header
         '00000018'                             # REQUEST_LENGTH
        .'0004'                                 # REQUEST_TYPE
        .'0004'                                 # TOPIC_LENGTH
        .'74657374'                             # TOPIC ("test")
        .'00000000'                             # PARTITION
        # OFFSETS Request
        .'fffffffffffffffe'                     # TIME -2: earliest
        .'00000064',                            # MAX NUM OFFSTS 100
    );

my %responses = (
    0   => '',                                  # PRODUCE Response
    1   =>                                      # FETCH Response
        # Response Header
         '00000051'                             # RESPONSE_LENGTH
        .'0000'                                 # ERROR_CODE
        # MESSAGE
        .'00000016'                             # LENGTH
        .'00'                                   # MAGIC
        .''                                     # COMPRESSION
        .'d94a22be'                             # CHECKSUM
        # "The first message"
        .'546865206669727374206d657373616765'   # PAYLOAD
        # MESSAGE
        .'00000017'                             # LENGTH
        .'00'                                   # MAGIC
        .''                                     # COMPRESSION
        .'a3810845'                             # CHECKSUM
        # "The second message"
        .'546865207365636f6e64206d657373616765' # PAYLOAD
        # MESSAGE
        .'00000016'                             # LENGTH
        .'00'                                   # MAGIC
        .''                                     # COMPRESSION
        .'58611780'                             # CHECKSUM
        # "The third message"
        .'546865207468697264206d657373616765',  # PAYLOAD
    2   => '',                                  # MULTIFETCH Respns
    3   => '',                                  # MULTIPROD  Respns
    4   =>                                      # OFFSETS Response
        # Response Header
         '0000000e'                             # RESPONSE_LENGTH
        .'0000'                                 # ERROR_CODE
        # OFFSETS Response
        .'00000001'                             # NUMBER of OFFSETS
        .'0000000000000000'                     # OFFSET
    );

DIAGNOSTICS

Kafka::Mock is not an end user module and any error is FATAL. FATAL errors will cause the program to halt (confess), since the problem is so severe that it would be dangerous to continue. (This can always be trapped with eval. Under the circumstances, dying is the best thing to do).

Mismatch argument

This means that you didn't give the right argument to the new constructor.

Any other errors

When working with the Kafka::Mock module errors may occur are listed in the array @Kafka::Mock::ERROR.

SEE ALSO

The basic operation of the Kafka package modules:

Kafka - constants and messages used by the Kafka package modules

Kafka::IO - object interface to socket communications with the Apache Kafka server

Kafka::Producer - object interface to the producer client

Kafka::Consumer - object interface to the consumer client

Kafka::Message - object interface to the Kafka message properties

Kafka::Protocol - functions to process messages in the Apache Kafka's wire format

Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems

Kafka::Mock - object interface to the TCP mock server for testing

A wealth of detail about the Apache Kafka and Wire Format:

Main page at http://incubator.apache.org/kafka/

Wire Format at http://cwiki.apache.org/confluence/display/KAFKA/Wire+Format/

Writing a Driver for Kafka at http://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka

AUTHOR

Sergey Gladkov, <sgladkov@trackingsoft.com>

CONTRIBUTORS

Alexander Solovey

Jeremy Jordan

Vlad Marchenko

COPYRIGHT AND LICENSE

Copyright (C) 2012-2013 by TrackingSoft LLC. All rights reserved.

This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.