Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query ruby thread and GVL states instead of relying on our call frame for callbacks #584

Merged
merged 4 commits into from Jan 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 14 additions & 10 deletions ext/ffi_c/Call.c
Expand Up @@ -116,7 +116,7 @@ rbffi_SetupCallParams(int argc, VALUE* argv, int paramCount, Type** paramTypes,
Type* paramType = paramTypes[i];
int type;


if (unlikely(paramType->nativeType == NATIVE_MAPPED)) {
VALUE values[] = { argv[argidx], Qnil };
argv[argidx] = rb_funcall2(((MappedType *) paramType)->rbConverter, id_to_native, 2, values);
Expand Down Expand Up @@ -297,8 +297,8 @@ rbffi_SetupCallParams(int argc, VALUE* argv, int paramCount, Type** paramTypes,

case NATIVE_STRING:
if (type == T_NIL) {
param->ptr = NULL;
param->ptr = NULL;

} else {
if (rb_safe_level() >= 1 && OBJ_TAINTED(argv[argidx])) {
rb_raise(rb_eSecurityError, "Unsafe string parameter");
Expand Down Expand Up @@ -345,9 +345,13 @@ static void *
call_blocking_function(void* data)
{
rbffi_blocking_call_t* b = (rbffi_blocking_call_t *) data;
#ifndef HAVE_RUBY_THREAD_HAS_GVL_P
b->frame->has_gvl = false;
#endif
ffi_call(&b->cif, FFI_FN(b->function), b->retval, b->ffiValues);
#ifndef HAVE_RUBY_THREAD_HAS_GVL_P
b->frame->has_gvl = true;
#endif

return NULL;
}
Expand Down Expand Up @@ -376,9 +380,9 @@ rbffi_CallFunction(int argc, VALUE* argv, void* function, FunctionType* fnInfo)
FFIStorage* params;
VALUE rbReturnValue;
rbffi_frame_t frame = { 0 };

retval = alloca(MAX(fnInfo->ffi_cif.rtype->size, FFI_SIZEOF_ARG));

if (unlikely(fnInfo->blocking)) {
rbffi_blocking_call_t* bc;

Expand Down Expand Up @@ -408,7 +412,7 @@ rbffi_CallFunction(int argc, VALUE* argv, void* function, FunctionType* fnInfo)
fnInfo->parameterCount, fnInfo->parameterTypes, params, ffiValues,
fnInfo->callbackParameters, fnInfo->callbackCount, fnInfo->rbEnums);

rbffi_frame_push(&frame);
rbffi_frame_push(&frame);
rb_rescue2(rbffi_do_blocking_call, (VALUE) bc, rbffi_save_frame_exception, (VALUE) &frame, rb_eException, (VALUE) 0);
rbffi_frame_pop(&frame);

Expand All @@ -419,7 +423,7 @@ rbffi_CallFunction(int argc, VALUE* argv, void* function, FunctionType* fnInfo)
xfree(bc->retval);
xfree(bc);
#endif

} else {

ffiValues = ALLOCA_N(void *, fnInfo->parameterCount);
Expand All @@ -436,15 +440,15 @@ rbffi_CallFunction(int argc, VALUE* argv, void* function, FunctionType* fnInfo)

if (unlikely(!fnInfo->ignoreErrno)) {
rbffi_save_errno();
}
}

if (RTEST(frame.exc) && frame.exc != Qnil) {
rb_exc_raise(frame.exc);
}

RB_GC_GUARD(rbReturnValue) = rbffi_NativeValue_ToRuby(fnInfo->returnType, fnInfo->rbReturnType, retval);
RB_GC_GUARD(fnInfo->rbReturnType);

return rbReturnValue;
}

Expand All @@ -461,7 +465,7 @@ getPointer(VALUE value, int type)
return memory != NULL ? memory->address : NULL;

} else if (type == T_STRING) {

return StringValuePtr(value);

} else if (type == T_NIL) {
Expand Down
4 changes: 2 additions & 2 deletions ext/ffi_c/Call.h
Expand Up @@ -48,7 +48,7 @@ extern "C" {
#if (defined(__i386__) || defined(__x86_64__)) && !(defined(_WIN32) || defined(__WIN32__))
# define BYPASS_FFI 1
#endif

typedef union {
#ifdef USE_RAW
signed int s8, s16, s32;
Expand All @@ -70,7 +70,7 @@ typedef union {
double f64;
long double ld;
} FFIStorage;

extern void rbffi_Call_Init(VALUE moduleFFI);

extern void rbffi_SetupCallParams(int argc, VALUE* argv, int paramCount, Type** paramTypes,
Expand Down
31 changes: 26 additions & 5 deletions ext/ffi_c/Function.c
Expand Up @@ -105,6 +105,27 @@ static VALUE async_cb_call(void *);
extern void *rb_thread_call_with_gvl(void *(*func)(void *), void *data1);
#endif

#ifdef HAVE_RUBY_THREAD_HAS_GVL_P
extern int ruby_thread_has_gvl_p(void);
#define rbffi_thread_has_gvl_p(frame) ruby_thread_has_gvl_p()
#else
static int rbffi_thread_has_gvl_p(rbffi_frame_t *frame)
{
return frame != NULL && frame->has_gvl;
}
#endif

#ifdef HAVE_RUBY_NATIVE_THREAD_P
extern int ruby_native_thread_p(void);
#define rbffi_native_thread_p(frame) ruby_native_thread_p()
#else
static int rbffi_native_thread_p(rbffi_frame_t *frame)
{
return frame != NULL;
}
#endif


VALUE rbffi_FunctionClass = Qnil;

#if defined(DEFER_ASYNC_CALLBACK)
Expand Down Expand Up @@ -474,13 +495,13 @@ callback_invoke(ffi_cif* cif, void* retval, void** parameters, void* user_data)
cb.frame = rbffi_frame_current();

if (cb.frame != NULL) cb.frame->exc = Qnil;
if (cb.frame != NULL && cb.frame->has_gvl) {
callback_with_gvl(&cb);

#if defined(HAVE_RB_THREAD_CALL_WITH_GVL)
} else if (cb.frame != NULL) {
if (rbffi_native_thread_p(cb.frame)) {
if(rbffi_thread_has_gvl_p(cb.frame)) {
callback_with_gvl(&cb);
} else {
rb_thread_call_with_gvl(callback_with_gvl, &cb);
#endif
}
#if defined(DEFER_ASYNC_CALLBACK) && !defined(_WIN32)
} else {
bool empty = false;
Expand Down
22 changes: 12 additions & 10 deletions ext/ffi_c/Thread.c
Expand Up @@ -70,13 +70,15 @@ rbffi_frame_current(void)
#endif
}

void
void
rbffi_frame_push(rbffi_frame_t* frame)
{
memset(frame, 0, sizeof(*frame));
#ifndef HAVE_RUBY_THREAD_HAS_GVL_P
frame->has_gvl = true;
#endif
frame->exc = Qnil;

#ifdef _WIN32
frame->prev = TlsGetValue(frame_thread_key);
TlsSetValue(frame_thread_key, frame);
Expand All @@ -87,7 +89,7 @@ rbffi_frame_push(rbffi_frame_t* frame)
#endif
}

void
void
rbffi_frame_pop(rbffi_frame_t* frame)
{
#ifdef _WIN32
Expand Down Expand Up @@ -118,13 +120,13 @@ rbffi_blocking_thread(void* args)
struct BlockingThread* thr = (struct BlockingThread *) args;
char c = 1;
VALUE retval;

retval = (*thr->fn)(thr->data);

pthread_testcancel();

thr->retval = retval;

write(thr->wrfd, &c, sizeof(c));

return NULL;
Expand All @@ -135,7 +137,7 @@ wait_for_thread(void *data)
{
struct BlockingThread* thr = (struct BlockingThread *) data;
char c;

if (read(thr->rdfd, &c, 1) < 1) {
rb_thread_wait_fd(thr->rdfd);
while (read(thr->rdfd, &c, 1) < 1 && rb_io_wait_readable(thr->rdfd) == Qtrue) {
Expand Down Expand Up @@ -166,7 +168,7 @@ rbffi_thread_blocking_region(VALUE (*func)(void *), void *data1, void (*ubf)(voi
struct BlockingThread* thr;
int fd[2];
VALUE exc;

if (pipe(fd) < 0) {
rb_raise(rb_eSystemCallError, "pipe(2) failed");
return Qnil;
Expand All @@ -192,7 +194,7 @@ rbffi_thread_blocking_region(VALUE (*func)(void *), void *data1, void (*ubf)(voi

exc = rb_rescue2(wait_for_thread, (VALUE) thr, cleanup_blocking_thread, (VALUE) thr,
rb_eException);

pthread_join(thr->tid, NULL);
close(fd[1]);
close(fd[0]);
Expand Down Expand Up @@ -348,6 +350,6 @@ rbffi_Thread_Init(VALUE moduleFFI)
#ifdef _WIN32
frame_thread_key = TlsAlloc();
#else
pthread_key_create(&thread_data_key, thread_data_free);
pthread_key_create(&thread_data_key, thread_data_free);
#endif
}
2 changes: 2 additions & 0 deletions ext/ffi_c/Thread.h
Expand Up @@ -66,7 +66,9 @@ typedef struct rbffi_frame {
struct thread_data* td;
#endif
struct rbffi_frame* prev;
#ifndef HAVE_RUBY_THREAD_HAS_GVL_P
bool has_gvl;
#endif
VALUE exc;
} rbffi_frame_t;

Expand Down
9 changes: 7 additions & 2 deletions ext/ffi_c/extconf.rb
Expand Up @@ -30,6 +30,11 @@
have_func('rb_thread_blocking_region')
have_func('rb_thread_call_with_gvl')
have_func('rb_thread_call_without_gvl')
have_func('ruby_native_thread_p')
if RbConfig::CONFIG['host_os'].downcase !~ /darwin/ || RUBY_VERSION >= "2.3.0"
# On OSX ruby_thread_has_gvl_p is detected but fails at runtime for ruby < 2.3.0
have_func('ruby_thread_has_gvl_p')
end

if libffi_ok
have_func('ffi_prep_cif_var')
Expand All @@ -43,7 +48,7 @@
$defs << "-DFFI_BUILDING" if RbConfig::CONFIG['host_os'] =~ /mswin/ # for compatibility with newer libffi

create_header

$LOCAL_LIBS << " ./libffi/.libs/libffi_convenience.lib" if !libffi_ok && RbConfig::CONFIG['host_os'] =~ /mswin/

create_makefile("ffi_c")
Expand All @@ -63,7 +68,7 @@
end
end
end

else
File.open("Makefile", "w") do |mf|
mf.puts "# Dummy makefile for non-mri rubies"
Expand Down
95 changes: 95 additions & 0 deletions spec/ffi/callback_spec.rb
Expand Up @@ -772,3 +772,98 @@ module LibTestStdcall
end
end
end

describe "Callback interop" do
require 'fiddle'
require 'fiddle/import'
require 'timeout'

module LibTestFFI
extend FFI::Library
ffi_lib TestLibrary::PATH
attach_function :testCallbackVrV, :testClosureVrV, [ :pointer ], :void
attach_function :testCallbackVrV_blocking, :testClosureVrV, [ :pointer ], :void, blocking: true
end

module LibTestFiddle
extend Fiddle::Importer
dlload TestLibrary::PATH
extern 'void testClosureVrV(void *fp)'
end

def assert_callback_in_same_thread_called_once
called = 0
thread = nil
yield proc {
called += 1
thread = Thread.current
}
expect(called).to eq(1)
expect(thread).to eq(Thread.current)
end

it "from ffi to ffi" do
assert_callback_in_same_thread_called_once do |block|
func = FFI::Function.new(:void, [:pointer], &block)
LibTestFFI.testCallbackVrV(FFI::Pointer.new(func.to_i))
end
end

it "from ffi to ffi with blocking:true" do
assert_callback_in_same_thread_called_once do |block|
func = FFI::Function.new(:void, [:pointer], &block)
LibTestFFI.testCallbackVrV_blocking(FFI::Pointer.new(func.to_i))
end
end

# https://github.com/ffi/ffi/issues/527
if RUBY_VERSION.split('.').map(&:to_i).pack("C*") >= [2,3,0].pack("C*") || RUBY_PLATFORM =~ /java/
it "from fiddle to ffi" do
assert_callback_in_same_thread_called_once do |block|
func = FFI::Function.new(:void, [:pointer], &block)
LibTestFiddle.testClosureVrV(Fiddle::Pointer[func.to_i])
end
end
end

it "from ffi to fiddle" do
assert_callback_in_same_thread_called_once do |block|
func = LibTestFiddle.bind_function(:cbVrV, Fiddle::TYPE_VOID, [], &block)
LibTestFFI.testCallbackVrV(FFI::Pointer.new(func.to_i))
end
end

it "from ffi to fiddle with blocking:true" do
assert_callback_in_same_thread_called_once do |block|
func = LibTestFiddle.bind_function(:cbVrV, Fiddle::TYPE_VOID, [], &block)
LibTestFFI.testCallbackVrV_blocking(FFI::Pointer.new(func.to_i))
end
end

it "from fiddle to fiddle" do
assert_callback_in_same_thread_called_once do |block|
func = LibTestFiddle.bind_function(:cbVrV, Fiddle::TYPE_VOID, [], &block)
LibTestFiddle.testClosureVrV(Fiddle::Pointer[func.to_i])
end
end

# https://github.com/ffi/ffi/issues/527
if RUBY_ENGINE == 'ruby' && RUBY_VERSION.split('.').map(&:to_i).pack("C*") >= [2,3,0].pack("C*")
Copy link
Member

@tduehr tduehr Jul 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can just compare the strings here as strings.

RUBY_VERSION >= "2.3.0"

Does rbx not support this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have seen ruby-2.1.10 and I know you as a very exact character, I thought it's best to compare exactly...

rbx doesn't support fiddle, so that these tests fail. The embed_test seems to work in rbx after some tweeking. May I adjust the tests for compatibility with rbx?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough...

changing the tests for rbx would depend on the nature of the changes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand your last comment (as a non English speaker). Shall I adjust the tests so that they pass on rbx (although it is no longer tested on travis-ci)?

Anyway I already tested this with JRuby (see 047371b) , so that the update of the testsuite could be merged there someday.

it "C outside ffi call stack does not deadlock [#527]" do
path = File.join(File.dirname(__FILE__), "embed-test/embed-test.rb")
pid = spawn(RbConfig.ruby, "-Ilib", path, { [:out, :err] => "embed-test.log" })
begin
Timeout.timeout(10){ Process.wait(pid) }
rescue Timeout::Error
Process.kill(9, pid)
raise
else
if $?.exitstatus != 0
raise "external process failed:\n#{ File.read("embed-test.log") }"
end
end

expect(File.read("embed-test.log")).to match(/callback called with \["hello", 5, 0\]/)
end
end
end