Skip to content

Commit

Permalink
Fix bug where the status endpoint can become non-responsive (PR #5053)
Browse files Browse the repository at this point in the history
# Description

This PR prevents the status endpoint form hanging when the DB is in trouble.

I produces a fully redundant, local cache of the compiler queue size.
I'm in general not a fan of redundant data like this, but the only alternative I see is to bake in a timeout in the status endpoint so it only reports data is can collect .e.g `<10ms`

# Self Check:

Strike through any lines that are not applicable (`~~line~~`) then check the box

- [ ] Attached issue to pull request
- [x] Changelog entry
- [x] Type annotations are present
- [x] Code is clear and sufficiently documented
- [x] No (preventable) type errors (check using make mypy or make mypy-diff)
- [x] Sufficient test cases (reproduces the bug/tests the requested feature)
- [x] Correct, in line with design
- [ ] ~~End user documentation is included or an issue is created for end-user documentation (add ref to issue here: )~~
  • Loading branch information
wouterdb authored and inmantaci committed Nov 1, 2022
1 parent 13c0f16 commit 2698fb9
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 10 deletions.
5 changes: 5 additions & 0 deletions changelogs/unreleased/fix-hanging-status-endpoint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
description: Fix bug where the status endpoint can become non-responsive
change-type: patch
destination-branches: [master, iso5, iso4]
sections:
bugfix: "{{description}}"
2 changes: 1 addition & 1 deletion src/inmanta/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3607,7 +3607,7 @@ async def get_next_compiles_for_environment(cls, environment_id: uuid.UUID) -> "
@classmethod
async def get_next_compiles_count(cls) -> int:
"""Get the number of compiles in the queue for ALL environments"""
result = await cls._fetch_int(f"SELECT count(*) FROM {cls.table_name()} WHERE NOT handled and completed IS NOT NULL")
result = await cls._fetch_int(f"SELECT count(*) FROM {cls.table_name()} WHERE NOT handled AND completed IS NULL")
return result

@classmethod
Expand Down
10 changes: 8 additions & 2 deletions src/inmanta/server/services/compilerservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,9 +504,11 @@ def __init__(self) -> None:
self._global_lock = asyncio.locks.Lock()
self.listeners: List[CompileStateListener] = []
self._scheduled_full_compiles: Dict[uuid.UUID, Tuple[TaskMethod, str]] = {}
# cache the queue count for DB-less and lock-less access to number of tasks
self._queue_count_cache: int = 0

async def get_status(self) -> Dict[str, ArgumentTypes]:
return {"task_queue": await data.Compile.get_next_compiles_count(), "listeners": len(self.listeners)}
return {"task_queue": self._queue_count_cache, "listeners": len(self.listeners)}

def add_listener(self, listener: CompileStateListener) -> None:
self.listeners.append(listener)
Expand Down Expand Up @@ -642,7 +644,7 @@ async def _queue(self, compile: data.Compile) -> None:
# don't execute any compiles in a halted environment
if env.halted:
return

self._queue_count_cache += 1
if compile.environment not in self._recompiles or self._recompiles[compile.environment].done():
task = self.add_background_task(self._run(compile))
self._recompiles[compile.environment] = task
Expand Down Expand Up @@ -677,7 +679,9 @@ async def notify(listener: CompileStateListener) -> None:

async def _recover(self) -> None:
"""Restart runs after server restart"""
# one run per env max to get started
runs = await data.Compile.get_next_run_all()
self._queue_count_cache = await data.Compile.get_next_compiles_count() - len(runs)
for run in runs:
await self._queue(run)
unhandled = await data.Compile.get_unhandled_compiles()
Expand Down Expand Up @@ -749,6 +753,7 @@ async def _run(self, compile: data.Compile) -> None:
Runs a compile request. At completion, looks for similar compile requests based on _compile_merge_key and marks
those as completed as well.
"""
self._queue_count_cache -= 1
await self._auto_recompile_wait(compile)

compile_merge_key: Hashable = CompilerService._compile_merge_key(compile)
Expand Down Expand Up @@ -784,6 +789,7 @@ async def _run(self, compile: data.Compile) -> None:
if self.is_stopping():
return
self.add_background_task(self._notify_listeners(compile))
self._queue_count_cache -= len(merge_candidates)
for merge_candidate in merge_candidates:
self.add_background_task(self._notify_listeners(merge_candidate))
await self._dequeue(compile.environment)
Expand Down
6 changes: 4 additions & 2 deletions src/inmanta/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,9 +507,11 @@ async def fun_wrapper() -> bool:
raise ValueError("value of INMANTA_RETRY_LIMITED_MULTIPLIER must be bigger or equal to 1.")
hard_timeout = timeout * multiplier
start = time.time()
while time.time() - start < hard_timeout and not (await fun_wrapper()):
result = await fun_wrapper()
while time.time() - start < hard_timeout and not result:
await asyncio.sleep(interval)
if not (await fun_wrapper()):
result = await fun_wrapper()
if not result:
raise asyncio.TimeoutError(f"Wait condition was not reached after hard limit of {hard_timeout} seconds")
if time.time() - start > timeout:
raise asyncio.TimeoutError(
Expand Down
63 changes: 58 additions & 5 deletions tests/server/test_compilerservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,16 +211,22 @@ class HangRunner(object):
compile runner mock, hang until released
"""

def __init__(self):
def __init__(self, compile: data.Compile):
self.lock = Semaphore(0)
self.started = False
self.done = False
self.version = None
self.request = compile

async def run(self, force_update: Optional[bool] = False):
print("Start Run: ", self.request.id, self.request.environment)

now = datetime.datetime.now().astimezone()
await self.request.update_fields(started=now)
self.started = True
await self.lock.acquire()
self.done = True
print("END Run: ", self.request.id, self.request.environment)
return True, None

def release(self):
Expand All @@ -236,14 +242,28 @@ def __init__(self):
self.locks = {}

def _get_compile_runner(self, compile: data.Compile, project_dir: str):
print("Get Run: ", compile.remote_id, compile.id)
runner = HangRunner()
runner = HangRunner(compile)
self.locks[compile.remote_id] = runner
return runner

def get_runner(self, remote_id: uuid.UUID) -> HangRunner:
return self.locks.get(remote_id)

async def compiler_cache_consistent(expected: int) -> None:
async def inner() -> bool:
not_done = await data.Compile.get_next_compiles_count()
running = sum(1 for task in cs._recompiles.values() if not task.done())
print(
expected,
cs._queue_count_cache,
not_done - running,
not_done,
running,
)
return cs._queue_count_cache == (not_done - running) == expected

await retry_limited(inner, 1)

# manual setup of server
server = Server()
cs = HookedCompilerService()
Expand All @@ -264,7 +284,7 @@ async def request_compile(env: data.Environment) -> uuid.UUID:
results = await data.Compile.get_by_remote_id(env.id, u1)
assert len(results) == 1
assert results[0].remote_id == u1
print("request: ", u1, results[0].id)
print("request: ", results[0].id, env.id)
return u1

# setup projects in the database
Expand Down Expand Up @@ -316,9 +336,19 @@ async def isdone():

await retry_limited(isdone, 1)

await compiler_cache_consistent(5)

# run through env1, entire sequence
for i in range(4):
await check_compile_in_sequence(env1, e1, i)
if i < 2:
# First one is never queued, so not counted
# Last iteration here doesn't de-queue an item, but allows it to complete
# So don't handle last 2
await compiler_cache_consistent(4 - i)

await compiler_cache_consistent(3)

collector.verify(e1)
print("env1 done")

Expand All @@ -328,6 +358,7 @@ async def isdone():
# progress two steps into env2
for i in range(2):
await check_compile_in_sequence(env2, e2, i)
await compiler_cache_consistent(2 - i)

assert not collector.seen
print(collector.preseen)
Expand All @@ -353,10 +384,13 @@ async def isdone():
collector = Collector()
cs.add_listener(collector)

# one in cache, one running
await compiler_cache_consistent(1)

# complete the sequence, expect re-run of third compile
for i in range(3):
print(i)
await check_compile_in_sequence(env2, e2[2:], i)
await compiler_cache_consistent(0)

# all are re-run, entire sequence present
collector.verify(e2)
Expand Down Expand Up @@ -796,6 +830,8 @@ async def test_compileservice_queue(mocked_compiler_service_block: queue.Queue,
assert len(result.result["queue"]) == 1
assert result.result["queue"][0]["remote_id"] == str(remote_id1)
assert result.code == 200
# None in the queue, all running
assert compilerslice._queue_count_cache == 0

# request a compile
remote_id2 = uuid.uuid4()
Expand All @@ -806,6 +842,8 @@ async def test_compileservice_queue(mocked_compiler_service_block: queue.Queue,
assert len(result.result["queue"]) == 2
assert result.result["queue"][1]["remote_id"] == str(remote_id2)
assert result.code == 200
# 1 in the queue, 1 running
assert compilerslice._queue_count_cache == 1

# request a compile with do_export=True
remote_id3 = uuid.uuid4()
Expand All @@ -815,6 +853,8 @@ async def test_compileservice_queue(mocked_compiler_service_block: queue.Queue,
assert len(result.result["queue"]) == 3
assert result.result["queue"][2]["remote_id"] == str(remote_id3)
assert result.code == 200
# 2 in the queue, 1 running
assert compilerslice._queue_count_cache == 2

# request a compile with do_export=False -> expect merge with compile2
remote_id4 = uuid.uuid4()
Expand All @@ -824,6 +864,8 @@ async def test_compileservice_queue(mocked_compiler_service_block: queue.Queue,
assert len(result.result["queue"]) == 4
assert result.result["queue"][3]["remote_id"] == str(remote_id4)
assert result.code == 200
# 3 in the queue, 1 running
assert compilerslice._queue_count_cache == 3

# request a compile with do_export=True -> expect merge with compile3, expect force_update == True for the compile
remote_id5 = uuid.uuid4()
Expand All @@ -836,6 +878,8 @@ async def test_compileservice_queue(mocked_compiler_service_block: queue.Queue,
assert result.result["queue"][4]["remote_id"] == str(remote_id5)
assert result.result["queue"][5]["remote_id"] == str(remote_id6)
assert result.code == 200
# 5 in the queue, 1 running
assert compilerslice._queue_count_cache == 5

# finish a compile and wait for service to take on next
await run_compile_and_wait_until_compile_is_done(compilerslice, mocked_compiler_service_block, env.id)
Expand All @@ -845,11 +889,15 @@ async def test_compileservice_queue(mocked_compiler_service_block: queue.Queue,
assert len(result.result["queue"]) == 5
assert result.result["queue"][0]["remote_id"] == str(remote_id2)
assert result.code == 200
# 4 in the queue, 1 running
assert compilerslice._queue_count_cache == 4

# finish second compile
await run_compile_and_wait_until_compile_is_done(compilerslice, mocked_compiler_service_block, env.id)

assert await compilerslice.get_report(compile_id2) == await compilerslice.get_report(compile_id4)
# 2 in the queue, 1 running
assert compilerslice._queue_count_cache == 2

# finish third compile
# prevent race conditions where compile is not yet in queue
Expand All @@ -867,6 +915,9 @@ async def test_compileservice_queue(mocked_compiler_service_block: queue.Queue,
assert await compilerslice.get_report(compile_id3) == await compilerslice.get_report(compile_id5)
assert await compilerslice.get_report(compile_id3) == await compilerslice.get_report(compile_id6)

# 0 in the queue, 0 running
assert compilerslice._queue_count_cache == 0

# api should return none
result = await client.get_compile_queue(environment)
assert len(result.result["queue"]) == 0
Expand All @@ -879,6 +930,7 @@ async def test_compilerservice_halt(mocked_compiler_service_block, server, clien
result = await client.get_compile_queue(environment)
assert result.code == 200
assert len(result.result["queue"]) == 0
assert compilerslice._queue_count_cache == 0

await client.halt_environment(environment)

Expand All @@ -889,6 +941,7 @@ async def test_compilerservice_halt(mocked_compiler_service_block, server, clien
result = await client.get_compile_queue(environment)
assert result.code == 200
assert len(result.result["queue"]) == 1
assert compilerslice._queue_count_cache == 0

result = await client.is_compiling(environment)
assert result.code == 204
Expand Down

0 comments on commit 2698fb9

Please sign in to comment.