Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reshard by redistributing samples to new queues #13769

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

darshanime
Copy link
Contributor

During resharding, try to redistribute the samples amongst the new queues instead of waiting for them all to be sent out, which is prone to tail latency.

closes #7230

@darshanime darshanime marked this pull request as draft March 14, 2024 07:23
@darshanime darshanime marked this pull request as ready for review March 14, 2024 13:39
@machine424
Copy link
Collaborator

Thanks, I'll take a look at this.

@bwplotka
Copy link
Member

(Hello from bug scrub meeting).

Friendly ping @machine424 if you want to review, but it feels that:

@darshanime
Copy link
Contributor Author

+ @csmarchbanks @cstyan, you might want to review this too as remote maintainers

@cstyan
Copy link
Member

cstyan commented Mar 26, 2024

@darshanime this is on my review list for this week 👍

Copy link
Member

@cstyan cstyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a good change, but we should do some testing. As Bartek mentioned, we could run into unintended consequences since general throughput is a hot path plus resharding has the potential to cause problems with how long it takes.

Can we hedge the redistribution? Like, only redistribute if we have more than # of shards * batch size samples buffered, or if the flushing takes longer than X seconds (the flush deadline perhaps?)

Comment on lines 1260 to 1316
func (s *shards) reshard(numShards int) bool {
// Exclusive lock to ensure that this does not run concurrently with enqueue.
s.mtx.Lock()
defer s.mtx.Unlock()

newQueues := make([]*queue, numShards)
for i := 0; i < numShards; i++ {
newQueues[i] = newQueue(s.qm.cfg.MaxSamplesPerSend, s.qm.cfg.Capacity)
}

for _, queue := range s.queues {
queue.batchMtx.Lock()

for _, ts := range queue.batch {
queueIndex := uint64(ts.ref) % uint64(len(newQueues))
added := newQueues[queueIndex].Append(ts)
if !added {
// We are not able to add, we can revert to the start/stop loop.
queue.batchMtx.Unlock()
return false
}
}
}

// We have successfully moved all the samples, now can delete the old queues.
for _, queue := range s.queues {
close(queue.batchQueue)
queue.batchMtx.Unlock()
}

// Waiting till flushDeadline for all the runShards to terminate.
select {
case <-s.done:
case <-time.After(s.qm.flushDeadline):
// Cancelling the current context so as to unblock client calls.
s.hardShutdown()
<-s.done
}

s.queues = newQueues
var hardShutdownCtx context.Context
hardShutdownCtx, s.hardShutdown = context.WithCancel(context.Background())
s.running.Store(int32(numShards))
s.done = make(chan struct{})
for i := 0; i < numShards; i++ {
go s.runShard(hardShutdownCtx, i, newQueues[i])
}

return true
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function feels like it's doing a little bit too much IMO. Is there any refactoring we can do so we can reuse stop and start

What do you think of this; rename this function to redistributeSamples, have it return the same bool of whether it was successful or not, and then also update shards.stop to take a bool force. In stop we would skip the graceful shutdown attempt and go straight to the unclean shutdown if force is true. Then, we can just check successful in the reshardLoop, and call stop appropriately, but also always call start?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this ☝️ might mean we need to rearrange where the locks are acquired.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@darshanime we had a chance to test this out in a dev environment yesterday, no obvious issues we were still able to achieve the same throughput and it looked like we were resharding within a slightly smaller spread of shard replicas which is what I would expect to see.

However, one thing we did notice, the metric for the # of active shards is not being set properly anymore. We could set it here in this function, but I think it's probably better to set it on line 1135/1136 after we get the successful return value from calling reshard.

@cstyan
Copy link
Member

cstyan commented Apr 16, 2024

bump @darshanime

Signed-off-by: darshanime <deathbullet@gmail.com>
Signed-off-by: darshanime <deathbullet@gmail.com>
@darshanime
Copy link
Contributor Author

Thanks for the review, @cstyan

Can we hedge the redistribution? Like, only redistribute if we have more than # of shards * batch size samples buffered, or if the flushing takes longer than X seconds (the flush deadline perhaps?)

Currently, we have 2 ways of resharding; triggering soft shutdown, and waiting for all the queues to drain (prone to tail latency amplification), and, if that fails, triggering hard shutdown and dropping samples. This PR adds a 3rd way; redistributing the samples to new shards. The original issue we're trying to address is reducing the impact of resharding; both in terms of time it takes to complete the resharding and dropped samples. With that in mind, imo we should attempt the redistribution first (since it takes "no latency", and causes no dropped samples). We are falling back to the other 2 ways if this fails.

We can hedge the redistribution wrt # of shards * batch size, but what is the right number for the threshold? Smaller batched samples are easy+cheap to redistribute, and larger batched samples is where we would reap the most benefit of redistribution. Can we add this hedging later after gaining some experience running this in the wild? I can add a flag to disable redistribution completely, if required.

This function feels like it's doing a little bit too much IMO. Is there any refactoring we can do so we can reuse stop and start... What do you think of this; rename this function to redistributeSamples

👍 I've broken the reshard function, and created a new function for redistributing the samples.

In stop we would skip the graceful shutdown attempt and go straight to the unclean shutdown if force is true.

I think this may lead to higher dropped samples if the redistribution fails? Note, when we attempt redistribution and it fails, we haven't spent any time yet, so retaining the original soft -> hard shutdown won't change the current latency characteristics. Can add skipping soft shutdown behind a flag, wdyt?

the metric for the # of active shards is not being set properly anymore.

Thanks, fixed. I've added it to the reshard function so that it can be unit tested easily

@cstyan
Copy link
Member

cstyan commented May 13, 2024

I think this may lead to higher dropped samples if the redistribution fails? Note, when we attempt redistribution and it fails, we haven't spent any time yet, so retaining the original soft -> hard shutdown won't change the current latency characteristics. Can add skipping soft shutdown behind a flag, wdyt?

True. The fact that both QueueManager and queue have Append functions was a bit misleading given I haven't read some of this code in a while. The former will block until samples are successfully enqueued or the hard shutdown is initiated, while the latter will fail if the queue is full and is supposed to be retried.

Can we add this hedging later after gaining some experience running this in the wild? I can add a flag to disable redistribution completely, if required.

This seems like a reasonable compromise. Let me think about it some more.

Comment on lines +1266 to +1270
for i := 0; i < numShards; i++ {
newQueues[i] = newQueue(s.qm.cfg.MaxSamplesPerSend, s.qm.cfg.Capacity)
}

successful := s.redistributeSamples(newQueues)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I'm thinking about, with the way things are written currently if we are downsharding we could possible often get successful = false when attempting to redistribute.

If we're downsharding and all the existing queues are at full Capacity, the newer lower amount of shards won't be able to buffer that many samples. In which case, we either want to know that ahead of time and not attempt to redistribute or have a way to ensure queues that are having samples redistributed to them can send when they're full.

Additionally, we're potentially doubling the amount of memory used by enqueued samples for the duration of the resharding operation.

Copy link
Member

@cstyan cstyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we not also missing a call to runShard? We're just creating new queues but passing them to the existing shards?

Nevermind, found it.

Comment on lines +1296 to +1298
for i := 0; i < numShards; i++ {
go s.runShard(hardShutdownCtx, i, newQueues[i])
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah potentially we want to start these immediately after creating the queues, otherwise we might fail to redistribute when downsharding

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reduce the impact of remote write resharding
4 participants