Skip to content

Commit

Permalink
Merge pull request #683 from larskanis/remove-old-async-calls
Browse files Browse the repository at this point in the history
Remove old code for 'blocking: true' on Ruby before 2.0
  • Loading branch information
larskanis committed Apr 12, 2019
2 parents 59934bb + a87a7f9 commit b3d623f
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 382 deletions.
27 changes: 3 additions & 24 deletions ext/ffi_c/Call.c
Expand Up @@ -43,10 +43,8 @@
#endif
#include <errno.h>
#include <ruby.h>
#if defined(HAVE_RUBY_THREAD_H)
#include <ruby/thread.h>
#endif
#if defined(HAVE_NATIVETHREAD) && (defined(HAVE_RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL)) && !defined(_WIN32)
#if defined(HAVE_NATIVETHREAD) && !defined(_WIN32)
# include <signal.h>
# include <pthread.h>
#endif
Expand Down Expand Up @@ -359,7 +357,7 @@ call_blocking_function(void* data)
VALUE
rbffi_do_blocking_call(void *data)
{
rbffi_thread_blocking_region(call_blocking_function, data, (void *) -1, NULL);
rb_thread_call_without_gvl(call_blocking_function, data, (void *) -1, NULL);

return Qnil;
}
Expand All @@ -386,22 +384,11 @@ rbffi_CallFunction(int argc, VALUE* argv, void* function, FunctionType* fnInfo)
if (unlikely(fnInfo->blocking)) {
rbffi_blocking_call_t* bc;

/*
* due to the way thread switching works on older ruby variants, we
* cannot allocate anything passed to the blocking function on the stack
*/
#if defined(HAVE_RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL)
/* allocate information passed to the blocking function on the stack */
ffiValues = ALLOCA_N(void *, fnInfo->parameterCount);
params = ALLOCA_N(FFIStorage, fnInfo->parameterCount);
bc = ALLOCA_N(rbffi_blocking_call_t, 1);
bc->retval = retval;
#else
ffiValues = ALLOC_N(void *, fnInfo->parameterCount);
params = ALLOC_N(FFIStorage, fnInfo->parameterCount);
bc = ALLOC_N(rbffi_blocking_call_t, 1);
bc->retval = xmalloc(MAX(fnInfo->ffi_cif.rtype->size, FFI_SIZEOF_ARG));
bc->stkretval = retval;
#endif
bc->cif = fnInfo->ffi_cif;
bc->function = function;
bc->ffiValues = ffiValues;
Expand All @@ -416,14 +403,6 @@ rbffi_CallFunction(int argc, VALUE* argv, void* function, FunctionType* fnInfo)
rb_rescue2(rbffi_do_blocking_call, (VALUE) bc, rbffi_save_frame_exception, (VALUE) &frame, rb_eException, (VALUE) 0);
rbffi_frame_pop(&frame);

#if !(defined(HAVE_RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL))
memcpy(bc->stkretval, bc->retval, MAX(bc->cif.rtype->size, FFI_SIZEOF_ARG));
xfree(bc->params);
xfree(bc->ffiValues);
xfree(bc->retval);
xfree(bc);
#endif

} else {

ffiValues = ALLOCA_N(void *, fnInfo->parameterCount);
Expand Down
3 changes: 0 additions & 3 deletions ext/ffi_c/Call.h
Expand Up @@ -94,9 +94,6 @@ typedef struct rbffi_blocking_call {
void **ffiValues;
void* retval;
void* params;
#if !(defined(HAVE_RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL))
void* stkretval;
#endif
} rbffi_blocking_call_t;

VALUE rbffi_do_blocking_call(void* data);
Expand Down
105 changes: 0 additions & 105 deletions ext/ffi_c/Function.c
Expand Up @@ -47,9 +47,7 @@
# endif
#endif
#include <ruby.h>
#if defined(HAVE_RUBY_THREAD_H)
#include <ruby/thread.h>
#endif

#include <ffi.h>
#if defined(HAVE_NATIVETHREAD) && !defined(_WIN32)
Expand Down Expand Up @@ -101,10 +99,6 @@ static VALUE async_cb_event(void *);
static VALUE async_cb_call(void *);
#endif

#ifdef HAVE_RB_THREAD_CALL_WITH_GVL
extern void *rb_thread_call_with_gvl(void *(*func)(void *), void *data1);
#endif

#ifdef HAVE_RUBY_THREAD_HAS_GVL_P
extern int ruby_thread_has_gvl_p(void);
#define rbffi_thread_has_gvl_p(frame) ruby_thread_has_gvl_p()
Expand Down Expand Up @@ -157,15 +151,9 @@ static struct gvl_callback* async_cb_list = NULL;
# ifndef _WIN32
static pthread_mutex_t async_cb_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t async_cb_cond = PTHREAD_COND_INITIALIZER;
# if !(defined(HAVE_RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL))
static int async_cb_pipe[2];
# endif
# else
static HANDLE async_cb_cond;
static CRITICAL_SECTION async_cb_lock;
# if !(defined(HAVE_RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL))
static int async_cb_pipe[2];
# endif
# endif
#endif

Expand Down Expand Up @@ -344,16 +332,8 @@ function_init(VALUE self, VALUE rbFunctionInfo, VALUE rbProc)

#if defined(DEFER_ASYNC_CALLBACK)
if (async_cb_thread == Qnil) {
#if !(defined(HAVE_RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL)) && defined(_WIN32)
_pipe(async_cb_pipe, 1024, O_BINARY);
#elif !(defined(HAVE_RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL))
pipe(async_cb_pipe);
fcntl(async_cb_pipe[0], F_SETFL, fcntl(async_cb_pipe[0], F_GETFL) | O_NONBLOCK);
fcntl(async_cb_pipe[1], F_SETFL, fcntl(async_cb_pipe[1], F_GETFL) | O_NONBLOCK);
#endif
async_cb_thread = rb_thread_create(async_cb_event, NULL);
}

#endif

fn->closure = rbffi_Closure_Alloc(fn->info->closurePool);
Expand Down Expand Up @@ -515,17 +495,8 @@ callback_invoke(ffi_cif* cif, void* retval, void** parameters, void* user_data)
cb.next = async_cb_list;
async_cb_list = &cb;

#if !(defined(HAVE_RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL))
pthread_mutex_unlock(&async_cb_mutex);
/* Only signal if the list was empty */
if (empty) {
char c;
write(async_cb_pipe[1], &c, 1);
}
#else
pthread_cond_signal(&async_cb_cond);
pthread_mutex_unlock(&async_cb_mutex);
#endif

/* Wait for the thread executing the ruby callback to signal it is done */
pthread_mutex_lock(&cb.async_mutex);
Expand All @@ -549,15 +520,7 @@ callback_invoke(ffi_cif* cif, void* retval, void** parameters, void* user_data)
async_cb_list = &cb;
LeaveCriticalSection(&async_cb_lock);

#if !(defined(HAVE_RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL))
/* Only signal if the list was empty */
if (empty) {
char c;
write(async_cb_pipe[1], &c, 1);
}
#else
SetEvent(async_cb_cond);
#endif

/* Wait for the thread executing the ruby callback to signal it is done */
WaitForSingleObject(cb.async_event, INFINITE);
Expand All @@ -575,19 +538,14 @@ struct async_wait {
static void * async_cb_wait(void *);
static void async_cb_stop(void *);

#if defined(HAVE_RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL)
static VALUE
async_cb_event(void* unused)
{
struct async_wait w = { 0 };

w.stop = false;
while (!w.stop) {
#if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL)
rb_thread_call_without_gvl(async_cb_wait, &w, async_cb_stop, &w);
#else
rb_thread_blocking_region(async_cb_wait, &w, async_cb_stop, &w);
#endif
if (w.cb != NULL) {
/* Start up a new ruby thread to run the ruby callback */
rb_thread_create(async_cb_call, w.cb);
Expand All @@ -597,69 +555,6 @@ async_cb_event(void* unused)
return Qnil;
}

#elif defined(_WIN32)
static VALUE
async_cb_event(void* unused)
{
while (true) {
struct gvl_callback* cb;
char buf[64];
fd_set rfds;

FD_ZERO(&rfds);
FD_SET(async_cb_pipe[0], &rfds);
rb_thread_select(async_cb_pipe[0] + 1, &rfds, NULL, NULL, NULL);
read(async_cb_pipe[0], buf, sizeof(buf));

EnterCriticalSection(&async_cb_lock);
cb = async_cb_list;
async_cb_list = NULL;
LeaveCriticalSection(&async_cb_lock);

while (cb != NULL) {
struct gvl_callback* next = cb->next;
/* Start up a new ruby thread to run the ruby callback */
rb_thread_create(async_cb_call, cb);
cb = next;
}
}

return Qnil;
}
#else
static VALUE
async_cb_event(void* unused)
{
while (true) {
struct gvl_callback* cb;
char buf[64];

if (read(async_cb_pipe[0], buf, sizeof(buf)) < 0) {
rb_thread_wait_fd(async_cb_pipe[0]);
while (read(async_cb_pipe[0], buf, sizeof (buf)) < 0) {
if (rb_io_wait_readable(async_cb_pipe[0]) != Qtrue) {
return Qfalse;
}
}
}

pthread_mutex_lock(&async_cb_mutex);
cb = async_cb_list;
async_cb_list = NULL;
pthread_mutex_unlock(&async_cb_mutex);

while (cb != NULL) {
struct gvl_callback* next = cb->next;
/* Start up a new ruby thread to run the ruby callback */
rb_thread_create(async_cb_call, cb);
cb = next;
}
}

return Qnil;
}
#endif

#ifdef _WIN32
static void *
async_cb_wait(void *data)
Expand Down

0 comments on commit b3d623f

Please sign in to comment.