Skip to content

Commit

Permalink
Fix WaitReady if directory does not exist (#84)
Browse files Browse the repository at this point in the history
* Fix WaitReady if directory does not exist

Signed-off-by: Ondrej Fabry <ofabry@cisco.com>

* Watch directories recursively in WaitReady

Signed-off-by: Ondrej Fabry <ofabry@cisco.com>

Signed-off-by: Ondrej Fabry <ofabry@cisco.com>
  • Loading branch information
ondrej-fabry committed Nov 24, 2022
1 parent 344b3ab commit d8a7de3
Showing 1 changed file with 77 additions and 5 deletions.
82 changes: 77 additions & 5 deletions adapter/socketclient/socketclient.go
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"net"
"os"
"path/filepath"
Expand Down Expand Up @@ -49,7 +50,7 @@ var (
// DefaultDisconnectTimeout is default timeout for discconnecting
DefaultDisconnectTimeout = time.Millisecond * 100
// MaxWaitReady defines maximum duration of waiting for socket file
MaxWaitReady = time.Second * 10
MaxWaitReady = time.Second * 3
)

var (
Expand Down Expand Up @@ -132,20 +133,37 @@ func (c *Client) SetDisconnectTimeout(t time.Duration) {
c.disconnectTimeout = t
}

// SetMsgCallback sets the callback for incoming messages.
func (c *Client) SetMsgCallback(cb adapter.MsgCallback) {
log.Debug("SetMsgCallback")
c.msgCallback = cb
}

// WaitReady checks socket file existence and waits for it if necessary
// WaitReady checks if the socket file exists and if it does not exist waits for
// it for the duration defined by MaxWaitReady.
func (c *Client) WaitReady() error {
socketDir, _ := filepath.Split(c.socketPath)
dirChain := strings.Split(filepath.ToSlash(filepath.Clean(socketDir)), "/")

dir := "/"
for _, dirElem := range dirChain {
dir = filepath.Join(dir, dirElem)
if err := waitForDir(dir); err != nil {
return err
}
log.Debugf("dir ready: %v", dir)
}

// check if socket already exists
if _, err := os.Stat(c.socketPath); err == nil {
return nil // socket exists, we are ready
} else if !os.IsNotExist(err) {
} else if !errors.Is(err, fs.ErrNotExist) {
log.Debugf("error is: %+v", err)
return err // some other error occurred
}

log.Debugf("waiting for file: %v", c.socketPath)

// socket does not exist, watch for it
watcher, err := fsnotify.NewWatcher()
if err != nil {
Expand All @@ -158,17 +176,21 @@ func (c *Client) WaitReady() error {
}()

// start directory watcher
if err := watcher.Add(filepath.Dir(c.socketPath)); err != nil {
d := filepath.Dir(c.socketPath)
if err := watcher.Add(d); err != nil {
log.Debugf("watcher add(%v) error: %v", d, err)
return err
}

timeout := time.NewTimer(MaxWaitReady)
for {
select {
case <-timeout.C:
log.Debugf("watcher timeout after: %v", MaxWaitReady)
return fmt.Errorf("timeout waiting (%s) for socket file: %s", MaxWaitReady, c.socketPath)

case e := <-watcher.Errors:
log.Debugf("watcher error: %+v", e)
return e

case ev := <-watcher.Events:
Expand All @@ -181,6 +203,56 @@ func (c *Client) WaitReady() error {
}
}

func waitForDir(dir string) error {
// check if dir already exists
if _, err := os.Stat(dir); err == nil {
return nil // dir exists, we are ready
} else if !errors.Is(err, fs.ErrNotExist) {
log.Debugf("error is: %+v", err)
return err // some other error occurred
}

log.Debugf("waiting for dir: %v", dir)

// dir does not exist, watch for it
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer func() {
if err := watcher.Close(); err != nil {
log.Debugf("failed to close file watcher: %v", err)
}
}()

// start watching directory
d := filepath.Dir(dir)
if err := watcher.Add(d); err != nil {
log.Debugf("watcher add (%v) error: %v", d, err)
return err
}

timeout := time.NewTimer(MaxWaitReady)
for {
select {
case <-timeout.C:
log.Debugf("watcher timeout after: %v", MaxWaitReady)
return fmt.Errorf("timeout waiting (%s) for directory: %s", MaxWaitReady, dir)

case e := <-watcher.Errors:
log.Debugf("watcher error: %+v", e)
return e

case ev := <-watcher.Events:
log.Debugf("watcher event: %+v", ev)
if ev.Name == dir && (ev.Op&fsnotify.Create) == fsnotify.Create {
// socket created, we are ready
return nil
}
}
}
}

func (c *Client) Connect() error {
// check if socket exists
if _, err := os.Stat(c.socketPath); os.IsNotExist(err) {
Expand Down Expand Up @@ -465,7 +537,7 @@ func (c *Client) readerLoop() {

// getMsgReplyHeader gets message ID and context from the message reply header
//
// Message reply has following structure:
// Message reply has the following structure:
//
// type msgReplyHeader struct {
// MsgID uint16
Expand Down

0 comments on commit d8a7de3

Please sign in to comment.