NAME
Net::STOMP::Client::Tutorial - Getting started with STOMP and Net::STOMP::Client
INTRODUCTION
Here is a five-parts tutorial to get started with Net::STOMP::Client.
A basic knowledge of STOMP is required. For this, you can read:
http://stomp.github.com/stomp-specification-1.1.html
the STOMP 1.1 protocol specification
http://fusesource.com/docs/broker/5.4/connectivity_guide/BHIJBDJH.html
the Fuse Message Broker STOMP Tutorial
PART 1: CONNECTING TO A BROKER
Net::STOMP::Client, like similar modules under the Net::* hierarchy, provides an object oriented interface to a network protocol.
CREATING AN OBJECT
In order to connect to a broker, you first have to create an object. This object will later be used to interact with the broker. When the module creates the object, it tries to connect to the broker using either plain TCP or SSL. Nothing is done at the STOMP level.
To create the object, you of course need to specify where to connect to. This can be done either with a single uri
parameter:
$stomp = Net::STOMP::Client->new(uri => "stomp://mybroker:6163");
or with the pair of host
and port
parameters:
$stomp = Net::STOMP::Client->new(
host => "mybroker",
port => 6163,
);
USING SSL
Using SSL is more complex since more parameters have to be given. Note: IO::Socket::SSL is used behind the scene so you can refer to its documentation for more information. Here is how this could be done:
$stomp = Net::STOMP::Client->new(
uri => "stomp+ssl://mybroker:6162",
sockopts => {
# path of the directory containing trusted certificates
SSL_ca_path => "/etc/ssl/ca",
# client certificate to present
SSL_cert_file => "/etc/ssl/client-cert.pem",
# client private key
SSL_key_file => "/etc/ssl/client-key.pem",
# passphrase of the client private key
SSL_passwd_cb => sub { return("secret") },
},
);
GETTING PEER INFORMATION
Once connected, at TCP or SSL level, you can get information about the broker using the peer
method. For instance:
$peer = $stomp->peer();
printf("connected to broker %s (IP %s), port %d\n",
$peer->host(), $peer->addr(), $peer->port());
CONNECTING
After creating the broker object, you must start a STOMP session by sending a CONNECT
frame. This is as simple as:
$stomp->connect();
If authentication is required, you must pass extra information at this stage. For instance with:
$stomp->connect(
login => "guest",
passcode => "welcome",
);
At this point, the session has been established and you can now send and/or receive messages.
DISCONNECTING
When you are done with messaging, you should gracefully end the session by sending a DISCONNECT
frame with:
$stomp->disconnect();
Note: STOMP does not support reconnection. Once the session has been ended, the broker object cannot be used anymore.
WRAP UP
Putting all this together, here is a complete program that simply connects, starts and ends a session, printing information along the way.
use Net::STOMP::Client;
$stomp = Net::STOMP::Client->new(uri => "stomp://mybroker:6163");
$peer = $stomp->peer();
printf("connected to broker %s (IP %s), port %d\n",
$peer->host(), $peer->addr(), $peer->port());
$stomp->connect();
printf("session %s started\n", $stomp->session());
$stomp->disconnect();
printf("session ended\n");
PART 2: SENDING MESSAGES
SENDING MESSAGES
A message is made of a header (a list of key/value pairs) and a body. Both are optional.
To send a message, you have to issue a SEND
frame. For instance:
$stomp->send(
destination => "/queue/test",
subject => "this is a test",
time => time(),
body => "Hello world!\n",
);
USING RECEIPTS
By default, you do not get any confirmation that the message has indeed been received by the broker. If you want such a confirmation, you have to use receipts. The following code snippet sends two messages with a receipt
header containing a pseudo-unique id and waits for matching RECEIPT
frames coming from the broker. This is easy because the Net::STOMP::Client module keeps track of which receipts are expected and have not been received yet.
$stomp->send(
destination => "/queue/test",
body => "Test of receipts 1...\n",
receipt => $stomp->uuid(),
);
$stomp->send(
destination => "/queue/test",
body => "Test of receipts 2...\n",
receipt => $stomp->uuid(),
);
# wait at most 10 seconds for all pending receipts
$stomp->wait_for_receipts(timeout => 10);
# complain if some receipts are still pending
die("Receipt not received!\n") if $stomp->receipts();
Note: all STOMP frames can carry a receipt
header so this is not restricted to message sending.
USING TRANSACTIONS
In addition, you can use transactions to group the sending of several messages so that either none or all of them get handled by the broker.
# create a unique transaction id
$tid = $stomp->uuid();
# begin the transaction
$stomp->begin(transaction => $tid);
# send two messages as part of this transaction
$stomp->send(
destination => "/queue/test1",
body => "message 1",
transaction => $tid,
);
$stomp->send(
destination => "/queue/test2",
body => "message 2",
transaction => $tid,
);
# either abort or commit
if (... something bad happened...) {
# abort/rollback the transaction
$stomp->abort(transaction => $tid);
# no messages have been delivered to the broker
} else {
# commit the transaction
$stomp->commit(transaction => $tid);
# both messages have been delivered to the broker
}
PART 3: RECEIVING MESSAGES
USING SUBSCRIPTIONS
In order to receive frames, you first have to subscribe to one or more destinations. This is as easy as:
$stomp->subscribe(destination => "/queue/test");
When you are done, you simply unsubscribe with:
$stomp->unsubscribe(destination => "/queue/test");
It is good practice to add an id
header to uniquely identify the subscription. All messages part of this subscription will have a matching subscription
header. This id
can also be used to unsubscribe.
In fact, the code above only works with STOMP 1.0. In STOMP 1.1, the id
header has been made mandatory so you must use something like:
$stomp->subscribe(
destination => "/queue/test",
id => "testsub",
);
# received messages will contain: subscription:testsub
$stomp->unsubscribe(id => "testsub");
RECEIVING FRAMES
While you are subscribed to some destinations, the broker may decide at any time to send you MESSAGE
frames. You can process these frames with a simple loop:
while ($frame = $stomp->wait_for_frames()) {
# ... do something with the received frame ...
}
The code above is blocking and will loop forever. You can add a timeout
option to have a non-blocking loop:
while (1) {
# wait at most one second for a new frame
$frame = $stomp->wait_for_frames(timeout => 1);
# do what is appropriate
if ($frame) {
# ... do something with the received frame ...
} else {
# nothing received
}
}
Because of the asynchronous nature of STOMP, receiving messages is a bit tricky: you cannot know a priori which types of frames will be sent when. For instance, you may want to send messages (with receipts) while you are subscribed to some destinations and you may receive a MESSAGE
frame while you would like to wait for a <RECEIPT> frame, or vice versa.
The wait_for_frames
method described above will wait for any frame, not only message frames. It is up to you to check that what you receive is a MESSAGE
frame or not. This can be done with something like:
if ($frame->command() eq "MESSAGE") {
# ... do something with the received message ...
} else {
# something else than a message frame
}
WRAP UP
Putting all this together, here is a complete program that receives ten messages from to /queue/test
:
use Net::STOMP::Client;
$stomp = Net::STOMP::Client->new(uri => "stomp://mybroker:6163");
# the next line will be explained in the next part of the tutorial ;-)
$stomp->message_callback(sub { return(1) });
$stomp->connect();
$sid = $stomp->uuid();
$stomp->subscribe(
destination => "/queue/test",
# we use the generated subscription id
id => $sid,
# we want a receipt on our SUBSCRIBE frame
receipt => $stomp->uuid(),
);
$count = 0;
while ($count < 10) {
$frame = $stomp->wait_for_frames(timeout => 1);
if ($frame) {
if ($frame->command() eq "MESSAGE") {
$count++;
printf("received message %d with id %s\n",
$count, $frame->header("message-id"));
} else {
# this will catch the RECEIPT frame
printf("%s frame received\n", $frame->command());
}
} else {
print("waiting for messages...\n");
}
}
$stomp->unsubscribe(id => $sid);
$stomp->disconnect();
PART 4: USING CALLBACKS
As seen in part 3, because of the asynchronous nature of STOMP, it is a bit tricky to properly handle all the different types of frames that can be received.
In order to simplify this, Net::STOMP::Client supports the use of callbacks. They are pieces of code called in well defined situations. In fact, there are two levels of callbacks: global and local.
GLOBAL CALLBACKS
Global (per command) callbacks are called each time a frame is received. Net::STOMP::Client has default callbacks that should be sufficient for all types of frames, except for MESSAGE
frames. For these, it is really up to the coder to define what he wants to do with the received messages.
Here is an example with a message callback counting the messages received:
$stomp->message_callback(sub {
my($self, $frame) = @_;
$count++;
return($self);
});
These callbacks are somehow global and it is good practice not to change them during a session. If you do not need a global message callback, you can supply the dummy:
$stomp->message_callback(sub { return(1) });
Here is how to re-write a simplified version of the inner part of the receiving program of part 3 with a global callback:
$count = 0;
sub msg_cb ($$) {
my($self, $frame) = @_;
$count++;
printf("received message %d with id %s\n",
$count, $frame->header("message-id"));
return($self);
}
$stomp->message_callback(\&msg_cb);
$stomp->wait_for_frames() while $count < 10;
LOCAL CALLBACKS
Local (per invocation) callbacks are called by the wait_for_frame
method. Their return value control what wait_for_frame
does:
undef
: an error occuredfalse:
wait_for_frame
should wait for more framestrue:
wait_for_frame
can stop and return this value
Here is how to use wait_for_frames
with a local callback to wait until we receive a MESSAGE
frame that contains "quit" in the body:
sub msg_cb ($$) {
my($self, $frame) = @_;
return(0) unless $frame->command() eq "MESSAGE";
return(0) unless $frame->body() =~ /quit/;
return($frame);
}
$frame = $stomp->wait_for_frames(callback => \&msg_cb);
As you see, you can put the logic either in the global callbacks or in the local callbacks. The best practice is to have a single global message callback that does not change throughout the execution of the program and to optionally put in local callbacks what may change from one place of the program to another.
WRAP UP
Here is how to re-write the receiving program of part 3 with a global callback only counting the number of messages and a local callback printing information:
use Net::STOMP::Client;
$stomp = Net::STOMP::Client->new(uri => "stomp://mybroker:6163");
$stomp->connect();
sub msg_cb ($$) {
my($self, $frame) = @_;
my $cmd = $frame->command();
if ($cmd eq "MESSAGE") {
printf("received message %d with id %s\n",
$count, $frame->header("message-id"));
} else {
printf("%s frame received\n", $cmd);
}
return($frame);
}
$stomp->message_callback(sub { $count++ });
$sid = $stomp->uuid();
$stomp->subscribe(
destination => "/queue/test",
id => $sid,
receipt => $stomp->uuid(),
);
$count = 0;
while ($count < 10) {
$stomp->wait_for_frames(
callback => \&msg_cb,
timeout => 1,
) or print("waiting for messages...\n");
}
$stomp->unsubscribe(id => $sid);
$stomp->disconnect();
PART 5: ADVANCED FEATURES
ACKNOWLEDGMENT MODES
Unless specified otherwise, subscriptions are made in auto
mode, meaning that a message is considered to be delivered by the broker as soon as it sends the corresponding MESSAGE
frame. The client may not receive the frame or it could exit before processing it. This could result in message loss.
In order to avoid message loss, one can change the subscription acknowledgment mode to be client
instead of auto
. This is an option of the SUBSCRIBE
frame:
$stomp->subscribe(
destination => "/queue/test",
id => $sid,
ack => "client",
);
In client
mode, the client must explicitly acknowledge the messages it has successfully processed. This is achieved by sending an ACK
frame with a message-id
header matching the one of the received message:
$stomp->ack("message-id" => $frame->header("message-id"));
In fact, you can directly pass the frame that holds the received message and Net::STOMP::Client will extract the message-id
for you:
$stomp->ack(frame => $frame);
Messages that have not be acknowledged by the end of the session will be resent by the broker.
Note that STOMP 1.1 also has a client-individual
mode. Please consult the protocol specification for more details.
EFFICIENT I/O
The high-level methods handle one frame at a time. This can be inefficient for small frames. For instance, the send
method will build a frame object, encode it and send it on the wire with at least one call to syswrite
, maybe for very few bytes.
The low-level methods allow you to better control this and queue messages in memory before sending them. This way, you group data and use I/O more efficiently.
Here is how to queue ten messages and send them in one go.
foreach $n (1 .. 10) {
$frame = Net::STOMP::Client::Frame->new(
command => "SEND",
headers => { destination => "/topic/test" },
body => "message $n",
);
# simply add the frame to the outgoing queue
$stomp->queue_frame($frame);
}
# no timeout given: block until all data has been sent
$stomp->send_data();
SEE ALSO
IO::Socket::SSL, Net::STOMP::Client, Net::STOMP::Client::Debug, Net::STOMP::Client::Error, Net::STOMP::Client::Frame, Net::STOMP::Client::OO, Net::STOMP::Client::Peer.
AUTHOR
Lionel Cons http://cern.ch/lionel.cons
Copyright CERN 2010-2012