Skip to content

Commit

Permalink
Extend remote config watch with context and callback
Browse files Browse the repository at this point in the history
  • Loading branch information
cavus700 committed Apr 5, 2024
1 parent 947eb59 commit ba872b6
Showing 1 changed file with 44 additions and 8 deletions.
52 changes: 44 additions & 8 deletions viper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package viper

import (
"bytes"
"context"
"encoding/csv"
"errors"
"fmt"
Expand Down Expand Up @@ -143,6 +144,16 @@ func DecodeHook(hook mapstructure.DecodeHookFunc) DecoderConfigOption {
}
}

type RemoteConfigEvent uint

const (
RemoteConfigEvent_Unknown RemoteConfigEvent = iota
// Remote config was updated
RemoteConfigEvent_Updated
// Remote config watch routine stopped
RemoteConfigEvent_Stopped
)

// Viper is a prioritized configuration registry. It
// maintains a set of configuration sources, fetches
// values to populate those, and provides them according
Expand Down Expand Up @@ -217,7 +228,8 @@ type Viper struct {
aliases map[string]string
typeByDefValue bool

onConfigChange func(fsnotify.Event)
onConfigChange func(fsnotify.Event)
onRemoteConfigChange func(RemoteConfigEvent)

logger *slog.Logger

Expand Down Expand Up @@ -432,6 +444,14 @@ func (v *Viper) OnConfigChange(run func(in fsnotify.Event)) {
v.onConfigChange = run
}

// OnRemoteConfigChange sets the event handler that is called when a remote config file changes.
func OnRemoteConfigChange(run func(RemoteConfigEvent)) { v.OnRemoteConfigChange(run) }

// OnRemoteConfigChange sets the event handler that is called when a remote config file changes.
func (v *Viper) OnRemoteConfigChange(run func(RemoteConfigEvent)) {
v.onRemoteConfigChange = run
}

// WatchConfig starts watching a config file for changes.
func WatchConfig() { v.WatchConfig() }

Expand Down Expand Up @@ -1973,7 +1993,11 @@ func (v *Viper) WatchRemoteConfig() error {
}

func (v *Viper) WatchRemoteConfigOnChannel() error {
return v.watchKeyValueConfigOnChannel()
return v.watchKeyValueConfigOnChannelWithContext(context.TODO())
}

func (v *Viper) WatchRemoteConfigOnChannelWithContext(ctx context.Context) error {
return v.watchKeyValueConfigOnChannelWithContext(ctx)
}

// Retrieve the first found remote configuration.
Expand Down Expand Up @@ -2010,20 +2034,32 @@ func (v *Viper) getRemoteConfig(provider RemoteProvider) (map[string]any, error)
return v.kvstore, err
}

// Retrieve the first found remote configuration.
func (v *Viper) watchKeyValueConfigOnChannel() error {
// Watch the first found remote configuration.
func (v *Viper) watchKeyValueConfigOnChannelWithContext(ctx context.Context) error {
if len(v.remoteProviders) == 0 {
return RemoteConfigError("No Remote Providers")
}

for _, rp := range v.remoteProviders {
respc, _ := RemoteConfig.WatchChannel(rp)
// Todo: Add quit channel
go func(rc <-chan *RemoteResponse) {
for {
b := <-rc
reader := bytes.NewReader(b.Value)
v.unmarshalReader(reader, v.kvstore)
select {
case b := <-rc:
reader := bytes.NewReader(b.Value)
if err := v.unmarshalReader(reader, v.kvstore); err != nil {
v.logger.Error(fmt.Errorf("unmarshal remote config update: %w", err).Error())
continue
}
if v.onRemoteConfigChange != nil {
v.onRemoteConfigChange(RemoteConfigEvent_Updated)
}
case <-ctx.Done():
if v.onRemoteConfigChange != nil {
v.onRemoteConfigChange(RemoteConfigEvent_Stopped)
}
return
}
}
}(respc)
return nil
Expand Down

0 comments on commit ba872b6

Please sign in to comment.