Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump github.com/golang/protobuf from 1.4.3 to 1.5.1 #53

Closed
7 changes: 5 additions & 2 deletions Makefile
Expand Up @@ -61,11 +61,14 @@ manifests:
GOROOT=$(shell $(GO) env GOROOT) controller-gen $(CRD_OPTIONS) rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases


module_opt = module=github.com/cobalt77/kubecc

# Protobuf code generators
.PHONY: proto
proto:
protoc proto/types.proto --go_out=. --go-grpc_out=.
protoc proto/testpb.proto --go_out=. --go-grpc_out=.
protoc pkg/types/types.proto -I. --go_out=. --go_opt=$(module_opt) --go-grpc_out=. --go-grpc_opt=$(module_opt)
protoc internal/testutil/testutil.proto -I. --go_out=. --go_opt=$(module_opt) --go-grpc_out=. --go-grpc_opt=$(module_opt)
protoc pkg/metrics/metrics.proto -I. --go_out=. --go_opt=$(module_opt)

# Code generating, formatting, vetting
.PHONY: fmt vet generate
Expand Down
17 changes: 11 additions & 6 deletions api/v1alpha1/buildcluster_types.go
Expand Up @@ -16,6 +16,7 @@ type ComponentsSpec struct {
Agent AgentSpec `json:"agent"`
Scheduler SchedulerSpec `json:"scheduler,omitempty"` // +optional
Monitor MonitorSpec `json:"monitor,omitempty"` // +optional
Cache CacheSpec `json:"cache,omitempty"` // +optional
}

type IngressSpec struct {
Expand Down Expand Up @@ -52,8 +53,6 @@ type AgentSpec struct {
// +kubebuilder:default:="gcr.io/kubecc/agent:latest"
Image string `json:"image"` // +optional
AdditionalLabels map[string]string `json:"additionalLabels,omitempty"` // +optional
// +kubebuilder:default:=debug
LogLevel string `json:"logLevel"` // +optional
// +kubebuilder:default:=Always
ImagePullPolicy v1.PullPolicy `json:"imagePullPolicy"` // +optional
}
Expand All @@ -64,8 +63,6 @@ type SchedulerSpec struct {
// +kubebuilder:default:="gcr.io/kubecc/scheduler:latest"
Image string `json:"image"` // +optional
AdditionalLabels map[string]string `json:"additionalLabels,omitempty"` // +optional
// +kubebuilder:default:=debug
LogLevel string `json:"logLevel"` // +optional
// +kubebuilder:default:=Always
ImagePullPolicy v1.PullPolicy `json:"imagePullPolicy"` // +optional
}
Expand All @@ -76,8 +73,16 @@ type MonitorSpec struct {
// +kubebuilder:default:="gcr.io/kubecc/monitor:latest"
Image string `json:"image"` // +optional
AdditionalLabels map[string]string `json:"additionalLabels,omitempty"` // +optional
// +kubebuilder:default:=debug
LogLevel string `json:"logLevel"` // +optional
// +kubebuilder:default:=Always
ImagePullPolicy v1.PullPolicy `json:"imagePullPolicy"` // +optional
}

type CacheSpec struct {
NodeAffinity *v1.NodeAffinity `json:"nodeAffinity,omitempty"` // +optional
Resources v1.ResourceRequirements `json:"resources,omitempty"` // +optional
// +kubebuilder:default:="gcr.io/kubecc/monitor:latest"
Image string `json:"image"` // +optional
AdditionalLabels map[string]string `json:"additionalLabels,omitempty"` // +optional
// +kubebuilder:default:=Always
ImagePullPolicy v1.PullPolicy `json:"imagePullPolicy"` // +optional
}
Expand Down
29 changes: 29 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 4 additions & 13 deletions bake.hcl
@@ -1,29 +1,20 @@
group "default" {
targets = ["manager", "kubecc", "environment"]
}

target "manager" {
dockerfile = "images/manager/Dockerfile"
tags = ["gcr.io/kubecc/manager"]
platforms = ["linux/amd64"]
context = "."
cache-from = ["type=local,src=build/cache/manager"]
cache-to = ["type=local,dest=build/cache/manager"]
targets = ["kubecc", "environment"]
}

target "kubecc" {
dockerfile = "images/kubecc/Dockerfile"
tags = ["gcr.io/kubecc/kubecc"]
platforms = ["linux/amd64"]
platforms = ["linux/amd64", "linux/arm64"]
context = "."
cache-from = ["type=local,src=build/cache/kubecc"]
cache-to = ["type=local,dest=build/cache/kubecc"]
}

target "kubecc" {
target "environment" {
dockerfile = "images/environment/Dockerfile"
tags = ["gcr.io/kubecc/environment"]
platforms = ["linux/amd64"]
platforms = ["linux/amd64", "linux/arm64"]
context = "."
cache-from = ["type=local,src=build/cache/environment"]
cache-to = ["type=local,dest=build/cache/environment"]
Expand Down
22 changes: 7 additions & 15 deletions cmd/kcctl/commands/monitor.go
@@ -1,17 +1,15 @@
package commands

import (
"bytes"
"encoding/json"
"strings"

monitormetrics "github.com/cobalt77/kubecc/pkg/apps/monitor/metrics"
"github.com/cobalt77/kubecc/pkg/clients"
"github.com/cobalt77/kubecc/pkg/metrics"
"github.com/cobalt77/kubecc/pkg/servers"
"github.com/cobalt77/kubecc/pkg/types"
"github.com/cobalt77/kubecc/pkg/ui"
"github.com/spf13/cobra"
"github.com/tinylib/msgp/msgp"
)

// monitorCmd represents the monitor command.
Expand All @@ -20,19 +18,13 @@ var monitorCmd = &cobra.Command{
Short: "Commands to interact with the monitor",
}

func onValueChanged(tb *ui.TextBox) func(*monitormetrics.StoreContents) {
return func(contents *monitormetrics.StoreContents) {
func onValueChanged(tb *ui.TextBox) func(*metrics.StoreContents) {
return func(contents *metrics.StoreContents) {
printable := map[string]interface{}{}
for _, bucket := range contents.Buckets {
jsonContents := map[string]string{}
for k, v := range bucket.Data {
buf := new(bytes.Buffer)
_, err := msgp.UnmarshalAsJSON(buf, v)
if err != nil {
jsonContents[k] = "<error>"
} else {
jsonContents[k] = buf.String()
}
jsonContents[k] = v.String()
}
printable[bucket.Name] = jsonContents
}
Expand All @@ -56,11 +48,11 @@ var listenCmd = &cobra.Command{
if err != nil {
cliLog.Fatal(err)
}
client := types.NewExternalMonitorClient(cc)
listener := metrics.NewListener(cliContext, client)
client := types.NewMonitorClient(cc)
listener := clients.NewListener(cliContext, client)
tb := &ui.TextBox{}

listener.OnValueChanged(monitormetrics.MetaBucket, onValueChanged(tb)).
listener.OnValueChanged(metrics.MetaBucket, onValueChanged(tb)).
OrExpired(func() metrics.RetryOptions {
tb.SetText("-- KEY EXPIRED -- \n\n" + tb.Paragraph.Text)
return metrics.NoRetry
Expand Down
7 changes: 2 additions & 5 deletions cmd/kcctl/commands/root.go
Expand Up @@ -6,7 +6,6 @@ import (
"os"

"github.com/cobalt77/kubecc/internal/logkc"
"github.com/cobalt77/kubecc/internal/zapkc"
"github.com/cobalt77/kubecc/pkg/config"
"github.com/cobalt77/kubecc/pkg/identity"
"github.com/cobalt77/kubecc/pkg/meta"
Expand All @@ -24,10 +23,8 @@ var (

// rootCmd represents the base command when called without any subcommands.
var rootCmd = &cobra.Command{
Use: "kcctl",
Short: "A brief description of your application",
Long: fmt.Sprintf("%s\n%s", zapkc.Yellow.Add(logkc.BigAsciiTextColored), `
The kubecc CLI utility`),
Use: "kcctl",
Long: fmt.Sprintf("%s\n%s", logkc.BigAsciiTextColored, `The kubecc CLI utility`),
}

// Execute adds all child commands to the root command and sets flags appropriately.
Expand Down
38 changes: 22 additions & 16 deletions cmd/kcctl/commands/status.go
Expand Up @@ -2,18 +2,17 @@ package commands

import (
"context"
"io"

"github.com/cobalt77/kubecc/internal/logkc"
"github.com/cobalt77/kubecc/pkg/config"
"github.com/cobalt77/kubecc/pkg/clients"
"github.com/cobalt77/kubecc/pkg/identity"
"github.com/cobalt77/kubecc/pkg/meta"
"github.com/cobalt77/kubecc/pkg/metrics"
"github.com/cobalt77/kubecc/pkg/metrics/common"
"github.com/cobalt77/kubecc/pkg/servers"
"github.com/cobalt77/kubecc/pkg/types"
"github.com/cobalt77/kubecc/pkg/ui"
"github.com/spf13/cobra"
"go.uber.org/zap/zapcore"
)

// statusCmd represents the status command.
Expand All @@ -27,32 +26,39 @@ Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
Run: func(cmd *cobra.Command, args []string) {
conf := (&config.ConfigMapProvider{}).Load().Kcctl

cc, err := servers.Dial(cliContext, conf.MonitorAddress)
cc, err := servers.Dial(cliContext, cliConfig.MonitorAddress,
servers.WithTLS(!cliConfig.DisableTLS))
if err != nil {
cliLog.Fatal(err)
}
ctx := meta.NewContext(
meta.WithProvider(identity.Component, meta.WithValue(types.CLI)),
meta.WithProvider(identity.UUID),
meta.WithProvider(logkc.Logger, meta.WithValue(logkc.New(types.CLI,
logkc.WithWriter(io.Discard),
meta.WithProvider(logkc.Logger, meta.WithValue(logkc.New(
types.CLI,
logkc.WithLogLevel(zapcore.ErrorLevel),
))),
)
client := types.NewExternalMonitorClient(cc)
listener := metrics.NewListener(ctx, client)
client := types.NewMonitorClient(cc)
listener := clients.NewListener(ctx, client)
display := ui.NewStatusDisplay()

listener.OnProviderAdded(func(pctx context.Context, uuid string) {
display.AddAgent(pctx, uuid)
listener.OnValueChanged(uuid, func(qp *common.QueueParams) {
display.Update(uuid, qp)
info, err := client.Whois(ctx, &types.WhoisRequest{
UUID: uuid,
})
listener.OnValueChanged(uuid, func(qs *common.QueueStatus) {
display.Update(uuid, qs)
if err != nil {
return
}
if info.Component != types.Agent && info.Component != types.Consumerd {
return
}

display.AddAgent(pctx, info)
listener.OnValueChanged(uuid, func(qp *metrics.UsageLimits) {
display.Update(uuid, qp)
})
listener.OnValueChanged(uuid, func(ts *common.TaskStatus) {
listener.OnValueChanged(uuid, func(ts *metrics.TaskStatus) {
display.Update(uuid, ts)
})
<-pctx.Done()
Expand Down
22 changes: 5 additions & 17 deletions cmd/kubecc/components/agent/agent.go
@@ -1,8 +1,6 @@
package commands

import (
"net"

"github.com/cobalt77/kubecc/internal/logkc"
"github.com/cobalt77/kubecc/internal/sleep"
sleeptoolchain "github.com/cobalt77/kubecc/internal/sleep/toolchain"
Expand All @@ -13,6 +11,7 @@ import (
"github.com/cobalt77/kubecc/pkg/host"
"github.com/cobalt77/kubecc/pkg/identity"
"github.com/cobalt77/kubecc/pkg/meta"
"github.com/cobalt77/kubecc/pkg/metrics"
"github.com/cobalt77/kubecc/pkg/servers"
"github.com/cobalt77/kubecc/pkg/toolchains"
"github.com/cobalt77/kubecc/pkg/tracing"
Expand All @@ -38,12 +37,6 @@ func run(cmd *cobra.Command, args []string) {
)
lg := meta.Log(ctx)

srv := servers.NewServer(ctx)
listener, err := net.Listen("tcp", conf.ListenAddress)
if err != nil {
lg.With(zap.Error(err)).Fatalw("Error listening on socket")
}

schedulerCC, err := servers.Dial(ctx, conf.SchedulerAddress)
lg.With("address", schedulerCC.Target()).Info("Dialing scheduler")
if err != nil {
Expand All @@ -57,10 +50,10 @@ func run(cmd *cobra.Command, args []string) {
}

schedulerClient := types.NewSchedulerClient(schedulerCC)
monitorClient := types.NewInternalMonitorClient(monitorCC)
monitorClient := types.NewMonitorClient(monitorCC)

a := agent.NewAgentServer(ctx,
agent.WithUsageLimits(&types.UsageLimits{
agent.WithUsageLimits(&metrics.UsageLimits{
ConcurrentProcessLimit: int32(conf.UsageLimits.ConcurrentProcessLimit),
QueuePressureMultiplier: conf.UsageLimits.QueuePressureMultiplier,
QueueRejectMultiplier: conf.UsageLimits.QueueRejectMultiplier,
Expand All @@ -77,15 +70,10 @@ func run(cmd *cobra.Command, args []string) {
agent.WithSchedulerClient(schedulerClient),
agent.WithMonitorClient(monitorClient),
)
types.RegisterAgentServer(srv, a)
go a.StartMetricsProvider()

mgr := servers.NewStreamManager(ctx, a)
go mgr.Run()
go a.StartMetricsProvider()
err = srv.Serve(listener)
if err != nil {
lg.With(zap.Error(err)).Error("GRPC error")
}
mgr.Run()
}

var Command = &cobra.Command{
Expand Down
16 changes: 9 additions & 7 deletions cmd/kubecc/components/consumerd/consumerd.go
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cobalt77/kubecc/pkg/host"
"github.com/cobalt77/kubecc/pkg/identity"
"github.com/cobalt77/kubecc/pkg/meta"
"github.com/cobalt77/kubecc/pkg/metrics"
"github.com/cobalt77/kubecc/pkg/servers"
"github.com/cobalt77/kubecc/pkg/toolchains"
"github.com/cobalt77/kubecc/pkg/tracing"
Expand Down Expand Up @@ -46,17 +47,18 @@ func run(cmd *cobra.Command, args []string) {
lg.With(zap.Error(err)).Fatal("Error dialing scheduler")
}

monitorCC, err := servers.Dial(ctx, conf.MonitorAddress)
monitorCC, err := servers.Dial(ctx, conf.MonitorAddress,
servers.WithTLS(!conf.DisableTLS))
lg.With("address", monitorCC.Target()).Info("Dialing monitor")
if err != nil {
lg.With(zap.Error(err)).Fatal("Error dialing monitor")
}

schedulerClient := types.NewSchedulerClient(schedulerCC)
monitorClient := types.NewInternalMonitorClient(monitorCC)
monitorClient := types.NewMonitorClient(monitorCC)

d := consumerd.NewConsumerdServer(ctx,
consumerd.WithUsageLimits(&types.UsageLimits{
consumerd.WithUsageLimits(&metrics.UsageLimits{
ConcurrentProcessLimit: int32(conf.UsageLimits.ConcurrentProcessLimit),
QueuePressureMultiplier: conf.UsageLimits.QueuePressureMultiplier,
QueueRejectMultiplier: conf.UsageLimits.QueueRejectMultiplier,
Expand All @@ -70,13 +72,13 @@ func run(cmd *cobra.Command, args []string) {
},
),
consumerd.WithToolchainRunners(cctoolchain.AddToStore, sleeptoolchain.AddToStore),
consumerd.WithSchedulerClient(schedulerClient, schedulerCC),
consumerd.WithSchedulerClient(schedulerClient),
consumerd.WithMonitorClient(monitorClient),
)

mgr := servers.NewStreamManager(ctx, d)
go d.StartMetricsProvider()
go mgr.Run()
// mgr := servers.NewStreamManager(ctx, d)
// go d.StartMetricsProvider()
// go mgr.Run()

listener, err := net.Listen("tcp", conf.ListenAddress)
if err != nil {
Expand Down