Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
Adds an exported function to flush internal reader (#1248)
Browse files Browse the repository at this point in the history
  • Loading branch information
quentin-cha committed Jan 22, 2021
1 parent e736602 commit 13369a4
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 2 deletions.
15 changes: 14 additions & 1 deletion metric/metricexport/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (ir *IntervalReader) Start() error {
reportingInterval = ir.ReportingInterval
}

if ir.done != nil {
if ir.quit != nil {
return errAlreadyStarted
}
ir.timer = time.NewTicker(reportingInterval)
Expand Down Expand Up @@ -172,6 +172,19 @@ func (ir *IntervalReader) Stop() {
ir.quit = nil
}

// Flush flushes the metrics if IntervalReader is stopped, otherwise no-op.
func (ir *IntervalReader) Flush() {
ir.mu.Lock()
defer ir.mu.Unlock()

// No-op if IntervalReader is not stopped
if ir.quit != nil {
return
}

ir.reader.ReadAndExport(ir.exporter)
}

// ReadAndExport reads metrics from all producer registered with
// producer manager and then exports them using provided exporter.
func (r *Reader) ReadAndExport(exporter Exporter) {
Expand Down
68 changes: 67 additions & 1 deletion metric/metricexport/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,69 @@ func TestManualReadForIntervalReader(t *testing.T) {
resetExporter(exporter1)
}

func TestFlushNoOpForIntervalReader(t *testing.T) {
ir1 = createAndStart(exporter1, duration1, t)

gaugeEntry.Set(1)

// since IR is not stopped, flush does nothing
ir1.Flush()

// expect no data points
checkExportedCount(exporter1, 0, t)
checkExportedMetricDesc(exporter1, "active_request", t)
ir1.Stop()
resetExporter(exporter1)
}

func TestFlushAllowMultipleForIntervalReader(t *testing.T) {
ir1 = createAndStart(exporter1, duration1, t)

gaugeEntry.Set(1)

ir1.Stop()
ir1.Flush()

// metric is still coming in
gaugeEntry.Add(1)

// one more flush after IR stopped
ir1.Flush()

// expect 2 data point, one from each flush
checkExportedCount(exporter1, 2, t)
checkExportedValues(exporter1, []int64{1, 2}, t)
checkExportedMetricDesc(exporter1, "active_request", t)

resetExporter(exporter1)
}

func TestFlushRestartForIntervalReader(t *testing.T) {
ir1 = createAndStart(exporter1, duration1, t)

gaugeEntry.Set(1)
ir1.Stop()
ir1.Flush()

// restart the IR
err := ir1.Start()
if err != nil {
t.Fatalf("error starting reader %v\n", err)
}

gaugeEntry.Add(1)

ir1.Stop()
ir1.Flush()

// expect 2 data point, one from each flush
checkExportedCount(exporter1, 2, t)
checkExportedValues(exporter1, []int64{1, 2}, t)
checkExportedMetricDesc(exporter1, "active_request", t)

resetExporter(exporter1)
}

func TestProducerWithIntervalReaderStop(t *testing.T) {
ir1 = createAndStart(exporter1, duration1, t)
ir1.Stop()
Expand Down Expand Up @@ -166,7 +229,10 @@ func TestIntervalReaderMultipleStop(t *testing.T) {

func TestIntervalReaderMultipleStart(t *testing.T) {
ir1 = createAndStart(exporter1, duration1, t)
ir1.Start()
err := ir1.Start()
if err == nil {
t.Fatalf("expected error but got nil\n")
}

gaugeEntry.Add(1)

Expand Down

0 comments on commit 13369a4

Please sign in to comment.