Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Allow configuring cache sync timeouts #1247

Merged

Conversation

varshaprasad96
Copy link
Member

@varshaprasad96 varshaprasad96 commented Nov 5, 2020

This PR allows users to configure timeout for cache syncs
while starting the controller.

Closes: #1219

@k8s-ci-robot k8s-ci-robot added the cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. label Nov 5, 2020
@k8s-ci-robot
Copy link
Contributor

Hi @varshaprasad96. Thanks for your PR.

I'm waiting for a kubernetes-sigs member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work. Regular contributors should join the org to skip this step.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@k8s-ci-robot k8s-ci-robot added the needs-ok-to-test Indicates a PR that requires an org member to verify it is safe to test. label Nov 5, 2020
@k8s-ci-robot k8s-ci-robot added the size/M Denotes a PR that changes 30-99 lines, ignoring generated files. label Nov 5, 2020
@varshaprasad96
Copy link
Member Author

/assign @alvaroaleman

Copy link
Contributor

@jmrodri jmrodri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a couple questions and a suggestion to add another comment.

pkg/controller/controller.go Outdated Show resolved Hide resolved
pkg/internal/controller/controller.go Outdated Show resolved Hide resolved
pkg/source/source_test.go Outdated Show resolved Hide resolved
pkg/source/source_test.go Outdated Show resolved Hide resolved
Copy link
Member

@alvaroaleman alvaroaleman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/ok-to-test

pkg/controller/controller.go Outdated Show resolved Hide resolved
pkg/internal/controller/controller.go Outdated Show resolved Hide resolved
pkg/source/source.go Outdated Show resolved Hide resolved
@@ -247,13 +248,24 @@ var _ = Describe("Source", func() {
It("should return an error if syncing fails", func(done Done) {
f := false
instance := source.NewKindWithCache(nil, &informertest.FakeInformers{Synced: &f})
err := instance.WaitForSync(nil)
err := instance.WaitForSync(context.Background(), time.Duration(10)*time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an integration test where we give this an impossible low timeout (1 nanosecond or such) and verify it fails, then verify it succeeds with a reasonable timeout?

Also, please add a simple unittest that verifies the defaulting of the timeout

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have added a test case - where the caches do not sync and because of the timeout which is set there is an error (this verifies that we don't go into an infinite loop waiting for caches to sync). However, verifying the case with a reasonable timeout requires us to pass a context to controller.Start(..) and wait for the channel to close with the caches synced. Currently, the fakeinformer does not use the context and instead only mocks whether caches are synced or not with a boolean value to ensure that Start() is not blocked. I suppose that's why in all the other test cases we are passing a cancelled context. Modifying the fake informer to accept the context would break the other test cases. Is there a workaround for this, or is the current test sufficient?
cc: @joelanford

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comment, writing an integration test that uses a real cache and not the fakeinformer should be doable here

pkg/source/source_test.go Outdated Show resolved Hide resolved
@k8s-ci-robot k8s-ci-robot added ok-to-test Indicates a non-member PR verified by an org member that is safe to test. needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. and removed needs-ok-to-test Indicates a PR that requires an org member to verify it is safe to test. labels Nov 5, 2020
@varshaprasad96 varshaprasad96 force-pushed the add/cache-timeout branch 2 times, most recently from 3886222 to 3382fcd Compare November 10, 2020 03:33
@k8s-ci-robot k8s-ci-robot removed the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Nov 10, 2020
pkg/internal/controller/controller.go Outdated Show resolved Hide resolved
Expect(err).NotTo(HaveOccurred())
sync := false
ctrl.startWatches = []watchDescription{{
src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using the FakeInformer, please use the cache you construct in line 127 (The testcase you copied this from seems to be bugged, it constructs a cache and then never uses it).

To test the positive case (we correctly sync when there is a reasonable timeout), you could do the following:

  • Start off similiar like this test, construct a cache and set it up in startWatches
  • Build a wrapper struct for the Source that closes a channel after WaitForSync was called (below)
  • Start the cache in a distinct goroutine (as that is blocking)
  • Start the controller in a distinct goroutine (as that is blocking, too)
  • Wait for the channel in the source or a timeout via select
  • Close the context used to start the cache and controller

wrapper would be something like this:

type signalingSourceWrapper struct {
  cacheSyncDone chan struct{}
  source.Source
}

func (s *singalingSourceWrapper) WaitForSync(ctx context.Context) error {
  defer func(){close(s.cacheSyncDone} ()
  return s.Source.WaitForSync(ctx)
}

Let me know if you have further questions on this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ops, sorry, I had missed this. Does the current implementation look fine or shall I modify it ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alvaroaleman, can you please help here:
I tried to do the following:

  1. Created a wrapper around SyncingSource as that in turn wraps Source:
type SingnallingSourceWrapper struct {
	cacheSyncDone chan struct{}
	source.SyncingSource
}

func NewSignallingSource(cacheChannel chan struct{}, src source.SyncingSource) source.SyncingSource {
	return &SingnallingSourceWrapper{cacheSyncDone: cacheChannel, SyncingSource: src}
}

func (s *SingnallingSourceWrapper) WaitForSync(ctx context.Context) error {
	defer func() {
		close(s.cacheSyncDone)
	}()
	return s.SyncingSource.WaitForSync(ctx)
}

In the test, I tried the following as you have suggested:

// Configure a new cache:
c, err := cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())
ctx := context.TODO()


// Set it up with start watches. Create a new `channel` for SignallingSource:
src := source.NewKindWithCache(&appsv1.Deployment{}, c)
ctrl.startWatches = []watchDescription{{
	src: NewSignallingSource(channel, src),
}}

// Start cache and controller in a separate go routine:
go c.Start(ctx)
go ctrl.Start(ctx)

// Wait for channel to close:
select {
	case <-channel:
		// channel has closed, cache has synced.
	default:
		// there is a timeout. cache did not sync
}

ctx.Done()

When I check for the condition where cache is synced, there is always a timeout. And I am also facing the error:

watch of *v1.Deployment ended with: very short watch: pkg/mod/k8s.io/client-go@v0.19.2/tools/cache/reflector.go:156: Unexpected watch close - watch lasted less than a second and no items received

I'm not able to figure out the reason for timing out always since the context which is being sent for starting caches and controller is closed in the end. Also the channel seems to still be open. Is there something which Im missing?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check the error return of the cache and controller?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the default case needs to not be default but something that actually times out, i.E.:

timeoutCtx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
defer cancel()
select {
	case <-channel:
		// channel has closed, cache has synced.
	case <-timeoutCtx.Done():
		// there is a timeout. cache did not sync
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Well, that simplifies the whole thing, we can then simply start the controller synchronously and verify we get that error

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the same error does occur when controller is started synchronously without any timeout set. But isn't informer not getting synced also mean that caches are not synced? if so, how can we verify the case that everything works fine and caches are synced when no timeout is set.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the reason this happens is that the cache isn't started properly by the time we start the controller. Since cache.Start() is blocking, it can not be used well to figure out when it finished starting. You could probably use Eventually with cache.List() for something that is not a deployment to verify that it finished starting before starting the controller (its important that its a different object than whatever the controller has, as the List creates and syncs an informer for that object type)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I start the cache in a separate go routine, wait for it to start, then start a controller in a separate go routine - I get the error pkg/mod/k8s.io/client-go@v0.19.2/tools/cache/reflector.go:156: watch of *v1.ReplicaSet ended with: very short watch: pkg/mod/k8s.io/client-go@v0.19.2/tools/cache/reflector.go:156: Unexpected watch close - watch lasted less than a second and no items received

Why is it required that when we set a watch, there should be a change in the state of the resource ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why you are seeing that error, it is not required to have any change in the state of the resources for this to work. What works for me is this:

    It("should error when cache sync timeout occurs", func(done Done) {
      c, err := cache.New(cfg, cache.Options{})
      Expect(err).NotTo(HaveOccurred())
      ctrl.CacheSyncTimeout = 10 * time.Millisecond
      ctrl.startWatches = []watchDescription{{src: source.NewKindWithCache(&appsv1.Deployment{}, c)}}

      err = ctrl.Start(context.TODO())
      Expect(err).To(HaveOccurred())
      Expect(err.Error()).To(ContainSubstring("cache did not sync"))

      close(done)
    })

    It("should not error when cache sync time out is of reasonable value", func(done Done) {                                                                                                                                                                                                                                   
      ctx, cancel := context.WithCancel(context.Background())
      defer cancel()

      ctrl.CacheSyncTimeout = 1 * time.Second
      c, err := cache.New(cfg, cache.Options{})
      Expect(err).NotTo(HaveOccurred())

      sourceSynced := make(chan struct{})
      ctrl.startWatches = []watchDescription{{
        src: &syncingSourceWrapper{
          SyncingSource: source.NewKindWithCache(&appsv1.Deployment{}, c), 
          synced:        sourceSynced,
        },
      }}  

      // Start the cache and controller asynchronously and wait for the source to sync
      go func() {
        defer GinkgoRecover()
        Expect(c.Start(ctx)).To(Succeed())
      }() 
      go func() {
        defer GinkgoRecover()
        Expect(ctrl.Start(ctx)).To(Succeed())
      }() 
      <-sourceSynced

      close(done)
    }, 10.0)

Using this wrapper:

type syncingSourceWrapper struct {
  source.SyncingSource
  synced chan struct{}
}

func (ssw *syncingSourceWrapper) WaitForSync(ctx context.Context) error {
  if err := ssw.SyncingSource.WaitForSync(ctx); err != nil {
    return err
  }
  close(ssw.synced)
  return nil
} 

@@ -247,13 +248,24 @@ var _ = Describe("Source", func() {
It("should return an error if syncing fails", func(done Done) {
f := false
instance := source.NewKindWithCache(nil, &informertest.FakeInformers{Synced: &f})
err := instance.WaitForSync(nil)
err := instance.WaitForSync(context.Background(), time.Duration(10)*time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comment, writing an integration test that uses a real cache and not the fakeinformer should be doable here

@varshaprasad96 varshaprasad96 force-pushed the add/cache-timeout branch 2 times, most recently from a255c3c to d190c63 Compare November 12, 2020 21:02
@@ -156,7 +164,7 @@ func (c *Controller) Start(ctx context.Context) error {
// caches.
for _, watch := range c.startWatches {
c.Log.Info("Starting EventSource", "source", watch.src)
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
if err := watch.src.Start(sourceStartCtx, watch.handler, c.Queue, watch.predicates...); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This context should be set up for each Start call. Otherwise there could be an erroneous timeout if you have a bunch of watches.

Suggested change
if err := watch.src.Start(sourceStartCtx, watch.handler, c.Queue, watch.predicates...); err != nil {
// use a context with timeout for launching sources and syncing caches.
watchStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()
if err := watch.src.Start(watchStartCtx, watch.handler, c.Queue, watch.predicates...); err != nil {

Comment on lines 149 to 152
// use a context with timeout for launching sources and syncing caches.
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// use a context with timeout for launching sources and syncing caches.
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()

@@ -169,7 +177,7 @@ func (c *Controller) Start(ctx context.Context) error {
if !ok {
continue
}
if err := syncingSource.WaitForSync(ctx); err != nil {
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
Copy link
Contributor

@estroz estroz Nov 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
// Use a context with timeout for syncing sources.
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {

varshaprasad96 and others added 2 commits January 5, 2021 12:20
This PR allows users to configure timeout for cache syncs
while starting the controller.
Co-authored-by: Alvaro Aleman <alvaroaleman@users.noreply.github.com>
Copy link
Member

@alvaroaleman alvaroaleman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm
/approve

Thank you for your work on this :)

@k8s-ci-robot k8s-ci-robot added the lgtm "Looks good to me", indicates that a PR is ready to be merged. label Jan 8, 2021
@k8s-ci-robot
Copy link
Contributor

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: alvaroaleman, varshaprasad96

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@k8s-ci-robot k8s-ci-robot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Jan 8, 2021
@k8s-ci-robot k8s-ci-robot merged commit 66537ca into kubernetes-sigs:master Jan 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. lgtm "Looks good to me", indicates that a PR is ready to be merged. ok-to-test Indicates a non-member PR verified by an org member that is safe to test. size/M Denotes a PR that changes 30-99 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Allow configuring a cache sync timeout
7 participants