Skip to content

Commit

Permalink
Improve schedule worker debugging and tests (#101)
Browse files Browse the repository at this point in the history
**What**
- Add some more test statements to make the validation more strict.  It
  now ensures that the one time schedule is not reschedule in the future
  while the repeating schedule does exist. It also ensures that "empty
  queue" is returned as expected.
- Using snake case on logrus fields because it works better with loki,
  which will then detect these as actual log fields which can leverage
  the full loki query language
- Add more log statements with task metadata to the scheduleTask
  workflow. This improves the ability to follow the flow of the logs
  when we need to view both manager and worker logs together.

Signed-off-by: Lucas Roesler <roesler.lucas@gmail.com>
  • Loading branch information
LucasRoesler committed Mar 22, 2021
1 parent 79f6eb5 commit 7ef81a2
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 14 deletions.
12 changes: 6 additions & 6 deletions pkg/db/tracing.go
Expand Up @@ -52,8 +52,8 @@ func (d traceableDB) ExecContext(ctx context.Context, query string, args ...inte
defer func() {
d.FinishSpan(span, err)
}()
span.SetTag("sql", query)
logrus.WithTime(time.Now()).WithField("sql.method", "ExecContext").Debug(query)
span.LogKV("sql", query)
logrus.WithTime(time.Now()).WithField("sql_method", "ExecContext").Debug(query)

result, err = d.SQLDB.ExecContext(ctx, query, args...)
return result, err
Expand All @@ -67,9 +67,9 @@ func (d traceableDB) QueryContext(ctx context.Context, query string, args ...int
defer func() {
d.FinishSpan(span, err)
}()
span.SetTag("sql", query)
span.LogKV("sql", query)

logrus.WithTime(time.Now()).WithField("sql.method", "QueryContext").Debug(query)
logrus.WithTime(time.Now()).WithField("sql_method", "QueryContext").Debug(query)
rows, err = d.SQLDB.QueryContext(ctx, query, args...)
return rows, err
}
Expand All @@ -81,9 +81,9 @@ func (d traceableDB) QueryRowContext(ctx context.Context, query string, args ...
span, ctx := d.StartSpan(ctx, "QueryRowContext")
defer d.FinishSpan(span, nil)

span.SetTag("sql", query)
span.LogKV("sql", query)

logrus.WithTime(time.Now()).WithField("sql.method", "QueryRowContext").Debug(query)
logrus.WithTime(time.Now()).WithField("sql_method", "QueryRowContext").Debug(query)
return d.SQLDB.QueryRowContext(ctx, query, args...)
}

Expand Down
26 changes: 19 additions & 7 deletions pkg/queue/workers/schedule_worker.go
Expand Up @@ -100,7 +100,7 @@ func (w *scheduleWorker) iteration(ctx context.Context, tracer opentracing.Trace
queue.ScheduleWorkerMetrics.WorkingGauge.Inc()
defer queue.ScheduleWorkerMetrics.WorkingGauge.Dec()

logrus.Debug("starting task scheduling iteration...")
logrus.Debug("starting task scheduling iteration")
for {
// check if the iteration was cancelled
err = ctx.Err()
Expand All @@ -109,7 +109,7 @@ func (w *scheduleWorker) iteration(ctx context.Context, tracer opentracing.Trace
return err
}

logrus.Debug("trying to find a task to schedule...")
logrus.Debug("trying to find a task to schedule")
err = w.scheduleTask(ctx)
if err == ErrScheduleQueueIsEmpty {
return nil
Expand All @@ -133,6 +133,7 @@ func (w *scheduleWorker) scheduleTask(ctx context.Context) (err error) {
}
}()

logrus.Debug("looking for ready schedules")
tx, err := w.db.BeginTx(ctx, nil)
builder := squirrel.StatementBuilder.
PlaceholderFormat(squirrel.Dollar).
Expand Down Expand Up @@ -212,9 +213,12 @@ func (w *scheduleWorker) scheduleTask(ctx context.Context) (err error) {
span.SetTag("task.queue", taskQueue)
span.SetTag("task.spec", string(specBytes))

logrus := logrus.WithField("type", taskType).WithField("queue", taskQueue)
logrus := logrus.WithField("type", taskType).
WithField("queue", taskQueue).
WithField("schedule_id", scheduleID).
WithField("schedule_cron", cronSchedule)

logrus.Debug("adding the task to the queue...")
logrus.Debug("adding the task to the queue")
task := queue.TaskEnqueueRequest{
TaskBase: queue.TaskBase{
Queue: taskQueue,
Expand All @@ -232,7 +236,7 @@ func (w *scheduleWorker) scheduleTask(ctx context.Context) (err error) {

logrus.Debug("task has been scheduled successfully")

logrus.Debug("calculating and updating the next execution time...")
logrus.Debug("calculating and updating the next execution time")

var nextExecution *time.Time
if cronSchedule != "" {
Expand All @@ -245,7 +249,7 @@ func (w *scheduleWorker) scheduleTask(ctx context.Context) (err error) {
nextExecution = &t
}

_, err = builder.
res, err := builder.
Update("schedules").
Set("next_execution_time", nextExecution).
Where(squirrel.Eq{
Expand All @@ -255,7 +259,15 @@ func (w *scheduleWorker) scheduleTask(ctx context.Context) (err error) {
if err != nil {
return err
}
logrus.Debug("the new execution time is set")

affected, err := res.RowsAffected()
if err != nil {
return err
}

logrus.WithField("affected", affected).
WithField("next_execution_time", nextExecution).
Debug("the new execution time is set")

queue.ScheduleWorkerMetrics.ProcessedCounter.With(labels).Inc()
return nil
Expand Down
21 changes: 20 additions & 1 deletion pkg/queue/workers/schedule_worker_test.go
Expand Up @@ -78,9 +78,19 @@ func TestScheduleTask(t *testing.T) {
w := newScheduleWorker(db, qm, time.Second)
err = w.scheduleTask(ctx)
require.NoError(t, err)

time.Sleep(time.Second)
err = w.scheduleTask(ctx)
require.NoError(t, err)

time.Sleep(time.Second)
err = w.scheduleTask(ctx)
require.Equal(t, err, ErrScheduleQueueIsEmpty)

time.Sleep(time.Second)
err = w.scheduleTask(ctx)
require.Equal(t, err, ErrScheduleQueueIsEmpty)

require.Len(t, qm.q, 2)

task1 := qm.q[0]
Expand All @@ -94,10 +104,19 @@ func TestScheduleTask(t *testing.T) {
require.Equal(t, string(taskSpec2), string(task2.Spec))

// checking that the execution time has changed
dbtest.EqualCount(t, db, 0, "schedules", squirrel.Eq{
dbtest.EqualCount(t, db, 0, "schedules", squirrel.LtOrEq{
"next_execution_time": now,
})

dbtest.EqualCount(t, db, 1, "schedules", squirrel.And{
squirrel.Gt{"next_execution_time": now},
squirrel.Eq{"schedule_id": scheduleID1},
})

dbtest.EqualCount(t, db, 1, "schedules", squirrel.Eq{
"next_execution_time": nil,
})

// these should stay zero because they are not incremented in the scheduleTask function
require.Equal(t, float64(0), testutil.ToFloat64(queue.ScheduleWorkerMetrics.ActiveGauge))
require.Equal(t, float64(0), testutil.ToFloat64(queue.ScheduleWorkerMetrics.WorkingGauge))
Expand Down

0 comments on commit 7ef81a2

Please sign in to comment.