### ZMQ COMMON API ###
sub connect {
my ($self, $endpoint) = @_;
unless ($endpoint) {
croak 'usage: $socket->connect($endpoint)';
}
$self->check_error(
'zmq_connect',
zmq_connect($self->_socket, $endpoint)
);
}
sub bind {
my ($self, $endpoint) = @_;
unless ($endpoint) {
croak 'usage: $socket->bind($endpoint)'
}
$self->check_error(
'zmq_bind',
zmq_bind($self->_socket, $endpoint)
);
}
#
# send/recv are hot spots, so sacrificing some readability for performance
#
sub send_multipart {
# 0: self
# 1: partsref
# 2: flags
my @parts = @{$_[1] // []};
unless (@parts) {
croak 'usage: send_multipart($parts, $flags)';
}
for my $i (0..$#parts-1) {
$_[0]->send($parts[$i], ($_[2] // 0) | ZMQ_SNDMORE);
}
$_[0]->send($parts[$#parts], $_[2] // 0);
}
sub recv_multipart {
# 0: self
# 1: flags
my @parts = ( $_[0]->recv($_[1]) );
my $type = ($_[0]->version)[0] == 2 ? 'int64_t' : 'int';
while ( $_[0]->get(ZMQ_RCVMORE, $type) ){
push @parts, $_[0]->recv($_[1] // 0);
}
return @parts;
}
sub get_fd {
return $_[0]->get(ZMQ_FD, 'int');
}
sub set_linger {
my ($self, $linger) = @_;
$self->set(ZMQ_LINGER, 'int', $linger);
}
sub get_linger {
return $_[0]->get(ZMQ_LINGER, 'int');
}
sub set_identity {
my ($self, $id) = @_;
$self->set(ZMQ_IDENTITY, 'binary', $id);
}
sub get_identity {
return $_[0]->get(ZMQ_IDENTITY, 'binary');
}
sub subscribe {
my ($self, $topic) = @_;
$self->set(ZMQ_SUBSCRIBE, 'binary', $topic);
}
sub unsubscribe {
my ($self, $topic) = @_;
$self->set(ZMQ_UNSUBSCRIBE, 'binary', $topic);
}
sub has_pollin {
return $_[0]->get(ZMQ_EVENTS, 'int') & ZMQ_POLLIN;
}
sub has_pollout {
return $_[0]->get(ZMQ_EVENTS, 'int') & ZMQ_POLLOUT;
}
sub get {
my ($self, $opt, $opt_type) = @_;
my $optval;
my $optval_len;
for ($opt_type) {
when (/^(binary|string)$/) {
# ZMQ_IDENTITY uses binary type and can be at most 255 bytes long
#
# ZMQ_LAST_ENDPOINT uses string type and expects a buffer large
# enough to hold an endpoint string
#
# So for these cases 256 should be sufficient (including \0).
# Other binary/string opts are being added all the time, and
# hopefully this value scales, but we can always increase it if
# necessary
my $optval_ptr = malloc(256);
$optval_len = 256;
$self->check_error(
'zmq_getsockopt',
zmq_getsockopt_binary(
$self->_socket,
$opt,
$optval_ptr,
\$optval_len
)
);
if ($opt_type eq 'binary') {
$optval = buffer_to_scalar($optval_ptr, $optval_len);
free($optval_ptr);
}
else { # string
# FFI::Platypus already appends a null terminating byte for
# strings, so strip the one included by zeromq (otherwise test
# comparisons fail due to the extra NUL)
$optval = buffer_to_scalar($optval_ptr, $optval_len-1);
free($optval_ptr);
}
}
when (/^int$/) {
$optval_len = $self->sockopt_sizes->{'int'};
$self->check_error(
'zmq_getsockopt',
zmq_getsockopt_int(
$self->_socket,
$opt,
\$optval,
\$optval_len
)
);
}
when (/^int64_t$/) {
$optval_len = $self->sockopt_sizes->{'sint64'};
$self->check_error(
'zmq_getsockopt',
zmq_getsockopt_int64(
$self->_socket,
$opt,
\$optval,
\$optval_len
)
);
}
when (/^uint64_t$/) {
$optval_len = $self->sockopt_sizes->{'uint64'};
$self->check_error(
'zmq_getsockopt',
zmq_getsockopt_uint64(
$self->_socket,
$opt,
\$optval,
\$optval_len
)
);
}
default {
croak "unknown type $opt_type";
}
}
return if $optval eq '';
return $optval;
}
sub set {
my ($self, $opt, $opt_type, $optval) = @_;
for ($opt_type) {
when (/^(binary|string)$/) {
my ($optval_ptr, $optval_len) = scalar_to_buffer($optval);
$self->check_error(
'zmq_setsockopt',
zmq_setsockopt_binary(
$self->_socket,
$opt,
$optval_ptr,
$optval_len
)
);
}
when (/^int$/) {
$self->check_error(
'zmq_setsockopt',
zmq_setsockopt_int(
$self->_socket,
$opt,
\$optval,
$self->sockopt_sizes->{'int'}
)
);
}
when (/^int64_t$/) {
$self->check_error(
'zmq_setsockopt',
zmq_setsockopt_int64(
$self->_socket,
$opt,
\$optval,
$self->sockopt_sizes->{'sint64'}
)
);
}
when (/^uint64_t$/) {
$self->check_error(
'zmq_setsockopt',
zmq_setsockopt_uint64(
$self->_socket,
$opt,
\$optval,
$self->sockopt_sizes->{'uint64'}
)
);
}
default {
croak "unknown type $opt_type";
}
}
return;
}
sub close {
my ($self) = @_;
# don't try to cleanup socket cloned from another process (fork)
return unless $self->_pid == $$;
$self->check_error(
'zmq_close',
zmq_close($self->_socket)
);
$self->_socket(-1);
}
sub DEMOLISH {
my ($self) = @_;
unless ($self->_socket == -1) {
$self->set_linger(1);
$self->close();
}
};
sub _load_common_ffi {
my ($soname) = @_;
my $ffi = FFI::Platypus->new( lib => $soname );
my $class = caller;
$ffi->attach(
# void *zmq_socket(void *context, int type)
['zmq_socket' => "${class}::zmq_socket"],
=> ['pointer', 'int'] => 'pointer'
);
# for get/set sockopt create ffi functions for each possible opt type
# int zmq_getsockopt(void *sock, int opt, void *val, size_t *len)
$ffi->attach(
['zmq_getsockopt' => "${class}::zmq_getsockopt_binary"],
=> ['pointer', 'int', 'pointer', 'size_t*'] => 'int'
);
$ffi->attach(
['zmq_getsockopt' => "${class}::zmq_getsockopt_int"],
=> ['pointer', 'int', 'int*', 'size_t*'] => 'int'
);
$ffi->attach(
['zmq_getsockopt' => "${class}::zmq_getsockopt_int64"],
=> ['pointer', 'int', 'sint64*', 'size_t*'] => 'int'
);
$ffi->attach(
['zmq_getsockopt' => "${class}::zmq_getsockopt_uint64"],
=> ['pointer', 'int', 'uint64*', 'size_t*'] => 'int'
);
# int zmq_setsockopt(void *sock, int opt, const void *val, size_t len)
$ffi->attach(
['zmq_setsockopt' => "${class}::zmq_setsockopt_binary"],
=> ['pointer', 'int', 'pointer', 'size_t'] => 'int'
);
$ffi->attach(
['zmq_setsockopt' => "${class}::zmq_setsockopt_int"],
=> ['pointer', 'int', 'int*', 'size_t'] => 'int'
);
$ffi->attach(
['zmq_setsockopt' => "${class}::zmq_setsockopt_int64"],
=> ['pointer', 'int', 'sint64*', 'size_t'] => 'int'
);
$ffi->attach(
['zmq_setsockopt' => "${class}::zmq_setsockopt_uint64"],
=> ['pointer', 'int', 'uint64*', 'size_t'] => 'int'
);
$ffi->attach(
# int zmq_connect(void *socket, const char *endpoint)
['zmq_connect' => "${class}::zmq_connect"],
=> ['pointer', 'string'] => 'int'
);
$ffi->attach(
# int zmq_bind(void *socket, const char *endpoint)
['zmq_bind' => "${class}::zmq_bind"],
=> ['pointer', 'string'] => 'int'
);
$ffi->attach(
# int zmq_msg_init(zmq_msg_t *msg)
['zmq_msg_init' => "${class}::zmq_msg_init"],
=> ['pointer'] => 'int'
);
$ffi->attach(
# int zmq_msg_init_size(zmq_msg_t *msg, size_t size)
['zmq_msg_init_size' => "${class}::zmq_msg_init_size"],
=> ['pointer', 'int'] => 'int'
);
$ffi->attach(
# size_t zmq_msg_size(zmq_msg_t *msg)
['zmq_msg_size' => "${class}::zmq_msg_size"],
=> ['pointer'] => 'int'
);
$ffi->attach(
# void *zmq_msg_data(zmq_msg_t *msg)
['zmq_msg_data' => "${class}::zmq_msg_data"],
=> ['pointer'] => 'pointer'
);
$ffi->attach(
# int zmq_msg_close(zmq_msg_t *msg)
['zmq_msg_close' => "${class}::zmq_msg_close"],
=> ['pointer'] => 'int'
);
$ffi->attach(
# int zmq_close(void *socket)
['zmq_close' => "${class}::zmq_close"],
=> ['pointer'] => 'int'
);
}