From a81f0be55be2322b87cae0fd7850b76f28b29ae6 Mon Sep 17 00:00:00 2001 From: Francois Gouteroux Date: Mon, 9 May 2022 12:20:16 +0200 Subject: [PATCH] receive: fix deadlock on interrupt in routerOnly mode (#5339) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix receive router deadlock on interrupt Signed-off-by: François Gouteroux * Update changelog Signed-off-by: François Gouteroux --- CHANGELOG.md | 1 + cmd/thanos/receive.go | 14 ++++++++------ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e2226e19e..6bd3e46d08 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ## Unreleased ### Fixed +- [#5339](https://github.com/thanos-io/thanos/pull/5339) Receive: Fix deadlock on interrupt in routerOnly mode ### Added diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 553e8303b2..3719f862a3 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -217,12 +217,13 @@ func runReceive( reloadGRPCServer := make(chan struct{}, 1) // hashringChangedChan signals when TSDB needs to be flushed and updated due to hashring config change. hashringChangedChan := make(chan struct{}, 1) - // uploadC signals when new blocks should be uploaded. - uploadC := make(chan struct{}, 1) - // uploadDone signals when uploading has finished. - uploadDone := make(chan struct{}, 1) if enableIngestion { + // uploadC signals when new blocks should be uploaded. + uploadC := make(chan struct{}, 1) + // uploadDone signals when uploading has finished. + uploadDone := make(chan struct{}, 1) + level.Debug(logger).Log("msg", "setting up tsdb") { if err := startTSDBAndUpload(g, logger, reg, dbs, reloadGRPCServer, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt); err != nil { @@ -370,7 +371,9 @@ func setupAndRunGRPCServer(g *run.Group, } } return nil - }, func(error) {}) + }, func(error) { + defer close(reloadGRPCServer) + }) return nil @@ -516,7 +519,6 @@ func startTSDBAndUpload(g *run.Group, // TSDBs reload logic, listening on hashring changes. cancel := make(chan struct{}) g.Add(func() error { - defer close(reloadGRPCServer) defer close(uploadC) // Before quitting, ensure the WAL is flushed and the DBs are closed.