#include "pollfd_rbhash.c"
// Returns false when time to exit
static bool do_watch();
void watch_thread_log(void* buffer, int len) {
int unused= write(2, len <= 0? "error\n":buffer, len <= 0? 6 : len);
(void) unused;
}
#ifdef WATCHTHREAD_DEBUGGING
#define WATCHTHREAD_DEBUG(fmt, ...) watch_thread_log(msgbuf, snprintf(msgbuf, sizeof(msgbuf), fmt, ##__VA_ARGS__ ))
#define WATCHTHREAD_WARN(fmt, ...) watch_thread_log(msgbuf, snprintf(msgbuf, sizeof(msgbuf), fmt, ##__VA_ARGS__ ))
#else
#define WATCHTHREAD_DEBUG(fmt, ...) ((void)0)
#define WATCHTHREAD_WARN(fmt, ...) ((void)0)
#endif
void* watch_thread_main(void* unused) {
while (do_watch()) {}
return NULL;
}
// separate from watch_thread_main because it uses a dynamic alloca() on each iteration
bool do_watch() {
struct pollfd *pollset;
struct timespec wake_time= { 0, -1 };
int capacity, buckets, sz, n_poll, i, j, n, delay= 10000;
char msgbuf[128];
if (pthread_mutex_lock(&watch_list_mutex))
abort(); // should never fail
// allocate to the size of watch_list_count, but cap it at 1024 for sanity
// since this is coming off the stack. If any user actually wants to watch
// more than 1024 sockets, this needs re-designed, but I'm not sure if
// malloc is thread-safe when the main perl binary was compiled without
// thread support.
capacity= watch_list_count > 1024? 1024 : watch_list_count+1;
buckets= capacity < 16? 16 : capacity < 128? 32 : 64;
sz= sizeof(struct pollfd) * capacity + POLLFD_RBHASH_SIZEOF(capacity, buckets);
pollset= (struct pollfd *) alloca(sz);
memset(pollset, 0, sz);
// first fd is always our control socket
pollset[0].fd= control_pipe[0];
pollset[0].events= POLLIN;
n_poll= 1;
WATCHTHREAD_DEBUG("watch_thread loop iter, watch_list_count=%d capacity=%d buckets=%d\n", watch_list_count, capacity, buckets);
for (i= 0, n= watch_list_count; i < n && n_poll < capacity; i++) {
struct socketalarm *alarm= watch_list[i];
int fd= alarm->watch_fd;
// Ignore watches that finished. Main thread needs to clean these up.
if (alarm->cur_action >= alarm->action_count)
continue;
// If the socketalarm is in the process of being executed and stopped at
// a 'sleep' command, factor that into the wake time.
if (alarm->wake_ts.tv_nsec != -1) {
if (wake_time.tv_nsec == -1
|| wake_time.tv_sec > alarm->wake_ts.tv_sec
|| (wake_time.tv_sec == alarm->wake_ts.tv_sec && wake_time.tv_nsec > alarm->wake_ts.tv_nsec)
) {
wake_time.tv_sec= alarm->wake_ts.tv_sec;
wake_time.tv_nsec= alarm->wake_ts.tv_nsec;
}
}
else {
// Find a pollset slot for this file descriptor, collapsing duplicates.
// The next unused one is at [n_poll], which has NodeID n_poll+1
int poll_i= -1 + (int)pollfd_rbhash_insert(pollset+capacity, capacity, n_poll+1, fd & (buckets-1), fd);
if (poll_i < 0) { // corrupt datastruct, should never happen
WATCHTHREAD_WARN("BUG: corrupt pollfd_rbhash");
break;
}
if (poll_i == n_poll) { // using the new uninitialized one?
pollset[poll_i].fd= fd;
pollset[poll_i].events= 0;
++n_poll;
}
// Add the poll flags of this socketalarm
int events= alarm->event_mask;
#ifdef POLLRDHUP
if (events & EVENT_SHUT)
pollset[poll_i].events= POLLRDHUP;
#endif
if (events & EVENT_EOF) {
// If a fd gets data in the queue, there is no way to wait exclusively
// for the EOF event. We have to wake up periodically to check the socket.
if (alarm->unwaitable) // will be set if found data queued in the buffer
delay= 500;
else
pollset[poll_i].events= POLLIN;
}
if (events & EVENT_IN)
pollset[poll_i].events= POLLIN;
if (events & EVENT_PRI)
pollset[poll_i].events= POLLPRI;
if (events & EVENT_CLOSE) {
// According to poll docs, it is a bug to assume closing a socket in one thread
// will wake a 'poll' in another thread, so if the user wants to know about this
// condition, we have to loop more quickly.
delay= 500;
}
}
}
pthread_mutex_unlock(&watch_list_mutex);
// If there is a defined wake-time, truncate the delay if the wake-time comes first
if (wake_time.tv_nsec != -1) {
struct timespec now_ts= { 0, -1 };
if (lazy_build_now_ts(&now_ts)) {
// subtract to find out delay. poll only has millisecond precision anyway.
int wake_delay= ((long)wake_time.tv_sec - (long)now_ts.tv_sec) * 1000
+ (wake_time.tv_nsec - now_ts.tv_nsec)/1000000;
if (wake_delay < delay)
delay= wake_delay;
}
}
WATCHTHREAD_DEBUG("poll(n=%d delay=%d)\n", n_poll, delay);
if (poll(pollset, n_poll, delay < 0? 0 : delay) < 0) {
perror("poll");
return false;
}
for (i= 0; i < n_poll; i++) {
int e= pollset[i].revents;
WATCHTHREAD_DEBUG(" fd=%3d revents=%02X (%s%s%s%s%s%s%s)\n", pollset[i].fd, e,
e&POLLIN? " IN":"", e&POLLPRI? " PRI":"", e&POLLOUT? " OUT":"",
#ifdef POLLRDHUP
e&POLLRDHUP? " RDHUP":"",
#else
"",
#endif
e&POLLERR? " ERR":"", e&POLLHUP? " HUP":"", e&POLLNVAL? " NVAL":"");
}
// First, did we get new control messages?
if (pollset[0].revents & POLLIN) {
char msg;
int ret= read(pollset[0].fd, &msg, 1);
if (ret != 1) { // should never fail
WATCHTHREAD_DEBUG("read(control_pipe): %d, errno %m, terminating watch_thread\n", ret);
return false;
}
if (msg == CONTROL_TERMINATE) {// intentional exit
WATCHTHREAD_DEBUG("terminate received\n");
return false;
}
// else its CONTROL_REWATCH, which means we should start over with new alarms to watch
WATCHTHREAD_DEBUG("got REWATCH, starting over\n");
return true;
}
// Now, process all of the socketalarms using the statuses from the pollfd
if (pthread_mutex_lock(&watch_list_mutex))
abort(); // should never fail
for (i= 0, n= watch_list_count; i < n; i++) {
struct socketalarm *alarm= watch_list[i];
// If it has not been triggered yet, see if it is now
if (alarm->cur_action == -1) {
bool trigger= false;
int fd= alarm->watch_fd, revents;
size_t pollfd_node;
struct stat statbuf;
// Is it still the same socket that we intended to watch?
if (fstat(fd, &statbuf) != 0
|| statbuf.st_dev != alarm->watch_fd_dev
|| statbuf.st_ino != alarm->watch_fd_ino
) {
// fd was closed/reused. If user watching event CLOSE, then trigger the actions,
// else assume that the host program took care of the socket and doesn't want
// the alarm.
if (alarm->event_mask & EVENT_CLOSE)
trigger= true;
else
alarm->cur_action= alarm->action_count;
}
else {
int poll_i= -1 + (int) pollfd_rbhash_find(pollset+capacity, capacity, fd & (buckets-1), fd);
// Did we poll this fd?
if (poll_i < 0)
// can only happen if watch_list changed while we let go of the mutex (or a bug in rbhash)
continue;
revents= pollset[poll_i].revents;
trigger= ((alarm->event_mask & EVENT_SHUT) && (revents &
#ifdef POLLRDHUP
(POLLHUP|POLLRDHUP|POLLERR)
#else
(POLLHUP|POLLERR)
#endif
))
|| ((alarm->event_mask & EVENT_IN) && (revents & POLLIN))
|| ((alarm->event_mask & EVENT_PRI) && (revents & POLLPRI));
// Now the tricky one, EVENT_EOF...
if (!trigger && (alarm->event_mask & EVENT_EOF) && (alarm->unwaitable || (revents & POLLIN))) {
int avail= recv(fd, msgbuf, sizeof(msgbuf), MSG_DONTWAIT|MSG_PEEK);
if (avail == 0)
// This the zero-length read that means EOF
trigger= true;
else
// else if there is data on the socket, we are in the "unwaitable" condition
// else, error conditions are not "EOF" and can still be waited using POLLIN.
alarm->unwaitable= (avail > 0);
}
// We're playing with race conditions, so make sure one more time that we're
// triggering on the socket we expected.
if (trigger) {
if ((fstat(fd, &statbuf) != 0
|| statbuf.st_dev != alarm->watch_fd_dev
|| statbuf.st_ino != alarm->watch_fd_ino
) && !(alarm->event_mask & EVENT_CLOSE)
) {
alarm->cur_action= alarm->action_count;
trigger= false;
}
}
}
if (!trigger)
continue; // don't exec_actions
}
socketalarm_exec_actions(alarm);
}
pthread_mutex_unlock(&watch_list_mutex);
return true;
}
// May only be called by Perl's thread
static bool watch_list_add(struct socketalarm *alarm) {
int i;
const char *error= NULL;
if (pthread_mutex_lock(&watch_list_mutex))
croak("mutex_lock failed");
if (!watch_list) {
Newxz(watch_list, 16, struct socketalarm * volatile);
watch_list_alloc= 16;
}
else {
// Clean up completed watches
for (i= watch_list_count-1; i >= 0; --i) {
if (watch_list[i]->cur_action >= watch_list[i]->action_count) {
watch_list[i]->list_ofs= -1;
if (--watch_list_count > i) {
watch_list[i]= watch_list[watch_list_count];
watch_list[i]->list_ofs= i;
}
watch_list[watch_list_count]= NULL;
}
}
// allocate more if needed
if (watch_list_count >= watch_list_alloc) {
Renew(watch_list, watch_list_alloc*2, struct socketalarm * volatile);
watch_list_alloc= watch_list_alloc*2;
}
}
i= alarm->list_ofs;
if (i < 0) { // only add if not already added
alarm->list_ofs= watch_list_count;
watch_list[watch_list_count++]= alarm;
// Initialize fields that watcher uses to track status
alarm->cur_action= -1;
alarm->wake_ts.tv_nsec= -1;
alarm->unwaitable= false;
}
// If the thread is not running, start it. Also create pipe if needed.
if (control_pipe[1] < 0) {
int ret= 0;
sigset_t mask, orig;
sigfillset(&mask);
if (pipe(control_pipe) != 0)
error= "pipe() failed";
// Block all signals before creating thread so that the new thread inherits it,
// then restore the original signals.
else if (pthread_sigmask(SIG_SETMASK, &mask, &orig) != 0)
error= "pthread_sigmask(BLOCK) failed";
else if (pthread_create(&watch_thread, NULL, (void*(*)(void*)) watch_thread_main, NULL) != 0)
error= "pthread_create failed";
else if (pthread_sigmask(SIG_SETMASK, &orig, NULL) != 0)
error= "pthread_sigmask(UNBLOCK) failed";
} else {
char msg= CONTROL_REWATCH;
if (write(control_pipe[1], &msg, 1) != 1)
error= "failed to notify watch_thread";
}
pthread_mutex_unlock(&watch_list_mutex);
if (error)
croak(error);
return i < 0;
}
// need to lock mutex before accessing concurrent alarm fields
static void watch_list_item_get_status(struct socketalarm *alarm, int *cur_action_out) {
if (pthread_mutex_lock(&watch_list_mutex))
croak("mutex_lock failed");
if (cur_action_out) *cur_action_out= alarm->cur_action;
pthread_mutex_unlock(&watch_list_mutex);
}
// May only be called by Perl's thread
static bool watch_list_remove(struct socketalarm *alarm) {
int i;
if (pthread_mutex_lock(&watch_list_mutex))
croak("mutex_lock failed");
// Clean up completed watches
for (i= watch_list_count-1; i >= 0; --i) {
if (watch_list[i]->cur_action >= watch_list[i]->action_count) {
watch_list[i]->list_ofs= -1;
if (--watch_list_count > i) {
watch_list[i]= watch_list[watch_list_count];
watch_list[i]->list_ofs= i;
}
watch_list[watch_list_count]= NULL;
}
}
i= alarm->list_ofs;
if (i >= 0) {
// fill the hole in the list by moving the final item
if (i < watch_list_count-1) {
watch_list[i]= watch_list[watch_list_count-1];
watch_list[i]->list_ofs= i;
}
--watch_list_count;
alarm->list_ofs= -1;
// This one was still an active watch, so need to notify thread
// not to listen for it anymore
if (control_pipe[1] >= 0) {
char msg= CONTROL_REWATCH;
if (write(control_pipe[1], &msg, 1) != 1) {
pthread_mutex_unlock(&watch_list_mutex);
croak("failed to notify watch_thread");
}
}
}
pthread_mutex_unlock(&watch_list_mutex);
return i >= 0;
}
// only called during Perl's END phase. Just need to let
// things end gracefully and not have the thread go nuts
// as sockets get closed.
static void shutdown_watch_thread() {
int i;
// Wipe the alarm list
if (pthread_mutex_lock(&watch_list_mutex))
croak("mutex_lock failed");
for (i= 0; i < watch_list_count; i++) {
watch_list[i]->list_ofs= -1;
watch_list[i]= NULL;
}
watch_list_count= 0;
// Notify the thread to stop
if (control_pipe[1] >= 0) {
char msg= CONTROL_TERMINATE;
if (write(control_pipe[1], &msg, 1) != 1)
warn("write(control_pipe) failed");
}
pthread_mutex_unlock(&watch_list_mutex);
// don't bother unallocating watch_list or closing pipe,
// because we're exiting anyway.
}