NAME

POEx::ZMQ::Socket - A POE-enabled ZeroMQ socket

SYNOPSIS

# An example ZMQ_ROUTER socket ->
use POE;
use POEx::ZMQ;

POE::Session->create(
  inline_states => +{
    _start => sub {
      # Set up a Context and save it for creating sockets later:
      $_[HEAP]->{ctx} = POEx::ZMQ->context;

      # Create a ZMQ_ROUTER socket associated with our Context:
      $_[HEAP]->{rtr} = POEx::ZMQ::Socket->new(
        context => $_[HEAP]->{ctx},
        type    => ZMQ_ROUTER,
      );

      # Set up the backend socket and start accepting/emitting events:
      $_[HEAP]->{rtr}->start;

      # Bind to a local TCP endpoint:
      $_[HEAP]->{rtr}->bind( 'tcp://127.0.0.1:1234' );
    },

    zmq_recv_multipart => sub {
      # ROUTER got message from REQ;
      # parts are available as a List::Objects::WithUtils::Array ->
      my $parts = $_[ARG0];

      # ROUTER receives [ IDENTITY, NULL, MSG .. ]:
      my ($id, undef, $content) = $parts->all;

      my $response;
      # ... do work ...
      # Send a response back:
      $_[KERNEL]->post( $_[SENDER], send_multipart =>
        $id, '', $response
      );
    },
  },
);

POE::Kernel->run;

DESCRIPTION

An asynchronous POE-powered http://www.zeromq.org|ZeroMQ socket.

These objects are event emitters powered by MooX::Role::POE::Emitter. That means they come with flexible event processing / dispatch / multiplexing options. See the MooX::Role::Pluggable and MooX::Role::POE::Emitter documentation for details.

ATTRIBUTES

type

Required; the socket type, as a constant.

See zmq_socket(3) for details on socket types.

See POEx::ZMQ::Constants for a ZeroMQ constant exporter.

context

The POEx::ZMQ::FFI::Context backend context object.

zsock

The POEx::ZMQ::FFI::Socket backend socket object.

METHODS

start

Start the emitter and set up the associated socket.

This method must be called to create the backend ZeroMQ socket and start the emitter's POE::Session.

Returns the object.

stop

Stop the emitter; a zmq_close(3) will be issued for the socket and "zsock" will be cleared.

Buffered items are not removed; "get_buffered_items" can be used to retrieve them for feeding to a new socket object's "send" method. See POEx::ZMQ::Buffered.

get_buffered_items

Returns (a shallow copy of) the List::Objects::WithUtils::Array containing messages currently buffered on the POE component (due to a backend ZeroMQ socket's blocking behavior; see zmq_socket(3)).

This will not return messages queued on the ZeroMQ side.

Each item is a POEx::ZMQ::Buffered object; look there for attribute documentation. These can also be fed back to "send" after retrieval from a dead socket, for example:

$old_socket->stop;  # Shut down this socket
my $pending = $old_socket->get_buffered_items;
$new_socket->send($_) for $pending->all;

get_context_opt

Retrieve context option values.

See "get_ctx_opt" in POEx::ZMQ::FFI::Context & zmq_ctx_get(3)

set_context_opt

Set context option values.

See "set_ctx_opt" in POEx::ZMQ::FFI::Context & zmq_ctx_set(3)

get_socket_opt

Get socket option values.

See "get_sock_opt" in POEx::ZMQ::FFI::Socket & zmq_getsockopt(3).

set_socket_opt

Set socket option values.

See "set_sock_opt" in POEx::ZMQ::FFI::Socket & zmq_setsockopt(3).

bind

$sock->bind( @endpoints );

Call a zmq_bind(3) for one or more specified endpoints.

A "bind_added" event is emitted for each added endpoint.

unbind

$sock->unbind( @endpoints );

Call a zmq_unbind(3) for one or more specified endpoints.

A "bind_removed" event is emitted for each removed endpoint.

connect

$sock->connect( @endpoints );

Call a zmq_bind(3) for one or more specified endpoints.

A "connect_added" event is emitted for each added endpoint.

disconnect

$sock->disconnect( @endpoints );

Call a zmq_disconnect(3) for one or more specified endpoints.

A "disconnect_issued" event is emitted for each removed endpoint.

send

$sock->send( $msg, $flags );

Send a single-part message (without blocking).

send_multipart

$sock->send_multipart( [ @parts ], $flags );
# For example, a ROUTER sending to $id ->
$rtr->send_multipart( [ $id, '', $msg ], $flags );

Send a multi-part message.

ACCEPTED EVENTS

These POE events take the same arguments as their object-oriented counterparts documented in "METHODS":

bind
unbind
connect
disconnect
send
send_multipart

EMITTED EVENTS

Emitted events are prefixed with the value of the "event_prefix" in MooX::Role::POE::Emitter attribute; by default, zmq_.

bind_added

Emitted when a "bind" is issued for an endpoint; $_[ARG0] is the bound endpoint.

bind_removed

Emitted when a "unbind" is issued for an endpoint; $_[ARG0] is the unbound endpoint.

connect_added

Emitted when a "connect" is issued for an endpoint; $_[ARG0] is the target endpoint.

disconnect_issued

Emitted when a "disconnect" is issued for an endpoint; $_[ARG0] is the disconnecting endpoint.

recv

sub zmq_recv {
  my $msg = $_[ARG0];
  $_[KERNEL]->post( $_[SENDER], send => 'bar' ) if $msg eq 'foo';
}

Emitted when a single-part message is received; $_[ARG0] is the message item.

recv_multipart

# A ROUTER receiving from REQ, for example:
sub zmq_recv_multipart {
  my $parts = $_[ARG0];
  my ($id, undef, $content) = @$parts;

  my $response = 'bar' if $content eq 'foo';

  $_[KERNEL]->post( $_[SENDER], send_multipart =>
    [ $id, '', $response ]
  );
}

Emitted when a multipart message is received; $_[ARG0] is a List::Objects::WithUtils::Array array-type object containing the message parts.

CONSUMES

MooX::Role::POE::Emitter, which in turn consumes MooX::Role::Pluggable.

SEE ALSO

zmq(7)

zmq_socket(3)

POEx::ZMQ::FFI::Context for details on the ZeroMQ context backend.

POEx::ZMQ::FFI::Socket for details on the ZeroMQ socket backend.

ZMQ::FFI for a loop-agnostic ZeroMQ implementation.

AUTHOR

Jon Portnoy <avenj@cobaltirc.org>

Licensed under the same terms as Perl.