Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE: support custom codecs for transport #250

Merged
merged 6 commits into from May 10, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions Gemfile
Expand Up @@ -14,10 +14,12 @@ group :test do
gem 'rack-test', require: 'rack/test'
gem 'jasmine'
gem 'puma'
gem 'm'
end

group :test, :development do
gem 'byebug'
gem 'oj'
end

group :development do
Expand Down
131 changes: 131 additions & 0 deletions bench/codecs.rb
@@ -0,0 +1,131 @@
# frozen_string_literal: true

require 'bundler/inline'

gemfile do
source 'https://rubygems.org'
gem 'message_bus', path: '../'
gem 'benchmark-ips'
gem 'oj'
end

require 'benchmark/ips'
require 'message_bus'

class StringHack
class FastIdList
def self.from_array(array)
new(",#{array.join(",")},")
end

def self.from_string(string)
new(string)
end

def initialize(packed)
@packed = packed
end

def include?(id)
@packed.include?(",#{id},")
end

def to_s
@packed
end
end

class OjString
def encode(data:, user_ids:, group_ids:, client_ids:)

if user_ids
user_ids = FastIdList.from_array(user_ids).to_s
end

::Oj.dump({
data: data,
user_ids: user_ids,
group_ids: group_ids,
client_ids: client_ids
},
mode: :compat)
end

def decode(payload)
result = ::Oj.load(payload, mode: :compat)

if str = result["user_ids"]
result["user_ids"] = FastIdList.from_string(str)
end

result
end
end
end

json_codec = MessageBus::Codec::Json.new
oj_codec = MessageBus::Codec::Oj.new
oj_fast_codec = MessageBus::Codec::OjFast.new
oj_fast_string_hack = StringHack::OjString.new

json = json_codec.encode(
data: "hello world",
user_ids: (1..10000).to_a,
group_ids: nil,
client_ids: nil
)

json_fast = oj_fast_codec.encode(
data: "hello world",
user_ids: (1..10000).to_a,
group_ids: nil,
client_ids: nil
)

json_fast2 = oj_fast_string_hack.encode(
data: "hello world",
user_ids: (1..10000).to_a,
group_ids: nil,
client_ids: nil
)

Benchmark.ips do |x|
x.report("json") do |n|
while n > 0
decoded = json_codec.decode(json)
decoded["user_ids"].include?(5000)
n -= 1
end
end

x.report("oj") do |n|
while n > 0
decoded = oj_codec.decode(json)
decoded["user_ids"].include?(5000)
n -= 1
end
end

x.report("oj_fast") do |n|
while n > 0
decoded = oj_fast_codec.decode(json_fast)
decoded["user_ids"].include?(5000)
n -= 1
end
end

x.report("oj_fast string hack") do |n|
while n > 0
decoded = oj_fast_string_hack.decode(json_fast2)
decoded["user_ids"].include?(5000)
n -= 1
end
end
x.compare!
end

#Comparison:
# oj_fast: 129350.0 i/s
# oj_fast string hack: 26255.2 i/s - 4.93x (± 0.00) slower
# oj: 3073.5 i/s - 42.09x (± 0.00) slower
# json: 2221.9 i/s - 58.22x (± 0.00) slower
37 changes: 26 additions & 11 deletions lib/message_bus.rb
Expand Up @@ -2,18 +2,22 @@

require "monitor"
require "set"
require "message_bus/version"
require "message_bus/message"
require "message_bus/client"
require "message_bus/connection_manager"
require "message_bus/diagnostics"
require "message_bus/rack/middleware"
require "message_bus/rack/diagnostics"
require "message_bus/timer_thread"

require_relative "message_bus/version"
require_relative "message_bus/message"
require_relative "message_bus/client"
require_relative "message_bus/connection_manager"
require_relative "message_bus/diagnostics"
require_relative "message_bus/rack/middleware"
require_relative "message_bus/rack/diagnostics"
require_relative "message_bus/timer_thread"
require_relative "message_bus/codec/base"
require_relative "message_bus/backends"
require_relative "message_bus/backends/base"

# we still need to take care of the logger
if defined?(::Rails)
require 'message_bus/rails/railtie'
require_relative 'message_bus/rails/railtie'
end

# @see MessageBus::Implementation
Expand Down Expand Up @@ -278,6 +282,17 @@ def allow_broadcast?
end
end

# @param [MessageBus::Codec::Base] codec used to encode and decode Message payloads
# @return [void]
def transport_codec=(codec)
configure(trasport_codec: codec)
end

# @return [MessageBus::Codec::Base] codec used to encode and decode Message payloads
def transport_codec
@config[:transport_codec] ||= MessageBus::Codec::Json.new
end

# @param [MessageBus::Backend::Base] pub_sub a configured backend
# @return [void]
def reliable_pub_sub=(pub_sub)
Expand Down Expand Up @@ -358,7 +373,7 @@ def publish(channel, data, opts = nil)
raise ::MessageBus::InvalidMessageTarget
end

encoded_data = JSON.dump(
encoded_data = transport_codec.encode(
data: data,
user_ids: user_ids,
group_ids: group_ids,
Expand Down Expand Up @@ -626,7 +641,7 @@ def decode_message!(msg)
channel, site_id = decode_channel_name(msg.channel)
msg.channel = channel
msg.site_id = site_id
parsed = JSON.parse(msg.data)
parsed = transport_codec.decode(msg.data)
msg.data = parsed["data"]
msg.user_ids = parsed["user_ids"]
msg.group_ids = parsed["group_ids"]
Expand Down
2 changes: 0 additions & 2 deletions lib/message_bus/backends/base.rb
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "message_bus/backends"

module MessageBus
module Backends
# Backends provide a consistent API over a variety of options for persisting
Expand Down
2 changes: 0 additions & 2 deletions lib/message_bus/backends/memory.rb
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "message_bus/backends/base"

module MessageBus
module Backends
# The memory backend stores published messages in a simple array per
Expand Down
2 changes: 0 additions & 2 deletions lib/message_bus/backends/postgres.rb
Expand Up @@ -2,8 +2,6 @@

require 'pg'

require "message_bus/backends/base"

module MessageBus
module Backends
# The Postgres backend stores published messages in a single Postgres table
Expand Down
2 changes: 0 additions & 2 deletions lib/message_bus/backends/redis.rb
Expand Up @@ -3,8 +3,6 @@
require 'redis'
require 'digest'

require "message_bus/backends/base"

module MessageBus
module Backends
# The Redis backend stores published messages in Redis sorted sets (using
Expand Down
19 changes: 19 additions & 0 deletions lib/message_bus/codec/base.rb
@@ -0,0 +1,19 @@
# frozen_string_literal: true

module MessageBus
module Codec
class Base
def encode(data:, user_ids:, group_ids:, client_ids:)
raise ConcreteClassMustImplementError
end

def decode(payload)
raise ConcreteClassMustImplementError
end
end

autoload :Json, File.expand_path("json", __dir__)
autoload :Oj, File.expand_path("oj", __dir__)
autoload :OjFast, File.expand_path("oj_fast", __dir__)
end
end
20 changes: 20 additions & 0 deletions lib/message_bus/codec/json.rb
@@ -0,0 +1,20 @@
# frozen_string_literal: true

module MessageBus
module Codec
class Json < Base
def encode(data:, user_ids:, group_ids:, client_ids:)
JSON.dump(
data: data,
user_ids: user_ids,
group_ids: group_ids,
client_ids: client_ids
)
end

def decode(payload)
JSON.parse(payload)
end
end
end
end
23 changes: 23 additions & 0 deletions lib/message_bus/codec/oj.rb
@@ -0,0 +1,23 @@
# frozen_string_literal: true

require 'oj' unless defined? ::Oj

module MessageBus
module Codec
class Oj < Base
def encode(data:, user_ids:, group_ids:, client_ids:)
::Oj.dump({
data: data,
user_ids: user_ids,
group_ids: group_ids,
client_ids: client_ids
},
mode: :compat)
end

def decode(payload)
::Oj.load(payload, mode: :compat)
end
end
end
end
82 changes: 82 additions & 0 deletions lib/message_bus/codec/oj_fast.rb
@@ -0,0 +1,82 @@
# frozen_string_literal: true

require 'oj' unless defined? ::Oj
require 'base64'

module MessageBus
module Codec
class FastIdList
def self.from_array(array)
new(array.sort.pack("V*"))
end

def self.from_string(string)
new(string)
end

def initialize(packed)
@packed = packed
end

def include?(id)
found = (0...length).bsearch do |index|
@packed.byteslice(index * 4, 4).unpack1("V") >= id
end

if found
found && @packed.byteslice(found * 4, 4).unpack1("V") == id
end
Copy link

@rafbgarcia rafbgarcia May 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: you mentioned the problem with 4 bytes, but why is it not possible to use "Q" to pack 8 bytes?

I made a test and it seems to work:

numbers = [1,999999999999999].pack("Q*")
(0...numbers.length).step(8).map { |index| numbers.byteslice(index, 8).unpack1("Q") }
=> [1, 999999999999999]

Sorry if that doesn't make sense :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure made Q vs V optional in the demo codec in the bench directory.

end

def length
@length ||= @packed.bytesize / 4
end

def to_a
@packed.unpack("V*")
end

def to_s
@packed
end
end

class OjFast < Base
def encode(data:, user_ids:, group_ids:, client_ids:)

if user_ids
user_ids = FastIdList.from_array(user_ids).to_s
end

#if group_ids
# group_ids = FastIdList.from_array(group_ids).to_s
#end
data = ::Oj.dump(data, mode: :compat)

Marshal.dump(
"data" => data,
"user_ids" => user_ids,
"group_ids" => group_ids,
"client_ids" => client_ids
)
end

def decode(payload)
result = Marshal.load(payload)
result["data"] = ::Oj.load(result["data"], mode: :compat)

if str = result["user_ids"]
result["user_ids"] = FastIdList.from_string(str)
end

# groups need to implement (-)
# if str = result["group_ids"]
# result["group_ids"] = FastIdList.from_string(str)
# end
#

result
end
end
end
end