Skip to content

Commit

Permalink
Extract thread control logic into a new ThreadedPeriodicWorker class (#…
Browse files Browse the repository at this point in the history
…2304)

Instead of letting BackpressureMonitor, SessionFlusher, and Metrics::Aggregator
handle their own threading logic, we can extract that into a new class
that can be reused by all of them.
  • Loading branch information
st0012 committed May 2, 2024
1 parent 6cdb1fc commit 09fa602
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 101 deletions.
1 change: 1 addition & 0 deletions sentry-ruby/lib/sentry-ruby.rb
Expand Up @@ -20,6 +20,7 @@
require "sentry/transaction"
require "sentry/hub"
require "sentry/background_worker"
require "sentry/threaded_periodic_worker"
require "sentry/session_flusher"
require "sentry/backpressure_monitor"
require "sentry/cron/monitor_check_ins"
Expand Down
34 changes: 2 additions & 32 deletions sentry-ruby/lib/sentry/backpressure_monitor.rb
@@ -1,19 +1,13 @@
# frozen_string_literal: true

module Sentry
class BackpressureMonitor
include LoggingHelper

class BackpressureMonitor < ThreadedPeriodicWorker
DEFAULT_INTERVAL = 10
MAX_DOWNSAMPLE_FACTOR = 10

def initialize(configuration, client, interval: DEFAULT_INTERVAL)
@interval = interval
super(configuration.logger, interval)
@client = client
@logger = configuration.logger

@thread = nil
@exited = false

@healthy = true
@downsample_factor = 0
Expand Down Expand Up @@ -47,29 +41,5 @@ def set_downsample_factor
log_debug("[BackpressureMonitor] health check negative, downsampling with a factor of #{@downsample_factor}")
end
end

def kill
log_debug("[BackpressureMonitor] killing monitor")

@exited = true
@thread&.kill
end

private

def ensure_thread
return if @exited
return if @thread&.alive?

@thread = Thread.new do
loop do
sleep(@interval)
run
end
end
rescue ThreadError
log_debug("[BackpressureMonitor] Thread creation failed")
@exited = true
end
end
end
34 changes: 3 additions & 31 deletions sentry-ruby/lib/sentry/metrics/aggregator.rb
Expand Up @@ -2,9 +2,7 @@

module Sentry
module Metrics
class Aggregator
include LoggingHelper

class Aggregator < ThreadedPeriodicWorker
FLUSH_INTERVAL = 5
ROLLUP_IN_SECONDS = 10

Expand Down Expand Up @@ -36,8 +34,8 @@ class Aggregator
attr_reader :client, :thread, :buckets, :flush_shift, :code_locations

def initialize(configuration, client)
super(configuration.logger, FLUSH_INTERVAL)
@client = client
@logger = configuration.logger
@before_emit = configuration.metrics.before_emit
@enable_code_locations = configuration.metrics.enable_code_locations
@stacktrace_builder = configuration.stacktrace_builder
Expand All @@ -46,8 +44,6 @@ def initialize(configuration, client)
@default_tags['release'] = configuration.release if configuration.release
@default_tags['environment'] = configuration.environment if configuration.environment

@thread = nil
@exited = false
@mutex = Mutex.new

# a nested hash of timestamp -> bucket keys -> Metric instance
Expand Down Expand Up @@ -120,34 +116,10 @@ def flush(force: false)
@client.capture_envelope(envelope)
end

def kill
log_debug('[Metrics::Aggregator] killing thread')

@exited = true
@thread&.kill
end
alias_method :run, :flush

private

def ensure_thread
return false if @exited
return true if @thread&.alive?

@thread = Thread.new do
loop do
# TODO-neel-metrics use event for force flush later
sleep(FLUSH_INTERVAL)
flush
end
end

true
rescue ThreadError
log_debug('[Metrics::Aggregator] thread creation failed')
@exited = true
false
end

# important to sort for key consistency
def serialize_tags(tags)
tags.flat_map do |k, v|
Expand Down
37 changes: 5 additions & 32 deletions sentry-ruby/lib/sentry/session_flusher.rb
@@ -1,19 +1,15 @@
# frozen_string_literal: true

module Sentry
class SessionFlusher
include LoggingHelper

class SessionFlusher < ThreadedPeriodicWorker
FLUSH_INTERVAL = 60

def initialize(configuration, client)
@thread = nil
@exited = false
super(configuration.logger, FLUSH_INTERVAL)
@client = client
@pending_aggregates = {}
@release = configuration.release
@environment = configuration.environment
@logger = configuration.logger

log_debug("[Sessions] Sessions won't be captured without a valid release") unless @release
end
Expand All @@ -25,30 +21,18 @@ def flush
@pending_aggregates = {}
end

alias_method :run, :flush

def add_session(session)
return if @exited
return unless @release

begin
ensure_thread
rescue ThreadError
log_debug("Session flusher thread creation failed")
@exited = true
return
end
return unless ensure_thread

return unless Session::AGGREGATE_STATUSES.include?(session.status)
@pending_aggregates[session.aggregation_key] ||= init_aggregates(session.aggregation_key)
@pending_aggregates[session.aggregation_key][session.status] += 1
end

def kill
log_debug("Killing session flusher")

@exited = true
@thread&.kill
end

private

def init_aggregates(aggregation_key)
Expand All @@ -70,16 +54,5 @@ def pending_envelope
def attrs
{ release: @release, environment: @environment }
end

def ensure_thread
return if @thread&.alive?

@thread = Thread.new do
loop do
sleep(FLUSH_INTERVAL)
flush
end
end
end
end
end
39 changes: 39 additions & 0 deletions sentry-ruby/lib/sentry/threaded_periodic_worker.rb
@@ -0,0 +1,39 @@
# frozen_string_literal: true

module Sentry
class ThreadedPeriodicWorker
include LoggingHelper

def initialize(logger, internal)
@thread = nil
@exited = false
@interval = internal
@logger = logger
end

def ensure_thread
return false if @exited
return true if @thread&.alive?

@thread = Thread.new do
loop do
sleep(@interval)
run
end
end

true
rescue ThreadError
log_debug("[#{self.class.name}] thread creation failed")
@exited = true
false
end

def kill
log_debug("[#{self.class.name}] thread killed")

@exited = true
@thread&.kill
end
end
end
4 changes: 2 additions & 2 deletions sentry-ruby/spec/sentry/backpressure_monitor_spec.rb
Expand Up @@ -55,7 +55,7 @@

it 'logs error' do
subject.healthy?
expect(string_io.string).to match(/\[BackpressureMonitor\] Thread creation failed/)
expect(string_io.string).to include("[#{described_class.name}] thread creation failed")
end
end

Expand Down Expand Up @@ -111,7 +111,7 @@
subject.healthy?
expect(subject.instance_variable_get(:@thread)).to receive(:kill)
subject.kill
expect(string_io.string).to match(/\[BackpressureMonitor\] killing monitor/)
expect(string_io.string).to include("[#{described_class.name}] thread killed")
end
end
end
4 changes: 2 additions & 2 deletions sentry-ruby/spec/sentry/metrics/aggregator_spec.rb
Expand Up @@ -47,7 +47,7 @@

it 'logs error' do
subject.add(:c, 'incr', 1)
expect(string_io.string).to match(/\[Metrics::Aggregator\] thread creation failed/)
expect(string_io.string).to include("[#{described_class.name}] thread creation failed")
end
end

Expand Down Expand Up @@ -479,7 +479,7 @@
it 'logs message when killing the thread' do
expect(subject.thread).to receive(:kill)
subject.kill
expect(string_io.string).to match(/\[Metrics::Aggregator\] killing thread/)
expect(string_io.string).to include("[#{described_class.name}] thread killed")
end
end
end
4 changes: 2 additions & 2 deletions sentry-ruby/spec/sentry/session_flusher_spec.rb
Expand Up @@ -146,7 +146,7 @@

it "logs error" do
subject.add_session(session)
expect(string_io.string).to match(/Session flusher thread creation failed/)
expect(string_io.string).to include("[#{described_class.name}] thread creation failed")
end
end

Expand All @@ -173,7 +173,7 @@
describe "#kill" do
it "logs message when killing the thread" do
subject.kill
expect(string_io.string).to match(/Killing session flusher/)
expect(string_io.string).to include("[#{described_class.name}] thread killed")
end
end
end

0 comments on commit 09fa602

Please sign in to comment.