Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance notification centre with better event handling #91

Merged
merged 5 commits into from Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 5 additions & 10 deletions adapter/adapter.go
Expand Up @@ -24,16 +24,17 @@ import (
"github.com/layer5io/meshery-adapter-library/meshes"
meshkitCfg "github.com/layer5io/meshkit/config"
"github.com/layer5io/meshkit/logger"
"github.com/layer5io/meshkit/utils/events"
)

// Interface Handler is extended by adapters, and used in package api/grpc that implements the MeshServiceServer.
type Handler interface {
GetName() string // Returns the name of the adapter.
GetComponentInfo(interface{}) error // Returns the component info.
// CreateInstance(*chan interface{}) error // Instantiates clients used in deploying and managing mesh instances, e.g. Kubernetes clients.
ApplyOperation(context.Context, OperationRequest, *chan interface{}) error // Applies an adapter operation. This is adapter specific and needs to be implemented by each adapter.
ListOperations() (Operations, error) // List all operations an adapter supports.
ProcessOAM(ctx context.Context, srv OAMRequest, hchan *chan interface{}) (string, error)
ApplyOperation(context.Context, OperationRequest) error // Applies an adapter operation. This is adapter specific and needs to be implemented by each adapter.
ListOperations() (Operations, error) // List all operations an adapter supports.
ProcessOAM(ctx context.Context, srv OAMRequest) (string, error)

// Need not implement this method and can be reused
StreamErr(*meshes.EventsResponse, error) // Streams an error event, e.g. to a channel
Expand All @@ -46,12 +47,6 @@ type Adapter struct {
Config meshkitCfg.Handler
KubeconfigHandler meshkitCfg.Handler
Log logger.Handler
Channel *chan interface{}
EventStreamer *events.EventStreamer
mx sync.Mutex
}

func (h *Adapter) SetChannel(hchan *chan interface{}) {
h.mx.Lock()
defer h.mx.Unlock()
h.Channel = hchan
}
8 changes: 4 additions & 4 deletions adapter/logger.go
Expand Up @@ -57,19 +57,19 @@ func (s *adapterLogger) GetComponentInfo(svc interface{}) error {
// return err
// }

func (s *adapterLogger) ApplyOperation(ctx context.Context, op OperationRequest, hchan *chan interface{}) error {
func (s *adapterLogger) ApplyOperation(ctx context.Context, op OperationRequest) error {
s.log.Info("Applying operation ", op.OperationName)
err := s.next.ApplyOperation(ctx, op, hchan)
err := s.next.ApplyOperation(ctx, op)
if err != nil {
s.log.Error(err)
}
return err
}

// ProcessOAM wraps the Handler's ProcessOAM method along with relevant logging
func (s *adapterLogger) ProcessOAM(ctx context.Context, oamRequest OAMRequest, hchan *chan interface{}) (string, error) {
func (s *adapterLogger) ProcessOAM(ctx context.Context, oamRequest OAMRequest) (string, error) {
s.log.Info("Process OAM components")
msg, err := s.next.ProcessOAM(ctx, oamRequest, hchan)
msg, err := s.next.ProcessOAM(ctx, oamRequest)
if err != nil {
s.log.Error(err)
}
Expand Down
14 changes: 12 additions & 2 deletions adapter/stream.go
Expand Up @@ -19,11 +19,21 @@ import "github.com/layer5io/meshery-adapter-library/meshes"
func (h *Adapter) StreamErr(e *meshes.EventsResponse, err error) {
h.Log.Error(err)
e.EventType = 2
*h.Channel <- e
//Putting this under a go routine so that this function is never blocking. If this push is performed synchronously then the call will be blocking in case
//when the channel is full with no client to recieve the events. This blocking may cause many operations to not return.
go func() {
h.EventStreamer.Publish(e)
h.Log.Info("Event stored and sent successfully")
}()
}

func (h *Adapter) StreamInfo(e *meshes.EventsResponse) {
h.Log.Info("Sending event")
e.EventType = 0
*h.Channel <- e
//Putting this under a go routine so that this function is never blocking. If this push is performed synchronously then the call will be blocking in case
//when the channel is full with no client to recieve the events. This blocking may cause many operations to not return.
go func() {
h.EventStreamer.Publish(e)
h.Log.Info("Event stored and sent successfully")
}()
}
5 changes: 3 additions & 2 deletions api/grpc/grpc.go
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/layer5io/meshery-adapter-library/adapter"
"github.com/layer5io/meshery-adapter-library/api/tracing"
"github.com/layer5io/meshery-adapter-library/meshes"
"github.com/layer5io/meshkit/utils/events"

"fmt"

Expand All @@ -50,8 +51,8 @@ type Service struct {
StartedAt time.Time `json:"startedat"`
TraceURL string `json:"traceurl"`

Handler adapter.Handler
Channel chan interface{}
Handler adapter.Handler
EventStreamer *events.EventStreamer

meshes.UnimplementedMeshServiceServer
}
Expand Down
13 changes: 6 additions & 7 deletions api/grpc/handlers.go
Expand Up @@ -59,7 +59,7 @@ func (s *Service) ApplyOperation(ctx context.Context, req *meshes.ApplyRuleReque
OperationID: req.OperationId,
K8sConfigs: req.KubeConfigs,
}
err := s.Handler.ApplyOperation(ctx, operation, &s.Channel)
err := s.Handler.ApplyOperation(ctx, operation)
if err != nil {
return &meshes.ApplyRuleResponse{
Error: err.Error(),
Expand Down Expand Up @@ -97,8 +97,10 @@ func (s *Service) SupportedOperations(ctx context.Context, req *meshes.Supported

// StreamEvents is the handler function for the method StreamEvents.
func (s *Service) StreamEvents(ctx *meshes.EventsRequest, srv meshes.MeshService_StreamEventsServer) error {
clientchan := make(chan interface{}, 10)
go s.EventStreamer.Subscribe(clientchan)
for {
data := <-s.Channel
data := <-clientchan
event := &meshes.EventsResponse{
OperationId: data.(*meshes.EventsResponse).OperationId,
EventType: meshes.EventType(data.(*meshes.EventsResponse).EventType),
Expand All @@ -110,11 +112,8 @@ func (s *Service) StreamEvents(ctx *meshes.EventsRequest, srv meshes.MeshService
Component: data.(*meshes.EventsResponse).Component,
ComponentName: data.(*meshes.EventsResponse).ComponentName,
}

if err := srv.Send(event); err != nil {
// to prevent loosing the event, will re-add to the channel
go func() {
s.Channel <- data
}()
return err
}
time.Sleep(500 * time.Millisecond)
Expand All @@ -131,7 +130,7 @@ func (s *Service) ProcessOAM(ctx context.Context, srv *meshes.ProcessOAMRequest)
K8sConfigs: srv.KubeConfigs,
}

msg, err := s.Handler.ProcessOAM(ctx, operation, &s.Channel)
msg, err := s.Handler.ProcessOAM(ctx, operation)
return &meshes.ProcessOAMResponse{Message: msg}, err
}

Expand Down
11 changes: 7 additions & 4 deletions go.mod
Expand Up @@ -14,19 +14,20 @@ require (
github.com/cenkalti/backoff/v4 v4.1.2
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/layer5io/learn-layer5/smi-conformance v0.0.0-20210317075357-06b4f88b3e34
github.com/layer5io/meshkit v0.5.16
github.com/layer5io/meshkit v0.5.37
github.com/layer5io/service-mesh-performance v0.3.4
github.com/spf13/viper v1.11.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc v0.11.0
go.opentelemetry.io/otel v1.3.0
go.opentelemetry.io/otel/exporters/trace/jaeger v0.11.0
go.opentelemetry.io/otel/sdk v1.3.0
golang.org/x/text v0.3.7
google.golang.org/grpc v1.45.0
google.golang.org/grpc v1.46.0
google.golang.org/protobuf v1.28.0
)

require (
cuelang.org/go v0.4.3 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/BurntSushi/toml v0.4.1 // indirect
github.com/MakeNowJust/heredoc v0.0.0-20170808103936-bb23615498cd // indirect
Expand All @@ -41,6 +42,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/chai2010/gettext-go v0.0.0-20160711120539-c6fed771bfd5 // indirect
github.com/cockroachdb/apd/v2 v2.0.1 // indirect
github.com/containerd/containerd v1.6.1 // indirect
github.com/cyphar/filepath-securejoin v0.2.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -64,7 +66,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.3.0 // indirect
Expand Down Expand Up @@ -101,6 +103,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/mpvl/unique v0.0.0-20150818121801-cbe035fff7de // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
Expand Down Expand Up @@ -137,7 +140,7 @@ require (
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
google.golang.org/api v0.74.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac // indirect
google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 // indirect
gopkg.in/gorp.v1 v1.7.2 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.66.4 // indirect
Expand Down