From fe49a3d1ffe43e9643f60c8200022459ba943245 Mon Sep 17 00:00:00 2001 From: Sam Saffron Date: Thu, 6 May 2021 15:29:31 +1000 Subject: [PATCH 1/6] FEATURE: support custom codecs for transport Adds support for MessageBus.transport_codec which can be used to optimise performance of transport - Oj can be used as a serializer vs JSON which is a bit slower - Users can implement custom codecs - OjFast can be used as a serializer for efficient user_id lookups Backwards compatible change, switching codecs to OjFast will invalidate all existing storage. Work in progress --- Gemfile | 1 + lib/message_bus.rb | 37 +++++++++---- lib/message_bus/backends/base.rb | 2 - lib/message_bus/backends/memory.rb | 2 - lib/message_bus/backends/postgres.rb | 2 - lib/message_bus/backends/redis.rb | 2 - lib/message_bus/codec/base.rb | 19 +++++++ lib/message_bus/codec/json.rb | 20 +++++++ lib/message_bus/codec/oj.rb | 23 ++++++++ lib/message_bus/codec/oj_fast.rb | 81 ++++++++++++++++++++++++++++ spec/lib/message_bus_spec.rb | 2 +- 11 files changed, 171 insertions(+), 20 deletions(-) create mode 100644 lib/message_bus/codec/base.rb create mode 100644 lib/message_bus/codec/json.rb create mode 100644 lib/message_bus/codec/oj.rb create mode 100644 lib/message_bus/codec/oj_fast.rb diff --git a/Gemfile b/Gemfile index d2924dc9..66bb4bf3 100644 --- a/Gemfile +++ b/Gemfile @@ -18,6 +18,7 @@ end group :test, :development do gem 'byebug' + gem 'oj' end group :development do diff --git a/lib/message_bus.rb b/lib/message_bus.rb index 364a0b7c..96e89d1f 100644 --- a/lib/message_bus.rb +++ b/lib/message_bus.rb @@ -2,18 +2,22 @@ require "monitor" require "set" -require "message_bus/version" -require "message_bus/message" -require "message_bus/client" -require "message_bus/connection_manager" -require "message_bus/diagnostics" -require "message_bus/rack/middleware" -require "message_bus/rack/diagnostics" -require "message_bus/timer_thread" + +require_relative "message_bus/version" +require_relative "message_bus/message" +require_relative "message_bus/client" +require_relative "message_bus/connection_manager" +require_relative "message_bus/diagnostics" +require_relative "message_bus/rack/middleware" +require_relative "message_bus/rack/diagnostics" +require_relative "message_bus/timer_thread" +require_relative "message_bus/codec/base" +require_relative "message_bus/backends" +require_relative "message_bus/backends/base" # we still need to take care of the logger if defined?(::Rails) - require 'message_bus/rails/railtie' + require_relative 'message_bus/rails/railtie' end # @see MessageBus::Implementation @@ -278,6 +282,17 @@ def allow_broadcast? end end + # @param [MessageBus::Codec::Base] codec used to encode and decode Message payloads + # @return [void] + def transport_codec=(codec) + configure(trasport_codec: codec) + end + + # @return [MessageBus::Codec::Base] codec used to encode and decode Message payloads + def transport_codec + @config[:transport_codec] ||= MessageBus::Codec::Json.new + end + # @param [MessageBus::Backend::Base] pub_sub a configured backend # @return [void] def reliable_pub_sub=(pub_sub) @@ -358,7 +373,7 @@ def publish(channel, data, opts = nil) raise ::MessageBus::InvalidMessageTarget end - encoded_data = JSON.dump( + encoded_data = transport_codec.encode( data: data, user_ids: user_ids, group_ids: group_ids, @@ -626,7 +641,7 @@ def decode_message!(msg) channel, site_id = decode_channel_name(msg.channel) msg.channel = channel msg.site_id = site_id - parsed = JSON.parse(msg.data) + parsed = transport_codec.decode(msg.data) msg.data = parsed["data"] msg.user_ids = parsed["user_ids"] msg.group_ids = parsed["group_ids"] diff --git a/lib/message_bus/backends/base.rb b/lib/message_bus/backends/base.rb index a66e7769..19326bca 100644 --- a/lib/message_bus/backends/base.rb +++ b/lib/message_bus/backends/base.rb @@ -1,7 +1,5 @@ # frozen_string_literal: true -require "message_bus/backends" - module MessageBus module Backends # Backends provide a consistent API over a variety of options for persisting diff --git a/lib/message_bus/backends/memory.rb b/lib/message_bus/backends/memory.rb index 4ff77d77..f65bd983 100644 --- a/lib/message_bus/backends/memory.rb +++ b/lib/message_bus/backends/memory.rb @@ -1,7 +1,5 @@ # frozen_string_literal: true -require "message_bus/backends/base" - module MessageBus module Backends # The memory backend stores published messages in a simple array per diff --git a/lib/message_bus/backends/postgres.rb b/lib/message_bus/backends/postgres.rb index 5ca74ec7..50d8c198 100644 --- a/lib/message_bus/backends/postgres.rb +++ b/lib/message_bus/backends/postgres.rb @@ -2,8 +2,6 @@ require 'pg' -require "message_bus/backends/base" - module MessageBus module Backends # The Postgres backend stores published messages in a single Postgres table diff --git a/lib/message_bus/backends/redis.rb b/lib/message_bus/backends/redis.rb index 4790ca93..e3a1b092 100644 --- a/lib/message_bus/backends/redis.rb +++ b/lib/message_bus/backends/redis.rb @@ -3,8 +3,6 @@ require 'redis' require 'digest' -require "message_bus/backends/base" - module MessageBus module Backends # The Redis backend stores published messages in Redis sorted sets (using diff --git a/lib/message_bus/codec/base.rb b/lib/message_bus/codec/base.rb new file mode 100644 index 00000000..48b59c56 --- /dev/null +++ b/lib/message_bus/codec/base.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +module MessageBus + module Codec + class Base + def encode(data:, user_ids:, group_ids:, client_ids:) + raise ConcreteClassMustImplementError + end + + def decode(payload) + raise ConcreteClassMustImplementError + end + end + + autoload :Json, File.expand_path("json", __dir__) + autoload :Oj, File.expand_path("oj", __dir__) + autoload :OjFast, File.expand_path("oj_fast", __dir__) + end +end diff --git a/lib/message_bus/codec/json.rb b/lib/message_bus/codec/json.rb new file mode 100644 index 00000000..49af348a --- /dev/null +++ b/lib/message_bus/codec/json.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +module MessageBus + module Codec + class Json < Base + def encode(data:, user_ids:, group_ids:, client_ids:) + JSON.dump( + data: data, + user_ids: user_ids, + group_ids: group_ids, + client_ids: client_ids + ) + end + + def decode(payload) + JSON.parse(payload) + end + end + end +end diff --git a/lib/message_bus/codec/oj.rb b/lib/message_bus/codec/oj.rb new file mode 100644 index 00000000..8bf2187e --- /dev/null +++ b/lib/message_bus/codec/oj.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +require 'oj' + +module MessageBus + module Codec + class Oj < Base + def encode(data:, user_ids:, group_ids:, client_ids:) + ::Oj.dump({ + data: data, + user_ids: user_ids, + group_ids: group_ids, + client_ids: client_ids + }, + mode: :compat) + end + + def decode(payload) + ::Oj.load(payload, mode: :compat) + end + end + end +end diff --git a/lib/message_bus/codec/oj_fast.rb b/lib/message_bus/codec/oj_fast.rb new file mode 100644 index 00000000..a85555b2 --- /dev/null +++ b/lib/message_bus/codec/oj_fast.rb @@ -0,0 +1,81 @@ +# frozen_string_literal: true + +require 'oj' + +module MessageBus + module Codec + class FastIdList + def self.from_array(array) + new(array.sort.pack("V*")) + end + + def self.from_string(string) + new(string) + end + + def initialize(packed) + @packed = packed + end + + def include?(id) + found = (0...length).bsearch do |index| + start = index * 4 + @packed[start, start + 4].unpack1("V") >= id + end + + if found + start = found * 4 + found && @packed[start, start + 4].unpack1("V") == id + end + end + + def length + @length ||= @packed.bytesize / 4 + end + + def to_a + @packed.unpack("V*") + end + + def to_s + @packed + end + end + + class OjFast < Base + def encode(data:, user_ids:, group_ids:, client_ids:) + + if user_ids + user_ids = FastIdList.from_array(user_ids).to_s + end + + #if group_ids + # group_ids = FastIdList.from_array(group_ids).to_s + #end + + ::Oj.dump({ + data: data, + user_ids: user_ids, + group_ids: group_ids, + client_ids: client_ids + }, + mode: :compat) + end + + def decode(payload) + result = ::Oj.load(payload, mode: :compat) + + if str = result["user_ids"] + result["user_ids"] = FastIdList.from_string(str) + end + + # groups need to implement (-) + # if str = result["group_ids"] + # result["group_ids"] = FastIdList.from_string(str) + # end + + result + end + end + end +end diff --git a/spec/lib/message_bus_spec.rb b/spec/lib/message_bus_spec.rb index df923d9d..92064671 100644 --- a/spec/lib/message_bus_spec.rb +++ b/spec/lib/message_bus_spec.rb @@ -162,7 +162,7 @@ data.must_equal 'norris' site_id.must_equal 'magic' channel.must_equal '/chuck' - user_ids.must_equal [1, 2, 3] + user_ids.to_a.must_equal [1, 2, 3] end it "should get global messages if it subscribes to them" do From 49c7200148f45481b5a0092f2207d3725a763867 Mon Sep 17 00:00:00 2001 From: Sam Saffron Date: Thu, 6 May 2021 17:26:07 +1000 Subject: [PATCH 2/6] Corrected implementation Sadly spec suite is not quite passing cause encoded data explodes if \x00 is in the stream, need to figure out why Bench though is spectacularly good, so we are making progress --- Gemfile | 1 + bench/codecs.rb | 131 +++++++++++++++++++++++++++++++ lib/message_bus/codec/oj.rb | 2 +- lib/message_bus/codec/oj_fast.rb | 29 +++---- 4 files changed, 148 insertions(+), 15 deletions(-) create mode 100644 bench/codecs.rb diff --git a/Gemfile b/Gemfile index 66bb4bf3..ea22e39d 100644 --- a/Gemfile +++ b/Gemfile @@ -14,6 +14,7 @@ group :test do gem 'rack-test', require: 'rack/test' gem 'jasmine' gem 'puma' + gem 'm' end group :test, :development do diff --git a/bench/codecs.rb b/bench/codecs.rb new file mode 100644 index 00000000..4e71e219 --- /dev/null +++ b/bench/codecs.rb @@ -0,0 +1,131 @@ +# frozen_string_literal: true + +require 'bundler/inline' + +gemfile do + source 'https://rubygems.org' + gem 'message_bus', path: '../' + gem 'benchmark-ips' + gem 'oj' +end + +require 'benchmark/ips' +require 'message_bus' + +class StringHack + class FastIdList + def self.from_array(array) + new(",#{array.join(",")},") + end + + def self.from_string(string) + new(string) + end + + def initialize(packed) + @packed = packed + end + + def include?(id) + @packed.include?(",#{id},") + end + + def to_s + @packed + end + end + + class OjString + def encode(data:, user_ids:, group_ids:, client_ids:) + + if user_ids + user_ids = FastIdList.from_array(user_ids).to_s + end + + ::Oj.dump({ + data: data, + user_ids: user_ids, + group_ids: group_ids, + client_ids: client_ids + }, + mode: :compat) + end + + def decode(payload) + result = ::Oj.load(payload, mode: :compat) + + if str = result["user_ids"] + result["user_ids"] = FastIdList.from_string(str) + end + + result + end + end +end + +json_codec = MessageBus::Codec::Json.new +oj_codec = MessageBus::Codec::Oj.new +oj_fast_codec = MessageBus::Codec::OjFast.new +oj_fast_string_hack = StringHack::OjString.new + +json = json_codec.encode( + data: "hello world", + user_ids: (1..10000).to_a, + group_ids: nil, + client_ids: nil +) + +json_fast = oj_fast_codec.encode( + data: "hello world", + user_ids: (1..10000).to_a, + group_ids: nil, + client_ids: nil +) + +json_fast2 = oj_fast_string_hack.encode( + data: "hello world", + user_ids: (1..10000).to_a, + group_ids: nil, + client_ids: nil +) + +Benchmark.ips do |x| + x.report("json") do |n| + while n > 0 + decoded = json_codec.decode(json) + decoded["user_ids"].include?(5000) + n -= 1 + end + end + + x.report("oj") do |n| + while n > 0 + decoded = oj_codec.decode(json) + decoded["user_ids"].include?(5000) + n -= 1 + end + end + + x.report("oj_fast") do |n| + while n > 0 + decoded = oj_fast_codec.decode(json_fast) + decoded["user_ids"].include?(5000) + n -= 1 + end + end + + x.report("oj_fast string hack") do |n| + while n > 0 + decoded = oj_fast_string_hack.decode(json_fast2) + decoded["user_ids"].include?(5000) + n -= 1 + end + end + x.compare! +end + +#Comparison: +# oj_fast: 129350.0 i/s +# oj_fast string hack: 26255.2 i/s - 4.93x (± 0.00) slower +# oj: 3073.5 i/s - 42.09x (± 0.00) slower +# json: 2221.9 i/s - 58.22x (± 0.00) slower diff --git a/lib/message_bus/codec/oj.rb b/lib/message_bus/codec/oj.rb index 8bf2187e..9630e09d 100644 --- a/lib/message_bus/codec/oj.rb +++ b/lib/message_bus/codec/oj.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -require 'oj' +require 'oj' unless defined? ::Oj module MessageBus module Codec diff --git a/lib/message_bus/codec/oj_fast.rb b/lib/message_bus/codec/oj_fast.rb index a85555b2..2b38c9c9 100644 --- a/lib/message_bus/codec/oj_fast.rb +++ b/lib/message_bus/codec/oj_fast.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true -require 'oj' +require 'oj' unless defined? ::Oj +require 'base64' module MessageBus module Codec @@ -19,13 +20,11 @@ def initialize(packed) def include?(id) found = (0...length).bsearch do |index| - start = index * 4 - @packed[start, start + 4].unpack1("V") >= id + @packed.byteslice(index * 4, 4).unpack1("V") >= id end if found - start = found * 4 - found && @packed[start, start + 4].unpack1("V") == id + found && @packed.byteslice(found * 4, 4).unpack1("V") == id end end @@ -52,18 +51,19 @@ def encode(data:, user_ids:, group_ids:, client_ids:) #if group_ids # group_ids = FastIdList.from_array(group_ids).to_s #end - - ::Oj.dump({ - data: data, - user_ids: user_ids, - group_ids: group_ids, - client_ids: client_ids - }, - mode: :compat) + data = ::Oj.dump(data, mode: :compat) + + Marshal.dump( + "data" => data, + "user_ids" => user_ids, + "group_ids" => group_ids, + "client_ids" => client_ids + ) end def decode(payload) - result = ::Oj.load(payload, mode: :compat) + result = Marshal.load(payload) + result["data"] = ::Oj.load(result["data"], mode: :compat) if str = result["user_ids"] result["user_ids"] = FastIdList.from_string(str) @@ -73,6 +73,7 @@ def decode(payload) # if str = result["group_ids"] # result["group_ids"] = FastIdList.from_string(str) # end + # result end From b4b4e8d21401435fccfadf90e4c3a2052bb976a7 Mon Sep 17 00:00:00 2001 From: Sam Saffron Date: Fri, 7 May 2021 10:26:04 +1000 Subject: [PATCH 3/6] Improve implementation - Allow redis backend to handle \u0000 in exceptional cases like the unsupported Marshal codec - Ship only Oj and JSON codecs out of the box - Add more benchmarks to show when packing user ids helps vs hinders - Update readme --- CHANGELOG | 4 + README.md | 25 ++++++ bench/codecs.rb | 131 ------------------------------ bench/codecs/all_codecs.rb | 39 +++++++++ bench/codecs/marshal.rb | 11 +++ bench/codecs/packed_string.rb | 69 ++++++++++++++++ bench/codecs/string_hack.rb | 47 +++++++++++ bench/codecs_large_user_list.rb | 29 +++++++ bench/codecs_standard_message.rb | 29 +++++++ lib/message_bus.rb | 14 ++-- lib/message_bus/backends/redis.rb | 4 +- lib/message_bus/codec/base.rb | 3 +- lib/message_bus/codec/json.rb | 9 +- lib/message_bus/codec/oj.rb | 15 ++-- lib/message_bus/codec/oj_fast.rb | 82 ------------------- 15 files changed, 271 insertions(+), 240 deletions(-) delete mode 100644 bench/codecs.rb create mode 100644 bench/codecs/all_codecs.rb create mode 100644 bench/codecs/marshal.rb create mode 100644 bench/codecs/packed_string.rb create mode 100644 bench/codecs/string_hack.rb create mode 100644 bench/codecs_large_user_list.rb create mode 100644 bench/codecs_standard_message.rb delete mode 100644 lib/message_bus/codec/oj_fast.rb diff --git a/CHANGELOG b/CHANGELOG index 42c54a66..7fdb95ce 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,9 @@ 28-04-2021 +- Version TBD + + - FEATURE: Introduce support for transport codecs + - Version 3.3.5 - PERF: Optimised CORS preflight handling diff --git a/README.md b/README.md index 4468a487..384b277b 100644 --- a/README.md +++ b/README.md @@ -435,6 +435,31 @@ MessageBus.configure(backend: :memory) The `:clear_every` option supported by the PostgreSQL backend is also supported by the in-memory backend. + +### Transport codecs + +By default MessageBus serializes messages to the backend using JSON. Under most situation this performs extremely well. + +In some exceptional cases you may consider a different transport codec. To configure a custom codec use: + +```ruby +MessageBus.configure(transport_codec: codec) +``` + +A codec class must implement MessageBus::Codec::Base. Specifically an `encode` and `decode` method. + +See the `bench` directory for examples where the default JSON codec can perform poorly. A specific examples may be +attempting to distribute a message to a restricted list of thousands of users. In cases like this you may consider +using a packed string encoder. + +Keep in mind, much of MessageBus internals and supporting tools expect data to be converted to JSON and back, if you use a naive (and fast) `Marshal` based codec you may need to limit the features you use. Specifically the Postgresql backend expects the codec never to return a string with `\u0000`, additionally some classes like DistributedCache expect keys to be converted to Strings. + +Another example may be very large and complicated messages where Oj in compatability mode outperforms JSON. To opt for the Oj codec use: + +``` +MessageBus.configure(transport_codec: MessageBus::Codec::Oj.new) +``` + ### Forking/threading app servers If you're using a forking or threading app server and you're not getting immediate delivery of published messages, you might need to configure your web server to re-connect to the message_bus backend diff --git a/bench/codecs.rb b/bench/codecs.rb deleted file mode 100644 index 4e71e219..00000000 --- a/bench/codecs.rb +++ /dev/null @@ -1,131 +0,0 @@ -# frozen_string_literal: true - -require 'bundler/inline' - -gemfile do - source 'https://rubygems.org' - gem 'message_bus', path: '../' - gem 'benchmark-ips' - gem 'oj' -end - -require 'benchmark/ips' -require 'message_bus' - -class StringHack - class FastIdList - def self.from_array(array) - new(",#{array.join(",")},") - end - - def self.from_string(string) - new(string) - end - - def initialize(packed) - @packed = packed - end - - def include?(id) - @packed.include?(",#{id},") - end - - def to_s - @packed - end - end - - class OjString - def encode(data:, user_ids:, group_ids:, client_ids:) - - if user_ids - user_ids = FastIdList.from_array(user_ids).to_s - end - - ::Oj.dump({ - data: data, - user_ids: user_ids, - group_ids: group_ids, - client_ids: client_ids - }, - mode: :compat) - end - - def decode(payload) - result = ::Oj.load(payload, mode: :compat) - - if str = result["user_ids"] - result["user_ids"] = FastIdList.from_string(str) - end - - result - end - end -end - -json_codec = MessageBus::Codec::Json.new -oj_codec = MessageBus::Codec::Oj.new -oj_fast_codec = MessageBus::Codec::OjFast.new -oj_fast_string_hack = StringHack::OjString.new - -json = json_codec.encode( - data: "hello world", - user_ids: (1..10000).to_a, - group_ids: nil, - client_ids: nil -) - -json_fast = oj_fast_codec.encode( - data: "hello world", - user_ids: (1..10000).to_a, - group_ids: nil, - client_ids: nil -) - -json_fast2 = oj_fast_string_hack.encode( - data: "hello world", - user_ids: (1..10000).to_a, - group_ids: nil, - client_ids: nil -) - -Benchmark.ips do |x| - x.report("json") do |n| - while n > 0 - decoded = json_codec.decode(json) - decoded["user_ids"].include?(5000) - n -= 1 - end - end - - x.report("oj") do |n| - while n > 0 - decoded = oj_codec.decode(json) - decoded["user_ids"].include?(5000) - n -= 1 - end - end - - x.report("oj_fast") do |n| - while n > 0 - decoded = oj_fast_codec.decode(json_fast) - decoded["user_ids"].include?(5000) - n -= 1 - end - end - - x.report("oj_fast string hack") do |n| - while n > 0 - decoded = oj_fast_string_hack.decode(json_fast2) - decoded["user_ids"].include?(5000) - n -= 1 - end - end - x.compare! -end - -#Comparison: -# oj_fast: 129350.0 i/s -# oj_fast string hack: 26255.2 i/s - 4.93x (± 0.00) slower -# oj: 3073.5 i/s - 42.09x (± 0.00) slower -# json: 2221.9 i/s - 58.22x (± 0.00) slower diff --git a/bench/codecs/all_codecs.rb b/bench/codecs/all_codecs.rb new file mode 100644 index 00000000..4583c0f3 --- /dev/null +++ b/bench/codecs/all_codecs.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +require_relative './packed_string' +require_relative './string_hack' +require_relative './marshal' + +def all_codecs + { + json: MessageBus::Codec::Json.new, + oj: MessageBus::Codec::Oj.new, + marshal: MarshalCodec.new, + packed_string_4_bytes: PackedString.new("V"), + packed_string_8_bytes: PackedString.new("Q"), + string_hack: StringHack.new + } +end + +def bench_decode(hash, user_needle) + encoded_data = all_codecs.map do |name, codec| + [ + name, codec, codec.encode(hash.dup) + ] + end + + Benchmark.ips do |x| + + encoded_data.each do |name, codec, encoded| + x.report(name) do |n| + while n > 0 + decoded = codec.decode(encoded) + decoded["user_ids"].include?(user_needle) + n -= 1 + end + end + end + + x.compare! + end +end diff --git a/bench/codecs/marshal.rb b/bench/codecs/marshal.rb new file mode 100644 index 00000000..6223549d --- /dev/null +++ b/bench/codecs/marshal.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +class MarshalCodec + def encode(hash) + ::Marshal.dump(hash) + end + + def decode(payload) + ::Marshal.load(payload) + end +end diff --git a/bench/codecs/packed_string.rb b/bench/codecs/packed_string.rb new file mode 100644 index 00000000..69f590d7 --- /dev/null +++ b/bench/codecs/packed_string.rb @@ -0,0 +1,69 @@ +# frozen_string_literal: true + +class PackedString + class FastIdList + def self.from_array(array, pack_with) + new(array.sort.pack("#{pack_with}*"), pack_with) + end + + def self.from_string(string, pack_with) + new(string, pack_with) + end + + def initialize(packed, pack_with) + raise "unknown pack format, expecting Q or V" if pack_with != "V" && pack_with != "Q" + @packed = packed + @pack_with = pack_with + @slot_size = pack_with == "V" ? 4 : 8 + end + + def include?(id) + found = (0...length).bsearch do |index| + @packed.byteslice(index * @slot_size, @slot_size).unpack1(@pack_with) >= id + end + + if found + found && @packed.byteslice(found * @slot_size, @slot_size).unpack1(@pack_with) == id + end + end + + def length + @length ||= @packed.bytesize / @slot_size + end + + def to_a + @packed.unpack("#{@pack_with}*") + end + + def to_s + @packed + end + end + + def initialize(pack_with = "V") + @pack_with = pack_with + @oj_options = { mode: :compat } + end + + def encode(hash) + + if user_ids = hash["user_ids"] + hash["user_ids"] = FastIdList.from_array(hash["user_ids"], @pack_with).to_s + end + + hash["data"] = ::Oj.dump(hash["data"], @oj_options) + + Marshal.dump(hash) + end + + def decode(payload) + result = Marshal.load(payload) + result["data"] = ::Oj.load(result["data"], @oj_options) + + if str = result["user_ids"] + result["user_ids"] = FastIdList.from_string(str, @pack_with) + end + + result + end +end diff --git a/bench/codecs/string_hack.rb b/bench/codecs/string_hack.rb new file mode 100644 index 00000000..1ab190c5 --- /dev/null +++ b/bench/codecs/string_hack.rb @@ -0,0 +1,47 @@ +# frozen_string_literal: true + +class StringHack + class FastIdList + def self.from_array(array) + new(",#{array.join(",")},") + end + + def self.from_string(string) + new(string) + end + + def initialize(packed) + @packed = packed + end + + def include?(id) + @packed.include?(",#{id},") + end + + def to_s + @packed + end + end + + def initialize + @oj_options = { mode: :compat } + end + + def encode(hash) + if user_ids = hash["user_ids"] + hash["user_ids"] = FastIdList.from_array(user_ids).to_s + end + + ::Oj.dump(hash, @oj_options) + end + + def decode(payload) + result = ::Oj.load(payload, @oj_options) + + if str = result["user_ids"] + result["user_ids"] = FastIdList.from_string(str) + end + + result + end +end diff --git a/bench/codecs_large_user_list.rb b/bench/codecs_large_user_list.rb new file mode 100644 index 00000000..238e0a6e --- /dev/null +++ b/bench/codecs_large_user_list.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +require 'bundler/inline' + +gemfile do + source 'https://rubygems.org' + gem 'message_bus', path: '../' + gem 'benchmark-ips' + gem 'oj' +end + +require 'benchmark/ips' +require 'message_bus' +require_relative 'codecs/all_codecs' + +bench_decode({ + "data" => "hello world", + "user_ids" => (1..10000).to_a, + "group_ids" => nil, + "client_ids" => nil + }, 5000 +) + +# packed_string_4_bytes: 127176.1 i/s +# packed_string_8_bytes: 94494.6 i/s - 1.35x (± 0.00) slower +# string_hack: 26403.4 i/s - 4.82x (± 0.00) slower +# marshal: 4985.5 i/s - 25.51x (± 0.00) slower +# oj: 3072.9 i/s - 41.39x (± 0.00) slower +# json: 2222.7 i/s - 57.22x (± 0.00) slower diff --git a/bench/codecs_standard_message.rb b/bench/codecs_standard_message.rb new file mode 100644 index 00000000..b8401fed --- /dev/null +++ b/bench/codecs_standard_message.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +require 'bundler/inline' + +gemfile do + source 'https://rubygems.org' + gem 'message_bus', path: '../' + gem 'benchmark-ips' + gem 'oj' +end + +require 'benchmark/ips' +require 'message_bus' +require_relative 'codecs/all_codecs' + +bench_decode({ + "data" => { amazing: "hello world this is an amazing message hello there!!!", another_key: [2, 3, 4] }, + "user_ids" => [1, 2, 3], + "group_ids" => [1], + "client_ids" => nil + }, 2 +) + +# marshal: 504885.6 i/s +# json: 401050.9 i/s - 1.26x (± 0.00) slower +# oj: 340847.4 i/s - 1.48x (± 0.00) slower +# string_hack: 296741.6 i/s - 1.70x (± 0.00) slower +# packed_string_4_bytes: 207942.6 i/s - 2.43x (± 0.00) slower +# packed_string_8_bytes: 206093.0 i/s - 2.45x (± 0.00) slower diff --git a/lib/message_bus.rb b/lib/message_bus.rb index 96e89d1f..94563c41 100644 --- a/lib/message_bus.rb +++ b/lib/message_bus.rb @@ -16,7 +16,7 @@ require_relative "message_bus/backends/base" # we still need to take care of the logger -if defined?(::Rails) +if defined?(::Rails::Engine) require_relative 'message_bus/rails/railtie' end @@ -373,12 +373,12 @@ def publish(channel, data, opts = nil) raise ::MessageBus::InvalidMessageTarget end - encoded_data = transport_codec.encode( - data: data, - user_ids: user_ids, - group_ids: group_ids, - client_ids: client_ids - ) + encoded_data = transport_codec.encode({ + "data" => data, + "user_ids" => user_ids, + "group_ids" => group_ids, + "client_ids" => client_ids + }) channel_opts = {} diff --git a/lib/message_bus/backends/redis.rb b/lib/message_bus/backends/redis.rb index e3a1b092..b77f4377 100644 --- a/lib/message_bus/backends/redis.rb +++ b/lib/message_bus/backends/redis.rb @@ -102,8 +102,8 @@ def expire_all_backlogs! local global_id = redis.call("INCR", global_id_key) local backlog_id = redis.call("INCR", backlog_id_key) - local payload = string.format("%i|%i|%s", global_id, backlog_id, start_payload) - local global_backlog_message = string.format("%i|%s", backlog_id, channel) + local payload = table.concat({ global_id, backlog_id, start_payload }, "|") + local global_backlog_message = table.concat({ backlog_id, channel }, "|") redis.call("ZADD", backlog_key, backlog_id, payload) redis.call("EXPIRE", backlog_key, max_backlog_age) diff --git a/lib/message_bus/codec/base.rb b/lib/message_bus/codec/base.rb index 48b59c56..4b0e58a7 100644 --- a/lib/message_bus/codec/base.rb +++ b/lib/message_bus/codec/base.rb @@ -3,7 +3,7 @@ module MessageBus module Codec class Base - def encode(data:, user_ids:, group_ids:, client_ids:) + def encode(hash) raise ConcreteClassMustImplementError end @@ -14,6 +14,5 @@ def decode(payload) autoload :Json, File.expand_path("json", __dir__) autoload :Oj, File.expand_path("oj", __dir__) - autoload :OjFast, File.expand_path("oj_fast", __dir__) end end diff --git a/lib/message_bus/codec/json.rb b/lib/message_bus/codec/json.rb index 49af348a..4294f1e8 100644 --- a/lib/message_bus/codec/json.rb +++ b/lib/message_bus/codec/json.rb @@ -3,13 +3,8 @@ module MessageBus module Codec class Json < Base - def encode(data:, user_ids:, group_ids:, client_ids:) - JSON.dump( - data: data, - user_ids: user_ids, - group_ids: group_ids, - client_ids: client_ids - ) + def encode(hash) + JSON.dump(hash) end def decode(payload) diff --git a/lib/message_bus/codec/oj.rb b/lib/message_bus/codec/oj.rb index 9630e09d..85423c88 100644 --- a/lib/message_bus/codec/oj.rb +++ b/lib/message_bus/codec/oj.rb @@ -5,18 +5,15 @@ module MessageBus module Codec class Oj < Base - def encode(data:, user_ids:, group_ids:, client_ids:) - ::Oj.dump({ - data: data, - user_ids: user_ids, - group_ids: group_ids, - client_ids: client_ids - }, - mode: :compat) + def initialize(options = { mode: :compat }) + @options = options + end + def encode(hash) + ::Oj.dump(hash, @options) end def decode(payload) - ::Oj.load(payload, mode: :compat) + ::Oj.load(payload, @options) end end end diff --git a/lib/message_bus/codec/oj_fast.rb b/lib/message_bus/codec/oj_fast.rb deleted file mode 100644 index 2b38c9c9..00000000 --- a/lib/message_bus/codec/oj_fast.rb +++ /dev/null @@ -1,82 +0,0 @@ -# frozen_string_literal: true - -require 'oj' unless defined? ::Oj -require 'base64' - -module MessageBus - module Codec - class FastIdList - def self.from_array(array) - new(array.sort.pack("V*")) - end - - def self.from_string(string) - new(string) - end - - def initialize(packed) - @packed = packed - end - - def include?(id) - found = (0...length).bsearch do |index| - @packed.byteslice(index * 4, 4).unpack1("V") >= id - end - - if found - found && @packed.byteslice(found * 4, 4).unpack1("V") == id - end - end - - def length - @length ||= @packed.bytesize / 4 - end - - def to_a - @packed.unpack("V*") - end - - def to_s - @packed - end - end - - class OjFast < Base - def encode(data:, user_ids:, group_ids:, client_ids:) - - if user_ids - user_ids = FastIdList.from_array(user_ids).to_s - end - - #if group_ids - # group_ids = FastIdList.from_array(group_ids).to_s - #end - data = ::Oj.dump(data, mode: :compat) - - Marshal.dump( - "data" => data, - "user_ids" => user_ids, - "group_ids" => group_ids, - "client_ids" => client_ids - ) - end - - def decode(payload) - result = Marshal.load(payload) - result["data"] = ::Oj.load(result["data"], mode: :compat) - - if str = result["user_ids"] - result["user_ids"] = FastIdList.from_string(str) - end - - # groups need to implement (-) - # if str = result["group_ids"] - # result["group_ids"] = FastIdList.from_string(str) - # end - # - - result - end - end - end -end From 06d17783b80be864db6e09cfc906ebf7eb0f06eb Mon Sep 17 00:00:00 2001 From: Sam Date: Fri, 7 May 2021 12:11:27 +1000 Subject: [PATCH 4/6] Update bench/codecs/packed_string.rb Co-authored-by: Rafael Garcia --- bench/codecs/packed_string.rb | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/bench/codecs/packed_string.rb b/bench/codecs/packed_string.rb index 69f590d7..c9c9db51 100644 --- a/bench/codecs/packed_string.rb +++ b/bench/codecs/packed_string.rb @@ -22,9 +22,7 @@ def include?(id) @packed.byteslice(index * @slot_size, @slot_size).unpack1(@pack_with) >= id end - if found - found && @packed.byteslice(found * @slot_size, @slot_size).unpack1(@pack_with) == id - end + found && @packed.byteslice(found * @slot_size, @slot_size).unpack1(@pack_with) == id end def length From 24c4a693c483e67c7316c01c453ecff2fb9c28a9 Mon Sep 17 00:00:00 2001 From: Ben Langfeld Date: Mon, 10 May 2021 11:20:29 -0300 Subject: [PATCH 5/6] Lint Co-authored-by: Rafael Garcia --- lib/message_bus/codec/oj.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/message_bus/codec/oj.rb b/lib/message_bus/codec/oj.rb index 85423c88..27ce6ede 100644 --- a/lib/message_bus/codec/oj.rb +++ b/lib/message_bus/codec/oj.rb @@ -8,6 +8,7 @@ class Oj < Base def initialize(options = { mode: :compat }) @options = options end + def encode(hash) ::Oj.dump(hash, @options) end From 501d52e9d874e57e3987bd91f8fb0e7e3df5ebe2 Mon Sep 17 00:00:00 2001 From: Ben Langfeld Date: Mon, 10 May 2021 11:21:13 -0300 Subject: [PATCH 6/6] More accurate list of unreleased features --- CHANGELOG | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 7fdb95ce..20295486 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,9 +1,9 @@ -28-04-2021 - -- Version TBD +UNRELEASED - FEATURE: Introduce support for transport codecs +28-04-2021 + - Version 3.3.5 - PERF: Optimised CORS preflight handling