Skip to content

Commit

Permalink
Disable concurrency in zstd and add Benchmark tests for it
Browse files Browse the repository at this point in the history
  • Loading branch information
rnishtala-sumo committed Mar 13, 2024
1 parent cc485e0 commit 962db5e
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 1 deletion.
25 changes: 25 additions & 0 deletions .chloggen/bnchmk-zstd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: confighttp

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Disable concurrency in zstd compression

# One or more tracking issues or pull requests related to the change
issues: [8216]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 1 addition & 1 deletion config/confighttp/compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var (
_ writeCloserReset = (*snappy.Writer)(nil)
snappyPool = &compressor{pool: sync.Pool{New: func() any { return snappy.NewBufferedWriter(nil) }}}
_ writeCloserReset = (*zstd.Encoder)(nil)
zStdPool = &compressor{pool: sync.Pool{New: func() any { zw, _ := zstd.NewWriter(nil); return zw }}}
zStdPool = &compressor{pool: sync.Pool{New: func() any { zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)); return zw }}}
_ writeCloserReset = (*zlib.Writer)(nil)
zLibPool = &compressor{pool: sync.Pool{New: func() any { return zlib.NewWriter(nil) }}}
)
Expand Down
91 changes: 91 additions & 0 deletions config/confighttp/compressor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package confighttp // import "go.opentelemetry.io/collector/config/confighttp"

import (
"bytes"
"fmt"
"io"
"strings"
"testing"

"github.com/klauspost/compress/zstd"

"go.opentelemetry.io/collector/config/configcompression"
)

func BenchmarkCompression(b *testing.B) {
benchmarks := []struct {
codec configcompression.Type
name string
function func(*testing.B, configcompression.Type, *bytes.Buffer, []byte)
}{
{
codec: configcompression.TypeZstd,
name: "zstdWithConcurrency",
function: benchmarkCompression,
},
{
codec: configcompression.TypeZstd,
name: "zstdNoConcurrency",
function: benchmarkCompressionNoConcurrency,
},
}
payload := make([]byte, 10<<20)
buffer := bytes.Buffer{}
buffer.Grow(len(payload))

ts := &bytes.Buffer{}
defer func() {
fmt.Printf("input => %.2f MB\n", float64(len(payload))/(1024*1024))
fmt.Println(ts)
}()

for i := range benchmarks {
benchmark := &benchmarks[i]
b.Run(fmt.Sprint(benchmark.name), func(b *testing.B) {
benchmark.function(b, benchmark.codec, &buffer, payload)
})

}
}

func benchmarkCompression(b *testing.B, _ configcompression.Type, buf *bytes.Buffer, payload []byte) {
// Enable concurrency

b.Run("compress", func(b *testing.B) {
stringReader := strings.NewReader(string(payload))
stringReadCloser := io.NopCloser(stringReader)
var enc io.Writer
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(payload)))
for i := 0; i < b.N; i++ {
enc, _ = zstd.NewWriter(nil, zstd.WithEncoderConcurrency(5))
enc.(writeCloserReset).Reset(buf)
_, copyErr := io.Copy(enc, stringReadCloser)
if copyErr != nil {
b.Fatal(copyErr)
}
}
})
}

func benchmarkCompressionNoConcurrency(b *testing.B, _ configcompression.Type, buf *bytes.Buffer, payload []byte) {
// Disable concurrency

b.Run("compress", func(b *testing.B) {
stringReader := strings.NewReader(string(payload))
stringReadCloser := io.NopCloser(stringReader)
var enc io.Writer
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(payload)))
for i := 0; i < b.N; i++ {
enc, _ = zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1))
enc.(writeCloserReset).Reset(buf)
_, copyErr := io.Copy(enc, stringReadCloser)
if copyErr != nil {
b.Fatal(copyErr)
}
}
})
}

0 comments on commit 962db5e

Please sign in to comment.