Skip to content

Commit

Permalink
Initialise New Scheduler (#2050)
Browse files Browse the repository at this point in the history
* Make legacyScheduler generic

* linting

* linting

* wip

* wip

* wip

* wip

* wip

* remove nats from e2e

* remove more nats config

* fixed submit test

* linting

* go mod tidy

* increase sleep to 60

* wip

* wip

* linting

* unused context

* remove extra queued-jobs-iterator

* wip

* wip

* linting passes

* documentation

* unit tests

* doc

* more tests

* linting

* more doc, revert files modified in error

* more linting

* more linting

* fix tests

* merge master

* doc improvements

* wip

* Update internal/scheduler/scheduler.go

Co-authored-by: Albin Severinson <albin@severinson.org>

* fixes following review

* wip

* renamed publisher_test

* wip

* linting

* fixes after merge

* Update internal/scheduler/scheduling_algo.go

Co-authored-by: Clif Houck <me@clifhouck.com>

* code review comments

* mocks

* wip

* wip

* wip

* doc for job repo

* doc for job repo

* addressed some warnings

* restore nodes table

* filter out queues with no jobs to schedule

* wip

* wip

* remived nats from main makefiles

* removed extra config

* renamed file

* code review comments

* linting

* linting

* more linting

* more linting

* another nats reference

* another nats reference

* linting

* fix tests

* fix tests

* fix tests

* job repo test done

* wip

* merged master

* merged master

* fix changes following merge

* restore package

* fixed proto package names

* linting

* linting

* fix null pointer in test

* linting

* linting

* doc

* unit tests

* moved mock generation into its owen package

* moved mock generation into its owen package

* wip

* go lint

* linting

* fixed package names

* wip

* linting

* linting

* linting

* move master check into a separate function

* don't modify restapi

* doc

* Update internal/armada/configuration/types.go

Co-authored-by: Albin Severinson <albin@severinson.org>

* Update internal/scheduler/database/job_repository.go

Co-authored-by: Albin Severinson <albin@severinson.org>

* Update internal/scheduler/reports.go

Co-authored-by: Albin Severinson <albin@severinson.org>

* wip

* compilation fixes

* updated proto

* add job ids

* wip

* tests for api

* linting

* add grpc mock

* fix tests

* fix tests

* fix tests

* formatting

* formatting

* doc

* more doc

* wip

* remove active_job_ids from streaming lease call

* wip

* nodes should be pointer not struct

* wip

* added job run lookup

* wip

* linting

* linting

* linting

* fix tests

* fix tests

* executor tests

* linting

* linting

* doc

* fix tests

* wip

* wip

* wip

* wip

* linting

* import order

* custom pulsar marshallers

* config

* more changes

* commands

* linting

* added tests for hooks

* lots of compilation fixes

* go mod tidy

* fix failing test

* test for queue_repository

* remove uneeded file

* linting

* flush db always

* flush db always

* review comments

* linting

Co-authored-by: Chris Martin <chris@cmartinit.co.uk>
Co-authored-by: Albin Severinson <albin@severinson.org>
Co-authored-by: Clif Houck <me@clifhouck.com>
  • Loading branch information
4 people committed Jan 25, 2023
1 parent db473c0 commit 6b27ffe
Show file tree
Hide file tree
Showing 45 changed files with 884 additions and 313 deletions.
24 changes: 24 additions & 0 deletions cmd/scheduler/cmd/main.go
@@ -0,0 +1,24 @@
package cmd

import (
"github.com/spf13/cobra"

"github.com/armadaproject/armada/internal/scheduler"
)

func runCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "run",
Short: "Runs the scheduler",
RunE: runScheduler,
}
return cmd
}

func runScheduler(_ *cobra.Command, _ []string) error {
config, err := loadConfig()
if err != nil {
return err
}
return scheduler.Run(config)
}
51 changes: 51 additions & 0 deletions cmd/scheduler/cmd/migrate_database.go
@@ -0,0 +1,51 @@
package cmd

import (
"context"
"time"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/armadaproject/armada/internal/common/database"
schedulerdb "github.com/armadaproject/armada/internal/scheduler/database"
)

func migrateDbCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "migrateDatabase",
Short: "migrates the scheduler database to the latest version",
RunE: migrateDatabase,
}
cmd.PersistentFlags().Duration(
"timeout",
5*time.Minute,
"Duration after which the migration will fail if it has not been created")

return cmd
}

func migrateDatabase(_ *cobra.Command, _ []string) error {
timeout := viper.GetDuration("timeout")
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
config, err := loadConfig()
if err != nil {
return err
}
start := time.Now()
log.Info("Beginning scheduler database migration")
db, err := database.OpenPgxConn(config.Postgres)
if err != nil {
return errors.WithMessagef(err, "Failed to connect to database")
}
err = schedulerdb.Migrate(ctx, db)
if err != nil {
return errors.WithMessagef(err, "Failed to migrate scheduler database")
}
taken := time.Since(start)
log.Infof("Scheduler database migrated in %s", taken)
return nil
}
48 changes: 48 additions & 0 deletions cmd/scheduler/cmd/root.go
@@ -0,0 +1,48 @@
package cmd

import (
"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/armadaproject/armada/internal/common"
commonconfig "github.com/armadaproject/armada/internal/common/config"
"github.com/armadaproject/armada/internal/scheduler"
)

const (
CustomConfigLocation string = "config"
)

func RootCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "scheduler",
SilenceUsage: true,
Short: "The main armada scheduler",
}

cmd.PersistentFlags().StringSlice(
"armadaUrl",
[]string{},
"Fully qualified path to application configuration file (for multiple config files repeat this arg or separate paths with commas)")

cmd.AddCommand(
runCmd(),
migrateDbCmd(),
)

return cmd
}

func loadConfig() (scheduler.Configuration, error) {
var config scheduler.Configuration
userSpecifiedConfigs := viper.GetStringSlice(CustomConfigLocation)

common.LoadConfig(&config, "./config/scheduler", userSpecifiedConfigs)

// TODO: once we're happy with this we can move it to common app startup
err := commonconfig.Validate(config)
if err != nil {
commonconfig.LogValidationErrors(err)
}
return config, err
}
59 changes: 2 additions & 57 deletions cmd/scheduler/main.go
@@ -1,67 +1,12 @@
package main

import (
"context"
"os"
"time"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"github.com/spf13/viper"

"github.com/armadaproject/armada/cmd/scheduler/cmd"
"github.com/armadaproject/armada/internal/common"
"github.com/armadaproject/armada/internal/common/database"
"github.com/armadaproject/armada/internal/scheduler"
schedulerdb "github.com/armadaproject/armada/internal/scheduler/database"
)

const (
CustomConfigLocation string = "config"
MigrateDatabase string = "migrateDatabase"
)

func init() {
pflag.StringSlice(
CustomConfigLocation,
[]string{},
"Fully qualified path to application configuration file (for multiple config files repeat this arg or separate paths with commas)",
)
pflag.Bool(MigrateDatabase, false, "Migrate database instead of running scheduler")
pflag.Parse()
}

func main() {
common.ConfigureLogging()
common.BindCommandlineArguments()

var config scheduler.Configuration
userSpecifiedConfigs := viper.GetStringSlice(CustomConfigLocation)

common.LoadConfig(&config, "./config/scheduler", userSpecifiedConfigs)

if viper.GetBool(MigrateDatabase) {
migrateDatabase(&config)
} else {
if err := scheduler.Run(&config); err != nil {
log.Errorf("failed to run scheduler: %s", err)
os.Exit(1)
}
}
}

func migrateDatabase(config *scheduler.Configuration) {
start := time.Now()
log.Info("Beginning scheduler database migration")
db, err := database.OpenPgxPool(config.Postgres)
if err != nil {
panic(errors.WithMessage(err, "Failed to connect to database"))
}
err = schedulerdb.Migrate(context.Background(), db)
if err != nil {
panic(errors.WithMessage(err, "Failed to migrate scheduler database"))
}
taken := time.Now().Sub(start)
log.Infof("Scheduler database migrated in %dms", taken.Milliseconds())
os.Exit(0)
_ = cmd.RootCmd().Execute()
}
103 changes: 86 additions & 17 deletions config/scheduler/config.yaml
@@ -1,25 +1,94 @@
cyclePeriod: 10s
executorTimeout: 1h
databaseFetchSize: 1000
pulsarSendTimeout: 5s
pulsar:
URL: "pulsar://pulsar:6650"
jobsetEventsTopic: "events"
maxConnectionsPerBroker: 1
compressionType: zlib
compressionLevel: faster
postgres:
maxOpenConns: 20
maxIdleConns: 5
connMaxLifetime: 30m
connection:
host: localhost
host: postgres
port: 5432
user: postgres
password: psw
dbname: postgres
sslmode: disable

metrics:
port: 9003

pulsar:
URL: "pulsar://localhost:6650"
jobsetEventsTopic: "events"
receiveTimeout: 5s
backoffTime: 1s

subscriptionName: "scheduler"
batchSize: 10000
batchDuration: 500ms
leader:
mode: standalone
grpc:
#port: 50052
keepaliveParams:
maxConnectionIdle: 5m
time: 120s
timeout: 20s
keepaliveEnforcementPolicy:
minTime: 10s
permitWithoutStream: true
scheduling:
preemption:
enabled: true
priorityClasses:
armada-default:
priority: 1000
maximalResourceFractionPerQueue:
memory: 0.99
cpu: 0.99
armada-preemptible:
priority: 900
maximalResourceFractionPerQueue:
memory: 0.99
cpu: 0.99
defaultPriorityClass: armada-default
queueLeaseBatchSize: 1000
minimumResourceToSchedule:
memory: 1000000 # 1Mb
cpu: 0.1
maximalResourceFractionToSchedulePerQueue:
memory: 1.0
cpu: 1.0
maximalResourceFractionPerQueue:
memory: 1.0
cpu: 1.0
maximalClusterFractionToSchedule:
memory: 1.0
cpu: 1.0
maximumJobsToSchedule: 5000
maxQueueReportsToStore: 1000
MaxJobReportsToStore: 10000
defaultJobLimits:
cpu: 1
memory: 1Gi
ephemeral-storage: 8Gi
defaultJobTolerations:
- key: "armadaproject.io/armada"
operator: "Equal"
value: "true"
effect: "NoSchedule"
defaultJobTolerationsByPriorityClass:
"":
- key: "armadaproject.io/pc-armada-default"
operator: "Equal"
value: "true"
effect: "NoSchedule"
armada-default:
- key: "armadaproject.io/pc-armada-default"
operator: "Equal"
value: "true"
effect: "NoSchedule"
armada-preemptible:
- key: "armadaproject.io/pc-armada-preemptible"
operator: "Equal"
value: "true"
effect: "NoSchedule"
maxRetries: 5
resourceScarcity:
cpu: 1.0
indexedResources:
- cpu
- memory
gangIdAnnotation: armadaproject.io/gangId
gangCardinalityAnnotation: armadaproject.io/gangCardinality

20 changes: 0 additions & 20 deletions e2e/pulsartest_client/app_test.go
Expand Up @@ -33,26 +33,6 @@ func TestNew(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, app)

// Invalid compression type
pc = cfg.PulsarConfig{
URL: "pulsar://localhost:6650",
CompressionType: "nocompression",
JobsetEventsTopic: "events",
}
app, err = pt.New(pt.Params{Pulsar: pc}, "submit")
assert.Error(t, err)
assert.Nil(t, app)

// Invalid compression level
pc = cfg.PulsarConfig{
URL: "pulsar://localhost:6650",
CompressionLevel: "veryCompressed",
JobsetEventsTopic: "events",
}
app, err = pt.New(pt.Params{Pulsar: pc}, "submit")
assert.Error(t, err)
assert.Nil(t, app)

// Invalid command type
pc = cfg.PulsarConfig{
URL: "pulsar://localhost:6650",
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Expand Up @@ -87,6 +87,7 @@ require (
github.com/go-openapi/strfmt v0.21.3
github.com/go-openapi/swag v0.22.3
github.com/go-openapi/validate v0.21.0
github.com/go-playground/validator/v10 v10.11.1
github.com/golang/mock v1.6.0
github.com/goreleaser/goreleaser v1.11.5
github.com/jessevdk/go-flags v1.5.0
Expand Down Expand Up @@ -199,6 +200,8 @@ require (
github.com/go-logr/logr v0.4.0 // indirect
github.com/go-openapi/inflect v0.19.0 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-telegram-bot-api/telegram-bot-api v4.6.4+incompatible // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
Expand Down Expand Up @@ -256,6 +259,7 @@ require (
github.com/klauspost/pgzip v1.2.5 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/linkedin/goavro/v2 v2.9.8 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
Expand Down

0 comments on commit 6b27ffe

Please sign in to comment.