diff --git a/gocron.go b/gocron.go index 67bb956a..dcb30d3f 100644 --- a/gocron.go +++ b/gocron.go @@ -45,6 +45,7 @@ var ( ErrInvalidInterval = errors.New(".Every() interval must be greater than 0") ErrInvalidIntervalType = errors.New(".Every() interval must be int, time.Duration, or string") ErrInvalidIntervalUnitsSelection = errors.New(".Every(time.Duration) and .Cron() cannot be used with units (e.g. .Seconds())") + ErrInvalidFunctionParameters = errors.New("length of function parameters must match job function parameters") ErrAtTimeNotSupported = errors.New("the At() method is not supported for this time unit") ErrWeekdayNotSupported = errors.New("weekday is not supported for time unit") diff --git a/job.go b/job.go index 1b3d7a5f..213480b8 100644 --- a/job.go +++ b/job.go @@ -397,7 +397,6 @@ func (j *Job) SingletonMode() { defer j.mu.Unlock() j.runConfig.mode = singletonMode j.jobFunction.limiter = &singleflight.Group{} - } // shouldRun evaluates if this job should run again @@ -410,10 +409,14 @@ func (j *Job) shouldRun() bool { // LastRun returns the time the job was run last func (j *Job) LastRun() time.Time { + j.mu.RLock() + defer j.mu.RUnlock() return j.lastRun } func (j *Job) setLastRun(t time.Time) { + j.mu.Lock() + defer j.mu.Unlock() j.lastRun = t } @@ -432,9 +435,17 @@ func (j *Job) setNextRun(t time.Time) { // RunCount returns the number of time the job ran so far func (j *Job) RunCount() int { + j.mu.RLock() + defer j.mu.RUnlock() return j.runCount } +func (j *Job) incrementRunCount() { + j.mu.Lock() + defer j.mu.Unlock() + j.runCount++ +} + func (j *Job) stop() { j.mu.Lock() defer j.mu.Unlock() diff --git a/scheduler.go b/scheduler.go index 20ee9457..c9bc7bfa 100644 --- a/scheduler.go +++ b/scheduler.go @@ -79,7 +79,7 @@ func (s *Scheduler) StartAsync() { } } -//start starts the scheduler, scheduling and running jobs +// start starts the scheduler, scheduling and running jobs func (s *Scheduler) start() { go s.executor.start() s.setRunning(true) @@ -342,7 +342,6 @@ func (s *Scheduler) calculateTotalDaysDifference(lastRun time.Time, daysToWeekda } func (s *Scheduler) calculateDays(job *Job, lastRun time.Time) nextRun { - if job.getInterval() == 1 { lastRunDayPlusJobAtTime := s.roundToMidnight(lastRun).Add(job.getAtTime(lastRun)) @@ -533,6 +532,21 @@ func (s *Scheduler) run(job *Job) { return } + job = s.addJobDetails(job) + if job.error != nil { + // delete the job from the scheduler as this job + // cannot be executed + s.RemoveByReference(job) + return + // return job.error + } + + s.executor.jobFunctions <- job.jobFunction.copy() + job.setLastRun(s.now()) + job.incrementRunCount() +} + +func (s *Scheduler) addJobDetails(job *Job) *Job { job.mu.Lock() defer job.mu.Unlock() @@ -544,13 +558,11 @@ func (s *Scheduler) run(job *Job) { job.parameters[job.parametersLen] = job.copy() default: // something is really wrong and we should never get here - return + job.error = wrapOrError(job.error, ErrInvalidFunctionParameters) } } - s.executor.jobFunctions <- job.jobFunction.copy() - job.setLastRun(s.now()) - job.runCount++ + return job } func (s *Scheduler) runContinuous(job *Job) { diff --git a/scheduler_test.go b/scheduler_test.go index 8c3ff05a..021be345 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -52,7 +52,6 @@ func TestImmediateExecution(t *testing.T) { case <-semaphore: // test passed } - } func TestScheduler_Every_InvalidInterval(t *testing.T) { @@ -75,7 +74,6 @@ func TestScheduler_Every_InvalidInterval(t *testing.T) { assert.EqualError(t, err, tc.expectedError) }) } - } func TestScheduler_EveryRandom(t *testing.T) { @@ -172,7 +170,6 @@ func TestScheduler_Every(t *testing.T) { s.Stop() assert.Equal(t, 2, counter) }) - } func TestExecutionSeconds(t *testing.T) { @@ -392,7 +389,6 @@ func TestWeekdayAt(t *testing.T) { } func TestScheduler_Remove(t *testing.T) { - t.Run("remove from non-running", func(t *testing.T) { s := NewScheduler(time.UTC) s.TagsUnique() @@ -784,7 +780,6 @@ func TestClearUnique(t *testing.T) { } func TestSetUnit(t *testing.T) { - testCases := []struct { desc string timeUnit schedulingUnit @@ -1036,7 +1031,7 @@ func _getMinutes(i int) time.Duration { } func TestScheduler_Do(t *testing.T) { - var testCases = []struct { + testCases := []struct { description string evalFunc func(*Scheduler) }{ @@ -1239,7 +1234,6 @@ func TestCalculateMonths(t *testing.T) { } func TestScheduler_SingletonMode(t *testing.T) { - testCases := []struct { description string removeJob bool @@ -1250,7 +1244,6 @@ func TestScheduler_SingletonMode(t *testing.T) { for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - s := NewScheduler(time.UTC) var trigger int32 @@ -1273,11 +1266,9 @@ func TestScheduler_SingletonMode(t *testing.T) { s.Stop() }) } - } func TestScheduler_SingletonModeAll(t *testing.T) { - testCases := []struct { description string removeJob bool @@ -1288,7 +1279,6 @@ func TestScheduler_SingletonModeAll(t *testing.T) { for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - s := NewScheduler(time.UTC) s.SingletonModeAll() @@ -1313,7 +1303,6 @@ func TestScheduler_SingletonModeAll(t *testing.T) { s.Stop() }) } - } func TestScheduler_LimitRunsTo(t *testing.T) { @@ -1404,7 +1393,8 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) { // 1s - job 1 hits the limit and is skipped // 2s - job 1 & 2 run // 3s - job 1 hits the limit and is skipped - {"reschedule mode", 2, RescheduleMode, 4, false, + { + "reschedule mode", 2, RescheduleMode, 4, false, func() { semaphore <- true time.Sleep(200 * time.Millisecond) @@ -1416,7 +1406,8 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) { // 1s - job 1 runs twice, the blocked run and the regularly scheduled run // 2s - jobs 1 & 3 run // 3s - jobs 2 & 3 run, job 1 hits the limit and waits - {"wait mode", 2, WaitMode, 8, false, + { + "wait mode", 2, WaitMode, 8, false, func() { semaphore <- true time.Sleep(100 * time.Millisecond) @@ -1424,7 +1415,8 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) { }, // Same as above - this confirms the same behavior when jobs are removed rather than the scheduler being stopped - {"wait mode - with job removal", 2, WaitMode, 8, true, + { + "wait mode - with job removal", 2, WaitMode, 8, true, func() { semaphore <- true time.Sleep(100 * time.Millisecond) @@ -1434,7 +1426,6 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) { for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - s := NewScheduler(time.UTC) s.SetMaxConcurrentJobs(tc.maxConcurrentJobs, tc.mode) @@ -1510,7 +1501,6 @@ func TestScheduler_TagsUnique(t *testing.T) { _, err = s.Every("1s").Tag(bar).Do(func() {}) assert.EqualError(t, err, ErrTagsUnique(bar).Error()) - } func TestScheduler_MultipleTagsChained(t *testing.T) { @@ -1671,21 +1661,29 @@ func TestScheduler_Update(t *testing.T) { func TestScheduler_RunByTag(t *testing.T) { var ( - s = NewScheduler(time.Local) - count = 0 - wg sync.WaitGroup + s = NewScheduler(time.Local) + wg sync.WaitGroup + counterMutex sync.RWMutex + count = 0 ) s.Every(1).Day().StartAt(time.Now().Add(time.Hour)).Tag("tag").Do(func() { + counterMutex.Lock() + defer counterMutex.Unlock() count++ wg.Done() }) - wg.Add(1) + wg.Add(3) s.StartAsync() + + assert.NoError(t, s.RunByTag("tag")) + assert.NoError(t, s.RunByTag("tag")) assert.NoError(t, s.RunByTag("tag")) wg.Wait() - assert.Equal(t, 1, count) + counterMutex.RLock() + defer counterMutex.RUnlock() + assert.Equal(t, 3, count) assert.Error(t, s.RunByTag("wrong-tag")) } @@ -1837,7 +1835,6 @@ func TestScheduler_WaitForSchedules(t *testing.T) { } func TestScheduler_LenWeekDays(t *testing.T) { - testCases := []struct { description string weekDays []time.Weekday @@ -1860,7 +1857,6 @@ func TestScheduler_LenWeekDays(t *testing.T) { assert.Equal(t, len(j.scheduledWeekdays), tc.finalLen) }) } - } func TestScheduler_CallNextWeekDay(t *testing.T) { @@ -1869,7 +1865,7 @@ func TestScheduler_CallNextWeekDay(t *testing.T) { } const wantTimeUntilNextRun = time.Hour * 24 * 2 - var lastRun = januaryFirst2020At(0, 0, 0) + lastRun := januaryFirst2020At(0, 0, 0) testCases := []struct { description string @@ -1881,7 +1877,6 @@ func TestScheduler_CallNextWeekDay(t *testing.T) { for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - s := NewScheduler(time.UTC) s.Every(1) @@ -1895,10 +1890,8 @@ func TestScheduler_CallNextWeekDay(t *testing.T) { got := s.durationToNextRun(lastRun, job).duration assert.Equal(t, wantTimeUntilNextRun, got) - }) } - } func TestScheduler_Midday(t *testing.T) { @@ -1986,7 +1979,6 @@ func TestScheduler_CheckNextWeekDay(t *testing.T) { job.lastRun = secondLastRun gotSecond := s.durationToNextRun(secondLastRun, job).duration assert.Equal(t, wantTimeUntilNextSecondRun, gotSecond) - }) } @@ -2043,18 +2035,14 @@ func TestScheduler_CheckEveryWeekHigherThanOne(t *testing.T) { } else if tc.caseTest == 3 { assert.Equal(t, wantTimeUntilNextRunTwoWeeksLessOneDay, got) } - } job.runCount++ } - }) } - } func TestScheduler_StartImmediately(t *testing.T) { - testCases := []struct { description string scheduler *Scheduler @@ -2110,7 +2098,6 @@ func TestScheduler_CheckCalculateDaysOfMonth(t *testing.T) { func TestScheduler_CheckSetBehaviourBeforeJobCreated(t *testing.T) { s := NewScheduler(time.UTC) s.Month(1, 2).Every(1).Do(func() {}) - } func TestScheduler_MonthLastDayAtTime(t *testing.T) { @@ -2124,7 +2111,6 @@ func TestScheduler_MonthLastDayAtTime(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - s := NewScheduler(time.UTC) got := s.durationToNextRun(tc.job.LastRun(), tc.job).duration assert.Equalf(t, tc.wantTimeUntilNextRun, got, fmt.Sprintf("expected %s / got %s", tc.wantTimeUntilNextRun.String(), got.String()))