Why not adopt me?
NAME
POEx::ZMQ::Socket - A POE-enabled ZeroMQ socket
SYNOPSIS
use POE;
# Imports POEx::ZMQ::Socket and POEx::ZMQ::Constants ->
use POEx::ZMQ;
POE::Session->create(
inline_states => +{
_start => sub {
# Set up a Context and save it for creating sockets later:
$_[HEAP]->{ctx} = POEx::ZMQ->context;
# Create a ZMQ_ROUTER socket associated with our Context:
$_[HEAP]->{rtr} = POEx::ZMQ::Socket->new(
context => $_[HEAP]->{ctx},
type => ZMQ_ROUTER,
);
# Set up the backend socket and start accepting/emitting events:
$_[HEAP]->{rtr}->start;
# Bind to a local TCP endpoint:
$_[HEAP]->{rtr}->bind( 'tcp://127.0.0.1:1234' );
},
zmq_recv_multipart => sub {
# ROUTER got message from REQ / DEALER
# parts are available as a List::Objects::WithUtils::Array ->
my $parts = $_[ARG0];
# ROUTER receives [ IDENTITY, NULL, MSG .. ]:
my $route = $parts->items_before(sub { $_ eq '' });
my $body = $parts->items_after(sub { $_ eq '' });
my $response;
# ... do work ...
# Send a response back:
$_[KERNEL]->post( $_[SENDER], send_multipart =>
[ $route->all, '', $response ]
);
},
},
);
POE::Kernel->run;
DESCRIPTION
An asynchronous POE-powered ZeroMQ socket.
These objects are event emitters powered by MooX::Role::POE::Emitter. That means they come with flexible event processing / dispatch / multiplexing options. See the MooX::Role::Pluggable and MooX::Role::POE::Emitter documentation for details.
ATTRIBUTES
type
Required; the socket type, as a constant.
See zmq_socket(3) for details on socket types.
See POEx::ZMQ::Constants for a ZeroMQ constant exporter.
ipv6
If set to true, IPv6 support is enabled via the appropriate socket option (ZMQ_IPV4ONLY
or ZMQ_IPV6
depending on your ZeroMQ version) when the emitter is started.
Defaults to false.
max_queue_size
Socket types that would normally block or return EFSM
(for example, out-of-order REP/REQ communication) will queue messages instead to avoid blocking the event loop; max_queue_size
is the maximum number of messages queued application-side before "max_queue_action" is invoked.
This is not related to messages queued on the ZeroMQ side; see zmq_socket(3) for details on socket behavior.
Defaults to 0 (unlimited)
max_queue_action
The action to take during "send" invocation when the application-side outgoing message queue reaches "max_queue_size".
If set to drop, new messages will be dropped.
If set to warn, a warning will be issued and new messages will be dropped.
If set to die, a stack trace is thrown.
If set to a coderef:
max_queue_action => sub {
my ($buf_item, $queue) = @_;
# Drop old and try again, for example:
$queue->shift;
1
},
... the subroutine is invoked and passed the POEx::ZMQ::Buffered object for the message and the current application-side outgoing message queue as a List::Objects::WithUtils::Array (respectively). This can be used to manually munge your outgoing queue yourself or perform some other action; if the given subroutine returns a boolean true value, another socket write will be attempted after the subroutine returns.
Defaults to die
.
context
The POEx::ZMQ::FFI::Context backend context object.
zsock
The POEx::ZMQ::FFI::Socket backend socket object.
METHODS
start
Start the emitter and set up the associated socket.
This method must be called to create the backend ZeroMQ socket and start the emitter's POE::Session.
Returns the object.
stop
Stop the emitter; a zmq_close(3) will be issued for the socket and "zsock" will be cleared.
Buffered items are not removed; "get_buffered_items" can be used to retrieve them for feeding to a new socket object's "send" method. See POEx::ZMQ::Buffered.
zmq_version
Returns the ZeroMQ version as a struct-like object; see "get_version" in POEx::ZMQ::FFI.
get_buffered_items
Returns (a shallow copy of) the List::Objects::WithUtils::Array containing messages currently buffered on the POE component (due to a backend ZeroMQ socket's blocking behavior; see zmq_socket(3)).
This will not return messages queued on the ZeroMQ side.
Each item is a POEx::ZMQ::Buffered object; look there for attribute documentation. These can also be fed back to "send" after retrieval from a dead socket, for example:
$old_socket->stop; # Shut down this socket
my $pending = $old_socket->get_buffered_items;
$new_socket->send($_) for $pending->all;
get_context_opt
Retrieve context option values.
See "get_ctx_opt" in POEx::ZMQ::FFI::Context & zmq_ctx_get(3)
set_context_opt
Set context option values.
See "set_ctx_opt" in POEx::ZMQ::FFI::Context & zmq_ctx_set(3)
Returns the invocant.
get_socket_opt
my $last_endpt = $sock->get_sock_opt( ZMQ_LAST_ENDPOINT );
Get socket option values.
See "get_sock_opt" in POEx::ZMQ::FFI::Socket & zmq_getsockopt(3).
set_socket_opt
$sock->set_sock_opt( ZMQ_LINGER, 0 );
Set socket option values.
See "set_sock_opt" in POEx::ZMQ::FFI::Socket & zmq_setsockopt(3).
Returns the invocant.
bind
$sock->bind( @endpoints );
Call a zmq_bind(3) for one or more specified endpoints.
A "bind_added" event is emitted for each added endpoint.
Returns the invocant.
unbind
$sock->unbind( @endpoints );
Call a zmq_unbind(3) for one or more specified endpoints.
A "bind_removed" event is emitted for each removed endpoint.
Returns the invocant.
connect
$sock->connect( @endpoints );
Call a zmq_bind(3) for one or more specified endpoints.
A "connect_added" event is emitted for each added endpoint.
Returns the invocant.
disconnect
$sock->disconnect( @endpoints );
Call a zmq_disconnect(3) for one or more specified endpoints.
A "disconnect_issued" event is emitted for each removed endpoint.
Returns the invocant.
send
$sock->send( $msg, $flags );
Send a single-part message (without blocking).
Sending will not block, regardless of the typical behavior of the ZeroMQ socket. See "max_queue_size" for details on queuing behavior.
Returns the invocant.
send_multipart
$sock->send_multipart( [ @parts ], $flags );
# A ROUTER sending to $id ->
$rtr->send_multipart( [ $id, '', $msg ], $flags );
Send a multi-part message.
Applies the same application-side queuing behavior as "send"; see "max_queue_size".
Returns the invocant.
ACCEPTED EVENTS
These POE events take the same arguments as their object-oriented counterparts documented in "METHODS":
- bind
- unbind
- connect
- disconnect
- send
- send_multipart
EMITTED EVENTS
Emitted events are prefixed with the value of the "event_prefix" in MooX::Role::POE::Emitter attribute; by default, zmq_
.
bind_added
Emitted when a "bind" is issued for an endpoint; $_[ARG0]
is the bound endpoint.
bind_removed
Emitted when a "unbind" is issued for an endpoint; $_[ARG0]
is the unbound endpoint.
connect_added
Emitted when a "connect" is issued for an endpoint; $_[ARG0]
is the target endpoint.
disconnect_issued
Emitted when a "disconnect" is issued for an endpoint; $_[ARG0]
is the disconnecting endpoint.
recv
sub zmq_recv {
my $msg = $_[ARG0];
$_[KERNEL]->post( $_[SENDER], send => 'bar' ) if $msg eq 'foo';
}
Emitted when a single-part message is received; $_[ARG0]
is the message item.
recv_multipart
# A ROUTER receiving from REQ, for example:
sub zmq_recv_multipart {
my $parts = $_[ARG0];
my ($id, undef, $content) = @$parts;
my $response = 'bar' if $content eq 'foo';
$_[KERNEL]->post( $_[SENDER], send_multipart =>
[ $id, '', $response ]
);
}
# ... or with more complex routing envelopes:
sub zmq_recv_multipart {
my $parts = $_[ARG0];
# pop() the application-relevant body:
my $body = $parts->pop;
# Then include the envelope (including empty delimiter msg) later:
$_[KERNEL]->post( $_[SENDER], send_multipart =>
[ $parts->all, $response ]
);
}
Emitted when a multipart message is received.
$_[ARG0]
is a List::Objects::WithUtils::Array array-type object containing the message parts. This makes basic handling tasks easy, such as splitting multipart bodies and the routing envelope on an empty part delimiter:
my $envelope = $parts->items_before(sub { $_ eq '' });
my $content = $parts->items_after(sub { $_ eq '' });
# ... returning a reply later:
$zsock->send_multipart(
[ $envelope->all, '', @parts ]
);
CONSUMES
MooX::Role::POE::Emitter, which in turn consumes MooX::Role::Pluggable.
SEE ALSO
POEx::ZMQ::FFI::Context for details on the ZeroMQ context backend.
POEx::ZMQ::FFI::Socket for details on the ZeroMQ socket backend.
ZMQ::FFI for a loop-agnostic ZeroMQ implementation.
AUTHOR
Jon Portnoy <avenj@cobaltirc.org>
Licensed under the same terms as Perl.