Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype batch api #4024

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.api.metrics;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;

public interface BatchRecorder {

/** Record the value to the instruments when record is called. */
default BatchRecorder addMeasurements(long value, LongInstrument... instruments) {
return this;
}

/** Record the value to the instruments when record is called. */
default BatchRecorder addMeasurements(double value, DoubleInstrument... instruments) {
return this;
}

/** Record the measurements. */
default void record(Attributes attributes) {
record(attributes, Context.current());
}

/** Record the measurements. */
default void record(Attributes attributes, Context context) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,29 @@ public DoubleGaugeBuilder gaugeBuilder(String name) {
return new NoopDoubleObservableInstrumentBuilder();
}

@Override
public BatchRecorder batch() {
return new NoopBatchRecord();
}

private DefaultMeter() {}

private static class NoopBatchRecord implements BatchRecorder {

@Override
public BatchRecorder addMeasurements(long value, LongInstrument... instruments) {
return this;
}

@Override
public BatchRecorder addMeasurements(double value, DoubleInstrument... instruments) {
return this;
}

@Override
public void record(Attributes attributes, Context context) {}
}

private static class NoopLongCounter implements LongCounter {
@Override
public void add(long value, Attributes attributes, Context context) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

/** A counter instrument that records {@code double} values. */
@ThreadSafe
public interface DoubleCounter {
public interface DoubleCounter extends DoubleInstrument {
/**
* Records a value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

/** A histogram instrument that records {@code long} values. */
@ThreadSafe
public interface DoubleHistogram {
public interface DoubleHistogram extends DoubleInstrument {

/**
* Records a value.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.api.metrics;

public interface DoubleInstrument {}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

/** An up-down-counter instrument that records {@code double} values. */
@ThreadSafe
public interface DoubleUpDownCounter {
public interface DoubleUpDownCounter extends DoubleInstrument {
/**
* Records a value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* always-increasing monotonic sums.
*/
@ThreadSafe
public interface LongCounter {
public interface LongCounter extends LongInstrument {

/**
* Records a value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

/** A histogram instrument that records {@code long} values. */
@ThreadSafe
public interface LongHistogram {
public interface LongHistogram extends LongInstrument {

/**
* Records a value.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.api.metrics;

public interface LongInstrument {}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

/** An up-down-counter instrument that records {@code long} values. */
@ThreadSafe
public interface LongUpDownCounter {
public interface LongUpDownCounter extends LongInstrument {
/**
* Records a value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,9 @@ public interface Meter {
* @return a builder used for configuring how to report gauge measurements on demand.
*/
DoubleGaugeBuilder gaugeBuilder(String name);

/** Returns a batch recorder. */
default BatchRecorder batch() {
return DefaultMeter.getInstance().batch();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleCounter;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.DoubleUpDownCounter;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.LongUpDownCounter;
import io.opentelemetry.api.metrics.Meter;
import java.util.concurrent.ThreadLocalRandom;

Expand Down Expand Up @@ -67,6 +69,63 @@ public void perform(Attributes labels) {
metric.record(ThreadLocalRandom.current().nextLong(0, 20_000L), labels);
}
};
}),
MultiRecordNoBatch(
meter -> {
return new Operation() {
final LongHistogram longHistogram =
meter.histogramBuilder("long_histogram").ofLongs().build();
final DoubleHistogram doubleHistogram =
meter.histogramBuilder("double_histogram").build();
final LongCounter longCounter = meter.counterBuilder("long_counter").build();
final DoubleCounter doubleCounter =
meter.counterBuilder("double_counter").ofDoubles().build();
final LongUpDownCounter longUpDownCounter =
meter.upDownCounterBuilder("long_up_down_counter").build();
final DoubleUpDownCounter doubleUpDownCounter =
meter.upDownCounterBuilder("double_up_down_counter").ofDoubles().build();

@Override
public void perform(Attributes labels) {
double doubleValue = ThreadLocalRandom.current().nextDouble(1000);
long longValue = ThreadLocalRandom.current().nextLong(1000);

longHistogram.record(longValue, labels);
doubleHistogram.record(doubleValue, labels);
longCounter.add(longValue, labels);
doubleCounter.add(doubleValue, labels);
longUpDownCounter.add(longValue, labels);
doubleUpDownCounter.add(doubleValue, labels);
}
};
}),
MultiRecordWithBatch(
meter -> {
return new Operation() {
final LongHistogram longHistogram =
meter.histogramBuilder("long_histogram").ofLongs().build();
final DoubleHistogram doubleHistogram =
meter.histogramBuilder("double_histogram").build();
final LongCounter longCounter = meter.counterBuilder("long_counter").build();
final DoubleCounter doubleCounter =
meter.counterBuilder("double_counter").ofDoubles().build();
final LongUpDownCounter longUpDownCounter =
meter.upDownCounterBuilder("long_up_down_counter").build();
final DoubleUpDownCounter doubleUpDownCounter =
meter.upDownCounterBuilder("double_up_down_counter").ofDoubles().build();

@Override
public void perform(Attributes labels) {
double doubleValue = ThreadLocalRandom.current().nextDouble(1000);
long longValue = ThreadLocalRandom.current().nextLong(1000);

meter
.batch()
.addMeasurements(doubleValue, doubleHistogram, doubleCounter, doubleUpDownCounter)
.addMeasurements(longValue, longHistogram, longCounter, longUpDownCounter)
.record(labels);
}
};
});

private final OperationBuilder builder;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.BatchRecorder;
import io.opentelemetry.api.metrics.DoubleInstrument;
import io.opentelemetry.api.metrics.LongInstrument;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

/** {@link SdkBatchRecorder} is SDK implementation of {@link BatchRecorder}. */
final class SdkBatchRecorder implements BatchRecorder {

private static final Logger logger = Logger.getLogger(SdkBatchRecorder.class.getName());

private final ThrottlingLogger throttlingLogger = new ThrottlingLogger(logger);
private final List<Object> measurements = new ArrayList<>();
private final SdkMeter.BatchLatch batchLatch;
private volatile boolean recorded = false;

SdkBatchRecorder(SdkMeter.BatchLatch batchLatch) {
this.batchLatch = batchLatch;
}

@Override
public BatchRecorder addMeasurements(long value, LongInstrument... instruments) {
if (!isRecorded()) {
measurements.add(new LongMeasurement(value, instruments));
}
return this;
}

@Override
public BatchRecorder addMeasurements(double value, DoubleInstrument... instruments) {
if (!isRecorded()) {
measurements.add(new DoubleMeasurement(value, instruments));
}
return this;
}

@Override
public void record(Attributes attributes, Context context) {
if (isRecorded()) {
return;
}
recorded = true;

batchLatch.startBatchRecord();
try {
for (Object measurement : measurements) {
if (measurement instanceof DoubleMeasurement) {
DoubleMeasurement doubleMeasurement = (DoubleMeasurement) measurement;
double value = doubleMeasurement.value;
for (DoubleInstrument instrument : doubleMeasurement.instruments) {
if (instrument instanceof SdkDoubleHistogram) {
((SdkDoubleHistogram) instrument).record(value, attributes, context);
} else if (instrument instanceof SdkDoubleCounter) {
((SdkDoubleCounter) instrument).add(value, attributes, context);
} else if (instrument instanceof SdkDoubleUpDownCounter) {
((SdkDoubleUpDownCounter) instrument).add(value, attributes, context);
} else {
unrecognizedInstrument(instrument);
}
}
}
if (measurement instanceof LongMeasurement) {
LongMeasurement longMeasurement = (LongMeasurement) measurement;
long value = longMeasurement.value;
for (LongInstrument instrument : longMeasurement.instruments) {
if (instrument instanceof SdkLongHistogram) {
((SdkLongHistogram) instrument).record(value, attributes, context);
} else if (instrument instanceof SdkLongCounter) {
((SdkLongCounter) instrument).add(value, attributes, context);
} else if (instrument instanceof SdkLongUpDownCounter) {
((SdkLongUpDownCounter) instrument).add(value, attributes, context);
} else {
unrecognizedInstrument(instrument);
}
}
}
}
} finally {
batchLatch.finishBatchRecord();
}
}

private void unrecognizedInstrument(Object instrument) {
throttlingLogger.log(
Level.WARNING,
"Unrecognized instrument in batch recording: " + instrument.getClass().getName());
}

private boolean isRecorded() {
if (recorded) {
throttlingLogger.log(Level.WARNING, "Batch has already recorded.");
return true;
}
return false;
}

private static class DoubleMeasurement {
private final double value;
private final List<DoubleInstrument> instruments;

private DoubleMeasurement(double value, DoubleInstrument... instruments) {
this.value = value;
this.instruments = Arrays.asList(instruments);
}
}

private static class LongMeasurement {
private final long value;
private final List<LongInstrument> instruments;

private LongMeasurement(long value, LongInstrument... instruments) {
this.value = value;
this.instruments = Arrays.asList(instruments);
}
}
}