Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE: support custom codecs for transport #250

Merged
merged 6 commits into from May 10, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG
@@ -1,5 +1,9 @@
28-04-2021

- Version TBD

- FEATURE: Introduce support for transport codecs

- Version 3.3.5

- PERF: Optimised CORS preflight handling
Expand Down
2 changes: 2 additions & 0 deletions Gemfile
Expand Up @@ -14,10 +14,12 @@ group :test do
gem 'rack-test', require: 'rack/test'
gem 'jasmine'
gem 'puma'
gem 'm'
end

group :test, :development do
gem 'byebug'
gem 'oj'
end

group :development do
Expand Down
25 changes: 25 additions & 0 deletions README.md
Expand Up @@ -435,6 +435,31 @@ MessageBus.configure(backend: :memory)

The `:clear_every` option supported by the PostgreSQL backend is also supported by the in-memory backend.


### Transport codecs

By default MessageBus serializes messages to the backend using JSON. Under most situation this performs extremely well.

In some exceptional cases you may consider a different transport codec. To configure a custom codec use:

```ruby
MessageBus.configure(transport_codec: codec)
```

A codec class must implement MessageBus::Codec::Base. Specifically an `encode` and `decode` method.

See the `bench` directory for examples where the default JSON codec can perform poorly. A specific examples may be
attempting to distribute a message to a restricted list of thousands of users. In cases like this you may consider
using a packed string encoder.

Keep in mind, much of MessageBus internals and supporting tools expect data to be converted to JSON and back, if you use a naive (and fast) `Marshal` based codec you may need to limit the features you use. Specifically the Postgresql backend expects the codec never to return a string with `\u0000`, additionally some classes like DistributedCache expect keys to be converted to Strings.

Another example may be very large and complicated messages where Oj in compatability mode outperforms JSON. To opt for the Oj codec use:

```
MessageBus.configure(transport_codec: MessageBus::Codec::Oj.new)
```

### Forking/threading app servers

If you're using a forking or threading app server and you're not getting immediate delivery of published messages, you might need to configure your web server to re-connect to the message_bus backend
Expand Down
39 changes: 39 additions & 0 deletions bench/codecs/all_codecs.rb
@@ -0,0 +1,39 @@
# frozen_string_literal: true

require_relative './packed_string'
require_relative './string_hack'
require_relative './marshal'

def all_codecs
{
json: MessageBus::Codec::Json.new,
oj: MessageBus::Codec::Oj.new,
marshal: MarshalCodec.new,
packed_string_4_bytes: PackedString.new("V"),
packed_string_8_bytes: PackedString.new("Q"),
string_hack: StringHack.new
}
end

def bench_decode(hash, user_needle)
encoded_data = all_codecs.map do |name, codec|
[
name, codec, codec.encode(hash.dup)
]
end

Benchmark.ips do |x|

encoded_data.each do |name, codec, encoded|
x.report(name) do |n|
while n > 0
decoded = codec.decode(encoded)
decoded["user_ids"].include?(user_needle)
n -= 1
end
end
end

x.compare!
end
end
11 changes: 11 additions & 0 deletions bench/codecs/marshal.rb
@@ -0,0 +1,11 @@
# frozen_string_literal: true

class MarshalCodec
def encode(hash)
::Marshal.dump(hash)
end

def decode(payload)
::Marshal.load(payload)
end
end
67 changes: 67 additions & 0 deletions bench/codecs/packed_string.rb
@@ -0,0 +1,67 @@
# frozen_string_literal: true

class PackedString
class FastIdList
def self.from_array(array, pack_with)
new(array.sort.pack("#{pack_with}*"), pack_with)
end

def self.from_string(string, pack_with)
new(string, pack_with)
end

def initialize(packed, pack_with)
raise "unknown pack format, expecting Q or V" if pack_with != "V" && pack_with != "Q"
@packed = packed
@pack_with = pack_with
@slot_size = pack_with == "V" ? 4 : 8
end

def include?(id)
found = (0...length).bsearch do |index|
@packed.byteslice(index * @slot_size, @slot_size).unpack1(@pack_with) >= id
end

found && @packed.byteslice(found * @slot_size, @slot_size).unpack1(@pack_with) == id
end

def length
@length ||= @packed.bytesize / @slot_size
end

def to_a
@packed.unpack("#{@pack_with}*")
end

def to_s
@packed
end
end

def initialize(pack_with = "V")
@pack_with = pack_with
@oj_options = { mode: :compat }
end

def encode(hash)

if user_ids = hash["user_ids"]
hash["user_ids"] = FastIdList.from_array(hash["user_ids"], @pack_with).to_s
end

hash["data"] = ::Oj.dump(hash["data"], @oj_options)

Marshal.dump(hash)
end

def decode(payload)
result = Marshal.load(payload)
result["data"] = ::Oj.load(result["data"], @oj_options)

if str = result["user_ids"]
result["user_ids"] = FastIdList.from_string(str, @pack_with)
end

result
end
end
47 changes: 47 additions & 0 deletions bench/codecs/string_hack.rb
@@ -0,0 +1,47 @@
# frozen_string_literal: true

class StringHack
class FastIdList
def self.from_array(array)
new(",#{array.join(",")},")
end

def self.from_string(string)
new(string)
end

def initialize(packed)
@packed = packed
end

def include?(id)
@packed.include?(",#{id},")
end

def to_s
@packed
end
end

def initialize
@oj_options = { mode: :compat }
end

def encode(hash)
if user_ids = hash["user_ids"]
hash["user_ids"] = FastIdList.from_array(user_ids).to_s
end

::Oj.dump(hash, @oj_options)
end

def decode(payload)
result = ::Oj.load(payload, @oj_options)

if str = result["user_ids"]
result["user_ids"] = FastIdList.from_string(str)
end

result
end
end
29 changes: 29 additions & 0 deletions bench/codecs_large_user_list.rb
@@ -0,0 +1,29 @@
# frozen_string_literal: true

require 'bundler/inline'

gemfile do
source 'https://rubygems.org'
gem 'message_bus', path: '../'
gem 'benchmark-ips'
gem 'oj'
end

require 'benchmark/ips'
require 'message_bus'
require_relative 'codecs/all_codecs'

bench_decode({
"data" => "hello world",
"user_ids" => (1..10000).to_a,
"group_ids" => nil,
"client_ids" => nil
}, 5000
)

# packed_string_4_bytes: 127176.1 i/s
# packed_string_8_bytes: 94494.6 i/s - 1.35x (± 0.00) slower
# string_hack: 26403.4 i/s - 4.82x (± 0.00) slower
# marshal: 4985.5 i/s - 25.51x (± 0.00) slower
# oj: 3072.9 i/s - 41.39x (± 0.00) slower
# json: 2222.7 i/s - 57.22x (± 0.00) slower
29 changes: 29 additions & 0 deletions bench/codecs_standard_message.rb
@@ -0,0 +1,29 @@
# frozen_string_literal: true

require 'bundler/inline'

gemfile do
source 'https://rubygems.org'
gem 'message_bus', path: '../'
gem 'benchmark-ips'
gem 'oj'
end

require 'benchmark/ips'
require 'message_bus'
require_relative 'codecs/all_codecs'

bench_decode({
"data" => { amazing: "hello world this is an amazing message hello there!!!", another_key: [2, 3, 4] },
"user_ids" => [1, 2, 3],
"group_ids" => [1],
"client_ids" => nil
}, 2
)

# marshal: 504885.6 i/s
# json: 401050.9 i/s - 1.26x (± 0.00) slower
# oj: 340847.4 i/s - 1.48x (± 0.00) slower
# string_hack: 296741.6 i/s - 1.70x (± 0.00) slower
# packed_string_4_bytes: 207942.6 i/s - 2.43x (± 0.00) slower
# packed_string_8_bytes: 206093.0 i/s - 2.45x (± 0.00) slower
49 changes: 32 additions & 17 deletions lib/message_bus.rb
Expand Up @@ -2,18 +2,22 @@

require "monitor"
require "set"
require "message_bus/version"
require "message_bus/message"
require "message_bus/client"
require "message_bus/connection_manager"
require "message_bus/diagnostics"
require "message_bus/rack/middleware"
require "message_bus/rack/diagnostics"
require "message_bus/timer_thread"

require_relative "message_bus/version"
require_relative "message_bus/message"
require_relative "message_bus/client"
require_relative "message_bus/connection_manager"
require_relative "message_bus/diagnostics"
require_relative "message_bus/rack/middleware"
require_relative "message_bus/rack/diagnostics"
require_relative "message_bus/timer_thread"
require_relative "message_bus/codec/base"
require_relative "message_bus/backends"
require_relative "message_bus/backends/base"

# we still need to take care of the logger
if defined?(::Rails)
require 'message_bus/rails/railtie'
if defined?(::Rails::Engine)
require_relative 'message_bus/rails/railtie'
end

# @see MessageBus::Implementation
Expand Down Expand Up @@ -278,6 +282,17 @@ def allow_broadcast?
end
end

# @param [MessageBus::Codec::Base] codec used to encode and decode Message payloads
# @return [void]
def transport_codec=(codec)
configure(trasport_codec: codec)
end

# @return [MessageBus::Codec::Base] codec used to encode and decode Message payloads
def transport_codec
@config[:transport_codec] ||= MessageBus::Codec::Json.new
end

# @param [MessageBus::Backend::Base] pub_sub a configured backend
# @return [void]
def reliable_pub_sub=(pub_sub)
Expand Down Expand Up @@ -358,12 +373,12 @@ def publish(channel, data, opts = nil)
raise ::MessageBus::InvalidMessageTarget
end

encoded_data = JSON.dump(
data: data,
user_ids: user_ids,
group_ids: group_ids,
client_ids: client_ids
)
encoded_data = transport_codec.encode({
"data" => data,
"user_ids" => user_ids,
"group_ids" => group_ids,
"client_ids" => client_ids
})

channel_opts = {}

Expand Down Expand Up @@ -626,7 +641,7 @@ def decode_message!(msg)
channel, site_id = decode_channel_name(msg.channel)
msg.channel = channel
msg.site_id = site_id
parsed = JSON.parse(msg.data)
parsed = transport_codec.decode(msg.data)
msg.data = parsed["data"]
msg.user_ids = parsed["user_ids"]
msg.group_ids = parsed["group_ids"]
Expand Down
2 changes: 0 additions & 2 deletions lib/message_bus/backends/base.rb
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "message_bus/backends"

module MessageBus
module Backends
# Backends provide a consistent API over a variety of options for persisting
Expand Down
2 changes: 0 additions & 2 deletions lib/message_bus/backends/memory.rb
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "message_bus/backends/base"

module MessageBus
module Backends
# The memory backend stores published messages in a simple array per
Expand Down
2 changes: 0 additions & 2 deletions lib/message_bus/backends/postgres.rb
Expand Up @@ -2,8 +2,6 @@

require 'pg'

require "message_bus/backends/base"

module MessageBus
module Backends
# The Postgres backend stores published messages in a single Postgres table
Expand Down