From 0d79789b94cc8baabac218ac2ec637e7ade0487d Mon Sep 17 00:00:00 2001 From: Sam Date: Tue, 11 May 2021 00:22:29 +1000 Subject: [PATCH] FEATURE: support custom codecs for transport (#250) Adds support for MessageBus.transport_codec which can be used to optimise performance of transport - Oj can be used as a serializer vs JSON which is a bit slower - Users can implement custom codecs --- CHANGELOG | 4 ++ Gemfile | 2 + README.md | 25 +++++++++++ bench/codecs/all_codecs.rb | 39 ++++++++++++++++ bench/codecs/marshal.rb | 11 +++++ bench/codecs/packed_string.rb | 67 ++++++++++++++++++++++++++++ bench/codecs/string_hack.rb | 47 +++++++++++++++++++ bench/codecs_large_user_list.rb | 29 ++++++++++++ bench/codecs_standard_message.rb | 29 ++++++++++++ lib/message_bus.rb | 49 +++++++++++++------- lib/message_bus/backends/base.rb | 2 - lib/message_bus/backends/memory.rb | 2 - lib/message_bus/backends/postgres.rb | 2 - lib/message_bus/backends/redis.rb | 6 +-- lib/message_bus/codec/base.rb | 18 ++++++++ lib/message_bus/codec/json.rb | 15 +++++++ lib/message_bus/codec/oj.rb | 21 +++++++++ spec/lib/message_bus_spec.rb | 2 +- 18 files changed, 342 insertions(+), 28 deletions(-) create mode 100644 bench/codecs/all_codecs.rb create mode 100644 bench/codecs/marshal.rb create mode 100644 bench/codecs/packed_string.rb create mode 100644 bench/codecs/string_hack.rb create mode 100644 bench/codecs_large_user_list.rb create mode 100644 bench/codecs_standard_message.rb create mode 100644 lib/message_bus/codec/base.rb create mode 100644 lib/message_bus/codec/json.rb create mode 100644 lib/message_bus/codec/oj.rb diff --git a/CHANGELOG b/CHANGELOG index 42c54a66..20295486 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,7 @@ +UNRELEASED + + - FEATURE: Introduce support for transport codecs + 28-04-2021 - Version 3.3.5 diff --git a/Gemfile b/Gemfile index d2924dc9..ea22e39d 100644 --- a/Gemfile +++ b/Gemfile @@ -14,10 +14,12 @@ group :test do gem 'rack-test', require: 'rack/test' gem 'jasmine' gem 'puma' + gem 'm' end group :test, :development do gem 'byebug' + gem 'oj' end group :development do diff --git a/README.md b/README.md index 4468a487..384b277b 100644 --- a/README.md +++ b/README.md @@ -435,6 +435,31 @@ MessageBus.configure(backend: :memory) The `:clear_every` option supported by the PostgreSQL backend is also supported by the in-memory backend. + +### Transport codecs + +By default MessageBus serializes messages to the backend using JSON. Under most situation this performs extremely well. + +In some exceptional cases you may consider a different transport codec. To configure a custom codec use: + +```ruby +MessageBus.configure(transport_codec: codec) +``` + +A codec class must implement MessageBus::Codec::Base. Specifically an `encode` and `decode` method. + +See the `bench` directory for examples where the default JSON codec can perform poorly. A specific examples may be +attempting to distribute a message to a restricted list of thousands of users. In cases like this you may consider +using a packed string encoder. + +Keep in mind, much of MessageBus internals and supporting tools expect data to be converted to JSON and back, if you use a naive (and fast) `Marshal` based codec you may need to limit the features you use. Specifically the Postgresql backend expects the codec never to return a string with `\u0000`, additionally some classes like DistributedCache expect keys to be converted to Strings. + +Another example may be very large and complicated messages where Oj in compatability mode outperforms JSON. To opt for the Oj codec use: + +``` +MessageBus.configure(transport_codec: MessageBus::Codec::Oj.new) +``` + ### Forking/threading app servers If you're using a forking or threading app server and you're not getting immediate delivery of published messages, you might need to configure your web server to re-connect to the message_bus backend diff --git a/bench/codecs/all_codecs.rb b/bench/codecs/all_codecs.rb new file mode 100644 index 00000000..4583c0f3 --- /dev/null +++ b/bench/codecs/all_codecs.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +require_relative './packed_string' +require_relative './string_hack' +require_relative './marshal' + +def all_codecs + { + json: MessageBus::Codec::Json.new, + oj: MessageBus::Codec::Oj.new, + marshal: MarshalCodec.new, + packed_string_4_bytes: PackedString.new("V"), + packed_string_8_bytes: PackedString.new("Q"), + string_hack: StringHack.new + } +end + +def bench_decode(hash, user_needle) + encoded_data = all_codecs.map do |name, codec| + [ + name, codec, codec.encode(hash.dup) + ] + end + + Benchmark.ips do |x| + + encoded_data.each do |name, codec, encoded| + x.report(name) do |n| + while n > 0 + decoded = codec.decode(encoded) + decoded["user_ids"].include?(user_needle) + n -= 1 + end + end + end + + x.compare! + end +end diff --git a/bench/codecs/marshal.rb b/bench/codecs/marshal.rb new file mode 100644 index 00000000..6223549d --- /dev/null +++ b/bench/codecs/marshal.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +class MarshalCodec + def encode(hash) + ::Marshal.dump(hash) + end + + def decode(payload) + ::Marshal.load(payload) + end +end diff --git a/bench/codecs/packed_string.rb b/bench/codecs/packed_string.rb new file mode 100644 index 00000000..c9c9db51 --- /dev/null +++ b/bench/codecs/packed_string.rb @@ -0,0 +1,67 @@ +# frozen_string_literal: true + +class PackedString + class FastIdList + def self.from_array(array, pack_with) + new(array.sort.pack("#{pack_with}*"), pack_with) + end + + def self.from_string(string, pack_with) + new(string, pack_with) + end + + def initialize(packed, pack_with) + raise "unknown pack format, expecting Q or V" if pack_with != "V" && pack_with != "Q" + @packed = packed + @pack_with = pack_with + @slot_size = pack_with == "V" ? 4 : 8 + end + + def include?(id) + found = (0...length).bsearch do |index| + @packed.byteslice(index * @slot_size, @slot_size).unpack1(@pack_with) >= id + end + + found && @packed.byteslice(found * @slot_size, @slot_size).unpack1(@pack_with) == id + end + + def length + @length ||= @packed.bytesize / @slot_size + end + + def to_a + @packed.unpack("#{@pack_with}*") + end + + def to_s + @packed + end + end + + def initialize(pack_with = "V") + @pack_with = pack_with + @oj_options = { mode: :compat } + end + + def encode(hash) + + if user_ids = hash["user_ids"] + hash["user_ids"] = FastIdList.from_array(hash["user_ids"], @pack_with).to_s + end + + hash["data"] = ::Oj.dump(hash["data"], @oj_options) + + Marshal.dump(hash) + end + + def decode(payload) + result = Marshal.load(payload) + result["data"] = ::Oj.load(result["data"], @oj_options) + + if str = result["user_ids"] + result["user_ids"] = FastIdList.from_string(str, @pack_with) + end + + result + end +end diff --git a/bench/codecs/string_hack.rb b/bench/codecs/string_hack.rb new file mode 100644 index 00000000..1ab190c5 --- /dev/null +++ b/bench/codecs/string_hack.rb @@ -0,0 +1,47 @@ +# frozen_string_literal: true + +class StringHack + class FastIdList + def self.from_array(array) + new(",#{array.join(",")},") + end + + def self.from_string(string) + new(string) + end + + def initialize(packed) + @packed = packed + end + + def include?(id) + @packed.include?(",#{id},") + end + + def to_s + @packed + end + end + + def initialize + @oj_options = { mode: :compat } + end + + def encode(hash) + if user_ids = hash["user_ids"] + hash["user_ids"] = FastIdList.from_array(user_ids).to_s + end + + ::Oj.dump(hash, @oj_options) + end + + def decode(payload) + result = ::Oj.load(payload, @oj_options) + + if str = result["user_ids"] + result["user_ids"] = FastIdList.from_string(str) + end + + result + end +end diff --git a/bench/codecs_large_user_list.rb b/bench/codecs_large_user_list.rb new file mode 100644 index 00000000..238e0a6e --- /dev/null +++ b/bench/codecs_large_user_list.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +require 'bundler/inline' + +gemfile do + source 'https://rubygems.org' + gem 'message_bus', path: '../' + gem 'benchmark-ips' + gem 'oj' +end + +require 'benchmark/ips' +require 'message_bus' +require_relative 'codecs/all_codecs' + +bench_decode({ + "data" => "hello world", + "user_ids" => (1..10000).to_a, + "group_ids" => nil, + "client_ids" => nil + }, 5000 +) + +# packed_string_4_bytes: 127176.1 i/s +# packed_string_8_bytes: 94494.6 i/s - 1.35x (± 0.00) slower +# string_hack: 26403.4 i/s - 4.82x (± 0.00) slower +# marshal: 4985.5 i/s - 25.51x (± 0.00) slower +# oj: 3072.9 i/s - 41.39x (± 0.00) slower +# json: 2222.7 i/s - 57.22x (± 0.00) slower diff --git a/bench/codecs_standard_message.rb b/bench/codecs_standard_message.rb new file mode 100644 index 00000000..b8401fed --- /dev/null +++ b/bench/codecs_standard_message.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +require 'bundler/inline' + +gemfile do + source 'https://rubygems.org' + gem 'message_bus', path: '../' + gem 'benchmark-ips' + gem 'oj' +end + +require 'benchmark/ips' +require 'message_bus' +require_relative 'codecs/all_codecs' + +bench_decode({ + "data" => { amazing: "hello world this is an amazing message hello there!!!", another_key: [2, 3, 4] }, + "user_ids" => [1, 2, 3], + "group_ids" => [1], + "client_ids" => nil + }, 2 +) + +# marshal: 504885.6 i/s +# json: 401050.9 i/s - 1.26x (± 0.00) slower +# oj: 340847.4 i/s - 1.48x (± 0.00) slower +# string_hack: 296741.6 i/s - 1.70x (± 0.00) slower +# packed_string_4_bytes: 207942.6 i/s - 2.43x (± 0.00) slower +# packed_string_8_bytes: 206093.0 i/s - 2.45x (± 0.00) slower diff --git a/lib/message_bus.rb b/lib/message_bus.rb index 364a0b7c..94563c41 100644 --- a/lib/message_bus.rb +++ b/lib/message_bus.rb @@ -2,18 +2,22 @@ require "monitor" require "set" -require "message_bus/version" -require "message_bus/message" -require "message_bus/client" -require "message_bus/connection_manager" -require "message_bus/diagnostics" -require "message_bus/rack/middleware" -require "message_bus/rack/diagnostics" -require "message_bus/timer_thread" + +require_relative "message_bus/version" +require_relative "message_bus/message" +require_relative "message_bus/client" +require_relative "message_bus/connection_manager" +require_relative "message_bus/diagnostics" +require_relative "message_bus/rack/middleware" +require_relative "message_bus/rack/diagnostics" +require_relative "message_bus/timer_thread" +require_relative "message_bus/codec/base" +require_relative "message_bus/backends" +require_relative "message_bus/backends/base" # we still need to take care of the logger -if defined?(::Rails) - require 'message_bus/rails/railtie' +if defined?(::Rails::Engine) + require_relative 'message_bus/rails/railtie' end # @see MessageBus::Implementation @@ -278,6 +282,17 @@ def allow_broadcast? end end + # @param [MessageBus::Codec::Base] codec used to encode and decode Message payloads + # @return [void] + def transport_codec=(codec) + configure(trasport_codec: codec) + end + + # @return [MessageBus::Codec::Base] codec used to encode and decode Message payloads + def transport_codec + @config[:transport_codec] ||= MessageBus::Codec::Json.new + end + # @param [MessageBus::Backend::Base] pub_sub a configured backend # @return [void] def reliable_pub_sub=(pub_sub) @@ -358,12 +373,12 @@ def publish(channel, data, opts = nil) raise ::MessageBus::InvalidMessageTarget end - encoded_data = JSON.dump( - data: data, - user_ids: user_ids, - group_ids: group_ids, - client_ids: client_ids - ) + encoded_data = transport_codec.encode({ + "data" => data, + "user_ids" => user_ids, + "group_ids" => group_ids, + "client_ids" => client_ids + }) channel_opts = {} @@ -626,7 +641,7 @@ def decode_message!(msg) channel, site_id = decode_channel_name(msg.channel) msg.channel = channel msg.site_id = site_id - parsed = JSON.parse(msg.data) + parsed = transport_codec.decode(msg.data) msg.data = parsed["data"] msg.user_ids = parsed["user_ids"] msg.group_ids = parsed["group_ids"] diff --git a/lib/message_bus/backends/base.rb b/lib/message_bus/backends/base.rb index a66e7769..19326bca 100644 --- a/lib/message_bus/backends/base.rb +++ b/lib/message_bus/backends/base.rb @@ -1,7 +1,5 @@ # frozen_string_literal: true -require "message_bus/backends" - module MessageBus module Backends # Backends provide a consistent API over a variety of options for persisting diff --git a/lib/message_bus/backends/memory.rb b/lib/message_bus/backends/memory.rb index 4ff77d77..f65bd983 100644 --- a/lib/message_bus/backends/memory.rb +++ b/lib/message_bus/backends/memory.rb @@ -1,7 +1,5 @@ # frozen_string_literal: true -require "message_bus/backends/base" - module MessageBus module Backends # The memory backend stores published messages in a simple array per diff --git a/lib/message_bus/backends/postgres.rb b/lib/message_bus/backends/postgres.rb index 5ca74ec7..50d8c198 100644 --- a/lib/message_bus/backends/postgres.rb +++ b/lib/message_bus/backends/postgres.rb @@ -2,8 +2,6 @@ require 'pg' -require "message_bus/backends/base" - module MessageBus module Backends # The Postgres backend stores published messages in a single Postgres table diff --git a/lib/message_bus/backends/redis.rb b/lib/message_bus/backends/redis.rb index 4790ca93..b77f4377 100644 --- a/lib/message_bus/backends/redis.rb +++ b/lib/message_bus/backends/redis.rb @@ -3,8 +3,6 @@ require 'redis' require 'digest' -require "message_bus/backends/base" - module MessageBus module Backends # The Redis backend stores published messages in Redis sorted sets (using @@ -104,8 +102,8 @@ def expire_all_backlogs! local global_id = redis.call("INCR", global_id_key) local backlog_id = redis.call("INCR", backlog_id_key) - local payload = string.format("%i|%i|%s", global_id, backlog_id, start_payload) - local global_backlog_message = string.format("%i|%s", backlog_id, channel) + local payload = table.concat({ global_id, backlog_id, start_payload }, "|") + local global_backlog_message = table.concat({ backlog_id, channel }, "|") redis.call("ZADD", backlog_key, backlog_id, payload) redis.call("EXPIRE", backlog_key, max_backlog_age) diff --git a/lib/message_bus/codec/base.rb b/lib/message_bus/codec/base.rb new file mode 100644 index 00000000..4b0e58a7 --- /dev/null +++ b/lib/message_bus/codec/base.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +module MessageBus + module Codec + class Base + def encode(hash) + raise ConcreteClassMustImplementError + end + + def decode(payload) + raise ConcreteClassMustImplementError + end + end + + autoload :Json, File.expand_path("json", __dir__) + autoload :Oj, File.expand_path("oj", __dir__) + end +end diff --git a/lib/message_bus/codec/json.rb b/lib/message_bus/codec/json.rb new file mode 100644 index 00000000..4294f1e8 --- /dev/null +++ b/lib/message_bus/codec/json.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +module MessageBus + module Codec + class Json < Base + def encode(hash) + JSON.dump(hash) + end + + def decode(payload) + JSON.parse(payload) + end + end + end +end diff --git a/lib/message_bus/codec/oj.rb b/lib/message_bus/codec/oj.rb new file mode 100644 index 00000000..27ce6ede --- /dev/null +++ b/lib/message_bus/codec/oj.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +require 'oj' unless defined? ::Oj + +module MessageBus + module Codec + class Oj < Base + def initialize(options = { mode: :compat }) + @options = options + end + + def encode(hash) + ::Oj.dump(hash, @options) + end + + def decode(payload) + ::Oj.load(payload, @options) + end + end + end +end diff --git a/spec/lib/message_bus_spec.rb b/spec/lib/message_bus_spec.rb index df923d9d..92064671 100644 --- a/spec/lib/message_bus_spec.rb +++ b/spec/lib/message_bus_spec.rb @@ -162,7 +162,7 @@ data.must_equal 'norris' site_id.must_equal 'magic' channel.must_equal '/chuck' - user_ids.must_equal [1, 2, 3] + user_ids.to_a.must_equal [1, 2, 3] end it "should get global messages if it subscribes to them" do