NAME

Net::STOMP::Client - STOMP object oriented client module

SYNOPSIS

#
# simple producer
#

use Net::STOMP::Client;

$stomp = Net::STOMP::Client->new(host => "127.0.0.1", port => 61613);
$stomp->connect(login => "guest", passcode => "guest");
$stomp->send(destination => "/queue/test", body => "hello world!");
$stomp->disconnect();

#
# consumer with client side acknowledgment
#

use Net::STOMP::Client;

$stomp = Net::STOMP::Client->new(host => "127.0.0.1", port => 61613);
$stomp->connect(login => "guest", passcode => "guest");
# declare a callback to be called for each received message frame
$stomp->message_callback(sub {
    my($self, $frame) = @_;

    $self->ack(frame => $frame);
    printf("received: %s\n", $frame->body());
    return($self);
});
# subscribe to the given queue
$stomp->subscribe(
    destination => "/queue/test",
    id          => "testsub",          # required in STOMP 1.1
    ack         => "client",           # client side acknowledgment
);
# wait for a specified message frame
$stomp->wait_for_frames(callback => sub {
    my($self, $frame) = @_;

    if ($frame->command() eq "MESSAGE") {
        # stop waiting for new frames if body is "quit"
        return(1) if $frame->body() eq "quit";
    }
    # continue to wait for more frames
    return(0);
});
$stomp->unsubscribe(id => "testsub");
$stomp->disconnect();

DESCRIPTION

This module provides an object oriented client interface to interact with servers supporting STOMP (Streaming Text Orientated Messaging Protocol). It supports the major features of modern messaging brokers: SSL, asynchronous I/O, receipts and transactions.

CONSTRUCTOR

The new() method can be used to create a Net::STOMP::Client object that will later be used to interact with a server. The following attributes are supported:

version

the STOMP version to use (string) or versions to use (reference to a list of strings); this defaults to the list of all supported versions

uri

the Uniform Resource Identifier (URI) specifying where the STOMP service is and how to connect to it, this can be for instance tcp://msg01:6163 or something more complex, see the "FAILOVER" section for more information

host

the server name or IP address

port

the port number of the STOMP service

timeout

the maximum time (in seconds) for various operations, see the "TIMEOUTS" section for more information

sockopts

arbitrary socket options (as a hash reference) that will be passed to IO::Socket::INET->new() or IO::Socket::SSL->new()

callbacks

a hash of code references that will be called on each received frame

client_heart_beat

the desired client-side heart-beat setting, see the "HEART-BEATING" section for more information

server_heart_beat

the desired server-side heart-beat setting, see the "HEART-BEATING" section for more information

Upon object creation, a TCP connection is made to the server but no data (i.e. STOMP frame) is exchanged.

FAILOVER

The uri option of the new() method can be given a complex URI indicating some kind of failover, for instance: failover:(tcp://msg01:6163,tcp://msg02:6163).

This must use the ActiveMQ syntax (see http://activemq.apache.org/failover-transport-reference.html) and only some options are supported, namely: backOffMultiplier, initialReconnectDelay, maxReconnectAttempts, maxReconnectDelay, randomize and useExponentialBackOff.

When specified, these failover options will be used only inside the new() method (so at the TCP connection level) and not elsewhere. If the broker later fails during the STOMP interaction, it is up to the program author, knowing the logic of his code, to perform the appropriate recovery actions and eventually reconnect, using again the new() method.

SSL

When creating an object with Net::STOMP::Client->new(), if you supply some socket options (via sockopts) with a name starting with SSL_, or if you supply a URI (via uri) with a scheme containg ssl, IO::Socket::SSL will be used to create the socket instead of IO::Socket::INET and the communication with the server will then go through SSL.

Here are the most commonly used SSL socket options:

SSL_ca_path

path to a directory containing several trusted certificates as separate files as well as an index of the certificates

SSL_key_file

path of your RSA private key

SSL_cert_file

path of your certificate

SSL_passwd_cb

subroutine that should return the password required to decrypt your private key

For more information, see IO::Socket::SSL.

TIMEOUTS

By default, when sending STOMP frames, the module waits until the frame indeed has been sent (from the socket point of view). In case the server is stuck or unusable, the module can therefore hang.

When creating the Net::STOMP::Client object, you can pass a timeout attribute to better control how certain operations handle timeouts.

This attribute should contain a reference to hash with the following keys:

connect

TCP-level timeout that will be given to the underlying IO::Socket::INET or IO::Socket::SSL object (default: none)

connected

timeout used while waiting for the initial CONNECTED frame from the broker (default: 10)

receive

timeout used while trying to receive any frame (default: none)

send

timeout used while trying to send any frame (default: none)

All values are in seconds. No timeout means wait until the operation succeeds.

As a shortcut, the timeout attribute can also be a scalar. In this case, only the connect and connected operations use this value.

STOMP METHODS

With a Net::STOMP::Client object, the following methods can be used to interact with the server. They match one-to-one the different commands that a client frame can hold:

connect()

connect to server

disconnect()

disconnect from server

subscribe()

subscribe to something

unsubscribe()

unsubscribe from something

send()

send a message somewhere

ack()

acknowledge the reception of a message

nack()

acknowledge the rejection of a message (STOMP 1.1 only)

begin()

begin/start a transaction

commit()

commit a transaction

abort()

abort/rollback a transaction

All these methods can receive options that will be passed directly as frame headers. For instance:

$stomp->subscribe(
    destination => "/queue/test",
    id          => "testsub",
    ack         => "client",
);

Some methods also support other options:

send()

body or body_reference: holds the body or body reference of the message to be sent

ack()

frame: holds the MESSAGE frame object to ACK

nack()

frame: holds the MESSAGE frame object to NACK

Finally, all methods support a timeout option that will be given to the send_frame() method called internally to send the crafted frame.

OTHER METHODS

In addition to the STOMP methods, the following ones are also available:

new(OPTIONS)

return a new Net::STOMP::Client object

peer()

return a Net::STOMP::Client::Peer object containing information about the connected STOMP server

socket()

return the file handle of the socket connecting the client and the server

server()

return the server header seen on the CONNECTED frame or the empty string if not seen

session()

return the session identifier if connected or the empty string otherwise

version()

return the STOMP version that has been negotiated between the client and the server or undef if the negotiation did not take place yet

uuid()

return a universal pseudo-unique identifier to be used for instance in receipts and transactions

receipts()

return the list of not-yet-received receipts, see the "RECEIPTS" section for more information

wait_for_frames()

wait for frames coming from the server, see the next section for more information

wait_for_receipts()

wait for all receipts to be received, using wait_for_frames() underneath

noop([timeout => TIMEOUT])

send an empty/noop frame i.e. a single newline byte, using send_frame() underneath

CALLBACKS

Since STOMP is asynchronous (for instance, MESSAGE frames could be sent by the server at any time), Net::STOMP::Client uses callbacks to handle frames. There are in fact two levels of callbacks.

First, there are per-command callbacks that will be called each time a frame is handled (via the internal method dispatch_frame()). Net::STOMP::Client implements default callbacks that should be sufficient for all frames except MESSAGE frames, which should really be handled by the coder. These callbacks should return undef on error, something else on success.

Here is an example with a callback counting the messages received:

$stomp->message_callback(sub {
    my($self, $frame) = @_;

    $MessageCount++;
    return($self);
});

Here are the methods that can be used to get or set these per-command callbacks:

connected_callback([SUBREF])
error_callback([SUBREF])
message_callback([SUBREF])
receipt_callback([SUBREF])

These callbacks are somehow global and it is good practice not to change them during a session. If you do not need a global message callback, you can supply the dummy:

$stomp->message_callback(sub { return(1) });

Then, the wait_for_frames() method takes an optional callback argument holding some code to be called for each received frame, after the per-command callback has been called. This can be seen as a local callback, only valid for the call to wait_for_frames(). This callback must return undef on error, false if more frames are expected or true if wait_for_frames() can now stop waiting for new frames and return.

Here are all the options that can be given to wait_for_frames():

callback

code to be called for each received frame (see above)

timeout

time to wait before giving up, undef means wait forever, this is the default

once

wait only for one frame, within the given timeout

The return value of wait_for_frames() can be: undef in case of error, false if no suitable frame has been received, the received frame if there is no user callback or the user callback return value otherwise.

RECEIPTS

Net::STOMP::Client has built-in support for receipts.

Each time a frame is sent, its receipt header (if supplied) is remembered.

Each time a RECEIPT frame is received from the server, the corresponding receipt is ticked off.

The receipts() method can be used to get the list of outstanding receipts.

The wait_for_receipts() method can be used to wait for all missing receipts.

Here is sample code to send two messages with receipts and then wait for both acknowledgments to come back from the server within ten seconds:

$stomp->send(
    destination => "/queue/test1",
    body        => "message 1",
    receipt     => $stomp->uuid(),
);
$stomp->send(
    destination => "/queue/test2",
    body        => "message 2",
    receipt     => $stomp->uuid(),
);
$stomp->wait_for_receipts(timeout => 10);
die("Not all receipts received!\n") if $stomp->receipts();

TRANSACTIONS

Here is an example using transactions:

# create a unique transaction id
$tid = $stomp->uuid();
# begin the transaction
$stomp->begin(transaction => $tid);
# send two messages as part of this transaction
$stomp->send(
    destination => "/queue/test1",
    body        => "message 1",
    transaction => $tid,
);
$stomp->send(
    destination => "/queue/test2",
    body        => "message 2",
    transaction => $tid,
);
# commit the transaction
$stomp->commit(transaction => $tid);

HEART-BEATING

STOMP 1.1 defines how each end of a STOMP connection can check if the other end is alive. To support heart-beating, this module provides the following methods:

last_received()

return the time at which data was last received, i.e. read from the network socket

last_sent()

return the time at which data was last sent, i.e. written to the network socket

client_heart_beat()

(after having received the CONNECTED frame) return the negotiated client-side heart-beat setting

server_heart_beat()

(after having received the CONNECTED frame) return the negotiated server-side heart-beat setting

beat([timeout => TIMEOUT])

send a noop frame (using the noop() method) unless the last sent time is recent enough with regard to the client heart-beat setting

For consistency with other Perl modules (for instance Time::HiRes), time is always expressed as a fractional number of seconds.

To use heart-beating, the client must specify the desired client_heart_beat and/or server_heart_beat attributes when invoking the new() method. Then, once the CONNECTED frame has been received, it can get the negotiated values with the methods above.

To prove that it is alive, the client just needs to call the beat() method.

To check if the server is alive, the client just needs to compare what is returned by the last_received() and server_heart_beat() methods.

LOW-LEVEL API

It should be enough to use the high-level API and use, for instance, the send() method to create a MESSAGE frame and send it in one go.

If you need lower level interaction, you can manipulate frames with the Net::STOMP::Client::Frame module.

You can also use:

$stomp->dispatch_frame(FRAME)

dispatch one received frame by calling the appropriate callback

$stomp->send_frame(FRAME, TIMEOUT)

try to send the given frame object within the given TIMEOUT

$stomp->queue_frame(FRAME)

add the given frame to the outgoing buffer queue

$stomp->send_data(TIMEOUT)

send all the queued data within the given TIMEOUT

$stomp->receive_frame(TIMEOUT)

try to receive a frame within the given TIMEOUT

$stomp->receive_data(TIMEOUT)

try to receive data within the given TIMEOUT, this data will be appended to the incoming buffer

$stomp->outgoing_buffer_length()

return the length (in bytes) of the outgoing buffer

$stomp->incoming_buffer_reference()

return a reference to the incoming buffer

In these methods, the TIMEOUT argument can either be undef (meaning block until it's done) or 0 (meaning do not block at all) or a positive number (meaning block at most this number of seconds).

COMPATIBILITY

This module implements the versions 1.0 (see http://stomp.github.com/stomp-specification-1.0.html) and 1.1 (see http://stomp.github.com/stomp-specification-1.1.html) of the protocol as well as well known extensions for JMS, ActiveMQ, Apollo and RabbitMQ.

It has been successfully tested against ActiveMQ, Apollo, HornetQ and RabbitMQ brokers.

SEE ALSO

IO::Socket::INET, IO::Socket::SSL, Net::STOMP::Client::Error, Net::STOMP::Client::Frame, Net::STOMP::Client::Peer, Time::HiRes.

AUTHOR

Lionel Cons http://cern.ch/lionel.cons

Copyright CERN 2010-2012