/*
* libetp implementation
*
* Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann <libetp@schmorp.de>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modifica-
* tion, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
* CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
* CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
* ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
* OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Alternatively, the contents of this file may be used under the terms of
* the GNU General Public License ("GPL") version 2 or any later version,
* in which case the provisions of the GPL are applicable instead of
* the above. If you wish to allow the use of your version of this file
* only under the terms of the GPL and not to allow others to use your
* version of this file under the BSD license, indicate your decision
* by deleting the provisions above and replace them with the notice
* and other provisions required by the GPL. If you do not delete the
* provisions above, a recipient may use your version of this file under
* either the BSD or the GPL.
*/
#if HAVE_SYS_PRCTL_H
# include <sys/prctl.h>
#endif
#ifdef EIO_STACKSIZE
# define X_STACKSIZE EIO_STACKSIZE
#endif
#include "xthread.h"
#ifndef ETP_API_DECL
# define ETP_API_DECL static
#endif
#ifndef ETP_PRI_MIN
# define ETP_PRI_MIN 0
# define ETP_PRI_MAX 0
#endif
#ifndef ETP_TYPE_QUIT
# define ETP_TYPE_QUIT 0
#endif
#ifndef ETP_TYPE_GROUP
# define ETP_TYPE_GROUP 1
#endif
#ifndef ETP_WANT_POLL
# define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata)
#endif
#ifndef ETP_DONE_POLL
# define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata)
#endif
#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
#define ETP_TICKS ((1000000 + 1023) >> 10)
enum {
ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */
};
/* calculate time difference in ~1/ETP_TICKS of a second */
ecb_inline int
etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
{
return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
+ ((tv2->tv_usec - tv1->tv_usec) >> 10);
}
struct etp_tmpbuf
{
void *ptr;
int len;
};
static void *
etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
{
if (buf->len < len)
{
free (buf->ptr);
buf->ptr = malloc (buf->len = len);
}
return buf->ptr;
}
/*
* a somewhat faster data structure might be nice, but
* with 8 priorities this actually needs <20 insns
* per shift, the most expensive operation.
*/
typedef struct
{
ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
int size;
} etp_reqq;
typedef struct etp_pool *etp_pool;
typedef struct etp_worker
{
etp_pool pool;
struct etp_tmpbuf tmpbuf;
/* locked by pool->wrklock */
struct etp_worker *prev, *next;
xthread_t tid;
#ifdef ETP_WORKER_COMMON
ETP_WORKER_COMMON
#endif
} etp_worker;
struct etp_pool
{
void *userdata;
etp_reqq req_queue;
etp_reqq res_queue;
unsigned int started, idle, wanted;
unsigned int max_poll_time; /* pool->reslock */
unsigned int max_poll_reqs; /* pool->reslock */
unsigned int nreqs; /* pool->reqlock */
unsigned int nready; /* pool->reqlock */
unsigned int npending; /* pool->reqlock */
unsigned int max_idle; /* maximum number of threads that can pool->idle indefinitely */
unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */
void (*want_poll_cb) (void *userdata);
void (*done_poll_cb) (void *userdata);
xmutex_t wrklock;
xmutex_t reslock;
xmutex_t reqlock;
xcond_t reqwait;
etp_worker wrk_first;
};
#define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
/* worker threads management */
static void
etp_worker_clear (etp_worker *wrk)
{
}
static void ecb_cold
etp_worker_free (etp_worker *wrk)
{
free (wrk->tmpbuf.ptr);
wrk->next->prev = wrk->prev;
wrk->prev->next = wrk->next;
free (wrk);
}
ETP_API_DECL unsigned int
etp_nreqs (etp_pool pool)
{
int retval;
if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
retval = pool->nreqs;
if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
return retval;
}
ETP_API_DECL unsigned int
etp_nready (etp_pool pool)
{
unsigned int retval;
if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
retval = pool->nready;
if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
return retval;
}
ETP_API_DECL unsigned int
etp_npending (etp_pool pool)
{
unsigned int retval;
if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
retval = pool->npending;
if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
return retval;
}
ETP_API_DECL unsigned int
etp_nthreads (etp_pool pool)
{
unsigned int retval;
if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
retval = pool->started;
if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
return retval;
}
static void ecb_noinline ecb_cold
reqq_init (etp_reqq *q)
{
int pri;
for (pri = 0; pri < ETP_NUM_PRI; ++pri)
q->qs[pri] = q->qe[pri] = 0;
q->size = 0;
}
static int ecb_noinline
reqq_push (etp_reqq *q, ETP_REQ *req)
{
int pri = req->pri;
req->next = 0;
if (q->qe[pri])
{
q->qe[pri]->next = req;
q->qe[pri] = req;
}
else
q->qe[pri] = q->qs[pri] = req;
return q->size++;
}
static ETP_REQ * ecb_noinline
reqq_shift (etp_reqq *q)
{
int pri;
if (!q->size)
return 0;
--q->size;
for (pri = ETP_NUM_PRI; pri--; )
{
ETP_REQ *req = q->qs[pri];
if (req)
{
if (!(q->qs[pri] = (ETP_REQ *)req->next))
q->qe[pri] = 0;
return req;
}
}
abort ();
}
ETP_API_DECL int ecb_cold
etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata))
{
X_MUTEX_CREATE (pool->wrklock);
X_MUTEX_CREATE (pool->reslock);
X_MUTEX_CREATE (pool->reqlock);
X_COND_CREATE (pool->reqwait);
reqq_init (&pool->req_queue);
reqq_init (&pool->res_queue);
pool->wrk_first.next =
pool->wrk_first.prev = &pool->wrk_first;
pool->started = 0;
pool->idle = 0;
pool->nreqs = 0;
pool->nready = 0;
pool->npending = 0;
pool->wanted = 4;
pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */
pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
pool->userdata = userdata;
pool->want_poll_cb = want_poll;
pool->done_poll_cb = done_poll;
return 0;
}
static void ecb_noinline ecb_cold
etp_proc_init (void)
{
#if HAVE_PRCTL_SET_NAME
/* provide a more sensible "thread name" */
char name[16 + 1];
const int namelen = sizeof (name) - 1;
int len;
prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
name [namelen] = 0;
len = strlen (name);
strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
#endif
}
X_THREAD_PROC (etp_proc)
{
ETP_REQ *req;
struct timespec ts;
etp_worker *self = (etp_worker *)thr_arg;
etp_pool pool = self->pool;
etp_proc_init ();
/* try to distribute timeouts somewhat evenly */
ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
for (;;)
{
ts.tv_sec = 0;
X_LOCK (pool->reqlock);
for (;;)
{
req = reqq_shift (&pool->req_queue);
if (ecb_expect_true (req))
break;
if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
{
X_UNLOCK (pool->reqlock);
X_LOCK (pool->wrklock);
--pool->started;
X_UNLOCK (pool->wrklock);
goto quit;
}
++pool->idle;
if (pool->idle <= pool->max_idle)
/* we are allowed to pool->idle, so do so without any timeout */
X_COND_WAIT (pool->reqwait, pool->reqlock);
else
{
/* initialise timeout once */
if (!ts.tv_sec)
ts.tv_sec = time (0) + pool->idle_timeout;
if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT)
ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
}
--pool->idle;
}
--pool->nready;
X_UNLOCK (pool->reqlock);
if (ecb_expect_false (req->type == ETP_TYPE_QUIT))
goto quit;
ETP_EXECUTE (self, req);
X_LOCK (pool->reslock);
++pool->npending;
if (!reqq_push (&pool->res_queue, req))
ETP_WANT_POLL (pool);
etp_worker_clear (self);
X_UNLOCK (pool->reslock);
}
quit:
free (req);
X_LOCK (pool->wrklock);
etp_worker_free (self);
X_UNLOCK (pool->wrklock);
return 0;
}
static void ecb_cold
etp_start_thread (etp_pool pool)
{
etp_worker *wrk = calloc (1, sizeof (etp_worker));
/*TODO*/
assert (("unable to allocate worker thread data", wrk));
wrk->pool = pool;
X_LOCK (pool->wrklock);
if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
{
wrk->prev = &pool->wrk_first;
wrk->next = pool->wrk_first.next;
pool->wrk_first.next->prev = wrk;
pool->wrk_first.next = wrk;
++pool->started;
}
else
free (wrk);
X_UNLOCK (pool->wrklock);
}
static void
etp_maybe_start_thread (etp_pool pool)
{
if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted))
return;
/* todo: maybe use pool->idle here, but might be less exact */
if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
return;
etp_start_thread (pool);
}
static void ecb_cold
etp_end_thread (etp_pool pool)
{
ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
req->type = ETP_TYPE_QUIT;
req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
X_LOCK (pool->reqlock);
reqq_push (&pool->req_queue, req);
X_COND_SIGNAL (pool->reqwait);
X_UNLOCK (pool->reqlock);
X_LOCK (pool->wrklock);
--pool->started;
X_UNLOCK (pool->wrklock);
}
ETP_API_DECL int
etp_poll (etp_pool pool)
{
unsigned int maxreqs;
unsigned int maxtime;
struct timeval tv_start, tv_now;
X_LOCK (pool->reslock);
maxreqs = pool->max_poll_reqs;
maxtime = pool->max_poll_time;
X_UNLOCK (pool->reslock);
if (maxtime)
gettimeofday (&tv_start, 0);
for (;;)
{
ETP_REQ *req;
etp_maybe_start_thread (pool);
X_LOCK (pool->reslock);
req = reqq_shift (&pool->res_queue);
if (ecb_expect_true (req))
{
--pool->npending;
if (!pool->res_queue.size)
ETP_DONE_POLL (pool);
}
X_UNLOCK (pool->reslock);
if (ecb_expect_false (!req))
return 0;
X_LOCK (pool->reqlock);
--pool->nreqs;
X_UNLOCK (pool->reqlock);
if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
{
req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
continue;
}
else
{
int res = ETP_FINISH (req);
if (ecb_expect_false (res))
return res;
}
if (ecb_expect_false (maxreqs && !--maxreqs))
break;
if (maxtime)
{
gettimeofday (&tv_now, 0);
if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
break;
}
}
errno = EAGAIN;
return -1;
}
ETP_API_DECL void
etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
ETP_API_DECL void
etp_cancel (etp_pool pool, ETP_REQ *req)
{
req->cancelled = 1;
etp_grp_cancel (pool, req);
}
ETP_API_DECL void
etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
{
for (grp = grp->grp_first; grp; grp = grp->grp_next)
etp_cancel (pool, grp);
}
ETP_API_DECL void
etp_submit (etp_pool pool, ETP_REQ *req)
{
req->pri -= ETP_PRI_MIN;
if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
{
/* I hope this is worth it :/ */
X_LOCK (pool->reqlock);
++pool->nreqs;
X_UNLOCK (pool->reqlock);
X_LOCK (pool->reslock);
++pool->npending;
if (!reqq_push (&pool->res_queue, req))
ETP_WANT_POLL (pool);
X_UNLOCK (pool->reslock);
}
else
{
X_LOCK (pool->reqlock);
++pool->nreqs;
++pool->nready;
reqq_push (&pool->req_queue, req);
X_COND_SIGNAL (pool->reqwait);
X_UNLOCK (pool->reqlock);
etp_maybe_start_thread (pool);
}
}
ETP_API_DECL void ecb_cold
etp_set_max_poll_time (etp_pool pool, double seconds)
{
if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
pool->max_poll_time = seconds * ETP_TICKS;
if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
}
ETP_API_DECL void ecb_cold
etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
{
if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
pool->max_poll_reqs = maxreqs;
if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
}
ETP_API_DECL void ecb_cold
etp_set_max_idle (etp_pool pool, unsigned int threads)
{
if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
pool->max_idle = threads;
if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
}
ETP_API_DECL void ecb_cold
etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
{
if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
pool->idle_timeout = seconds;
if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
}
ETP_API_DECL void ecb_cold
etp_set_min_parallel (etp_pool pool, unsigned int threads)
{
if (pool->wanted < threads)
pool->wanted = threads;
}
ETP_API_DECL void ecb_cold
etp_set_max_parallel (etp_pool pool, unsigned int threads)
{
if (pool->wanted > threads)
pool->wanted = threads;
while (pool->started > pool->wanted)
etp_end_thread (pool);
}