### ZMQ COMMON API ###

sub connect {
    my ($self, $endpoint) = @_;

    unless ($endpoint) {
        croak 'usage: $socket->connect($endpoint)';
    }

    $self->check_error(
        'zmq_connect',
        zmq_connect($self->_socket, $endpoint)
    );
}

sub bind {
    my ($self, $endpoint) = @_;

    unless ($endpoint) {
        croak 'usage: $socket->bind($endpoint)'
    }

    $self->check_error(
        'zmq_bind',
        zmq_bind($self->_socket, $endpoint)
    );
}

#
# send/recv are hot spots, so sacrificing some readability for performance
#

sub send_multipart {
    # 0: self
    # 1: partsref
    # 2: flags

    my @parts = @{$_[1] // []};
    unless (@parts) {
        croak 'usage: send_multipart($parts, $flags)';
    }

    for my $i (0..$#parts-1) {
        $_[0]->send($parts[$i], ($_[2] // 0) | ZMQ_SNDMORE);
    }

    $_[0]->send($parts[$#parts], $_[2] // 0);
}

sub recv_multipart {
    # 0: self
    # 1: flags

    my @parts = ( $_[0]->recv($_[1]) );
    my $type  = ($_[0]->version)[0] == 2 ? 'int64_t' : 'int';

    while ( $_[0]->get(ZMQ_RCVMORE, $type) ){
        push @parts, $_[0]->recv($_[1] // 0);
    }

    return @parts;
}

sub get_fd {
    return $_[0]->get(ZMQ_FD, 'int');
}

sub set_linger {
    my ($self, $linger) = @_;

    $self->set(ZMQ_LINGER, 'int', $linger);
}

sub get_linger {
    return $_[0]->get(ZMQ_LINGER, 'int');
}

sub set_identity {
    my ($self, $id) = @_;

    $self->set(ZMQ_IDENTITY, 'binary', $id);
}

sub get_identity {
    return $_[0]->get(ZMQ_IDENTITY, 'binary');
}

sub subscribe {
    my ($self, $topic) = @_;

    $self->set(ZMQ_SUBSCRIBE, 'binary', $topic);
}

sub unsubscribe {
    my ($self, $topic) = @_;

    $self->set(ZMQ_UNSUBSCRIBE, 'binary', $topic);
}

sub has_pollin {
    return $_[0]->get(ZMQ_EVENTS, 'int') & ZMQ_POLLIN;
}

sub has_pollout {
    return $_[0]->get(ZMQ_EVENTS, 'int') & ZMQ_POLLOUT;
}

sub get {
    my ($self, $opt, $opt_type) = @_;

    my $optval;
    my $optval_len;

    for ($opt_type) {
        when (/^(binary|string)$/) {
            # ZMQ_IDENTITY uses binary type and can be at most 255 bytes long
            #
            # ZMQ_LAST_ENDPOINT uses string type and expects a buffer large
            # enough to hold an endpoint string
            #
            # So for these cases 256 should be sufficient (including \0).
            # Other binary/string opts are being added all the time, and
            # hopefully this value scales, but we can always increase it if
            # necessary
            my $optval_ptr = malloc(256);
            $optval_len    = 256;

            $self->check_error(
                'zmq_getsockopt',
                zmq_getsockopt_binary(
                    $self->_socket,
                    $opt,
                    $optval_ptr,
                    \$optval_len
                )
            );

            if ($opt_type eq 'binary') {
                $optval = buffer_to_scalar($optval_ptr, $optval_len);
                free($optval_ptr);
            }
            else { # string
                # FFI::Platypus already appends a null terminating byte for
                # strings, so strip the one included by zeromq (otherwise test
                # comparisons fail due to the extra NUL)
                $optval = buffer_to_scalar($optval_ptr, $optval_len-1);
                free($optval_ptr);
            }
        }

        when (/^int$/) {
            $optval_len = $self->sockopt_sizes->{'int'};
            $self->check_error(
                'zmq_getsockopt',
                zmq_getsockopt_int(
                    $self->_socket,
                    $opt,
                    \$optval,
                    \$optval_len
                )
            );
        }

        when (/^int64_t$/) {
            $optval_len = $self->sockopt_sizes->{'sint64'};
            $self->check_error(
                'zmq_getsockopt',
                zmq_getsockopt_int64(
                    $self->_socket,
                    $opt,
                    \$optval,
                    \$optval_len
                )
            );
        }

        when (/^uint64_t$/) {
            $optval_len = $self->sockopt_sizes->{'uint64'};
            $self->check_error(
                'zmq_getsockopt',
                zmq_getsockopt_uint64(
                    $self->_socket,
                    $opt,
                    \$optval,
                    \$optval_len
                )
            );
        }

        default {
            croak "unknown type $opt_type";
        }
    }

    return if $optval eq '';
    return $optval;
}

sub set {
    my ($self, $opt, $opt_type, $optval) = @_;

    for ($opt_type) {
        when (/^(binary|string)$/) {
            my ($optval_ptr, $optval_len) = scalar_to_buffer($optval);
            $self->check_error(
                'zmq_setsockopt',
                zmq_setsockopt_binary(
                    $self->_socket,
                    $opt,
                    $optval_ptr,
                    $optval_len
                )
            );
        }

        when (/^int$/) {
            $self->check_error(
                'zmq_setsockopt',
                zmq_setsockopt_int(
                    $self->_socket,
                    $opt,
                    \$optval,
                    $self->sockopt_sizes->{'int'}
                )
            );
        }

        when (/^int64_t$/) {
            $self->check_error(
                'zmq_setsockopt',
                zmq_setsockopt_int64(
                    $self->_socket,
                    $opt,
                    \$optval,
                    $self->sockopt_sizes->{'sint64'}
                )
            );
        }

        when (/^uint64_t$/) {
            $self->check_error(
                'zmq_setsockopt',
                zmq_setsockopt_uint64(
                    $self->_socket,
                    $opt,
                    \$optval,
                    $self->sockopt_sizes->{'uint64'}
                )
            );
        }

        default {
            croak "unknown type $opt_type";
        }
    }

    return;
}

sub close {
    my ($self) = @_;

    # don't try to cleanup socket cloned from another process (fork)
    return unless $self->_pid == $$;

    $self->check_error(
        'zmq_close',
        zmq_close($self->_socket)
    );

    $self->_socket(-1);
}

sub DEMOLISH {
    my ($self) = @_;

    unless ($self->_socket == -1) {
        $self->set_linger(1);
        $self->close();
    }
};

sub _load_common_ffi {
    my ($soname) = @_;

    my $ffi   = FFI::Platypus->new( lib => $soname );
    my $class = caller;

    $ffi->attach(
        # void *zmq_socket(void *context, int type)
        ['zmq_socket' => "${class}::zmq_socket"],
            => ['pointer', 'int'] => 'pointer'
    );

    # for get/set sockopt create ffi functions for each possible opt type

    # int zmq_getsockopt(void *sock, int opt, void *val, size_t *len)

    $ffi->attach(
        ['zmq_getsockopt' => "${class}::zmq_getsockopt_binary"],
            => ['pointer', 'int', 'pointer', 'size_t*'] => 'int'
    );

    $ffi->attach(
        ['zmq_getsockopt' => "${class}::zmq_getsockopt_int"],
            => ['pointer', 'int', 'int*', 'size_t*'] => 'int'
    );

    $ffi->attach(
        ['zmq_getsockopt' => "${class}::zmq_getsockopt_int64"],
            => ['pointer', 'int', 'sint64*', 'size_t*'] => 'int'
    );

    $ffi->attach(
        ['zmq_getsockopt' => "${class}::zmq_getsockopt_uint64"],
            => ['pointer', 'int', 'uint64*', 'size_t*'] => 'int'
    );


    # int zmq_setsockopt(void *sock, int opt, const void *val, size_t len)

    $ffi->attach(
        ['zmq_setsockopt' => "${class}::zmq_setsockopt_binary"],
            => ['pointer', 'int', 'pointer', 'size_t'] => 'int'
    );

    $ffi->attach(
        ['zmq_setsockopt' => "${class}::zmq_setsockopt_int"],
            => ['pointer', 'int', 'int*', 'size_t'] => 'int'
    );

    $ffi->attach(
        ['zmq_setsockopt' => "${class}::zmq_setsockopt_int64"],
            => ['pointer', 'int', 'sint64*', 'size_t'] => 'int'
    );

    $ffi->attach(
        ['zmq_setsockopt' => "${class}::zmq_setsockopt_uint64"],
            => ['pointer', 'int', 'uint64*', 'size_t'] => 'int'
    );


    $ffi->attach(
        # int zmq_connect(void *socket, const char *endpoint)
        ['zmq_connect' => "${class}::zmq_connect"],
            => ['pointer', 'string'] => 'int'
    );

    $ffi->attach(
        # int zmq_bind(void *socket, const char *endpoint)
        ['zmq_bind' => "${class}::zmq_bind"],
            => ['pointer', 'string'] => 'int'
    );

    $ffi->attach(
        # int zmq_msg_init(zmq_msg_t *msg)
        ['zmq_msg_init' => "${class}::zmq_msg_init"],
            => ['pointer'] => 'int'
    );

    $ffi->attach(
        # int zmq_msg_init_size(zmq_msg_t *msg, size_t size)
        ['zmq_msg_init_size' => "${class}::zmq_msg_init_size"],
            => ['pointer', 'int'] => 'int'
    );

    $ffi->attach(
        # size_t zmq_msg_size(zmq_msg_t *msg)
        ['zmq_msg_size' => "${class}::zmq_msg_size"],
            => ['pointer'] => 'int'
    );

    $ffi->attach(
        # void *zmq_msg_data(zmq_msg_t *msg)
        ['zmq_msg_data' => "${class}::zmq_msg_data"],
            => ['pointer'] => 'pointer'
    );

    $ffi->attach(
        # int zmq_msg_close(zmq_msg_t *msg)
        ['zmq_msg_close' => "${class}::zmq_msg_close"],
            => ['pointer'] => 'int'
    );

    $ffi->attach(
        # int zmq_close(void *socket)
        ['zmq_close' => "${class}::zmq_close"],
            => ['pointer'] => 'int'
    );
}