From 4425ce362da1cc8d084d16eb586a23bf3493b8b0 Mon Sep 17 00:00:00 2001 From: Ondrej Fabry Date: Tue, 8 Nov 2022 14:41:31 +0100 Subject: [PATCH 1/2] Fix WaitReady if directory does not exist Signed-off-by: Ondrej Fabry --- adapter/socketclient/socketclient.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/adapter/socketclient/socketclient.go b/adapter/socketclient/socketclient.go index f9372276..bbf799e4 100644 --- a/adapter/socketclient/socketclient.go +++ b/adapter/socketclient/socketclient.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io" + "io/fs" "net" "os" "path/filepath" @@ -49,7 +50,7 @@ var ( // DefaultDisconnectTimeout is default timeout for discconnecting DefaultDisconnectTimeout = time.Millisecond * 100 // MaxWaitReady defines maximum duration of waiting for socket file - MaxWaitReady = time.Second * 10 + MaxWaitReady = time.Second * 3 ) var ( @@ -132,17 +133,20 @@ func (c *Client) SetDisconnectTimeout(t time.Duration) { c.disconnectTimeout = t } +// SetMsgCallback sets the callback for incoming messages. func (c *Client) SetMsgCallback(cb adapter.MsgCallback) { log.Debug("SetMsgCallback") c.msgCallback = cb } -// WaitReady checks socket file existence and waits for it if necessary +// WaitReady checks if the socket file exists and if it does not exist waits for +// it for the duration defined by MaxWaitReady. func (c *Client) WaitReady() error { // check if socket already exists if _, err := os.Stat(c.socketPath); err == nil { return nil // socket exists, we are ready - } else if !os.IsNotExist(err) { + } else if !errors.Is(err, fs.ErrNotExist) { + log.Debugf("error is: %+v", err) return err // some other error occurred } @@ -166,9 +170,11 @@ func (c *Client) WaitReady() error { for { select { case <-timeout.C: + log.Debugf("watcher timeout after: %v", MaxWaitReady) return fmt.Errorf("timeout waiting (%s) for socket file: %s", MaxWaitReady, c.socketPath) case e := <-watcher.Errors: + log.Debugf("watcher error: %+v", e) return e case ev := <-watcher.Events: @@ -465,7 +471,7 @@ func (c *Client) readerLoop() { // getMsgReplyHeader gets message ID and context from the message reply header // -// Message reply has following structure: +// Message reply has the following structure: // // type msgReplyHeader struct { // MsgID uint16 From 3aefc97db42a07748ec28284537a1b6bfbe6f635 Mon Sep 17 00:00:00 2001 From: Ondrej Fabry Date: Tue, 15 Nov 2022 14:09:18 +0100 Subject: [PATCH 2/2] Watch directories recursively in WaitReady Signed-off-by: Ondrej Fabry --- adapter/socketclient/socketclient.go | 68 +++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/adapter/socketclient/socketclient.go b/adapter/socketclient/socketclient.go index bbf799e4..3c78dfd6 100644 --- a/adapter/socketclient/socketclient.go +++ b/adapter/socketclient/socketclient.go @@ -142,6 +142,18 @@ func (c *Client) SetMsgCallback(cb adapter.MsgCallback) { // WaitReady checks if the socket file exists and if it does not exist waits for // it for the duration defined by MaxWaitReady. func (c *Client) WaitReady() error { + socketDir, _ := filepath.Split(c.socketPath) + dirChain := strings.Split(filepath.ToSlash(filepath.Clean(socketDir)), "/") + + dir := "/" + for _, dirElem := range dirChain { + dir = filepath.Join(dir, dirElem) + if err := waitForDir(dir); err != nil { + return err + } + log.Debugf("dir ready: %v", dir) + } + // check if socket already exists if _, err := os.Stat(c.socketPath); err == nil { return nil // socket exists, we are ready @@ -150,6 +162,8 @@ func (c *Client) WaitReady() error { return err // some other error occurred } + log.Debugf("waiting for file: %v", c.socketPath) + // socket does not exist, watch for it watcher, err := fsnotify.NewWatcher() if err != nil { @@ -162,7 +176,9 @@ func (c *Client) WaitReady() error { }() // start directory watcher - if err := watcher.Add(filepath.Dir(c.socketPath)); err != nil { + d := filepath.Dir(c.socketPath) + if err := watcher.Add(d); err != nil { + log.Debugf("watcher add(%v) error: %v", d, err) return err } @@ -187,6 +203,56 @@ func (c *Client) WaitReady() error { } } +func waitForDir(dir string) error { + // check if dir already exists + if _, err := os.Stat(dir); err == nil { + return nil // dir exists, we are ready + } else if !errors.Is(err, fs.ErrNotExist) { + log.Debugf("error is: %+v", err) + return err // some other error occurred + } + + log.Debugf("waiting for dir: %v", dir) + + // dir does not exist, watch for it + watcher, err := fsnotify.NewWatcher() + if err != nil { + return err + } + defer func() { + if err := watcher.Close(); err != nil { + log.Debugf("failed to close file watcher: %v", err) + } + }() + + // start watching directory + d := filepath.Dir(dir) + if err := watcher.Add(d); err != nil { + log.Debugf("watcher add (%v) error: %v", d, err) + return err + } + + timeout := time.NewTimer(MaxWaitReady) + for { + select { + case <-timeout.C: + log.Debugf("watcher timeout after: %v", MaxWaitReady) + return fmt.Errorf("timeout waiting (%s) for directory: %s", MaxWaitReady, dir) + + case e := <-watcher.Errors: + log.Debugf("watcher error: %+v", e) + return e + + case ev := <-watcher.Events: + log.Debugf("watcher event: %+v", ev) + if ev.Name == dir && (ev.Op&fsnotify.Create) == fsnotify.Create { + // socket created, we are ready + return nil + } + } + } +} + func (c *Client) Connect() error { // check if socket exists if _, err := os.Stat(c.socketPath); os.IsNotExist(err) {