Skip to content

Commit

Permalink
Upgrade and fix kafka consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
huuhait committed Aug 7, 2023
1 parent 01fdbd8 commit c5a3e18
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 471 deletions.
11 changes: 6 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.18.1-alpine AS builder
FROM golang:1.20.5-alpine AS builder

RUN apk add --no-cache curl git

Expand All @@ -8,9 +8,10 @@ RUN curl -Lo /usr/bin/kaigara https://github.com/openware/kaigara/releases/downl
&& chmod +x /usr/bin/kaigara

WORKDIR /build
ENV CGO_ENABLED=1 \
GOOS=linux \
GOARCH=amd64
ENV GO111MODULE=on \
CGO_ENABLED=0 \
GOARCH="amd64" \
GOOS=linux

COPY go.mod go.sum ./
RUN go mod download
Expand All @@ -19,7 +20,7 @@ COPY . .
RUN go build ./cmd/rango


FROM alpine:3.9
FROM alpine

RUN apk add ca-certificates
WORKDIR app
Expand Down
62 changes: 27 additions & 35 deletions cmd/rango/rango.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"crypto/rsa"
"flag"
"fmt"
Expand All @@ -10,12 +11,10 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/twmb/franz-go/pkg/kgo"

"github.com/zsmartex/pkg/services"
"github.com/zsmartex/pkg/v2/log"
"github.com/zsmartex/pkg/v2/utils"
"github.com/zsmartex/rango/config"
"github.com/zsmartex/rango/pkg/auth"
"github.com/zsmartex/rango/pkg/metrics"
"github.com/zsmartex/rango/pkg/routing"
Expand Down Expand Up @@ -57,23 +56,7 @@ func authHandler(h httpHanlder, key *rsa.PublicKey, mustAuth bool) httpHanlder {
r.Header.Del("JwtRole")
}
h(w, r)
return
}
}

func setupLogger() {
logLevel, ok := os.LookupEnv("LOG_LEVEL")
if ok {
level, err := zerolog.ParseLevel(strings.ToLower(logLevel))
if err != nil {
panic(err)
}

zerolog.SetGlobalLevel(level)
return
}

zerolog.SetGlobalLevel(zerolog.DebugLevel)
}

func getPublicKey() (pub *rsa.PublicKey, err error) {
Expand Down Expand Up @@ -146,46 +129,55 @@ func filterPrefixed(prefix string, arr []string) []string {
}

func main() {
log.New(os.Getenv("APP_NAME"))
flag.Parse()

setupLogger()

metrics.Enable()

ctx := context.Background()
rbac := getRBACConfig()
hub := routing.NewHub(rbac)
pub, err := getPublicKey()
if err != nil {
log.Error().Msgf("Loading public key failed: %s", err.Error())
log.Errorf("Loading public key failed: %s", err.Error())
time.Sleep(2 * time.Second)
return
}

kafka_brokers := strings.Split(os.Getenv("KAFKA_BROKERS"), ",")
consumer, err := services.NewKafkaConsumer(kafka_brokers, fmt.Sprintf("rango-%s", utils.RandomString(10)), []string{*exName})
kafkaBrokers := strings.Split(os.Getenv("KAFKA_BROKERS"), ",")
seeds := kgo.SeedBrokers(kafkaBrokers...)
client, err := kgo.NewClient(
seeds,
kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()),
kgo.AllowAutoTopicCreation(),
kgo.ConsumerGroup(fmt.Sprintf("rango-%s", utils.RandomString(10))),
kgo.ConsumeTopics(*exName),
kgo.DisableAutoCommit(),
)
if err != nil {
log.Error().Msgf("Failed to create consumer: %s", err.Error())
log.Errorf("Failed to create consumer: %s", err.Error())
return
}

log.Info().Msg("Starting rango...")
log.Info("Starting rango...")

go func() {
for {
records, err := consumer.Poll()
if err != nil {
config.Logger.Fatalf("Failed to poll consumer %v", err)
fetches := client.PollRecords(ctx, -1)
if err := fetches.Err(); err != nil {
log.Fatalf("Failed to poll consumer %v", err)
continue
}

records := fetches.Records()
for _, r := range records {
hub.ReceiveMsg(r)

consumer.CommitRecords(*r)
client.CommitRecords(ctx, r)
}
}
}()

defer consumer.Client.Close()
defer client.Close()

go hub.ListenWebsocketEvents()

Expand All @@ -199,9 +191,9 @@ func main() {

go http.ListenAndServe(":4242", promhttp.Handler())

log.Printf("Listenning on %s", getServerAddress())
log.Infof("Listenning on %s", getServerAddress())
err = http.ListenAndServe(getServerAddress(), nil)
if err != nil {
log.Fatal().Msg("ListenAndServe failed: " + err.Error())
log.Fatalf("ListenAndServe failed: " + err.Error())
}
}
12 changes: 0 additions & 12 deletions config/config.go

This file was deleted.

92 changes: 23 additions & 69 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,94 +1,48 @@
module github.com/zsmartex/rango

go 1.18
go 1.20

require (
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/google/uuid v1.3.0
github.com/gorilla/websocket v1.4.2
github.com/prometheus/client_golang v1.6.0
github.com/rs/zerolog v1.18.0
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
github.com/twmb/franz-go v1.6.0
github.com/zsmartex/pkg v1.3.59
github.com/zsmartex/pkg/v2 v2.1.19
github.com/stretchr/testify v1.8.0
github.com/twmb/franz-go v1.8.0
github.com/zsmartex/pkg/v2 v2.1.20-0.20230703235735-738e75768163
)

require (
github.com/armon/go-metrics v0.3.9 // indirect
github.com/armon/go-radix v1.0.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v3 v3.0.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fatih/color v1.7.0 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-hclog v0.16.2 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-plugin v1.4.3 // indirect
github.com/hashicorp/go-retryablehttp v0.6.6 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/go-secure-stdlib/mlock v0.1.1 // indirect
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.1 // indirect
github.com/hashicorp/go-secure-stdlib/strutil v0.1.1 // indirect
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/go-version v1.2.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/vault/api v1.4.1 // indirect
github.com/hashicorp/vault/sdk v0.4.1 // indirect
github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb // indirect
github.com/google/go-cmp v0.5.6 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.10.1 // indirect
github.com/jackc/pgconn v1.13.0 // indirect
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.2.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgtype v1.9.1 // indirect
github.com/jackc/pgx/v4 v4.14.1 // indirect
github.com/jackc/pgproto3/v2 v2.3.1 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.4 // indirect
github.com/klauspost/compress v1.15.4 // indirect
github.com/mattn/go-colorable v0.1.6 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/kr/pretty v0.2.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/copystructure v1.0.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/go-testing-interface v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.4.2 // indirect
github.com/mitchellh/reflectwalk v1.0.0 // indirect
github.com/oklog/run v1.0.0 // indirect
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
github.com/pierrec/lz4/v4 v4.1.14 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.9.1 // indirect
github.com/prometheus/procfs v0.0.11 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/stretchr/objx v0.2.0 // indirect
github.com/twmb/franz-go/pkg/kadm v1.0.0 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.1.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/crypto v0.0.0-20220518034528-6f7dac969898 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
google.golang.org/grpc v1.47.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/square/go-jose.v2 v2.5.1 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
gorm.io/driver/postgres v1.3.1 // indirect
gorm.io/gorm v1.23.3 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/stretchr/objx v0.4.0 // indirect
github.com/twmb/franz-go/pkg/kadm v1.2.1 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.2.0 // indirect
golang.org/x/crypto v0.8.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gorm.io/gorm v1.25.2 // indirect
)

0 comments on commit c5a3e18

Please sign in to comment.