Skip to content

Commit

Permalink
account for field type checking
Browse files Browse the repository at this point in the history
  • Loading branch information
pcrwebdesign committed Mar 15, 2021
1 parent b0f2419 commit 8328e31
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 48 deletions.
2 changes: 1 addition & 1 deletion Gemfile
@@ -1,7 +1,7 @@
source 'https://rubygems.org'

gem 'byebug'
gem 'google-cloud-pubsub', '>= 1.6.0'
# gem 'google-cloud-pubsub', '>= 1.6.0'

# Specify your gem's dependencies in bunny_tie.gemspec
gemspec
64 changes: 40 additions & 24 deletions Gemfile.lock
Expand Up @@ -2,25 +2,35 @@ PATH
remote: .
specs:
pubsub_tie (0.0.2)
activesupport
google-cloud-pubsub (~> 1.6)

GEM
remote: https://rubygems.org/
specs:
activesupport (6.1.3)
concurrent-ruby (~> 1.0, >= 1.0.2)
i18n (>= 1.6, < 2)
minitest (>= 5.1)
tzinfo (~> 2.0)
zeitwerk (~> 2.3)
addressable (2.7.0)
public_suffix (>= 2.0.2, < 5.0)
byebug (11.1.3)
concurrent-ruby (1.1.6)
concurrent-ruby (1.1.8)
diff-lcs (1.3)
faraday (1.0.1)
faraday (1.3.0)
faraday-net_http (~> 1.0)
multipart-post (>= 1.2, < 3)
google-cloud-core (1.5.0)
ruby2_keywords
faraday-net_http (1.0.1)
google-cloud-core (1.6.0)
google-cloud-env (~> 1.0)
google-cloud-errors (~> 1.0)
google-cloud-env (1.3.1)
google-cloud-env (1.5.0)
faraday (>= 0.17.3, < 2.0)
google-cloud-errors (1.0.0)
google-cloud-pubsub (1.6.1)
google-cloud-errors (1.1.0)
google-cloud-pubsub (1.10.0)
concurrent-ruby (~> 1.1)
google-cloud-core (~> 1.2)
google-gax (~> 1.8)
Expand All @@ -33,33 +43,36 @@ GEM
googleauth (~> 0.9)
grpc (~> 1.24)
rly (~> 0.2.3)
google-protobuf (3.11.4)
googleapis-common-protos (1.3.10)
google-protobuf (~> 3.11)
googleapis-common-protos-types (>= 1.0.5, < 2.0)
google-protobuf (3.15.6)
googleapis-common-protos (1.3.11)
google-protobuf (~> 3.14)
googleapis-common-protos-types (>= 1.0.6, < 2.0)
grpc (~> 1.27)
googleapis-common-protos-types (1.0.5)
google-protobuf (~> 3.11)
googleauth (0.12.0)
googleapis-common-protos-types (1.0.6)
google-protobuf (~> 3.14)
googleauth (0.16.0)
faraday (>= 0.17.3, < 2.0)
jwt (>= 1.4, < 3.0)
memoist (~> 0.16)
multi_json (~> 1.11)
os (>= 0.9, < 2.0)
signet (~> 0.14)
grpc (1.28.0)
google-protobuf (~> 3.11)
grpc (1.36.0)
google-protobuf (~> 3.14)
googleapis-common-protos-types (~> 1.0)
grpc-google-iam-v1 (0.6.10)
google-protobuf (~> 3.11)
googleapis-common-protos (>= 1.3.10, < 2.0)
grpc-google-iam-v1 (0.6.11)
google-protobuf (~> 3.14)
googleapis-common-protos (>= 1.3.11, < 2.0)
grpc (~> 1.27)
jwt (2.2.1)
i18n (1.8.9)
concurrent-ruby (~> 1.0)
jwt (2.2.2)
memoist (0.16.2)
multi_json (1.14.1)
minitest (5.14.4)
multi_json (1.15.0)
multipart-post (2.1.1)
os (1.1.0)
public_suffix (4.0.5)
os (1.1.1)
public_suffix (4.0.6)
rake (10.5.0)
rly (0.2.3)
rspec (3.9.0)
Expand All @@ -75,19 +88,22 @@ GEM
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.9.0)
rspec-support (3.9.3)
signet (0.14.0)
ruby2_keywords (0.0.4)
signet (0.15.0)
addressable (~> 2.3)
faraday (>= 0.17.3, < 2.0)
jwt (>= 1.5, < 3.0)
multi_json (~> 1.10)
tzinfo (2.0.4)
concurrent-ruby (~> 1.0)
zeitwerk (2.4.2)

PLATFORMS
ruby

DEPENDENCIES
bundler (~> 1.16)
byebug
google-cloud-pubsub (>= 1.6.0)
pubsub_tie!
rake (~> 10.0)
rspec (~> 3.0)
Expand Down
22 changes: 19 additions & 3 deletions lib/pubsub_tie/events.rb
Expand Up @@ -7,25 +7,41 @@ def configure(config)

evs = config['events'].map{|e| e['name']}
@events = Hash[evs.map(&:to_sym).zip(config['events'])]
@events.each do |k, evt|
fields = (evt['required'] || []) + (evt['optional'] || [])
evt['fields'] = Hash[ fields.map {|f| [f['name'], f['type']]} ]
end
end

# Full event name from symbol protecting from typos
# Raises KeyError if bad symbol
def full_name(sym)
"#{@prefix}-#{name(sym)}"
end

def name(sym)
"#{@prefix}-#{value(sym, 'name')}"
value(sym, 'name')
end

def required(sym)
(value(sym, 'required') || []).map(&:to_sym)
field_names(sym, 'required')
end

def optional(sym)
(value(sym, 'optional') || []).map(&:to_sym)
field_names(sym, 'optional')
end

def types(sym)
value(sym, 'fields')
end

private
def value(sym, key)
@events.fetch(sym)[key]
end

def field_names(sym, mode)
(value(sym, mode) || []).map {|field| field['name'].to_sym}
end
end
end
63 changes: 52 additions & 11 deletions lib/pubsub_tie/publisher.rb
Expand Up @@ -16,36 +16,44 @@ def google_pubsub(config)
credentials: creds)
end

def publish(topic_sym, data, resource)
#
# Publishes event data asynchronously to topic inferred from event_sym.
# Data is augmented with event_name and event_time and validated against
# loaded configuration
#
def publish(event_sym, data, resource)
message = augmented(data, event_sym)

@pubsub.
topic(Events.name topic_sym).
# publish(message(data, resource), publish_time: Time.now.utc)
publish_async(message(validate_data(topic_sym, data), resource),
topic(Events.full_name event_sym).
# publish(message(payload, resource), publish_time: Time.now.utc)
publish_async(payload(validate_data(event_sym, message), resource),
publish_time: Time.now.utc) do |result|
unless result.succeeded?
Rails.logger.error(
"Failed to publish #{data} to #{topic_sym} on #{resource} due to #{result.error}")
"Failed to publish #{message} to #{event_sym} on #{resource} due to #{result.error}")
end
end
end

def batch(topic_sym, messages, resource)
def batch(event_sym, messages, resource)
topic = @pubsub.
topic(Events.name topic_sym)
topic(Events.full_name event_sym)
messages.each do |data|
topic.publish_async(message(validate_data(topic_sym, data), resource),
message = augmented(data, event_sym)
topic.publish_async(payload(validate_data(event_sym, message), resource),
publish_time: Time.now.utc) do |result|
unless result.succeeded?
Rails.logger.error(
"Failed to publish #{data} to #{topic_sym} on #{data} due to #{result.error}")
"Failed to publish #{message} to #{event_sym} on #{resource} due to #{result.error}")
end
end
end
topic.async_publisher.stop.wait!
end

private
def message(data, resource)
def payload(data, resource)
# TODO: embed resource in message
data.to_json
end
Expand All @@ -57,11 +65,44 @@ def validate_data(sym, data)
"Missing event required args for #{sym}: #{missing}")
end

data.slice(*(Events.required(sym) + Events.optional(sym)))
validate_types(sym,
data.slice(*(Events.required(sym) + Events.optional(sym))))
end

def missing_required(sym, data)
Events.required(sym) - data.keys
end

def augmented(data, event_sym)
{event_name: Events.name(event_sym),
event_time: Time.current.utc}.merge(data.to_hash.to_options)
end

def validate_types(sym, data)
types = Events.types(sym)

data.each do |field, val|
case val
when String
bad_type(field, data) unless types[field.to_s] == "STRING"
when Integer
bad_type(field, data) unless ["INT", "FLOAT"].include? types[field.to_s]
when Numeric
bad_type(field, data) unless types[field.to_s] == "FLOAT"
when Time
bad_type(field, data) unless types[field.to_s] == "TIMESTAMP"
when DateTime
bad_type(field, data) unless types[field.to_s] == "DATETIME"
else
bad_type(field, data)
end
end

data
end

def bad_type(field, data)
raise ArgumentError.new("Bad type for field #{field} in event #{data}")
end
end
end
1 change: 1 addition & 0 deletions pubsub_tie.gemspec
Expand Up @@ -29,6 +29,7 @@ Gem::Specification.new do |spec|
spec.require_paths = ["lib"]

spec.add_dependency 'google-cloud-pubsub', '~> 1.6'
spec.add_dependency "activesupport"

spec.add_development_dependency "rake", '~> 10.0'
spec.add_development_dependency "bundler", '~> 1.16'
Expand Down
6 changes: 3 additions & 3 deletions spec/pubsub_tie/events_spec.rb
Expand Up @@ -10,16 +10,16 @@ module PubSubTie
'required' =>['req1', 'req2'],
'optional' =>['opt1']}] } }

describe ".name" do
describe ".full_name" do
context "when the event for the symbol is defined" do
it "preprends the application prefix" do
expect(Events.name(:event_zero)).to eq('test-event_zero')
expect(Events.full_name(:event_zero)).to eq('test-event_zero')
end
end

context "when the event for the symbol is not defined" do
it "raises a KeyError" do
expect {Events.name(:bad_name)}.to raise_error(KeyError)
expect {Events.full_name(:bad_name)}.to raise_error(KeyError)
end
end
end
Expand Down
41 changes: 35 additions & 6 deletions spec/pubsub_tie/publisher_spec.rb
Expand Up @@ -6,27 +6,37 @@ module PubSubTie
let(:config) { {'app_prefix' => 'test',
'events' =>[{
'name' =>'event_zero',
'required' =>['req1'],
'optional' =>['opt1']}] } }
'required' => [{'name' => 'req1', 'type' => 'INT'},
{'name' => 'event_name', 'type' => 'STRING'},
{'name' => 'event_time', 'type' => 'TIMESTAMP'}],
'optional' => [{'name' => 'opt1', 'type' => 'INT'}] }] } }

before(:each) do
Events.configure(config)
Publisher.configure(pubconf)
# Freeze time
travel_to Time.current
end

describe ".publish" do
subject { Publisher.publish(:event_zero, data, nil) }
let(:data) { {req1: 'alpha'} }
let(:data) { {req1: 1} }

it 'produces a topic named after the event name' do
expect(PubSubTie::Google::PubSub::Mock).
to receive(:topic).
with(Events.name(:event_zero)).
with(Events.full_name(:event_zero)).
and_call_original
subject
end

describe 'message' do
let(:augmented) { {req1: 1,
event_name: Events.name(:event_zero),
event_time: Time.current.utc} }
let(:req1) { 1 }


context "with missing required attributes" do
let(:data) { {} }

Expand All @@ -36,13 +46,32 @@ module PubSubTie
end

context "with required attributes" do
context 'with only listed attributes' do
let(:data) { {req1: req1} }

it "augments them to include name and time" do
expect(PubSubTie::Google::PubSub::Topic).
to receive(:publish_async).
with(augmented.to_json, anything)
subject
end

context 'with a bad type' do
let(:req1) { '1.1' }

it "raises an ArgumentError" do
expect { subject }.to raise_error(ArgumentError)
end
end
end

context 'with non-listed attributes' do
let(:data) { {req1: 'alpha', bogus: 'bravo'} }
let(:data) { {req1: req1, bogus: 'bravo'} }

it "ignores them" do
expect(PubSubTie::Google::PubSub::Topic).
to receive(:publish_async).
with({req1: 'alpha'}.to_json, anything)
with(augmented.to_json, anything)
subject
end
end
Expand Down
5 changes: 5 additions & 0 deletions spec/spec_helper.rb
@@ -1,6 +1,9 @@
require "bundler/setup"
require "pubsub_tie"
require 'pathname'
require 'active_support'
require 'active_support/testing/time_helpers'
require 'byebug'

RSpec.configure do |config|
# Enable flags like --only-failures and --next-failure
Expand All @@ -9,6 +12,8 @@
# Disable RSpec exposing methods globally on `Module` and `main`
config.disable_monkey_patching!

config.include ActiveSupport::Testing::TimeHelpers

config.expect_with :rspec do |c|
c.syntax = :expect
end
Expand Down

0 comments on commit 8328e31

Please sign in to comment.