Skip to content

Commit

Permalink
k8s fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
oavdeev committed Sep 17, 2021
1 parent f8a4c4e commit d401db6
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 75 deletions.
42 changes: 32 additions & 10 deletions metaflow/plugins/aws/eks/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,15 @@ def wait(self, echo=None):
stderr_location = ds.get_log_location(TASK_LOG_SOURCE, "stderr")

def wait_for_launch(job):
status = job.status
status = job.get_status()
echo(
"Task is starting (Status %s)..." % status,
"stderr",
job_id=job.id,
)
t = time.time()
while True:
new_status = job.status
new_status = job.get_status()
if status != new_status or (time.time() - t) > 30:
status = new_status
echo(
Expand All @@ -277,7 +277,7 @@ def wait_for_launch(job):
job_id=job.id,
)
t = time.time()
if job.is_running or job.is_done:
if job.check_pod_is_running() or job.check_is_done():
break
time.sleep(1)

Expand Down Expand Up @@ -317,7 +317,7 @@ def _print_available(tail, stream, should_persist=False):
now = time.time()
log_update_delay = update_delay(now - start_time)
next_log_update = now + log_update_delay
is_running = self._job.is_running
is_running = self._job.check_pod_is_running()

# This sleep should never delay log updates. On the other hand,
# we should exit this loop when the task has finished without
Expand All @@ -337,8 +337,32 @@ def _print_available(tail, stream, should_persist=False):
_print_available(stdout_tail, "stdout")
_print_available(stderr_tail, "stderr")

if self._job.has_failed:
exit_code, reason = self._job.reason
try:
# Now the pod is no longer running, but we need to wait for the job
# status to update. *Usually* that happens pretty much immediately,
# but since it is technically done by the job controller
# asynchronously, there's room for a race condition where pod is
# done but the job is still "active". It is more likely to happen
# when the control plane is overloaded, e.g. on minikube/kind.
self._job.wait_done(timeout_seconds=20)
except TimeoutError:
# We shouldn't really get here unless the K8S control plane is
# really unhealthy.
echo(
"Pod is not running but the job not is done or failed, last job state: %s" % self._job._job,
"stderr",
job_id=self._job.id,
)
echo(
"last pod state: %s" % self._job._pod,
"stderr",
job_id=self._job.id,
)
# Kill the job if it is still running by throwing an exception.
raise KubernetesKilledException("Task failed!")

if self._job.check_has_failed():
exit_code, reason = self._job.get_done_reason()
msg = next(
msg
for msg in [
Expand All @@ -358,10 +382,8 @@ def _print_available(tail, stream, should_persist=False):
"%s. This could be a transient error. "
"Use @retry to retry." % msg
)
elif not self._job.is_done:
# Kill the job if it is still running by throwing an exception.
raise KubernetesKilledException("Task failed!")
exit_code, _ = self._job.reason

exit_code, _ = self._job.get_done_reason()
echo(
"Task finished with exit code %s." % exit_code,
"stderr",
Expand Down

0 comments on commit d401db6

Please sign in to comment.