diff --git a/adapter/socketclient/socketclient.go b/adapter/socketclient/socketclient.go index f9372276..3c78dfd6 100644 --- a/adapter/socketclient/socketclient.go +++ b/adapter/socketclient/socketclient.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io" + "io/fs" "net" "os" "path/filepath" @@ -49,7 +50,7 @@ var ( // DefaultDisconnectTimeout is default timeout for discconnecting DefaultDisconnectTimeout = time.Millisecond * 100 // MaxWaitReady defines maximum duration of waiting for socket file - MaxWaitReady = time.Second * 10 + MaxWaitReady = time.Second * 3 ) var ( @@ -132,20 +133,37 @@ func (c *Client) SetDisconnectTimeout(t time.Duration) { c.disconnectTimeout = t } +// SetMsgCallback sets the callback for incoming messages. func (c *Client) SetMsgCallback(cb adapter.MsgCallback) { log.Debug("SetMsgCallback") c.msgCallback = cb } -// WaitReady checks socket file existence and waits for it if necessary +// WaitReady checks if the socket file exists and if it does not exist waits for +// it for the duration defined by MaxWaitReady. func (c *Client) WaitReady() error { + socketDir, _ := filepath.Split(c.socketPath) + dirChain := strings.Split(filepath.ToSlash(filepath.Clean(socketDir)), "/") + + dir := "/" + for _, dirElem := range dirChain { + dir = filepath.Join(dir, dirElem) + if err := waitForDir(dir); err != nil { + return err + } + log.Debugf("dir ready: %v", dir) + } + // check if socket already exists if _, err := os.Stat(c.socketPath); err == nil { return nil // socket exists, we are ready - } else if !os.IsNotExist(err) { + } else if !errors.Is(err, fs.ErrNotExist) { + log.Debugf("error is: %+v", err) return err // some other error occurred } + log.Debugf("waiting for file: %v", c.socketPath) + // socket does not exist, watch for it watcher, err := fsnotify.NewWatcher() if err != nil { @@ -158,7 +176,9 @@ func (c *Client) WaitReady() error { }() // start directory watcher - if err := watcher.Add(filepath.Dir(c.socketPath)); err != nil { + d := filepath.Dir(c.socketPath) + if err := watcher.Add(d); err != nil { + log.Debugf("watcher add(%v) error: %v", d, err) return err } @@ -166,9 +186,11 @@ func (c *Client) WaitReady() error { for { select { case <-timeout.C: + log.Debugf("watcher timeout after: %v", MaxWaitReady) return fmt.Errorf("timeout waiting (%s) for socket file: %s", MaxWaitReady, c.socketPath) case e := <-watcher.Errors: + log.Debugf("watcher error: %+v", e) return e case ev := <-watcher.Events: @@ -181,6 +203,56 @@ func (c *Client) WaitReady() error { } } +func waitForDir(dir string) error { + // check if dir already exists + if _, err := os.Stat(dir); err == nil { + return nil // dir exists, we are ready + } else if !errors.Is(err, fs.ErrNotExist) { + log.Debugf("error is: %+v", err) + return err // some other error occurred + } + + log.Debugf("waiting for dir: %v", dir) + + // dir does not exist, watch for it + watcher, err := fsnotify.NewWatcher() + if err != nil { + return err + } + defer func() { + if err := watcher.Close(); err != nil { + log.Debugf("failed to close file watcher: %v", err) + } + }() + + // start watching directory + d := filepath.Dir(dir) + if err := watcher.Add(d); err != nil { + log.Debugf("watcher add (%v) error: %v", d, err) + return err + } + + timeout := time.NewTimer(MaxWaitReady) + for { + select { + case <-timeout.C: + log.Debugf("watcher timeout after: %v", MaxWaitReady) + return fmt.Errorf("timeout waiting (%s) for directory: %s", MaxWaitReady, dir) + + case e := <-watcher.Errors: + log.Debugf("watcher error: %+v", e) + return e + + case ev := <-watcher.Events: + log.Debugf("watcher event: %+v", ev) + if ev.Name == dir && (ev.Op&fsnotify.Create) == fsnotify.Create { + // socket created, we are ready + return nil + } + } + } +} + func (c *Client) Connect() error { // check if socket exists if _, err := os.Stat(c.socketPath); os.IsNotExist(err) { @@ -465,7 +537,7 @@ func (c *Client) readerLoop() { // getMsgReplyHeader gets message ID and context from the message reply header // -// Message reply has following structure: +// Message reply has the following structure: // // type msgReplyHeader struct { // MsgID uint16