diff --git a/.chloggen/bnchmk-zstd.yaml b/.chloggen/bnchmk-zstd.yaml new file mode 100644 index 00000000000..5a974afd274 --- /dev/null +++ b/.chloggen/bnchmk-zstd.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: confighttp + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Disable concurrency in zstd compression + +# One or more tracking issues or pull requests related to the change +issues: [8216] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/config/confighttp/compressor.go b/config/confighttp/compressor.go index 3e085bead0d..660fa83ce51 100644 --- a/config/confighttp/compressor.go +++ b/config/confighttp/compressor.go @@ -28,9 +28,10 @@ var ( _ writeCloserReset = (*snappy.Writer)(nil) snappyPool = &compressor{pool: sync.Pool{New: func() any { return snappy.NewBufferedWriter(nil) }}} _ writeCloserReset = (*zstd.Encoder)(nil) - zStdPool = &compressor{pool: sync.Pool{New: func() any { zw, _ := zstd.NewWriter(nil); return zw }}} - _ writeCloserReset = (*zlib.Writer)(nil) - zLibPool = &compressor{pool: sync.Pool{New: func() any { return zlib.NewWriter(nil) }}} + // Concurrency 1 disables async decoding via goroutines. This is useful to reduce memory usage and isn't a bottleneck for compression using sync.Pool. + zStdPool = &compressor{pool: sync.Pool{New: func() any { zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)); return zw }}} + _ writeCloserReset = (*zlib.Writer)(nil) + zLibPool = &compressor{pool: sync.Pool{New: func() any { return zlib.NewWriter(nil) }}} ) type compressor struct { diff --git a/config/confighttp/compressor_test.go b/config/confighttp/compressor_test.go new file mode 100644 index 00000000000..29efe4190d0 --- /dev/null +++ b/config/confighttp/compressor_test.go @@ -0,0 +1,96 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// This file contains helper functions regarding compression/decompression for confighttp. + +package confighttp // import "go.opentelemetry.io/collector/config/confighttp" + +import ( + "bytes" + "fmt" + "io" + "strings" + "testing" + + "github.com/klauspost/compress/zstd" + + "go.opentelemetry.io/collector/config/configcompression" +) + +func BenchmarkCompression(b *testing.B) { + benchmarks := []struct { + codec configcompression.Type + name string + function func(*testing.B, configcompression.Type, *bytes.Buffer, []byte) + }{ + { + codec: configcompression.TypeZstd, + name: "zstdWithConcurrency", + function: benchmarkCompression, + }, + { + codec: configcompression.TypeZstd, + name: "zstdNoConcurrency", + function: benchmarkCompressionNoConcurrency, + }, + } + payload := make([]byte, 10<<20) + buffer := bytes.Buffer{} + buffer.Grow(len(payload)) + + ts := &bytes.Buffer{} + defer func() { + fmt.Printf("input => %.2f MB\n", float64(len(payload))/(1024*1024)) + fmt.Println(ts) + }() + + for i := range benchmarks { + benchmark := &benchmarks[i] + b.Run(fmt.Sprint(benchmark.name), func(b *testing.B) { + benchmark.function(b, benchmark.codec, &buffer, payload) + }) + + } +} + +func benchmarkCompression(b *testing.B, _ configcompression.Type, buf *bytes.Buffer, payload []byte) { + // Concurrency Enabled + + b.Run("compress", func(b *testing.B) { + stringReader := strings.NewReader(string(payload)) + stringReadCloser := io.NopCloser(stringReader) + var enc io.Writer + b.ResetTimer() + b.ReportAllocs() + b.SetBytes(int64(len(payload))) + for i := 0; i < b.N; i++ { + enc, _ = zstd.NewWriter(nil, zstd.WithEncoderConcurrency(5)) + enc.(writeCloserReset).Reset(buf) + _, copyErr := io.Copy(enc, stringReadCloser) + if copyErr != nil { + b.Fatal(copyErr) + } + } + }) +} + +func benchmarkCompressionNoConcurrency(b *testing.B, _ configcompression.Type, buf *bytes.Buffer, payload []byte) { + // Concurrency Disabled + + b.Run("compress", func(b *testing.B) { + stringReader := strings.NewReader(string(payload)) + stringReadCloser := io.NopCloser(stringReader) + var enc io.Writer + b.ResetTimer() + b.ReportAllocs() + b.SetBytes(int64(len(payload))) + for i := 0; i < b.N; i++ { + enc, _ = zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)) + enc.(writeCloserReset).Reset(buf) + _, copyErr := io.Copy(enc, stringReadCloser) + if copyErr != nil { + b.Fatal(copyErr) + } + } + }) +}