MODULE = ZMQ::Raw               PACKAGE = ZMQ::Raw::Socket

INCLUDE: const-xs-socket_options.inc

SV *
new (class, context, type)
	SV *class
	SV *context
	int type

	PREINIT:
		zmq_raw_socket *sock;
		zmq_raw_context *ctx;

	CODE:
		ctx = ZMQ_SV_TO_PTR (Context, context);
		Newxz (sock, 1, zmq_raw_socket);
		sock->type = type;
		sock->socket = zmq_socket (ctx->context, type);
		sock->context = ctx->context;
		if (sock->socket == NULL)
		{
			Safefree (sock);
			zmq_raw_check_error (-1);
		}

		ZMQ_NEW_OBJ_WITH_MAGIC (RETVAL, SvPVbyte_nolen (class), sock,
			SvRV (context));

	OUTPUT: RETVAL

void
bind (self, endpoint)
	SV *self
	const char *endpoint

	PREINIT:
		int rc;
		zmq_raw_socket *sock;

	CODE:
		sock = ZMQ_SV_TO_PTR (Socket, self);
		rc = zmq_bind (sock->socket, endpoint);
		zmq_raw_check_error (rc);

void
unbind (self, endpoint)
	SV *self
	const char *endpoint

	PREINIT:
		int rc;
		zmq_raw_socket *sock;

	CODE:
		sock = ZMQ_SV_TO_PTR (Socket, self);
		rc = zmq_unbind (sock->socket, endpoint);
		zmq_raw_check_error (rc);

void
connect (self, endpoint)
	SV *self
	const char *endpoint

	PREINIT:
		int rc;
		zmq_raw_socket *sock;

	CODE:
		sock = ZMQ_SV_TO_PTR (Socket, self);
		rc = zmq_connect (sock->socket, endpoint);
		zmq_raw_check_error (rc);

void
disconnect (self, endpoint)
	SV *self
	const char *endpoint

	PREINIT:
		int rc;
		zmq_raw_socket *sock;

	CODE:
		sock = ZMQ_SV_TO_PTR (Socket, self);
		rc = zmq_disconnect (sock->socket, endpoint);
		zmq_raw_check_error (rc);

void
send (self, buffer, flags=0)
	SV *self
	SV *buffer
	int flags

	PREINIT:
		int rc;
		zmq_raw_socket *sock;

	PPCODE:
		sock = ZMQ_SV_TO_PTR (Socket, self);
		rc = zmq_send (sock->socket,
			SvPVX (buffer), SvCUR (buffer), flags);
		if (rc < 0 && zmq_errno() == EAGAIN && (flags & ZMQ_DONTWAIT))
			XSRETURN_UNDEF;

		zmq_raw_check_error (rc);
		XSRETURN_YES;

void
sendmsg (self, msg, flags=0)
	SV *self
	SV *msg
	int flags

	PREINIT:
		int rc;
		zmq_raw_socket *sock;

	PPCODE:
		sock = ZMQ_SV_TO_PTR (Socket, self);
		rc = zmq_sendmsg (sock->socket,
			ZMQ_SV_TO_PTR (Message, msg), flags);
		if (rc < 0 && zmq_errno() == EAGAIN && (flags & ZMQ_DONTWAIT))
			XSRETURN_UNDEF;

		zmq_raw_check_error (rc);
		XSRETURN_YES;

SV *
recv (self, flags=0)
	SV *self
	int flags

	PREINIT:
		int rc;
		zmq_msg_t *msg;
		zmq_raw_socket *sock;

	CODE:
		sock = ZMQ_SV_TO_PTR (Socket, self);

		Newx (msg, 1, zmq_msg_t);
		rc = zmq_msg_init (msg);
		zmq_raw_check_error (rc);

		SV *buffer = sv_2mortal (newSV (16));
		SvPOK_on (buffer);
		SvCUR_set (buffer, 0);

		do
		{
			rc = zmq_recvmsg (sock->socket, msg,
				flags);

			if (rc < 0)
			{
				zmq_msg_close (msg);
				Safefree (msg);

				if (zmq_errno() == EAGAIN && (flags & ZMQ_DONTWAIT))
					XSRETURN_UNDEF;

				zmq_raw_check_error (rc);
			}

			sv_catpvn (buffer, zmq_msg_data (msg), zmq_msg_size (msg));

			rc = zmq_msg_get (msg, ZMQ_MORE);
			if (rc < 0)
			{
				zmq_msg_close (msg);
				Safefree (msg);
				zmq_raw_check_error (rc);
			}

		}
		while (rc);

		zmq_msg_close (msg);
		Safefree (msg);

		SvREFCNT_inc (buffer);
		RETVAL = buffer;

	OUTPUT: RETVAL

SV *
recvmsg (self, flags=0)
	SV *self
	int flags

	PREINIT:
		int rc;
		zmq_msg_t *msg;
		zmq_raw_socket *sock;

	CODE:
		sock = ZMQ_SV_TO_PTR (Socket, self);

		Newx (msg, 1, zmq_msg_t);
		rc = zmq_msg_init (msg);
		zmq_raw_check_error (rc);

		rc = zmq_recvmsg (sock->socket, msg,
			flags);
		if (rc < 0)
		{
			zmq_msg_close (msg);
			Safefree (msg);

			if (zmq_errno() == EAGAIN && (flags & ZMQ_DONTWAIT))
				XSRETURN_UNDEF;
		}
		zmq_raw_check_error (rc);

		ZMQ_NEW_OBJ (RETVAL, "ZMQ::Raw::Message", msg);

	OUTPUT: RETVAL

void
setsockopt (self, option, value)
	SV *self
	int option
	SV *value

	PREINIT:
		int rc;
		zmq_raw_socket *sock;

	CODE:
		sock = ZMQ_SV_TO_PTR (Socket, self);

		switch (option)
		{
			// int
			case ZMQ_BACKLOG:
			case ZMQ_CONFLATE:
			case ZMQ_CONNECT_TIMEOUT:
			case ZMQ_HANDSHAKE_IVL:
			case ZMQ_HEARTBEAT_IVL:
			case ZMQ_HEARTBEAT_TIMEOUT:
			case ZMQ_HEARTBEAT_TTL:
			case ZMQ_IMMEDIATE:
			case ZMQ_INVERT_MATCHING:
			case ZMQ_IPV6:
			case ZMQ_LINGER:
			case ZMQ_MULTICAST_HOPS:
			case ZMQ_MULTICAST_MAXTPDU:
			case ZMQ_PLAIN_SERVER:
			case ZMQ_USE_FD:
			case ZMQ_PROBE_ROUTER:
			case ZMQ_RATE:
			case ZMQ_RCVBUF:
			case ZMQ_RCVHWM:
			case ZMQ_RCVTIMEO:
			case ZMQ_RECONNECT_IVL:
			case ZMQ_RECONNECT_IVL_MAX:
			case ZMQ_RECOVERY_IVL:
			case ZMQ_REQ_CORRELATE:
			case ZMQ_REQ_RELAXED:
			case ZMQ_ROUTER_HANDOVER:
			case ZMQ_ROUTER_MANDATORY:
			case ZMQ_ROUTER_RAW:
			case ZMQ_SNDBUF:
			case ZMQ_SNDHWM:
			case ZMQ_SNDTIMEO:
			case ZMQ_STREAM_NOTIFY:
			case ZMQ_TCP_KEEPALIVE:
			case ZMQ_TCP_KEEPALIVE_CNT:
			case ZMQ_TCP_KEEPALIVE_IDLE:
			case ZMQ_TCP_KEEPALIVE_INTVL:
			case ZMQ_TCP_MAXRT:
			case ZMQ_TOS:
			case ZMQ_XPUB_VERBOSE:
			case ZMQ_XPUB_VERBOSER:
			case ZMQ_XPUB_MANUAL:
			case ZMQ_XPUB_NODROP:
			case ZMQ_CURVE_SERVER:
				{
					int v;
					if (!SvIOK (value))
						croak_usage ("Value is not an int");

					v = SvIV (value);
					rc = zmq_setsockopt (sock->socket, option,
						&v, sizeof (v));
					zmq_raw_check_error (rc);
				}
				break;

			// int64
			case ZMQ_MAXMSGSIZE:
				{
					int64_t v;
					if (!SvIOK (value))
						croak_usage ("Value is not an int");

					v = SvIV (value);
					rc = zmq_setsockopt (sock->socket, option,
						&v, sizeof (v));
					zmq_raw_check_error (rc);
				}
				break;

			// binary
			case ZMQ_CONNECT_RID:
			case ZMQ_IDENTITY:
			case ZMQ_PLAIN_PASSWORD:
			case ZMQ_PLAIN_USERNAME:
			case ZMQ_SOCKS_PROXY:
			case ZMQ_SUBSCRIBE:
			case ZMQ_UNSUBSCRIBE:
			case ZMQ_XPUB_WELCOME_MSG:
			case ZMQ_ZAP_DOMAIN:
			case ZMQ_TCP_ACCEPT_FILTER:
			case ZMQ_CURVE_SECRETKEY:
			case ZMQ_CURVE_PUBLICKEY:
			case ZMQ_CURVE_SERVERKEY:
				{
					STRLEN len;
					char *buf;

					if (!SvPOK (value))
						croak_usage ("Value is not a string");

					buf = SvPV (value, len);
					rc = zmq_setsockopt (sock->socket, option,
						buf, len);
					zmq_raw_check_error (rc);
				}
				break;

			default:
				croak_usage ("Unsupported option");
		}

void
close (self)
	SV *self

	PREINIT:
		int rc;
		zmq_raw_socket *sock;

	CODE:
		sock = ZMQ_SV_TO_PTR (Socket, self);

		rc = zmq_close (sock->socket);
		zmq_raw_check_error (rc);

		sock->socket = zmq_socket (sock->context, sock->type);
		if (sock->socket == NULL)
			zmq_raw_check_error (-1);

void
monitor (self, endpoint, events)
	SV *self
	const char *endpoint
	int events

	PREINIT:
		int rc;
		zmq_raw_socket *sock;

	CODE:
		sock = ZMQ_SV_TO_PTR (Socket, self);

		rc = zmq_socket_monitor (sock->socket, endpoint, events);
		zmq_raw_check_error (rc);

void
DESTROY(self)
	SV *self

	PREINIT:
		int rc;
		zmq_raw_socket *sock;

	CODE:
		sock = ZMQ_SV_TO_PTR (Socket, self);
		if (sock->socket)
			zmq_close (sock->socket);
		Safefree (sock);
		SvREFCNT_dec (ZMQ_SV_TO_MAGIC (self));