Skip to content

Commit

Permalink
0.4 (#438)
Browse files Browse the repository at this point in the history
* add protocol package

* add some documentation

* fix

* make ByteSequence more generic + add more benchmarks

* WIP: add support for record batches

* finish support for record batches

* add support for recort set compression

* backward-compatible compression codec imports

* fix compress tests

* make it possible for the transport to connect to multiple clusters + enhance kafka.Client to expose methods for creating and deleting topics

* support responding to metadata requests with cached response

* manage proper shutdown of client transport in tests

* WIP: test Produce API

* WIP: massive cleanup + track down CRC32 validation issue

* functional Produce and Fetch implementations

* add metadata request/response

* add listoffsets API

* expose cluster id and controller in metadata response

* remove bufio.Writer from the protocol API

* remove bufio.Reader from the protocol API

* add back deprecated Client methods

* fixes for kafka 0.10

* cleanup comment in protocol/record.go

* add more comments

* reduce size of bufio.Reader buffer on kafka connections

* refactor transport internals to support splitting requests and dispatching them across multiple brokers

* avoid contention on connection pool mutex in most cases

* cleanup

* add kafka.(*Client).MultiFetch API

* close records in produce request

* refactor record batch APIs to fully support streaming

* remove io.Closer from protocol.RecordBatch

* never return nil record batches

* record batch fixes

* remove unused variable

* fix reading of multiple topic partitions in produce and fetch messages

* alias compress.Compression in the kafka package

* expose compression constants in the kafka package

* exposes kafka.Request and kafka.Response interfaces

* simplify the protocol.Bytes interface

* simplify error management in protocol package

* wait for topic creation to propagate + fix request dispatching in multi-broker clusters

* simplify kafka.(*Client).CreateTopics API

* improve error handling + wait for metadata propagation after topic creation

* revisit connection pool implementation to remove multiplexing

* fix panic when referencing truncated page buffer

* fix unexpected EOF errors reading kafka messages

* revisit record reader API

* fix panic type asserting nil response into *metadata.Response

* optimize allocation of broker ids in cluster metadata

* unify sync.Pool usage

* reduce memory footprint of protocol.(*RecordSet).readFromVersion2

* fix panic accessing optimized record reader with a nil headers slice

* add APIs for marshaling and unmarshaling kafka values

* [skip ci] fix README example

* investigate-multi-fetch-issues

* remove MultiFetch API

* simplify protocol tests

* add benchmarks for kafka.Marshal and kafka.Unmarshal

* fix crash on cluster layout changes

* add more error codes

* remove partial support for flexible message format

* downgrade metadata test from v9 to v8

* test against kafka 2.5.0

* Update offsetfetch.go

Co-authored-by: Jeremy Jackins <jeremyjackins@gmail.com>

* Update offsetfetch.go

Co-authored-by: Jeremy Jackins <jeremyjackins@gmail.com>

* Update offsetfetch.go

Co-authored-by: Jeremy Jackins <jeremyjackins@gmail.com>

* fix typos

* fix more typos

* set pprof labels on transport goroutines (#458)

* change tests to run against 2.4.1 instead of 2.5.0

* support up to 2.3.1 (TestConn/nettest/PingPong fails with 2.4 and above)

* Update README.md

Co-authored-by: Steve van Loben Sels <steve@segment.com>

* Update client.go

Co-authored-by: Steve van Loben Sels <steve@segment.com>

* comment on why we devide the timeout by 2

* protocol.Reducer => protocol.Merger

* cleanup docker-compose.yml

* protocol.Mapper => protocol.Splitter

* propagate the caller's context to the dial function (#460)

* fix backward compatiblity with kafka-go v0.3.x

* fix record offsets when fetching messages with version 1

* default record timestamps to current timestamp

* revert changes to docker-compose.yml

* fix tests

* fix tests (2)

* 0.4: kafka.Writer (#461)

* 0.4: kafka.Writer

* update README

* disable some parallel tests

* disable global parallelism in tests

* fix typo

* disable parallelism in sub-packages tests

* properly seed random sources + delete test topics

* cleanup build

* run all tests

* fix tests

* enable more SASL mechanisms on CI

* try to fix the CI config

* try testing the sasl package with 2.3.1 only

* inline configuration for kafka 2.3.1 in CI

* fix zookeeper hostname in CI

* cleanup CI config

* keep the kafka 0.10 configuration separate + test against more kafka versions

* fix kafka 0.11 image tag

* try caching dependencies

* support multiple broker addresses

* uncomment max attempt test

* fix typos

* guard against empty kafka.MultiAddr in kafka.Transport

* don't export new APIs for network addresses + adapt to any multi-addr implementation

* add comment about the transport caching the metadata responses

* 0.4 fix tls address panic (#478)

* 0.4: fix panic when TLS is enabled

* 0.4: fix panic when establishing TLS connections

* cleanup

* Update transport_test.go

Co-authored-by: Steve van Loben Sels <steve@segment.com>

* validate that an error is returned

Co-authored-by: Steve van Loben Sels <steve@segment.com>

* 0.4: fix short writes (#479)

* 0.4: modify protocol.Bytes to expose the number of remaining bytes instead of the full size of the sequence (#485)

* modify protocol.Bytes to expose the number of remaining bytes instead of the full size of the sequence

* add test for pageRef.ReadByte + fix pageRef.scan

* reuse contiguousPages.scan

* fix(writer): set correct balancer (#489)

Sets the correct balancer as passed through in the config on the writer

Co-authored-by: Steve van Loben Sels <steve@segment.com>
Co-authored-by: Artur <artur.kronenberg@askattest.com>

* Fix for panic when RequiredAcks is set to RequireNone (#504)

* Fix panic in async wait() method when RequiredAcks is None

When RequiredAcks is None, the producer does not wait for a
response from the broker, therefore the response is nil.
The async wait() method was not handling this case, leading
to a panic.

* Add regression test for RequiredAcks == RequireNone

This new test is required because all the other Writer tests use
NewWriter() to create Writers, which sets RequiredAcks to
RequireAll when 0 (None) was specified.

* fix: writer test for RequiredAcks=None

* fix: writer tests for RequiredAcks=None (2)

* 0.4 broker resolver (#526)

* 0.4: kafka.BrokerResolver

* add kafka.Transport.Context

* inline network and address fields in conn type

* Fix sasl authentication on writer (#541)

The authenticateSASL was called before getting api version.
This resulted incorrect apiversion (0 instead of 1) when
calling saslHandshakeRoundTrip request

* Remove deprecated function (NewWriter) usages (#528)

* fix zstd decoder leak (#543)

* fix zstd decoder leak

* fix tests

* fix panic

* fix tests (2)

* fix tests (3)

* fix tests (4)

* move ConnWaitGroup to testing package

* fix zstd codec

* Update compress/zstd/zstd.go

Co-authored-by: Nicholas Sun <olassun2@gmail.com>

* PR feedback

Co-authored-by: Nicholas Sun <olassun2@gmail.com>

* improve custom resolver support by allowing port to be overridden (#545)

* 0.4: reduce memory footprint (#547)

* Bring over flexible message changes

* Add docker-compose config for kafka 2.4.1

* Misc. cleanups

* Add protocol tests and fix issues

* Misc. fixes; run circleci on v2.4.1

* Skip conntest for v2.4.1

* Disable nettests for kafka 2.4.1

* Revert formatting changes

* Misc. fixes

* Update comments

* Make create topics test more interesting

* feat(writer): add support for writing messages to multiple topics (#561)

* Add comments on failing nettests

* Fix spacing

* Update var int sizing

* Simplify writeVarInt implementation

* Revert encoding change

* Simplify varint encoding functions and expand tests

* Also test sizeOf functions in protocol test

* chore: merge master and resolve conflicts (#570)

Co-authored-by: Jeremy Jackins <jeremyjackins@gmail.com>
Co-authored-by: Steve van Loben Sels <steve@segment.com>
Co-authored-by: Artur <artur.kronenberg@askattest.com>
Co-authored-by: Neil Cook <neil.cook@noware.co.uk>
Co-authored-by: Ahmy Yulrizka <yulrizka@users.noreply.github.com>
Co-authored-by: Turfa Auliarachman <t.auliarachman@gmail.com>
Co-authored-by: Nicholas Sun <olassun2@gmail.com>
Co-authored-by: Dominic Barnes <dominic@segment.com>
Co-authored-by: Benjamin Yolken <benjamin.yolken@segment.com>
Co-authored-by: Benjamin Yolken <54862872+yolken-segment@users.noreply.github.com>
  • Loading branch information
11 people committed Dec 4, 2020
1 parent c66d8ca commit 0b1c833
Show file tree
Hide file tree
Showing 112 changed files with 13,404 additions and 1,522 deletions.
298 changes: 189 additions & 109 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,133 +1,213 @@
version: 2
jobs:
# The kafka 0.10 tests are maintained as a separate configuration because
# kafka only supported plain text SASL in this version.
kafka-010:
working_directory: /go/src/github.com/segmentio/kafka-go
working_directory: &working_directory /go/src/github.com/segmentio/kafka-go
environment:
KAFKA_VERSION: "0.10.1"
docker:
- image: circleci/golang
- image: wurstmeister/zookeeper
ports: ['2181:2181']
- image: wurstmeister/kafka:0.10.1.1
ports: ['9092:9092']
environment:
KAFKA_BROKER_ID: '1'
KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_ADVERTISED_HOST_NAME: 'localhost'
KAFKA_ADVERTISED_PORT: '9092'
KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN'
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
steps:
- checkout
- setup_remote_docker: { reusable: true, docker_layer_caching: true }
- run: go get -v -t . ./gzip ./lz4 ./sasl ./snappy
- run: go test -v -race -cover -timeout 150s . ./gzip ./lz4 ./sasl ./snappy
- image: circleci/golang
- image: wurstmeister/zookeeper
ports:
- 2181:2181
- image: wurstmeister/kafka:0.10.1.1
ports:
- 9092:9092
- 9093:9093
environment:
KAFKA_BROKER_ID: '1'
KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_ADVERTISED_HOST_NAME: 'localhost'
KAFKA_ADVERTISED_PORT: '9092'
KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_MESSAGE_MAX_BYTES: '200000000'
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN'
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
steps: &steps
- checkout
- restore_cache:
key: kafka-go-mod-{{ checksum "go.sum" }}-1
- run: go mod download
- save_cache:
key: kafka-go-mod-{{ checksum "go.sum" }}-1
paths:
- /go/pkg/mod
- run: go test -race -cover ./...

# Starting at version 0.11, the kafka features and configuration remained
# mostly stable, so we can use this CI job configuration as template for other
# versions as well.
kafka-011:
working_directory: /go/src/github.com/segmentio/kafka-go
working_directory: *working_directory
environment:
KAFKA_VERSION: "0.11.0"
docker:
- image: circleci/golang
- image: wurstmeister/zookeeper
ports: ['2181:2181']
- image: wurstmeister/kafka:2.11-0.11.0.3
ports: ['9092:9092','9093:9093']
environment:
KAFKA_BROKER_ID: '1'
KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_ADVERTISED_HOST_NAME: 'localhost'
KAFKA_ADVERTISED_PORT: '9092'
KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
/opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
steps:
- checkout
- setup_remote_docker: { reusable: true, docker_layer_caching: true }
- run: go get -v -t . ./gzip ./lz4 ./sasl ./snappy
- run: go test -v -race -cover -timeout 150s . ./gzip ./lz4 ./sasl ./snappy
- image: circleci/golang
- image: wurstmeister/zookeeper
ports:
- 2181:2181
- image: wurstmeister/kafka:2.11-0.11.0.3
ports:
- 9092:9092
- 9093:9093
environment: &environment
KAFKA_BROKER_ID: '1'
KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_ADVERTISED_HOST_NAME: 'localhost'
KAFKA_ADVERTISED_PORT: '9092'
KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_MESSAGE_MAX_BYTES: '200000000'
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
/opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
steps: *steps

kafka-101:
working_directory: *working_directory
environment:
KAFKA_VERSION: "1.0.1"
docker:
- image: circleci/golang
- image: wurstmeister/zookeeper
ports:
- 2181:2181
- image: wurstmeister/kafka:2.11-1.0.1
ports:
- 9092:9092
- 9093:9093
environment: *environment
steps: *steps

kafka-111:
working_directory: /go/src/github.com/segmentio/kafka-go
working_directory: *working_directory
environment:
KAFKA_VERSION: "1.1.1"
docker:
- image: circleci/golang
- image: wurstmeister/zookeeper
ports: ['2181:2181']
- image: wurstmeister/kafka:2.11-1.1.1
ports: ['9092:9092','9093:9093']
environment:
KAFKA_BROKER_ID: '1'
KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_ADVERTISED_HOST_NAME: 'localhost'
KAFKA_ADVERTISED_PORT: '9092'
KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
/opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
steps:
- checkout
- setup_remote_docker: { reusable: true, docker_layer_caching: true }
- run: go get -v -t . ./gzip ./lz4 ./sasl ./snappy
- run: go test -v -race -cover -timeout 150s . ./gzip ./lz4 ./sasl ./snappy
- image: circleci/golang
- image: wurstmeister/zookeeper
ports:
- 2181:2181
- image: wurstmeister/kafka:2.11-1.1.1
ports:
- 9092:9092
- 9093:9093
environment: *environment
steps: *steps

kafka-201:
working_directory: *working_directory
environment:
KAFKA_VERSION: "2.0.1"
docker:
- image: circleci/golang
- image: wurstmeister/zookeeper
ports:
- 2181:2181
- image: wurstmeister/kafka:2.12-2.0.1
ports:
- 9092:9092
- 9093:9093
environment: *environment
steps: *steps

kafka-211:
working_directory: *working_directory
environment:
KAFKA_VERSION: "2.1.1"
docker:
- image: circleci/golang
- image: wurstmeister/zookeeper
ports:
- 2181:2181
- image: wurstmeister/kafka:2.12-2.1.1
ports:
- 9092:9092
- 9093:9093
environment: *environment
steps: *steps

kafka-222:
working_directory: *working_directory
environment:
KAFKA_VERSION: "2.2.2"
docker:
- image: circleci/golang
- image: wurstmeister/zookeeper
ports:
- 2181:2181
- image: wurstmeister/kafka:2.12-2.2.2
ports:
- 9092:9092
- 9093:9093
environment: *environment
steps: *steps

kafka-210:
working_directory: /go/src/github.com/segmentio/kafka-go
kafka-231:
working_directory: *working_directory
environment:
KAFKA_VERSION: "2.1.0"
KAFKA_VERSION: "2.3.1"
docker:
- image: circleci/golang
- image: wurstmeister/zookeeper
ports:
- 2181:2181
- image: wurstmeister/kafka:2.12-2.3.1
ports:
- 9092:9092
- 9093:9093
environment: *environment
steps: *steps

kafka-241:
working_directory: *working_directory
environment:
KAFKA_VERSION: "2.4.1"

# Need to skip nettest to avoid these kinds of errors:
# --- FAIL: TestConn/nettest (17.56s)
# --- FAIL: TestConn/nettest/PingPong (7.40s)
# conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
# conntest.go:118: mismatching value: got 77, want 78
# conntest.go:118: mismatching value: got 78, want 79
# ...
#
# TODO: Figure out why these are happening and fix them (they don't appear to be new).
KAFKA_SKIP_NETTEST: "1"
docker:
- image: circleci/golang
- image: wurstmeister/zookeeper
ports: ['2181:2181']
- image: wurstmeister/kafka:2.12-2.1.0
ports: ['9092:9092','9093:9093']
environment:
KAFKA_BROKER_ID: '1'
KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_ADVERTISED_HOST_NAME: 'localhost'
KAFKA_ADVERTISED_PORT: '9092'
KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
/opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
steps:
- checkout
- setup_remote_docker: { reusable: true, docker_layer_caching: true }
- run: go get -v -t . ./gzip ./lz4 ./sasl ./snappy
- run: go test -v -race -cover -timeout 150s $(go list ./... | grep -v examples)
- image: circleci/golang
- image: wurstmeister/zookeeper
ports:
- 2181:2181
- image: wurstmeister/kafka:2.12-2.4.1
ports:
- 9092:9092
- 9093:9093
environment: *environment
steps: *steps

workflows:
version: 2
run:
jobs:
- kafka-010
- kafka-011
- kafka-111
- kafka-210
- kafka-010
- kafka-011
- kafka-101
- kafka-111
- kafka-201
- kafka-211
- kafka-222
- kafka-231
- kafka-241

0 comments on commit 0b1c833

Please sign in to comment.