diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 0816767a35..28bf3bde5a 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -163,9 +163,7 @@ func (c *Controller) Start(ctx context.Context) error { for _, watch := range c.startWatches { c.Log.Info("Starting EventSource", "source", watch.src) - watchStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) - defer cancel() - if err := watch.src.Start(watchStartCtx, watch.handler, c.Queue, watch.predicates...); err != nil { + if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { return err } } @@ -179,15 +177,21 @@ func (c *Controller) Start(ctx context.Context) error { continue } - // use a context with timeout for launching sources and syncing caches. - sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) - defer cancel() + if err := func() error { + // use a context with timeout for launching sources and syncing caches. + sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) + defer cancel() + + // WaitForSync waits for a definitive timeout, and returns if there + // is an error or a timeout + if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { + err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err) + c.Log.Error(err, "Could not wait for Cache to sync") + return err + } - // WaitForSync waits for a definitive timeout, and returns if there - // is an error or a timeout - if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { - err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err) - c.Log.Error(err, "Could not wait for Cache to sync") + return nil + }(); err != nil { return err } } diff --git a/pkg/source/source.go b/pkg/source/source.go index fe0e47150a..72e71e852a 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -215,7 +215,10 @@ func (cs *Channel) Start( } dst := make(chan event.GenericEvent, cs.DestBufferSize) + + cs.destLock.Lock() cs.dest = append(cs.dest, dst) + cs.destLock.Unlock() cs.once.Do(func() { // Distribute GenericEvents to all EventHandler / Queue pairs Watching this source @@ -238,9 +241,6 @@ func (cs *Channel) Start( } }() - cs.destLock.Lock() - defer cs.destLock.Unlock() - return nil }