Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE: support custom codecs for transport #250

Merged
merged 6 commits into from May 10, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Gemfile
Expand Up @@ -18,6 +18,7 @@ end

group :test, :development do
gem 'byebug'
gem 'oj'
end

group :development do
Expand Down
37 changes: 26 additions & 11 deletions lib/message_bus.rb
Expand Up @@ -2,18 +2,22 @@

require "monitor"
require "set"
require "message_bus/version"
require "message_bus/message"
require "message_bus/client"
require "message_bus/connection_manager"
require "message_bus/diagnostics"
require "message_bus/rack/middleware"
require "message_bus/rack/diagnostics"
require "message_bus/timer_thread"

require_relative "message_bus/version"
require_relative "message_bus/message"
require_relative "message_bus/client"
require_relative "message_bus/connection_manager"
require_relative "message_bus/diagnostics"
require_relative "message_bus/rack/middleware"
require_relative "message_bus/rack/diagnostics"
require_relative "message_bus/timer_thread"
require_relative "message_bus/codec/base"
require_relative "message_bus/backends"
require_relative "message_bus/backends/base"

# we still need to take care of the logger
if defined?(::Rails)
require 'message_bus/rails/railtie'
require_relative 'message_bus/rails/railtie'
end

# @see MessageBus::Implementation
Expand Down Expand Up @@ -278,6 +282,17 @@ def allow_broadcast?
end
end

# @param [MessageBus::Codec::Base] codec used to encode and decode Message payloads
# @return [void]
def transport_codec=(codec)
configure(trasport_codec: codec)
end

# @return [MessageBus::Codec::Base] codec used to encode and decode Message payloads
def transport_codec
@config[:transport_codec] ||= MessageBus::Codec::Json.new
end

# @param [MessageBus::Backend::Base] pub_sub a configured backend
# @return [void]
def reliable_pub_sub=(pub_sub)
Expand Down Expand Up @@ -358,7 +373,7 @@ def publish(channel, data, opts = nil)
raise ::MessageBus::InvalidMessageTarget
end

encoded_data = JSON.dump(
encoded_data = transport_codec.encode(
data: data,
user_ids: user_ids,
group_ids: group_ids,
Expand Down Expand Up @@ -626,7 +641,7 @@ def decode_message!(msg)
channel, site_id = decode_channel_name(msg.channel)
msg.channel = channel
msg.site_id = site_id
parsed = JSON.parse(msg.data)
parsed = transport_codec.decode(msg.data)
msg.data = parsed["data"]
msg.user_ids = parsed["user_ids"]
msg.group_ids = parsed["group_ids"]
Expand Down
2 changes: 0 additions & 2 deletions lib/message_bus/backends/base.rb
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "message_bus/backends"

module MessageBus
module Backends
# Backends provide a consistent API over a variety of options for persisting
Expand Down
2 changes: 0 additions & 2 deletions lib/message_bus/backends/memory.rb
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "message_bus/backends/base"

module MessageBus
module Backends
# The memory backend stores published messages in a simple array per
Expand Down
2 changes: 0 additions & 2 deletions lib/message_bus/backends/postgres.rb
Expand Up @@ -2,8 +2,6 @@

require 'pg'

require "message_bus/backends/base"

module MessageBus
module Backends
# The Postgres backend stores published messages in a single Postgres table
Expand Down
2 changes: 0 additions & 2 deletions lib/message_bus/backends/redis.rb
Expand Up @@ -3,8 +3,6 @@
require 'redis'
require 'digest'

require "message_bus/backends/base"

module MessageBus
module Backends
# The Redis backend stores published messages in Redis sorted sets (using
Expand Down
19 changes: 19 additions & 0 deletions lib/message_bus/codec/base.rb
@@ -0,0 +1,19 @@
# frozen_string_literal: true

module MessageBus
module Codec
class Base
def encode(data:, user_ids:, group_ids:, client_ids:)
raise ConcreteClassMustImplementError
end

def decode(payload)
raise ConcreteClassMustImplementError
end
end

autoload :Json, File.expand_path("json", __dir__)
autoload :Oj, File.expand_path("oj", __dir__)
autoload :OjFast, File.expand_path("oj_fast", __dir__)
end
end
20 changes: 20 additions & 0 deletions lib/message_bus/codec/json.rb
@@ -0,0 +1,20 @@
# frozen_string_literal: true

module MessageBus
module Codec
class Json < Base
def encode(data:, user_ids:, group_ids:, client_ids:)
JSON.dump(
data: data,
user_ids: user_ids,
group_ids: group_ids,
client_ids: client_ids
)
end

def decode(payload)
JSON.parse(payload)
end
end
end
end
23 changes: 23 additions & 0 deletions lib/message_bus/codec/oj.rb
@@ -0,0 +1,23 @@
# frozen_string_literal: true

require 'oj'

module MessageBus
module Codec
class Oj < Base
def encode(data:, user_ids:, group_ids:, client_ids:)
::Oj.dump({
data: data,
user_ids: user_ids,
group_ids: group_ids,
client_ids: client_ids
},
mode: :compat)
end

def decode(payload)
::Oj.load(payload, mode: :compat)
end
end
end
end
81 changes: 81 additions & 0 deletions lib/message_bus/codec/oj_fast.rb
@@ -0,0 +1,81 @@
# frozen_string_literal: true

require 'oj'

module MessageBus
module Codec
class FastIdList
def self.from_array(array)
new(array.sort.pack("V*"))
end

def self.from_string(string)
new(string)
end

def initialize(packed)
@packed = packed
end

def include?(id)
found = (0...length).bsearch do |index|
start = index * 4
@packed[start, start + 4].unpack1("V") >= id
SamSaffron marked this conversation as resolved.
Show resolved Hide resolved
end

if found
start = found * 4
found && @packed[start, start + 4].unpack1("V") == id
end
end

def length
@length ||= @packed.bytesize / 4
end

def to_a
@packed.unpack("V*")
end

def to_s
@packed
end
end

class OjFast < Base
def encode(data:, user_ids:, group_ids:, client_ids:)

if user_ids
user_ids = FastIdList.from_array(user_ids).to_s
end

#if group_ids
# group_ids = FastIdList.from_array(group_ids).to_s
#end

::Oj.dump({
data: data,
user_ids: user_ids,
group_ids: group_ids,
client_ids: client_ids
},
mode: :compat)
end

def decode(payload)
result = ::Oj.load(payload, mode: :compat)

if str = result["user_ids"]
result["user_ids"] = FastIdList.from_string(str)
end

# groups need to implement (-)
# if str = result["group_ids"]
# result["group_ids"] = FastIdList.from_string(str)
# end

result
end
end
end
end
2 changes: 1 addition & 1 deletion spec/lib/message_bus_spec.rb
Expand Up @@ -162,7 +162,7 @@
data.must_equal 'norris'
site_id.must_equal 'magic'
channel.must_equal '/chuck'
user_ids.must_equal [1, 2, 3]
user_ids.to_a.must_equal [1, 2, 3]
end

it "should get global messages if it subscribes to them" do
Expand Down