Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix WaitReady if directory does not exist #84

Merged
merged 4 commits into from Nov 24, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
82 changes: 77 additions & 5 deletions adapter/socketclient/socketclient.go
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"net"
"os"
"path/filepath"
Expand Down Expand Up @@ -49,7 +50,7 @@ var (
// DefaultDisconnectTimeout is default timeout for discconnecting
DefaultDisconnectTimeout = time.Millisecond * 100
// MaxWaitReady defines maximum duration of waiting for socket file
MaxWaitReady = time.Second * 10
MaxWaitReady = time.Second * 3
)

var (
Expand Down Expand Up @@ -132,20 +133,37 @@ func (c *Client) SetDisconnectTimeout(t time.Duration) {
c.disconnectTimeout = t
}

// SetMsgCallback sets the callback for incoming messages.
func (c *Client) SetMsgCallback(cb adapter.MsgCallback) {
log.Debug("SetMsgCallback")
c.msgCallback = cb
}

// WaitReady checks socket file existence and waits for it if necessary
// WaitReady checks if the socket file exists and if it does not exist waits for
// it for the duration defined by MaxWaitReady.
func (c *Client) WaitReady() error {
socketDir, _ := filepath.Split(c.socketPath)
dirChain := strings.Split(filepath.ToSlash(filepath.Clean(socketDir)), "/")

dir := "/"
for _, dirElem := range dirChain {
dir = filepath.Join(dir, dirElem)
if err := waitForDir(dir); err != nil {
return err
}
log.Debugf("dir ready: %v", dir)
}

// check if socket already exists
if _, err := os.Stat(c.socketPath); err == nil {
return nil // socket exists, we are ready
} else if !os.IsNotExist(err) {
} else if !errors.Is(err, fs.ErrNotExist) {
log.Debugf("error is: %+v", err)
return err // some other error occurred
}

log.Debugf("waiting for file: %v", c.socketPath)

// socket does not exist, watch for it
watcher, err := fsnotify.NewWatcher()
if err != nil {
Expand All @@ -158,17 +176,21 @@ func (c *Client) WaitReady() error {
}()

// start directory watcher
if err := watcher.Add(filepath.Dir(c.socketPath)); err != nil {
d := filepath.Dir(c.socketPath)
if err := watcher.Add(d); err != nil {
log.Debugf("watcher add(%v) error: %v", d, err)
return err
}

timeout := time.NewTimer(MaxWaitReady)
for {
select {
case <-timeout.C:
log.Debugf("watcher timeout after: %v", MaxWaitReady)
return fmt.Errorf("timeout waiting (%s) for socket file: %s", MaxWaitReady, c.socketPath)

case e := <-watcher.Errors:
log.Debugf("watcher error: %+v", e)
return e

case ev := <-watcher.Events:
Expand All @@ -181,6 +203,56 @@ func (c *Client) WaitReady() error {
}
}

func waitForDir(dir string) error {
// check if dir already exists
if _, err := os.Stat(dir); err == nil {
return nil // dir exists, we are ready
} else if !errors.Is(err, fs.ErrNotExist) {
log.Debugf("error is: %+v", err)
return err // some other error occurred
}

log.Debugf("waiting for dir: %v", dir)

// dir does not exist, watch for it
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer func() {
if err := watcher.Close(); err != nil {
log.Debugf("failed to close file watcher: %v", err)
}
}()

// start watching directory
d := filepath.Dir(dir)
if err := watcher.Add(d); err != nil {
log.Debugf("watcher add (%v) error: %v", d, err)
return err
}

timeout := time.NewTimer(MaxWaitReady)
for {
select {
case <-timeout.C:
log.Debugf("watcher timeout after: %v", MaxWaitReady)
return fmt.Errorf("timeout waiting (%s) for directory: %s", MaxWaitReady, dir)

case e := <-watcher.Errors:
log.Debugf("watcher error: %+v", e)
return e

case ev := <-watcher.Events:
log.Debugf("watcher event: %+v", ev)
if ev.Name == dir && (ev.Op&fsnotify.Create) == fsnotify.Create {
// socket created, we are ready
return nil
}
}
}
}

func (c *Client) Connect() error {
// check if socket exists
if _, err := os.Stat(c.socketPath); os.IsNotExist(err) {
Expand Down Expand Up @@ -465,7 +537,7 @@ func (c *Client) readerLoop() {

// getMsgReplyHeader gets message ID and context from the message reply header
//
// Message reply has following structure:
// Message reply has the following structure:
//
// type msgReplyHeader struct {
// MsgID uint16
Expand Down