#define PERL_NO_GET_CONTEXT
#include "EXTERN.h"
#include "perl.h"
#include "ppport.h"
#include "refcount.h"
#include "values.h"
#include "notification.h"
#include "channel.h"
/*
* Message channels
*/
enum state { HAS_NOTHING, HAS_READER, HAS_WRITER, HAS_MESSAGE, CLOSED };
static const char* state_names[] = { "nothing", "has-reader", "has-writer", "has-message" };
struct channel {
perl_mutex data_mutex;
perl_mutex reader_mutex;
perl_mutex writer_mutex;
perl_cond data_condvar;
Refcount refcount;
enum state state;
SV* message;
Notification read_notification;
Notification write_notification;
};
Channel* channel_alloc(UV refcount) {
Channel* ret = calloc(1, sizeof(Channel));
MUTEX_INIT(&ret->data_mutex);
MUTEX_INIT(&ret->reader_mutex);
MUTEX_INIT(&ret->writer_mutex);
COND_INIT(&ret->data_condvar);
refcount_init(&ret->refcount, refcount);
notification_init(&ret->read_notification);
notification_init(&ret->write_notification);
return ret;
}
void channel_send(Channel* channel, SV* message) {
MUTEX_LOCK(&channel->writer_mutex);
MUTEX_LOCK(&channel->data_mutex);
channel->message = message;
notification_trigger(&channel->read_notification);
if (channel->state == HAS_READER) {
channel->state = HAS_MESSAGE;
COND_SIGNAL(&channel->data_condvar);
}
else {
assert(channel->state == HAS_NOTHING);
channel->state = HAS_WRITER;
}
do COND_WAIT(&channel->data_condvar, &channel->data_mutex);
while (channel->state != HAS_NOTHING && channel->state != HAS_READER && channel->state != CLOSED);
MUTEX_UNLOCK(&channel->data_mutex);
MUTEX_UNLOCK(&channel->writer_mutex);
}
SV* S_channel_receive(pTHX_ Channel* channel) {
MUTEX_LOCK(&channel->reader_mutex);
MUTEX_LOCK(&channel->data_mutex);
notification_trigger(&channel->write_notification);
if (channel->state == HAS_NOTHING) {
channel->state = HAS_READER;
do COND_WAIT(&channel->data_condvar, &channel->data_mutex);
while (channel->state != HAS_MESSAGE && channel->state != CLOSED);
}
else
assert(channel->state == HAS_WRITER || channel->state == CLOSED);
SV* result;
if (channel->state != CLOSED) {
result = clone_value(channel->message);
channel->state = HAS_NOTHING;
}
else
result = &PL_sv_undef;
channel->message = NULL;
COND_SIGNAL(&channel->data_condvar);
MUTEX_UNLOCK(&channel->data_mutex);
MUTEX_UNLOCK(&channel->reader_mutex);
return result;
}
SV* S_channel_receive_ready_fh(pTHX_ Channel* channel) {
MUTEX_LOCK(&channel->data_mutex);
SV* result = notification_create(&channel->write_notification);
if (channel->state == HAS_READER)
notification_trigger(&channel->read_notification);
MUTEX_UNLOCK(&channel->data_mutex);
return result;
}
SV* S_channel_send_ready_fh(pTHX_ Channel* channel) {
MUTEX_LOCK(&channel->data_mutex);
SV* result = notification_create(&channel->write_notification);
if (channel->state == HAS_WRITER)
notification_trigger(&channel->write_notification);
MUTEX_UNLOCK(&channel->data_mutex);
return result;
}
void channel_close(Channel* channel) {
MUTEX_LOCK(&channel->data_mutex);
notification_unset(&channel->read_notification);
channel->state = CLOSED;
COND_SIGNAL(&channel->data_condvar);
MUTEX_UNLOCK(&channel->data_mutex);
}
void channel_refcount_dec(Channel* channel) {
if (refcount_dec(&channel->refcount) == 1) {
notification_unset(&channel->read_notification);
notification_unset(&channel->write_notification);
COND_DESTROY(&channel->data_condvar);
MUTEX_DESTROY(&channel->writer_mutex);
MUTEX_DESTROY(&channel->reader_mutex);
MUTEX_DESTROY(&channel->data_mutex);
free(channel);
}
}
static int channel_magic_destroy(pTHX_ SV* sv, MAGIC* magic) {
channel_refcount_dec((Channel*)magic->mg_ptr);
return 0;
}
static int channel_magic_dup(pTHX_ MAGIC* magic, CLONE_PARAMS* param) {
Channel* channel = (Channel*)magic->mg_ptr;
refcount_inc(&channel->refcount);
return 0;
}
static const MGVTBL channel_magic = { 0, 0, 0, 0, channel_magic_destroy, 0, channel_magic_dup };
SV* S_channel_to_sv(pTHX_ Channel* channel, SV* stash_name) {
object_to_sv(channel, gv_stashsv(stash_name, 0), &channel_magic, MGf_DUP);
}
Channel* S_sv_to_channel(pTHX_ SV* sv) {
return sv_to_object(sv, "Thread::Csp::Channel", &channel_magic);
}