From 09fa602616734d100da458f9ec9ffbf224286f70 Mon Sep 17 00:00:00 2001 From: Stan Lo Date: Thu, 2 May 2024 19:27:24 +0800 Subject: [PATCH] Extract thread control logic into a new ThreadedPeriodicWorker class (#2304) Instead of letting BackpressureMonitor, SessionFlusher, and Metrics::Aggregator handle their own threading logic, we can extract that into a new class that can be reused by all of them. --- sentry-ruby/lib/sentry-ruby.rb | 1 + .../lib/sentry/backpressure_monitor.rb | 34 +--------------- sentry-ruby/lib/sentry/metrics/aggregator.rb | 34 ++-------------- sentry-ruby/lib/sentry/session_flusher.rb | 37 +++--------------- .../lib/sentry/threaded_periodic_worker.rb | 39 +++++++++++++++++++ .../spec/sentry/backpressure_monitor_spec.rb | 4 +- .../spec/sentry/metrics/aggregator_spec.rb | 4 +- .../spec/sentry/session_flusher_spec.rb | 4 +- 8 files changed, 56 insertions(+), 101 deletions(-) create mode 100644 sentry-ruby/lib/sentry/threaded_periodic_worker.rb diff --git a/sentry-ruby/lib/sentry-ruby.rb b/sentry-ruby/lib/sentry-ruby.rb index 439d83ce5..7e75e9759 100644 --- a/sentry-ruby/lib/sentry-ruby.rb +++ b/sentry-ruby/lib/sentry-ruby.rb @@ -20,6 +20,7 @@ require "sentry/transaction" require "sentry/hub" require "sentry/background_worker" +require "sentry/threaded_periodic_worker" require "sentry/session_flusher" require "sentry/backpressure_monitor" require "sentry/cron/monitor_check_ins" diff --git a/sentry-ruby/lib/sentry/backpressure_monitor.rb b/sentry-ruby/lib/sentry/backpressure_monitor.rb index 4b8695a5a..b3f5652c7 100644 --- a/sentry-ruby/lib/sentry/backpressure_monitor.rb +++ b/sentry-ruby/lib/sentry/backpressure_monitor.rb @@ -1,19 +1,13 @@ # frozen_string_literal: true module Sentry - class BackpressureMonitor - include LoggingHelper - + class BackpressureMonitor < ThreadedPeriodicWorker DEFAULT_INTERVAL = 10 MAX_DOWNSAMPLE_FACTOR = 10 def initialize(configuration, client, interval: DEFAULT_INTERVAL) - @interval = interval + super(configuration.logger, interval) @client = client - @logger = configuration.logger - - @thread = nil - @exited = false @healthy = true @downsample_factor = 0 @@ -47,29 +41,5 @@ def set_downsample_factor log_debug("[BackpressureMonitor] health check negative, downsampling with a factor of #{@downsample_factor}") end end - - def kill - log_debug("[BackpressureMonitor] killing monitor") - - @exited = true - @thread&.kill - end - - private - - def ensure_thread - return if @exited - return if @thread&.alive? - - @thread = Thread.new do - loop do - sleep(@interval) - run - end - end - rescue ThreadError - log_debug("[BackpressureMonitor] Thread creation failed") - @exited = true - end end end diff --git a/sentry-ruby/lib/sentry/metrics/aggregator.rb b/sentry-ruby/lib/sentry/metrics/aggregator.rb index e02def2c0..45ffbd687 100644 --- a/sentry-ruby/lib/sentry/metrics/aggregator.rb +++ b/sentry-ruby/lib/sentry/metrics/aggregator.rb @@ -2,9 +2,7 @@ module Sentry module Metrics - class Aggregator - include LoggingHelper - + class Aggregator < ThreadedPeriodicWorker FLUSH_INTERVAL = 5 ROLLUP_IN_SECONDS = 10 @@ -36,8 +34,8 @@ class Aggregator attr_reader :client, :thread, :buckets, :flush_shift, :code_locations def initialize(configuration, client) + super(configuration.logger, FLUSH_INTERVAL) @client = client - @logger = configuration.logger @before_emit = configuration.metrics.before_emit @enable_code_locations = configuration.metrics.enable_code_locations @stacktrace_builder = configuration.stacktrace_builder @@ -46,8 +44,6 @@ def initialize(configuration, client) @default_tags['release'] = configuration.release if configuration.release @default_tags['environment'] = configuration.environment if configuration.environment - @thread = nil - @exited = false @mutex = Mutex.new # a nested hash of timestamp -> bucket keys -> Metric instance @@ -120,34 +116,10 @@ def flush(force: false) @client.capture_envelope(envelope) end - def kill - log_debug('[Metrics::Aggregator] killing thread') - - @exited = true - @thread&.kill - end + alias_method :run, :flush private - def ensure_thread - return false if @exited - return true if @thread&.alive? - - @thread = Thread.new do - loop do - # TODO-neel-metrics use event for force flush later - sleep(FLUSH_INTERVAL) - flush - end - end - - true - rescue ThreadError - log_debug('[Metrics::Aggregator] thread creation failed') - @exited = true - false - end - # important to sort for key consistency def serialize_tags(tags) tags.flat_map do |k, v| diff --git a/sentry-ruby/lib/sentry/session_flusher.rb b/sentry-ruby/lib/sentry/session_flusher.rb index 256320ca7..5971cfc59 100644 --- a/sentry-ruby/lib/sentry/session_flusher.rb +++ b/sentry-ruby/lib/sentry/session_flusher.rb @@ -1,19 +1,15 @@ # frozen_string_literal: true module Sentry - class SessionFlusher - include LoggingHelper - + class SessionFlusher < ThreadedPeriodicWorker FLUSH_INTERVAL = 60 def initialize(configuration, client) - @thread = nil - @exited = false + super(configuration.logger, FLUSH_INTERVAL) @client = client @pending_aggregates = {} @release = configuration.release @environment = configuration.environment - @logger = configuration.logger log_debug("[Sessions] Sessions won't be captured without a valid release") unless @release end @@ -25,30 +21,18 @@ def flush @pending_aggregates = {} end + alias_method :run, :flush + def add_session(session) - return if @exited return unless @release - begin - ensure_thread - rescue ThreadError - log_debug("Session flusher thread creation failed") - @exited = true - return - end + return unless ensure_thread return unless Session::AGGREGATE_STATUSES.include?(session.status) @pending_aggregates[session.aggregation_key] ||= init_aggregates(session.aggregation_key) @pending_aggregates[session.aggregation_key][session.status] += 1 end - def kill - log_debug("Killing session flusher") - - @exited = true - @thread&.kill - end - private def init_aggregates(aggregation_key) @@ -70,16 +54,5 @@ def pending_envelope def attrs { release: @release, environment: @environment } end - - def ensure_thread - return if @thread&.alive? - - @thread = Thread.new do - loop do - sleep(FLUSH_INTERVAL) - flush - end - end - end end end diff --git a/sentry-ruby/lib/sentry/threaded_periodic_worker.rb b/sentry-ruby/lib/sentry/threaded_periodic_worker.rb new file mode 100644 index 000000000..cf1272083 --- /dev/null +++ b/sentry-ruby/lib/sentry/threaded_periodic_worker.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +module Sentry + class ThreadedPeriodicWorker + include LoggingHelper + + def initialize(logger, internal) + @thread = nil + @exited = false + @interval = internal + @logger = logger + end + + def ensure_thread + return false if @exited + return true if @thread&.alive? + + @thread = Thread.new do + loop do + sleep(@interval) + run + end + end + + true + rescue ThreadError + log_debug("[#{self.class.name}] thread creation failed") + @exited = true + false + end + + def kill + log_debug("[#{self.class.name}] thread killed") + + @exited = true + @thread&.kill + end + end +end diff --git a/sentry-ruby/spec/sentry/backpressure_monitor_spec.rb b/sentry-ruby/spec/sentry/backpressure_monitor_spec.rb index 6f18d605f..1419cffdb 100644 --- a/sentry-ruby/spec/sentry/backpressure_monitor_spec.rb +++ b/sentry-ruby/spec/sentry/backpressure_monitor_spec.rb @@ -55,7 +55,7 @@ it 'logs error' do subject.healthy? - expect(string_io.string).to match(/\[BackpressureMonitor\] Thread creation failed/) + expect(string_io.string).to include("[#{described_class.name}] thread creation failed") end end @@ -111,7 +111,7 @@ subject.healthy? expect(subject.instance_variable_get(:@thread)).to receive(:kill) subject.kill - expect(string_io.string).to match(/\[BackpressureMonitor\] killing monitor/) + expect(string_io.string).to include("[#{described_class.name}] thread killed") end end end diff --git a/sentry-ruby/spec/sentry/metrics/aggregator_spec.rb b/sentry-ruby/spec/sentry/metrics/aggregator_spec.rb index f397d07d5..dcd78ab86 100644 --- a/sentry-ruby/spec/sentry/metrics/aggregator_spec.rb +++ b/sentry-ruby/spec/sentry/metrics/aggregator_spec.rb @@ -47,7 +47,7 @@ it 'logs error' do subject.add(:c, 'incr', 1) - expect(string_io.string).to match(/\[Metrics::Aggregator\] thread creation failed/) + expect(string_io.string).to include("[#{described_class.name}] thread creation failed") end end @@ -479,7 +479,7 @@ it 'logs message when killing the thread' do expect(subject.thread).to receive(:kill) subject.kill - expect(string_io.string).to match(/\[Metrics::Aggregator\] killing thread/) + expect(string_io.string).to include("[#{described_class.name}] thread killed") end end end diff --git a/sentry-ruby/spec/sentry/session_flusher_spec.rb b/sentry-ruby/spec/sentry/session_flusher_spec.rb index 5f9f53cdf..0c0fbb4eb 100644 --- a/sentry-ruby/spec/sentry/session_flusher_spec.rb +++ b/sentry-ruby/spec/sentry/session_flusher_spec.rb @@ -146,7 +146,7 @@ it "logs error" do subject.add_session(session) - expect(string_io.string).to match(/Session flusher thread creation failed/) + expect(string_io.string).to include("[#{described_class.name}] thread creation failed") end end @@ -173,7 +173,7 @@ describe "#kill" do it "logs message when killing the thread" do subject.kill - expect(string_io.string).to match(/Killing session flusher/) + expect(string_io.string).to include("[#{described_class.name}] thread killed") end end end