#define PERL_NO_GET_CONTEXT
#include "EXTERN.h"
#include "perl.h"
#define NO_XSLOCKS
#include "XSUB.h"
#include "message.h"
#include "queue.h"
#include "mthread.h"
#include "resources.h"
bool
inited = 0;
typedef
struct
{
perl_mutex lock;
UV current;
UV allocated;
void
** objects;
} resource;
static
S_resource_init(resource* res, UV preallocate) {
MUTEX_INIT(&res->lock);
res->objects = PerlMemShared_calloc(preallocate,
sizeof
(
void
*));
res->allocated = preallocate;
}
#define resource_init(res, pre) S_resource_init(res, pre)
static
UV resource_addobject(resource* res,
void
* object) {
UV ret;
MUTEX_LOCK(&res->lock);
ret = res->current;
if
(res->current == res->allocated)
res->objects = PerlMemShared_realloc(res->objects,
sizeof
(
void
*) * (res->allocated *=2));
res->objects[res->current++] = object;
MUTEX_UNLOCK(&res->lock);
return
ret;
}
static
resource threads;
static
resource queues;
static
struct
{
perl_mutex mutex;
perl_cond condvar;
int
count;
} counter;
static
void
wait_for_all_other_threads() {
MUTEX_LOCK(&counter.mutex);
while
(counter.count > 1)
COND_WAIT(&counter.condvar, &counter.mutex);
MUTEX_UNLOCK(&counter.mutex);
MUTEX_DESTROY(&counter.mutex);
COND_DESTROY(&counter.condvar);
}
static
XS(end_locker) {
dVAR; dXSARGS;
perl_mutex* mutex;
wait_for_all_other_threads();
mutex = get_shutdown_mutex();
MUTEX_LOCK(mutex);
XSRETURN_EMPTY;
}
static
void
end_unlocker() {
perl_mutex* mutex = get_shutdown_mutex();
MUTEX_UNLOCK(mutex);
}
void
global_init(pTHX) {
if
(!inited) {
mthread* ret;
inited = TRUE;
MUTEX_INIT(&counter.mutex);
COND_INIT(&counter.condvar);
counter.count = 0;
resource_init(&threads, 8);
resource_init(&queues, 8);
ret = mthread_alloc(aTHX);
store_self(aTHX, ret);
newXS(
"END"
, end_locker, __FILE__);
atexit
(end_unlocker);
}
}
mthread* mthread_alloc(PerlInterpreter* my_perl) {
mthread* ret;
MUTEX_LOCK(&counter.mutex);
counter.count++;
MUTEX_UNLOCK(&counter.mutex);
ret = PerlMemShared_calloc(1,
sizeof
*ret);
queue_init(&ret->queue);
ret->id = resource_addobject(&threads, ret);
ret->interp = my_perl;
MUTEX_INIT(&ret->lock);
return
ret;
}
void
mthread_destroy(mthread*
thread
) {
MUTEX_LOCK(&threads.lock);
threads.objects[
thread
->id] = NULL;
queue_destroy(&
thread
->queue);
MUTEX_UNLOCK(&threads.lock);
MUTEX_DESTROY(&
thread
->lock);
MUTEX_LOCK(&counter.mutex);
counter.count--;
COND_SIGNAL(&counter.condvar);
MUTEX_UNLOCK(&counter.mutex);
}
static
mthread* S_get_thread(pTHX_ UV thread_id) {
if
(thread_id >= threads.current || threads.objects[thread_id] == NULL)
Perl_croak(aTHX_
"Thread %"
UVuf
" doesn't exist"
, thread_id);
return
threads.objects[thread_id];
}
#define get_thread(id) S_get_thread(aTHX_ id)
UV queue_alloc(IV linked_to) {
message_queue* queue;
queue = PerlMemShared_calloc(1,
sizeof
*queue);
queue_init(queue);
return
resource_addobject(&queues, queue);
}
static
message_queue* S_get_queue(pTHX_ UV queue_id) {
if
(queue_id >= queues.current || queues.objects[queue_id] == NULL)
Perl_croak(aTHX_
"queue %"
UVuf
" doesn't exist"
, queue_id);
return
queues.objects[queue_id];
}
#define get_queue(id) S_get_queue(aTHX_ id)
#define THREAD_TRY \
XCPT_TRY_START
#define THREAD_CATCH_FINALLY(catch, finally) \
XCPT_TRY_END;\
finally;\
XCPT_CATCH {
catch
; XCPT_RETHROW; }
#define THREAD_CATCH(undo) THREAD_CATCH_FINALLY(undo, 0)
#define THREAD_FINALLY(undo) THREAD_CATCH_FINALLY(0, undo)
void
S_thread_send(pTHX_ UV thread_id, message* message) {
dXCPT;
MUTEX_LOCK(&threads.lock);
THREAD_TRY {
mthread*
thread
= get_thread(thread_id);
queue_enqueue(&
thread
->queue, message, &threads.lock);
} THREAD_CATCH( MUTEX_UNLOCK(&threads.lock) );
}
void
S_queue_send(pTHX_ UV queue_id, message* message) {
dXCPT;
MUTEX_LOCK(&queues.lock);
THREAD_TRY {
message_queue* queue = get_queue(queue_id);
queue_enqueue(queue, message, &queues.lock);
} THREAD_CATCH( MUTEX_UNLOCK(&queues.lock) );
}
void
S_queue_receive(pTHX_ UV queue_id, message* message) {
dXCPT;
MUTEX_LOCK(&queues.lock);
THREAD_TRY {
message_queue* queue = get_queue(queue_id);
queue_dequeue(queue, message, &queues.lock);
} THREAD_CATCH( MUTEX_UNLOCK(&queues.lock) );
}
void
S_queue_receive_nb(pTHX_ UV queue_id, message* message) {
dXCPT;
MUTEX_LOCK(&queues.lock);
THREAD_TRY {
message_queue* queue = get_queue(queue_id);
queue_dequeue_nb(queue, message, &queues.lock);
} THREAD_CATCH( MUTEX_UNLOCK(&queues.lock) );
}
void
S_send_listeners(pTHX_ mthread*
thread
, message* mess) {
int
i;
dXCPT;
MUTEX_LOCK(&
thread
->lock);
for
(i = 0; i <
thread
->listeners.head; ++i) {
message clone;
UV thread_id;
MUTEX_LOCK(&threads.lock);
thread_id =
thread
->listeners.list[i];
if
(thread_id >= threads.current || threads.objects[thread_id] == NULL)
continue
;
message_clone(mess, &clone);
queue_enqueue(&((mthread*)threads.objects[thread_id])->queue, &clone, &threads.lock);
}
MUTEX_UNLOCK(&
thread
->lock);
}
#define send_listeners(thread, message) S_send_listeners(aTHX_ thread, message)
void
thread_add_listener(pTHX, UV talker, UV listener) {
dXCPT;
MUTEX_LOCK(&threads.lock);
THREAD_TRY {
mthread*
thread
= get_thread(talker);
if
(
thread
->listeners.alloc ==
thread
->listeners.head) {
thread
->listeners.alloc =
thread
->listeners.alloc ?
thread
->listeners.alloc * 2 : 1;
thread
->listeners.list = PerlMemShared_realloc(
thread
->listeners.list,
sizeof
(IV) *
thread
->listeners.alloc);
}
thread
->listeners.list[
thread
->listeners.head++] = listener;
} THREAD_FINALLY( MUTEX_UNLOCK(&threads.lock) );
}