NAME

AnyEvent::Stomper - Flexible non-blocking STOMP client

SYNOPSIS

use AnyEvent;
use AnyEvent::Stomper;

my $stomper = AnyEvent::Stomper->new(
  host       => 'localhost',
  prot       => '61613',
  login      => 'guest',
  passcode   => 'guest',
);

my $cv = AE::cv;

$stomper->subscribe(
  id          => 'foo',
  destination => '/queue/foo',

  { on_receipt => sub {
      my $err = $_[1];

      if ( defined $err ) {
        warn $err->message . "\n";
        $cv->send;

        return;
      }

      $stomper->send(
        destination => '/queue/foo',
        body        => 'Hello, world!',
      );
    },

    on_message => sub {
      my $msg = shift;

      my $body = $msg->body;
      print "Consumed: $body\n";

      $cv->send;
    },
  }
);

$cv->recv;

DESCRIPTION

AnyEvent::Stomper is flexible non-blocking STOMP client. Supports following STOMP versions: 1.0, 1.1, 1.2.

Is recommended to read STOMP protocol specification before using the client: https://stomp.github.io/index.html

CONSTRUCTOR

new( %params )

my $stomper = AnyEvent::Stomper->new(
  host               => 'localhost',
  port               => '61613',
  login              => 'guest',
  passcode           => 'guest',
  vhost              => '/',
  heart_beat         => [ 5000, 5000 ],
  connection_timeout => 5,
  lazy               => 1,
  reconnect_interval => 5,

  on_connect => sub {
    # handling...
  },

  on_disconnect => sub {
    # handling...
  },

  on_error => sub {
    my $err = shift;

    # error handling...
  },
);
host => $host

Server hostname (default: localhost)

port => $port

Server port (default: 61613)

login => $login

The user identifier used to authenticate against a secured STOMP server.

passcode => $passcode

The password used to authenticate against a secured STOMP server.

vhost => $vhost

The name of a virtual host that the client wishes to connect to.

heart_beat => \@heart_beat

Heart-beating can optionally be used to test the healthiness of the underlying TCP connection and to make sure that the remote end is alive and kicking. The first number sets interval in milliseconds between outgoing heart-beats to the STOMP server. 0 means, that the client will not send heart-beats. The second number sets interval in milliseconds between incoming heart-beats from the STOMP server. 0 means, that the client does not want to receive heart-beats.

heart_beat => [ 5000, 5000 ],

Not set by default.

connection_timeout => $connection_timeout

Specifies connection timeout. If the client could not connect to the server after specified timeout, the on_error callback is called with the E_CANT_CONN error. The timeout specifies in seconds and can contain a fractional part.

connection_timeout => 10.5,

By default the client use kernel's connection timeout.

lazy => $boolean

If enabled, the connection establishes at time when you will send the first command to the server. By default the connection establishes after calling of the new method.

Disabled by default.

reconnect_interval => $reconnect_interval

If the parameter is specified, the client will try to reconnect only after this interval. Commands executed between reconnections will be queued.

reconnect_interval => 5,

Not set by default.

handle_params => \%params

Specifies AnyEvent::Handle parameters.

handle_params => {
  autocork => 1,
  linger   => 60,
}

Enabling of the autocork parameter can improve perfomance. See documentation on AnyEvent::Handle for more information.

on_connect => $cb->()

The on_connect callback is called when the connection is successfully established.

Not set by default.

on_disconnect => $cb->()

The on_disconnect callback is called when the connection is closed by any reason.

Not set by default.

on_error => $cb->( $err )

The on_error callback is called when occurred an error, which was affected on entire client (e. g. connection error or authentication error). Also the on_error callback is called on command errors if the command callback is not specified. If the on_error callback is not specified, the client just print an error messages to STDERR.

COMMAND METHODS

To execute the STOMP command you must call appropriate method. The client automaticaly adds content-length header to all outgoing frames. The body of the frame you can specify in body parameter of the command method.

If you want to receive RECEIPT frame, you must specify receipt header. The receipt header can take special value auto. In this case the value for receipt header will be generated automaticaly by the client. RECEIPT frame is passed to the command callback in first argument as the object of the class AnyEvent::Stomper::Frame. If the receipt header is not specified the first argument of the command callback will be undef.

For commands SUBSCRIBE, UNSUBSCRIBE, DISCONNECT the client automaticaly adds receipt header for internal usage.

The command callback is called after successful sending of the command to the server or when RECEIPT frame will be received, in case if receipt header is specified. If any error occurred during the command execution, the error object is passed to the callback in second argument. Error object is the instance of the class AnyEvent::Stomper::Error.

The command callback is optional. If it is not specified and any error occurred, the on_error callback of the client is called.

The full list of all available headers for every command you can find in STOMP protocol specification and in documentation on your queue broker. For various versions of STOMP protocol and various queue brokers they can be differ.

send( [ %headers ] [, $cb->( $receipt, $err ) ] )

Sends a message to a destination in the messaging system.

$stomper->send(
  destination => '/queue/foo',
  body        => 'Hello, world!',
);

$stomper->send(
  destination => '/queue/foo',
  body        => 'Hello, world!',

  sub {
    my $err = $_[1];

    if ( defined $err ) {
      my $err_msg   = $err->message;
      my $err_code  = $err->code;
      my $err_frame = $err->frame;

      # error handling...

      return;
    }
  }
);

$stomper->send(
  destination => '/queue/foo',
  receipt     => 'auto',
  body        => 'Hello, world!',

  sub {
    my $receipt = shift;
    my $err     = shift;

    if ( defined $err ) {
      my $err_msg   = $err->message;
      my $err_code  = $err->code;
      my $err_frame = $err->frame;

      # error handling...

      return;
    }

    # receipt handling...
  }
);

subscribe( [ %headers ] [, ( $cb->( $receipt, $err ) | \%cbs ) ] )

The method is used to register to listen to a given destination. The subscribe method require the on_message callback, which is called when MESSAGE frame from the server was received. The MESSAGE frame is passed to the on_message callback in first argument as the object of the class AnyEvent::Stomper::Frame.

$stomper->subscribe(
  id          => 'foo',
  destination => '/queue/foo',
  ack         => 'client',

  { on_receipt => sub {
      my $receipt = shift;
      my $err     = shift;

      if ( defined $err ) {
        my $err_msg   = $err->message;
        my $err_code  = $err->code;
        my $err_frame = $err->frame;

        return;
      }

      # receipt handling...
    },

    on_message => sub {
      my $msg = shift;

      my $headers = $msg->headers;
      my $body    = $msg->body;

      # message handling...
    },
  }
);

unsubscribe( [ %headers ] [, $cb->( $receipt, $err ) ] )

The method is used to remove an existing subscription.

$stomper->unsubscribe(
  id          => 'foo',
  destination => '/queue/foo',

  sub {
    my $receipt = shift;
    my $err     = shift;

    if ( defined $err ) {
      my $err_msg   = $err->message;
      my $err_code  = $err->code;
      my $err_frame = $err->frame;

      return;
    }

    # receipt handling...
  }
);

ack( [ %headers ] [, $cb->( $receipt, $err ) ] )

The method is used to acknowledge consumption of a message from a subscription using client or client-individual acknowledgment. Any messages received from such a subscription will not be considered to have been consumed until the message has been acknowledged via an ack() method.

$stomper->ack( id => $ack_id );

$stomper->ack(
  id      => $ack_id,
  receipt => 'auto',

  sub {
    my $receipt = shift;
    my $err     = shift;

    if ( defined $err ) {
      my $err_msg   = $err->message;
      my $err_code  = $err->code;
      my $err_frame = $err->frame;

      # error handling...
    }

    # receipt handling...
  }
);

nack( [ %headers ] [, $cb->( $receipt, $err ) ] )

The nack method is the opposite of ack method. It is used to tell the server that the client did not consume the message.

$stomper->nack( id => $ack_id );

$stomper->nack(
  id      => $ack_id,
  receipt => 'auto',

  sub {
    my $receipt = shift;
    my $err     = shift;

    if ( defined $err ) {
      my $err_msg   = $err->message;
      my $err_code  = $err->code;
      my $err_frame = $err->frame;

      # error handling...
    }

    # receipt handling...
  }
);

begin( [ %headers ] [, $cb->( $receipt, $err ) ] )

The method begin is used to start a transaction.

commit( [ %headers ] [, $cb->( $receipt, $err ) ] )

The method commit is used to commit a transaction.

abort( [ %headers ] [, $cb->( $receipt, $err ) ] )

The method abort is used to roll back a transaction.

disconnect( [ %headers ] [, $cb->( $receipt, $err ) ] )

A client can disconnect from the server at anytime by closing the socket but there is no guarantee that the previously sent frames have been received by the server. To do a graceful shutdown, where the client is assured that all previous frames have been received by the server, you must call disconnect method and wait for the RECEIPT frame.

execute( $command, [ %headers ] [, $cb->( $receipt, $err ) ] )

An alternative method to execute commands. In some cases it can be more convenient.

$stomper->execute( 'SEND',
  destination => '/queue/foo',
  receipt     => 'auto',
  body        => 'Hello, world!',

  sub {
    my $receipt = shift;
    my $err     = shift;

    if ( defined $err ) {
      my $err_msg   = $err->message;
      my $err_code  = $err->code;
      my $err_frame = $err->frame;

      # error handling...

      return;
    }

    # receipt handling...
  }
);

ERROR CODES

Every error object, passed to callback, contain error code, which can be used for programmatic handling of errors. AnyEvent::Stomper provides constants for error codes. They can be imported and used in expressions.

use AnyEvent::Stomper qw( :err_codes );
E_CANT_CONN

Can't connect to the server. All operations were aborted.

E_IO

Input/Output operation error. The connection to the STOMP server was closed and all operations were aborted.

E_CONN_CLOSED_BY_REMOTE_HOST

The connection closed by remote host. All operations were aborted.

E_CONN_CLOSED_BY_CLIENT

Connection closed by client prematurely. Uncompleted operations were aborted

E_OPRN_ERROR

Operation error. For example, missing required header.

E_UNEXPECTED_DATA

The client received unexpected data from the server. The connection to the STOMP server was closed and all operations were aborted.

E_READ_TIMEDOUT

Read timed out. The connection to the STOMP server was closed and all operations were aborted.

OTHER METHODS

host()

Gets current host of the client.

port()

Gets current port of the client.

connection_timeout( [ $fractional_seconds ] )

Gets or sets the connection_timeout of the client. The undef value resets the connection_timeout to default value.

reconnect_interval( [ $fractional_seconds ] )

Gets or sets reconnect_interval of the client.

on_connect( [ $callback ] )

Gets or sets the on_connect callback.

on_disconnect( [ $callback ] )

Gets or sets the on_disconnect callback.

on_error( [ $callback ] )

Gets or sets the on_error callback.

force_disconnect()

The method for forced disconnection. All uncompleted operations will be aborted.

SEE ALSO

AnyEvent::Stomper::Pool

AUTHOR

Eugene Ponizovsky, <ponizovsky@gmail.com>

Sponsored by SMS Online, <dev.opensource@sms-online.com>

COPYRIGHT AND LICENSE

Copyright (c) 2016, Eugene Ponizovsky, SMS Online. All rights reserved.

This module is free software; you can redistribute it and/or modify it under the same terms as Perl itself.