NAME
Net::STOMP::Client - STOMP object oriented client module
SYNOPSIS
#
# simple producer
#
use Net::STOMP::Client;
$stomp = Net::STOMP::Client->new(host => "127.0.0.1", port => 61613);
$stomp->connect(login => "guest", passcode => "guest");
$stomp->send(destination => "/queue/test", body => "hello world!");
$stomp->disconnect();
#
# consumer with client side acknowledgment
#
use Net::STOMP::Client;
$stomp = Net::STOMP::Client->new(host => "127.0.0.1", port => 61613);
$stomp->connect(login => "guest", passcode => "guest");
# declare a callback to be called for each received message frame
$stomp->message_callback(sub {
my($self, $frame) = @_;
$self->ack(frame => $frame);
printf("received: %s\n", $frame->body());
return($self);
});
# subscribe to the given queue
$stomp->subscribe(destination => "/queue/test", ack => "client");
# wait for a specified message frame
$stomp->wait_for_frames(callback => sub {
my($self, $frame) = @_;
if ($frame->command() eq "MESSAGE") {
# stop waiting for new frames if body is "quit"
return(1) if $frame->body() eq "quit";
}
# continue to wait for more frames
return(0);
});
$stomp->unsubscribe(destination => "/queue/test");
$stomp->disconnect();
DESCRIPTION
This module provides an object oriented client interface to interact with servers supporting STOMP (Streaming Text Orientated Messaging Protocol). It supports the major features of modern messaging brokers: SSL, asynchronous I/O, receipts and transactions.
CONSTRUCTOR
The new() method can be used to create a Net::STOMP::Client object that will later be used to interact with a server. The following attributes are supported:
uri
-
the Uniform Resource Identifier (URI) specifying where the STOMP service is and how to connect to it, this can be for instance
tcp://msg01:6163
or something more complex such asfailover:(ssl://msg01:6162,tcp://msg01:6163)
host
-
the server name or IP address
port
-
the port number of the STOMP service
timeout
-
the maximum time (in seconds) for various operations, see the "TIMEOUTS" section for more information
sockopts
-
arbitrary socket options (as a hash reference) that will be passed to IO::Socket::INET->new() or IO::Socket::SSL->new()
callbacks
-
a hash of code references that will be called on each received frame
session
-
the session identifier, once connected
Upon object creation, a TCP connection is made to the server but no data (i.e. STOMP frame) is exchanged.
SSL
When creating an object with Net::STOMP::Client->new(), if you supply some socket options (via sockopts
) with a name starting with SSL_
, or if you supply a URI (via uri
) with a scheme containg ssl
, IO::Socket::SSL will be used to create the socket instead of IO::Socket::INET and the communication with the server will then go through SSL.
Here are the most commonly used SSL socket options:
- SSL_ca_path
-
path to a directory containing several trusted certificates as separate files as well as an index of the certificates
- SSL_key_file
-
path of your RSA private key
- SSL_cert_file
-
path of your certificate
- SSL_passwd_cb
-
subroutine that should return the password required to decrypt your private key
For more information, see IO::Socket::SSL.
TIMEOUTS
By default, when sending STOMP frames, the module waits until the frame indeed has been sent (from the socket point of view). In case the server is stuck or unusable, the module can therefore hang.
When creating the Net::STOMP::Client object, you can pass a timeout
attribute to better control how certain operations handle timeouts.
This attribute should contain a reference to hash with the following keys:
- connect
-
TCP-level timeout that will be given to the underlying IO::Socket::INET or IO::Socket::SSL object (default: none)
- connected
-
timeout used while waiting for the initial CONNECTED frame from the broker (default: 10)
- send
-
timeout used while trying to send any frame (default: none)
All values are in seconds. No timeout means wait until the operation succeeds.
As a shortcut, the timeout
attribute can also be a scalar. In this case, only the connect
and connected
operations use this value.
STOMP METHODS
With a Net::STOMP::Client object, the following methods can be used to interact with the server. They match one-to-one the different commands that a client frame can hold:
- connect()
-
connect to server
- disconnect()
-
disconnect from server
- subscribe()
-
subscribe to something
- unsubscribe()
-
unsubscribe from something
- send()
-
send a message somewhere
- ack()
-
acknowledge the reception of a message
- begin()
-
begin/start a transaction
- commit()
-
commit a transaction
- abort()
-
abort/rollback a transaction
All these methods can receive options that will be passed directly as frame headers. For instance:
$stomp->subscribe(
destination => "/queue/test",
ack => "client",
);
Some methods also support other options:
- send()
-
body
: holds the body of the message to be sent - ack()
-
frame
: holds the frame object to acknowledge (its message-id will be used)
Finally, all methods support a timeout
option that will be given to the send_frame() method called internally to send the crafted frame.
OTHER METHODS
In addition to the STOMP methods, the following ones are also available:
- uuid()
-
return a universal pseudo-unique identifier to be used for instance in receipts and transactions
- receipts()
-
return the list of not-yet-received receipts, see the "RECEIPTS" section for more information
- wait_for_frames()
-
wait for frames coming from the server, see the next section for more information
- wait_for_receipts()
-
wait for all receipts to be received, using wait_for_frames() underneath
CALLBACKS
Since STOMP is asynchronous (for instance, MESSAGE
frames could be sent by the server at any time), Net::STOMP::Client uses callbacks to handle frames. There are in fact two levels of callbacks.
First, there are per-command callbacks that will be called each time a frame is handled (via the internal method dispatch_frame()). Net::STOMP::Client implements default callbacks that should be sufficient for all frames except MESSAGE
frames, which should really be handled by the coder. These callbacks should return undef on error, something else on success.
Here is an example with a callback counting the messages received:
$stomp->message_callback(sub {
my($self, $frame) = @_;
$MessageCount++;
return($self);
});
These callbacks are somehow global and it is good practice not to change them during a session. If you do not need a global message callback, you can supply the dummy:
$stomp->message_callback(sub { return(1) });
Then, the wait_for_frames() method takes an optional callback argument holding some code to be called for each received frame, after the per-command callback has been called. This can be seen as a local callback, only valid for the call to wait_for_frames(). This callback must return undef on error, false if more frames are expected or true if wait_for_frames() can now stop waiting for new frames and return.
Here are all the options that can be given to wait_for_frames():
- callback
-
code to be called for each received frame (see above)
- timeout
-
time to wait before giving up, undef means wait forever, this is the default
- once
-
wait only for one frame, within the given timeout
The return value of wait_for_frames() can be: undef in case of error, false if no suitable frame has been received, the received frame if there is no user callback or the user callback return value otherwise.
RECEIPTS
Net::STOMP::Client has built-in support for receipts.
Each time a frame is sent, its receipt
header (if supplied) is remembered.
Each time a RECEIPT
frame is received from the server, the corresponding receipt is ticked off.
The receipts() method can be used to get the list of outstanding receipts.
The wait_for_receipts() method can be used to wait for all missing receipts.
Here is sample code to send two messages with receipts and then wait for both acknowledgments to come back from the server within ten seconds:
$stomp->send(
destination => "/queue/test1",
body => "message 1",
receipt => $stomp->uuid(),
);
$stomp->send(
destination => "/queue/test2",
body => "message 2",
receipt => $stomp->uuid(),
);
$stomp->wait_for_receipts(timeout => 10);
die("Not all receipts received!\n") if $stomp->receipts();
TRANSACTIONS
Here is an example using transactions:
# create a unique transaction id
$tid = $stomp->uuid();
# begin the transaction
$stomp->begin(transaction => $tid);
# send two messages as part of this transaction
$stomp->send(
destination => "/queue/test1",
body => "message 1",
transaction => $tid,
);
$stomp->send(
destination => "/queue/test2",
body => "message 2",
transaction => $tid,
);
# commit the transaction
$stomp->commit(transaction => $tid);
LOW-LEVEL API
It should be enough to use the high-level API and use, for instance, the send() method to create a MESSAGE frame and send it in one go.
If you need lower level interaction, you can manipulate frames with the Net::STOMP::Client::Frame module.
You can also use:
- $stomp->send_frame(FRAME, TIMEOUT)
-
try to send the given frame object within the given TIMEOUT, or forever if the TIMEOUT is undef
- $stomp->receive_frame(TIMEOUT)
-
try to receive a frame within the given TIMEOUT, or forever if the TIMEOUT is undef
COMPATIBILITY
This module implements the version 1.0 of the protocol (see http://stomp.codehaus.org/Protocol) as well as well known extensions for JMS, AMQP, ActiveMQ and RabbitMQ.
It has been successfully tested against both ActiveMQ and RabbitMQ brokers.
SEE ALSO
IO::Socket::INET, IO::Socket::SSL, Net::STOMP::Client::Frame, Net::STOMP::Client::Error.
AUTHOR
Lionel Cons http://cern.ch/lionel.cons