Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support JRuby and fix travis failures #108

Merged
merged 15 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,21 @@ rvm:
- 2.4
- 2.5
- 2.6
- jruby-9.2.9.0

before_install:
- gem update --system
- |
r_eng="$(ruby -e 'STDOUT.write RUBY_ENGINE')";
if [ "$r_eng" == "jruby" ]; then
sudo apt-get update && \
sudo apt-get install -y git && \
sudo apt-get install -y libpthread-stubs0-dev && \
sudo apt-get install -y build-essential && \
sudo apt-get install -y zlib1g-dev && \
sudo apt-get install -y libssl-dev && \
sudo apt-get install -y libsasl2-dev
fi

before_script:
- docker-compose up -d
Expand Down
26 changes: 15 additions & 11 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@

version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: wurstmeister/kafka:1.0.1
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
- 9092:9092
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9092
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
KAFKA_CREATE_TOPICS: "consume_test_topic:3:1,empty_test_topic:3:1,load_test_topic:3:1,produce_test_topic:3:1,rake_test_topic:3:1,watermarks_test_topic:3:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
2 changes: 1 addition & 1 deletion ext/Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ task :default => :clean do

# Use default homebrew openssl if we're on mac and the directory exists
# and each of flags is not empty
if recipe.host.include?("darwin") && Dir.exists?("/usr/local/opt/openssl")
if recipe.host&.include?("darwin") && Dir.exist?("/usr/local/opt/openssl")
ENV["CPPFLAGS"] = "-I/usr/local/opt/openssl/include" unless ENV["CPPFLAGS"]
ENV["LDFLAGS"] = "-L/usr/local/opt/openssl/lib" unless ENV["LDFLAGS"]
end
Expand Down
2 changes: 1 addition & 1 deletion lib/rdkafka/bindings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module Bindings
extend FFI::Library

def self.lib_extension
if Gem::Platform.local.os.include?("darwin")
if RbConfig::CONFIG['host_os'] =~ /darwin/
'dylib'
else
'so'
Expand Down
6 changes: 2 additions & 4 deletions lib/rdkafka/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,8 @@ def native_kafka(config, type)
Rdkafka::Bindings.rd_kafka_queue_get_main(handle)
)

FFI::AutoPointer.new(
handle,
Rdkafka::Bindings.method(:rd_kafka_destroy)
)
# Return handle which should be closed using rd_kafka_destroy after usage.
handle
end
end

Expand Down
125 changes: 83 additions & 42 deletions lib/rdkafka/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ def initialize(native_kafka)
# Close this consumer
# @return [nil]
def close
return if @closed

@closing = true
Rdkafka::Bindings.rd_kafka_consumer_close(@native_kafka)
Rdkafka::Bindings.rd_kafka_destroy(@native_kafka)
@closed = true
end

# Subscribe to one or more topics letting Kafka handle partition assignments.
Expand All @@ -33,20 +37,19 @@ def close
# @return [nil]
def subscribe(*topics)
# Create topic partition list with topics and no partition set
tpl = TopicPartitionList.new_native_tpl(topics.length)
tpl = Rdkafka::Bindings.rd_kafka_topic_partition_list_new(topics.length)

topics.each do |topic|
Rdkafka::Bindings.rd_kafka_topic_partition_list_add(
tpl,
topic,
-1
)
Rdkafka::Bindings.rd_kafka_topic_partition_list_add(tpl, topic, -1)
end

# Subscribe to topic partition list and check this was successful
response = Rdkafka::Bindings.rd_kafka_subscribe(@native_kafka, tpl)
if response != 0
raise Rdkafka::RdkafkaError.new(response, "Error subscribing to '#{topics.join(', ')}'")
end
ensure
Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
end

# Unsubscribe from all subscribed topics.
Expand All @@ -72,12 +75,18 @@ def pause(list)
unless list.is_a?(TopicPartitionList)
raise TypeError.new("list has to be a TopicPartitionList")
end

tpl = list.to_native_tpl
response = Rdkafka::Bindings.rd_kafka_pause_partitions(@native_kafka, tpl)

if response != 0
list = TopicPartitionList.from_native_tpl(tpl)
raise Rdkafka::RdkafkaTopicPartitionListError.new(response, list, "Error pausing '#{list.to_h}'")
begin
response = Rdkafka::Bindings.rd_kafka_pause_partitions(@native_kafka, tpl)

if response != 0
list = TopicPartitionList.from_native_tpl(tpl)
raise Rdkafka::RdkafkaTopicPartitionListError.new(response, list, "Error pausing '#{list.to_h}'")
end
ensure
Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
end
end

Expand All @@ -92,10 +101,16 @@ def resume(list)
unless list.is_a?(TopicPartitionList)
raise TypeError.new("list has to be a TopicPartitionList")
end

tpl = list.to_native_tpl
response = Rdkafka::Bindings.rd_kafka_resume_partitions(@native_kafka, tpl)
if response != 0
raise Rdkafka::RdkafkaError.new(response, "Error resume '#{list.to_h}'")

begin
response = Rdkafka::Bindings.rd_kafka_resume_partitions(@native_kafka, tpl)
if response != 0
raise Rdkafka::RdkafkaError.new(response, "Error resume '#{list.to_h}'")
end
ensure
Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
end
end

Expand All @@ -105,17 +120,19 @@ def resume(list)
#
# @return [TopicPartitionList]
def subscription
tpl = FFI::MemoryPointer.new(:pointer)
response = Rdkafka::Bindings.rd_kafka_subscription(@native_kafka, tpl)
ptr = FFI::MemoryPointer.new(:pointer)
response = Rdkafka::Bindings.rd_kafka_subscription(@native_kafka, ptr)

if response != 0
raise Rdkafka::RdkafkaError.new(response)
end
tpl = tpl.read(:pointer).tap { |it| it.autorelease = false }

native = ptr.read_pointer

begin
Rdkafka::Consumer::TopicPartitionList.from_native_tpl(tpl)
Rdkafka::Consumer::TopicPartitionList.from_native_tpl(native)
ensure
Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(native)
end
end

Expand All @@ -128,10 +145,16 @@ def assign(list)
unless list.is_a?(TopicPartitionList)
raise TypeError.new("list has to be a TopicPartitionList")
end

tpl = list.to_native_tpl
response = Rdkafka::Bindings.rd_kafka_assign(@native_kafka, tpl)
if response != 0
raise Rdkafka::RdkafkaError.new(response, "Error assigning '#{list.to_h}'")

begin
response = Rdkafka::Bindings.rd_kafka_assign(@native_kafka, tpl)
if response != 0
raise Rdkafka::RdkafkaError.new(response, "Error assigning '#{list.to_h}'")
end
ensure
Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
end
end

Expand All @@ -141,19 +164,23 @@ def assign(list)
#
# @return [TopicPartitionList]
def assignment
tpl = FFI::MemoryPointer.new(:pointer)
response = Rdkafka::Bindings.rd_kafka_assignment(@native_kafka, tpl)
ptr = FFI::MemoryPointer.new(:pointer)
response = Rdkafka::Bindings.rd_kafka_assignment(@native_kafka, ptr)
if response != 0
raise Rdkafka::RdkafkaError.new(response)
end

tpl = tpl.read(:pointer).tap { |it| it.autorelease = false }
tpl = ptr.read_pointer

begin
Rdkafka::Consumer::TopicPartitionList.from_native_tpl(tpl)
ensure
Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy tpl
if !tpl.null?
begin
Rdkafka::Consumer::TopicPartitionList.from_native_tpl(tpl)
ensure
Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy tpl
end
end
ensure
ptr.free
end

# Return the current committed offset per partition for this consumer group.
Expand All @@ -171,12 +198,18 @@ def committed(list=nil, timeout_ms=1200)
elsif !list.is_a?(TopicPartitionList)
raise TypeError.new("list has to be nil or a TopicPartitionList")
end

tpl = list.to_native_tpl
response = Rdkafka::Bindings.rd_kafka_committed(@native_kafka, tpl, timeout_ms)
if response != 0
raise Rdkafka::RdkafkaError.new(response)

begin
response = Rdkafka::Bindings.rd_kafka_committed(@native_kafka, tpl, timeout_ms)
if response != 0
raise Rdkafka::RdkafkaError.new(response)
end
TopicPartitionList.from_native_tpl(tpl)
ensure
Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
end
TopicPartitionList.from_native_tpl(tpl)
end

# Query broker for low (oldest/beginning) and high (newest/end) offsets for a partition.
Expand All @@ -198,13 +231,16 @@ def query_watermark_offsets(topic, partition, timeout_ms=200)
partition,
low,
high,
timeout_ms
timeout_ms,
)
if response != 0
raise Rdkafka::RdkafkaError.new(response, "Error querying watermark offsets for partition #{partition} of #{topic}")
end

return low.read_int64, high.read_int64
return low.read_array_of_int64(1).first, high.read_array_of_int64(1).first
ensure
low.free
high.free
end

# Calculate the consumer lag per partition for the provided topic partition list.
Expand All @@ -220,6 +256,7 @@ def query_watermark_offsets(topic, partition, timeout_ms=200)
# @return [Hash<String, Hash<Integer, Integer>>] A hash containing all topics with the lag per partition
def lag(topic_partition_list, watermark_timeout_ms=100)
out = {}

topic_partition_list.to_h.each do |topic, partitions|
# Query high watermarks for this topic's partitions
# and compare to the offset in the list.
Expand Down Expand Up @@ -335,14 +372,16 @@ def commit(list=nil, async=false)
if !list.nil? && !list.is_a?(TopicPartitionList)
raise TypeError.new("list has to be nil or a TopicPartitionList")
end
tpl = if list
list.to_native_tpl
else
nil
end
response = Rdkafka::Bindings.rd_kafka_commit(@native_kafka, tpl, async)
if response != 0
raise Rdkafka::RdkafkaError.new(response)

tpl = list ? list.to_native_tpl : nil

begin
response = Rdkafka::Bindings.rd_kafka_commit(@native_kafka, tpl, async)
if response != 0
raise Rdkafka::RdkafkaError.new(response)
end
ensure
Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) if tpl
end
end

Expand All @@ -354,6 +393,8 @@ def commit(list=nil, async=false)
#
# @return [Message, nil] A message or nil if there was no new message within the timeout
def poll(timeout_ms)
return if @closed

message_ptr = Rdkafka::Bindings.rd_kafka_consumer_poll(@native_kafka, timeout_ms)
if message_ptr.null?
nil
Expand Down
12 changes: 7 additions & 5 deletions lib/rdkafka/consumer/headers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def self.from_native(native_message)
raise Rdkafka::RdkafkaError.new(err, "Error reading message headers")
end

headers_ptr = headers_ptrptr.read(:pointer).tap { |it| it.autorelease = false }
headers_ptr = headers_ptrptr.read_pointer

name_ptrptr = FFI::MemoryPointer.new(:pointer)
value_ptrptr = FFI::MemoryPointer.new(:pointer)
Expand All @@ -42,12 +42,14 @@ def self.from_native(native_message)
raise Rdkafka::RdkafkaError.new(err, "Error reading a message header at index #{idx}")
end

name = name_ptrptr.read(:pointer).tap { |it| it.autorelease = false }
name = name.read_string_to_null
name_ptr = name_ptrptr.read_pointer
name = name_ptr.respond_to?(:read_string_to_null) ? name_ptr.read_string_to_null : name_ptr.read_string

size = size_ptr[:value]
value = value_ptrptr.read(:pointer).tap { |it| it.autorelease = false }
value = value.read_string(size)

value_ptr = value_ptrptr.read_pointer

value = value_ptr.read_string(size)

headers[name.to_sym] = value

Expand Down
16 changes: 3 additions & 13 deletions lib/rdkafka/consumer/topic_partition_list.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ def self.from_native_tpl(pointer)
#
# The pointer will be cleaned by `rd_kafka_topic_partition_list_destroy` when GC releases it.
#
# @return [FFI::AutoPointer]
# @return [FFI::Pointer]
# @private
def to_native_tpl
tpl = TopicPartitionList.new_native_tpl(count)
tpl = Rdkafka::Bindings.rd_kafka_topic_partition_list_new(count)

@data.each do |topic, partitions|
if partitions
Expand All @@ -138,6 +138,7 @@ def to_native_tpl
topic,
p.partition
)

if p.offset
Rdkafka::Bindings.rd_kafka_topic_partition_list_set_offset(
tpl,
Expand All @@ -158,17 +159,6 @@ def to_native_tpl

tpl
end

# Creates a new native tpl and wraps it into FFI::AutoPointer which in turn calls
# `rd_kafka_topic_partition_list_destroy` when a pointer will be cleaned by GC
#
# @param count [Integer] an initial capacity of partitions list
# @return [FFI::AutoPointer]
# @private
def self.new_native_tpl(count)
tpl = Rdkafka::Bindings.rd_kafka_topic_partition_list_new(count)
FFI::AutoPointer.new(tpl, Rdkafka::Bindings.method(:rd_kafka_topic_partition_list_destroy))
end
end
end
end