Skip to content

Commit

Permalink
Merge pull request #91 from Revolyssup/noti
Browse files Browse the repository at this point in the history
Enhance notification centre with better event handling
  • Loading branch information
Revolyssup committed Aug 26, 2022
2 parents c5fb562 + 922ec74 commit 7f8bd73
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 115 deletions.
15 changes: 5 additions & 10 deletions adapter/adapter.go
Expand Up @@ -24,16 +24,17 @@ import (
"github.com/layer5io/meshery-adapter-library/meshes"
meshkitCfg "github.com/layer5io/meshkit/config"
"github.com/layer5io/meshkit/logger"
"github.com/layer5io/meshkit/utils/events"
)

// Interface Handler is extended by adapters, and used in package api/grpc that implements the MeshServiceServer.
type Handler interface {
GetName() string // Returns the name of the adapter.
GetComponentInfo(interface{}) error // Returns the component info.
// CreateInstance(*chan interface{}) error // Instantiates clients used in deploying and managing mesh instances, e.g. Kubernetes clients.
ApplyOperation(context.Context, OperationRequest, *chan interface{}) error // Applies an adapter operation. This is adapter specific and needs to be implemented by each adapter.
ListOperations() (Operations, error) // List all operations an adapter supports.
ProcessOAM(ctx context.Context, srv OAMRequest, hchan *chan interface{}) (string, error)
ApplyOperation(context.Context, OperationRequest) error // Applies an adapter operation. This is adapter specific and needs to be implemented by each adapter.
ListOperations() (Operations, error) // List all operations an adapter supports.
ProcessOAM(ctx context.Context, srv OAMRequest) (string, error)

// Need not implement this method and can be reused
StreamErr(*meshes.EventsResponse, error) // Streams an error event, e.g. to a channel
Expand All @@ -46,12 +47,6 @@ type Adapter struct {
Config meshkitCfg.Handler
KubeconfigHandler meshkitCfg.Handler
Log logger.Handler
Channel *chan interface{}
EventStreamer *events.EventStreamer
mx sync.Mutex
}

func (h *Adapter) SetChannel(hchan *chan interface{}) {
h.mx.Lock()
defer h.mx.Unlock()
h.Channel = hchan
}
8 changes: 4 additions & 4 deletions adapter/logger.go
Expand Up @@ -57,19 +57,19 @@ func (s *adapterLogger) GetComponentInfo(svc interface{}) error {
// return err
// }

func (s *adapterLogger) ApplyOperation(ctx context.Context, op OperationRequest, hchan *chan interface{}) error {
func (s *adapterLogger) ApplyOperation(ctx context.Context, op OperationRequest) error {
s.log.Info("Applying operation ", op.OperationName)
err := s.next.ApplyOperation(ctx, op, hchan)
err := s.next.ApplyOperation(ctx, op)
if err != nil {
s.log.Error(err)
}
return err
}

// ProcessOAM wraps the Handler's ProcessOAM method along with relevant logging
func (s *adapterLogger) ProcessOAM(ctx context.Context, oamRequest OAMRequest, hchan *chan interface{}) (string, error) {
func (s *adapterLogger) ProcessOAM(ctx context.Context, oamRequest OAMRequest) (string, error) {
s.log.Info("Process OAM components")
msg, err := s.next.ProcessOAM(ctx, oamRequest, hchan)
msg, err := s.next.ProcessOAM(ctx, oamRequest)
if err != nil {
s.log.Error(err)
}
Expand Down
14 changes: 12 additions & 2 deletions adapter/stream.go
Expand Up @@ -19,11 +19,21 @@ import "github.com/layer5io/meshery-adapter-library/meshes"
func (h *Adapter) StreamErr(e *meshes.EventsResponse, err error) {
h.Log.Error(err)
e.EventType = 2
*h.Channel <- e
//Putting this under a go routine so that this function is never blocking. If this push is performed synchronously then the call will be blocking in case
//when the channel is full with no client to recieve the events. This blocking may cause many operations to not return.
go func() {
h.EventStreamer.Publish(e)
h.Log.Info("Event stored and sent successfully")
}()
}

func (h *Adapter) StreamInfo(e *meshes.EventsResponse) {
h.Log.Info("Sending event")
e.EventType = 0
*h.Channel <- e
//Putting this under a go routine so that this function is never blocking. If this push is performed synchronously then the call will be blocking in case
//when the channel is full with no client to recieve the events. This blocking may cause many operations to not return.
go func() {
h.EventStreamer.Publish(e)
h.Log.Info("Event stored and sent successfully")
}()
}
5 changes: 3 additions & 2 deletions api/grpc/grpc.go
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/layer5io/meshery-adapter-library/adapter"
"github.com/layer5io/meshery-adapter-library/api/tracing"
"github.com/layer5io/meshery-adapter-library/meshes"
"github.com/layer5io/meshkit/utils/events"

"fmt"

Expand All @@ -50,8 +51,8 @@ type Service struct {
StartedAt time.Time `json:"startedat"`
TraceURL string `json:"traceurl"`

Handler adapter.Handler
Channel chan interface{}
Handler adapter.Handler
EventStreamer *events.EventStreamer

meshes.UnimplementedMeshServiceServer
}
Expand Down
13 changes: 6 additions & 7 deletions api/grpc/handlers.go
Expand Up @@ -59,7 +59,7 @@ func (s *Service) ApplyOperation(ctx context.Context, req *meshes.ApplyRuleReque
OperationID: req.OperationId,
K8sConfigs: req.KubeConfigs,
}
err := s.Handler.ApplyOperation(ctx, operation, &s.Channel)
err := s.Handler.ApplyOperation(ctx, operation)
if err != nil {
return &meshes.ApplyRuleResponse{
Error: err.Error(),
Expand Down Expand Up @@ -97,8 +97,10 @@ func (s *Service) SupportedOperations(ctx context.Context, req *meshes.Supported

// StreamEvents is the handler function for the method StreamEvents.
func (s *Service) StreamEvents(ctx *meshes.EventsRequest, srv meshes.MeshService_StreamEventsServer) error {
clientchan := make(chan interface{}, 10)
go s.EventStreamer.Subscribe(clientchan)
for {
data := <-s.Channel
data := <-clientchan
event := &meshes.EventsResponse{
OperationId: data.(*meshes.EventsResponse).OperationId,
EventType: meshes.EventType(data.(*meshes.EventsResponse).EventType),
Expand All @@ -110,11 +112,8 @@ func (s *Service) StreamEvents(ctx *meshes.EventsRequest, srv meshes.MeshService
Component: data.(*meshes.EventsResponse).Component,
ComponentName: data.(*meshes.EventsResponse).ComponentName,
}

if err := srv.Send(event); err != nil {
// to prevent loosing the event, will re-add to the channel
go func() {
s.Channel <- data
}()
return err
}
time.Sleep(500 * time.Millisecond)
Expand All @@ -131,7 +130,7 @@ func (s *Service) ProcessOAM(ctx context.Context, srv *meshes.ProcessOAMRequest)
K8sConfigs: srv.KubeConfigs,
}

msg, err := s.Handler.ProcessOAM(ctx, operation, &s.Channel)
msg, err := s.Handler.ProcessOAM(ctx, operation)
return &meshes.ProcessOAMResponse{Message: msg}, err
}

Expand Down
11 changes: 7 additions & 4 deletions go.mod
Expand Up @@ -14,19 +14,20 @@ require (
github.com/cenkalti/backoff/v4 v4.1.2
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/layer5io/learn-layer5/smi-conformance v0.0.0-20210317075357-06b4f88b3e34
github.com/layer5io/meshkit v0.5.16
github.com/layer5io/meshkit v0.5.37
github.com/layer5io/service-mesh-performance v0.3.4
github.com/spf13/viper v1.11.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc v0.11.0
go.opentelemetry.io/otel v1.3.0
go.opentelemetry.io/otel/exporters/trace/jaeger v0.11.0
go.opentelemetry.io/otel/sdk v1.3.0
golang.org/x/text v0.3.7
google.golang.org/grpc v1.45.0
google.golang.org/grpc v1.46.0
google.golang.org/protobuf v1.28.0
)

require (
cuelang.org/go v0.4.3 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/BurntSushi/toml v0.4.1 // indirect
github.com/MakeNowJust/heredoc v0.0.0-20170808103936-bb23615498cd // indirect
Expand All @@ -41,6 +42,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/chai2010/gettext-go v0.0.0-20160711120539-c6fed771bfd5 // indirect
github.com/cockroachdb/apd/v2 v2.0.1 // indirect
github.com/containerd/containerd v1.6.1 // indirect
github.com/cyphar/filepath-securejoin v0.2.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -64,7 +66,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.3.0 // indirect
Expand Down Expand Up @@ -101,6 +103,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/mpvl/unique v0.0.0-20150818121801-cbe035fff7de // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
Expand Down Expand Up @@ -137,7 +140,7 @@ require (
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
google.golang.org/api v0.74.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac // indirect
google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 // indirect
gopkg.in/gorp.v1 v1.7.2 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.66.4 // indirect
Expand Down

0 comments on commit 7f8bd73

Please sign in to comment.