From 04b1a7b6448d2b79580a10a4b35a86fd49302210 Mon Sep 17 00:00:00 2001 From: Ramiro Date: Mon, 23 Jan 2023 23:20:34 -0500 Subject: [PATCH 01/12] feat(import): Switching graphql Api to Rest Api for auto imports --- internal/jobs/repo/import.go | 117 ++++++++++++++++++++++++++++---- internal/warehouse/warehouse.go | 4 ++ 2 files changed, 109 insertions(+), 12 deletions(-) diff --git a/internal/jobs/repo/import.go b/internal/jobs/repo/import.go index 1842aeff5..e3902c9a2 100644 --- a/internal/jobs/repo/import.go +++ b/internal/jobs/repo/import.go @@ -5,6 +5,10 @@ import ( "database/sql" "encoding/json" "fmt" + "os" + "time" + + "github.com/google/go-github/v41/github" "github.com/google/uuid" "github.com/jackc/pgtype" "github.com/jackc/pgx/v4" @@ -13,7 +17,7 @@ import ( "github.com/mergestat/mergestat/internal/db" "github.com/mergestat/sqlq" "github.com/pkg/errors" - "time" + "golang.org/x/oauth2" ) type githubRepository struct { @@ -57,7 +61,7 @@ func AutoImport(pool *pgxpool.Pool, mergestat *sqlx.DB) sqlq.HandlerFunc { // if the execution fails for some reason, only that import is marked as failed // the job still continues executing. var importError error - if importError = handleImport(ctx, queries.WithTx(tx), mergestat, imp); importError != nil { + if importError = handleImport(ctx, pool, queries.WithTx(tx), mergestat, imp); importError != nil { logger.Warnf("import(%s) failed: %v", imp.ID, importError.Error()) _ = tx.Rollback(ctx) } else { @@ -86,9 +90,25 @@ func AutoImport(pool *pgxpool.Pool, mergestat *sqlx.DB) sqlq.HandlerFunc { } // handleImport handles execution of a given import configuration -func handleImport(ctx context.Context, qry *db.Queries, mergestat *sqlx.DB, imp db.ListRepoImportsDueForImportRow) (err error) { - var query, repoOwner string +func handleImport(ctx context.Context, pool *pgxpool.Pool, qry *db.Queries, mergestat *sqlx.DB, imp db.ListRepoImportsDueForImportRow) (err error) { + var repoOwner, ghToken string var removeDeletedRepos, defaultSyncTypes = true, make([]string, 0) //nolint:ineffassign + var repos []*githubRepository + + if ghToken, err = fetchGitHubTokenFromDB(ctx, pool); err != nil { + return err + } + + ts := oauth2.StaticTokenSource( + &oauth2.Token{AccessToken: ghToken}, + ) + + tc := oauth2.NewClient(ctx, ts) + if len(ghToken) <= 0 { + tc = nil + } + + client := github.NewClient(tc) // determine the kind of import (Org vs. User) and parse the settings switch imp.Type { @@ -105,7 +125,10 @@ func handleImport(ctx context.Context, qry *db.Queries, mergestat *sqlx.DB, imp repoOwner = settings.Org removeDeletedRepos = settings.RemoveDeletedRepos defaultSyncTypes = settings.DefaultSyncTypes - query = `SELECT login, name, topics FROM github_org_repos(?)` + + if repos, err = fetchGitHubReposByOrg(ctx, client, repoOwner, ghToken); err != nil { + return err + } case "GITHUB_USER": var settings struct { User string `json:"user"` @@ -119,17 +142,14 @@ func handleImport(ctx context.Context, qry *db.Queries, mergestat *sqlx.DB, imp repoOwner = settings.User removeDeletedRepos = settings.RemoveDeletedRepos defaultSyncTypes = settings.DefaultSyncTypes - query = `SELECT login, name, topics FROM github_user_repos(?)` + + if repos, err = fetchGitHubReposByUser(ctx, client, repoOwner, ghToken); err != nil { + return err + } default: return errors.Errorf("unknown import type: %s", imp.Type) } - // execute the mergestat query - this will scan a list of repos to be imported - var repos []*githubRepository - if err = mergestat.SelectContext(ctx, &repos, query, repoOwner); err != nil { - return errors.Wrapf(err, "failed to fetch repositories") - } - // remove any deleted repositories if removeDeletedRepos { var params = db.DeleteRemovedReposParams{Column1: imp.ID, Column2: repoUrls(repos)} @@ -218,3 +238,76 @@ func difference(existing, new []string) []string { return diff } + +// TODO(ramirocastillo):Move this fn to the helper package +// fetchGitHubTokenFromDB is a temporary helper function for retrieving the most recently added GITHUB_PAT service credential from the DB. +// It's "temporary" because the way credentials are managed and retrieved will likely need to be much more robust in the future. +func fetchGitHubTokenFromDB(ctx context.Context, pool *pgxpool.Pool) (string, error) { + encryptionSecret := os.Getenv("ENCRYPTION_SECRET") + row := pool.QueryRow(context.TODO(), "SELECT pgp_sym_decrypt(credentials, $1) FROM mergestat.service_auth_credentials WHERE type = 'GITHUB_PAT' ORDER BY created_at DESC LIMIT 1", encryptionSecret) + var credentials []byte + if err := row.Scan(&credentials); err != nil && !errors.Is(err, pgx.ErrNoRows) { + return "", fmt.Errorf("could not retrieve GitHub PAT from database: %v", err) + } + + if credentials == nil { + // default to the `GITHUB_TOKEN` env var if nothing in the DB + credentials = []byte(os.Getenv("GITHUB_TOKEN")) + } + + return string(credentials), nil +} + +// fetchGitHubReposByOrg fetch all repos from a given organization, is a github token is not +// provided will search only public repos +func fetchGitHubReposByOrg(ctx context.Context, client *github.Client, repoOwner string, ghToken string) ([]*githubRepository, error) { + var repositories []*githubRepository + var repos []*github.Repository + var err error + var typeOfRepo string + + if len(ghToken) <= 0 { + typeOfRepo = "public" + } + + if repos, _, err = client.Repositories.ListByOrg(ctx, repoOwner, &github.RepositoryListByOrgOptions{Type: typeOfRepo}); err != nil { + return repositories, err + } + + for _, repo := range repos { + var topics []byte + if topics, err = json.Marshal(repo.Topics); err != nil { + return repositories, err + } + + repositories = append(repositories, &githubRepository{Name: *repo.Name, Owner: string(*repo.Owner.OrganizationsURL), Topics: string(topics)}) + } + return repositories, err +} + +// fetchGitHubReposByUser fetch all repos from a given user, is a github token is not +// provided will search only public repos +func fetchGitHubReposByUser(ctx context.Context, client *github.Client, repoOwner string, ghToken string) ([]*githubRepository, error) { + var repositories []*githubRepository + var repos []*github.Repository + var err error + var typeOfRepo string + + if len(ghToken) <= 0 { + typeOfRepo = "public" + } + + if repos, _, err = client.Repositories.List(ctx, repoOwner, &github.RepositoryListOptions{Type: typeOfRepo}); err != nil { + return repositories, err + } + + for _, repo := range repos { + var topics []byte + if topics, err = json.Marshal(repo.Topics); err != nil { + return repositories, err + } + + repositories = append(repositories, &githubRepository{Name: *repo.Name, Owner: string(*repo.Owner.OrganizationsURL), Topics: string(topics)}) + } + return repositories, err +} diff --git a/internal/warehouse/warehouse.go b/internal/warehouse/warehouse.go index c2d5c60dc..2b9ee0e14 100644 --- a/internal/warehouse/warehouse.go +++ b/internal/warehouse/warehouse.go @@ -32,6 +32,10 @@ func New(ctx context.Context, db *db.Queries, pgpool *pgxpool.Pool, logger *zero ) tc := oauth2.NewClient(ctx, ts) + if len(ghToken) <= 0 { + tc = nil + } + client := github.NewClient(tc) pool := pool.Init(pgpool) queries := queries.NewQuerier(db) From 753fb2ac14ceb010ed91f83b1bc003d20102edde Mon Sep 17 00:00:00 2001 From: Ramiro Date: Tue, 24 Jan 2023 10:06:36 -0500 Subject: [PATCH 02/12] chore(import): Fetching all repos possible --- internal/jobs/repo/import.go | 66 ++++++++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 18 deletions(-) diff --git a/internal/jobs/repo/import.go b/internal/jobs/repo/import.go index e3902c9a2..cce9a6461 100644 --- a/internal/jobs/repo/import.go +++ b/internal/jobs/repo/import.go @@ -258,56 +258,86 @@ func fetchGitHubTokenFromDB(ctx context.Context, pool *pgxpool.Pool) (string, er return string(credentials), nil } -// fetchGitHubReposByOrg fetch all repos from a given organization, is a github token is not +// fetchGitHubReposByOrg fetch all repos from a given organization, if a github token is not // provided will search only public repos func fetchGitHubReposByOrg(ctx context.Context, client *github.Client, repoOwner string, ghToken string) ([]*githubRepository, error) { var repositories []*githubRepository var repos []*github.Repository var err error var typeOfRepo string + var resp *github.Response if len(ghToken) <= 0 { typeOfRepo = "public" } + opt := &github.RepositoryListByOrgOptions{Type: typeOfRepo} - if repos, _, err = client.Repositories.ListByOrg(ctx, repoOwner, &github.RepositoryListByOrgOptions{Type: typeOfRepo}); err != nil { - return repositories, err - } - - for _, repo := range repos { - var topics []byte - if topics, err = json.Marshal(repo.Topics); err != nil { + for { + if repos, resp, err = client.Repositories.ListByOrg(ctx, repoOwner, opt); err != nil { return repositories, err } - repositories = append(repositories, &githubRepository{Name: *repo.Name, Owner: string(*repo.Owner.OrganizationsURL), Topics: string(topics)}) + for _, repo := range repos { + var topics []byte + if topics, err = json.Marshal(repo.Topics); err != nil { + return repositories, err + } + + repositories = append(repositories, &githubRepository{Name: *repo.Name, Owner: string(*repo.Owner.OrganizationsURL), Topics: string(topics)}) + } + if resp == nil { + break + } + + if resp.NextPage == 0 { + break + } + opt.Page = resp.NextPage + continue + } return repositories, err } -// fetchGitHubReposByUser fetch all repos from a given user, is a github token is not +// fetchGitHubReposByUser fetch all repos from a given user, if a github token is not // provided will search only public repos func fetchGitHubReposByUser(ctx context.Context, client *github.Client, repoOwner string, ghToken string) ([]*githubRepository, error) { var repositories []*githubRepository var repos []*github.Repository var err error var typeOfRepo string + var resp *github.Response if len(ghToken) <= 0 { typeOfRepo = "public" } + opt := &github.RepositoryListOptions{Type: typeOfRepo} - if repos, _, err = client.Repositories.List(ctx, repoOwner, &github.RepositoryListOptions{Type: typeOfRepo}); err != nil { - return repositories, err - } - - for _, repo := range repos { - var topics []byte - if topics, err = json.Marshal(repo.Topics); err != nil { + for { + if repos, resp, err = client.Repositories.List(ctx, repoOwner, opt); err != nil { return repositories, err } - repositories = append(repositories, &githubRepository{Name: *repo.Name, Owner: string(*repo.Owner.OrganizationsURL), Topics: string(topics)}) + for _, repo := range repos { + var topics []byte + if topics, err = json.Marshal(repo.Topics); err != nil { + return repositories, err + } + + repositories = append(repositories, &githubRepository{Name: *repo.Name, Owner: string(*repo.Owner.OrganizationsURL), Topics: string(topics)}) + } + + if resp == nil { + break + } + + if resp.NextPage == 0 { + break + } + opt.Page = resp.NextPage + continue + } + return repositories, err } From 1b92942c57cd7493e33f129d4228e5b514320d33 Mon Sep 17 00:00:00 2001 From: Ramiro Date: Wed, 25 Jan 2023 09:35:58 -0500 Subject: [PATCH 03/12] chore(importer): Adding support for topics --- cmd/worker/main.go | 17 +++++++++-------- internal/jobs/repo/import.go | 25 ++++++++++++++++--------- 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 227025795..a8f13c3fb 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -5,13 +5,6 @@ import ( "database/sql" "errors" "fmt" - "github.com/mergestat/mergestat/internal/cron" - "github.com/mergestat/mergestat/internal/jobs/repo" - "github.com/mergestat/mergestat/internal/syncer" - "github.com/mergestat/mergestat/internal/timeout" - "github.com/mergestat/sqlq" - "github.com/mergestat/sqlq/runtime/embed" - "github.com/mergestat/sqlq/schema" "net/http" _ "net/http/pprof" "net/url" @@ -22,6 +15,14 @@ import ( "syscall" "time" + "github.com/mergestat/mergestat/internal/cron" + "github.com/mergestat/mergestat/internal/jobs/repo" + "github.com/mergestat/mergestat/internal/syncer" + "github.com/mergestat/mergestat/internal/timeout" + "github.com/mergestat/sqlq" + "github.com/mergestat/sqlq/runtime/embed" + "github.com/mergestat/sqlq/schema" + "github.com/go-git/go-git/v5" "github.com/golang-migrate/migrate/v4" _ "github.com/golang-migrate/migrate/v4/database/postgres" @@ -247,7 +248,7 @@ func main() { var worker, _ = embed.NewWorker(upstream, embed.WorkerConfig{Queues: queues}) // register job handlers for types implemented by this worker - _ = worker.Register("repos/auto-import", repo.AutoImport(pool, db)) + _ = worker.Register("repos/auto-import", repo.AutoImport(&logger, pool, db)) // TODO all of the following "params" should be configurable // either via the database/app or possibly with env vars diff --git a/internal/jobs/repo/import.go b/internal/jobs/repo/import.go index cce9a6461..0f6117f76 100644 --- a/internal/jobs/repo/import.go +++ b/internal/jobs/repo/import.go @@ -17,6 +17,7 @@ import ( "github.com/mergestat/mergestat/internal/db" "github.com/mergestat/sqlq" "github.com/pkg/errors" + "github.com/rs/zerolog" "golang.org/x/oauth2" ) @@ -32,7 +33,7 @@ func (repo *githubRepository) URL() string { // AutoImport implements the githubRepository auto-import job to automatically // sync githubRepository from user- or org- accounts. -func AutoImport(pool *pgxpool.Pool, mergestat *sqlx.DB) sqlq.HandlerFunc { +func AutoImport(l *zerolog.Logger, pool *pgxpool.Pool, mergestat *sqlx.DB) sqlq.HandlerFunc { var queries = db.New(pool) return func(ctx context.Context, job *sqlq.Job) (err error) { @@ -61,7 +62,7 @@ func AutoImport(pool *pgxpool.Pool, mergestat *sqlx.DB) sqlq.HandlerFunc { // if the execution fails for some reason, only that import is marked as failed // the job still continues executing. var importError error - if importError = handleImport(ctx, pool, queries.WithTx(tx), mergestat, imp); importError != nil { + if importError = handleImport(ctx, l, pool, queries.WithTx(tx), mergestat, imp); importError != nil { logger.Warnf("import(%s) failed: %v", imp.ID, importError.Error()) _ = tx.Rollback(ctx) } else { @@ -90,7 +91,7 @@ func AutoImport(pool *pgxpool.Pool, mergestat *sqlx.DB) sqlq.HandlerFunc { } // handleImport handles execution of a given import configuration -func handleImport(ctx context.Context, pool *pgxpool.Pool, qry *db.Queries, mergestat *sqlx.DB, imp db.ListRepoImportsDueForImportRow) (err error) { +func handleImport(ctx context.Context, logger *zerolog.Logger, pool *pgxpool.Pool, qry *db.Queries, mergestat *sqlx.DB, imp db.ListRepoImportsDueForImportRow) (err error) { var repoOwner, ghToken string var removeDeletedRepos, defaultSyncTypes = true, make([]string, 0) //nolint:ineffassign var repos []*githubRepository @@ -168,6 +169,8 @@ func handleImport(ctx context.Context, pool *pgxpool.Pool, qry *db.Queries, merg // upsert all fetched repositories for _, repo := range repos { + logger.Debug().Msgf(string(repo.Topics)) + var opts = db.UpsertRepoParams{ Repo: fmt.Sprintf("https://github.com/%s/%s", repoOwner, repo.Name), IsGithub: sql.NullBool{Bool: true, Valid: true}, @@ -278,12 +281,14 @@ func fetchGitHubReposByOrg(ctx context.Context, client *github.Client, repoOwner } for _, repo := range repos { - var topics []byte - if topics, err = json.Marshal(repo.Topics); err != nil { + + jsonStr, _ := json.Marshal(&repo.Topics) + if err != nil { return repositories, err } - repositories = append(repositories, &githubRepository{Name: *repo.Name, Owner: string(*repo.Owner.OrganizationsURL), Topics: string(topics)}) + repositories = append(repositories, &githubRepository{Name: *repo.Name, Owner: string(*repo.Owner.OrganizationsURL), + Topics: string(jsonStr)}) } if resp == nil { break @@ -319,12 +324,14 @@ func fetchGitHubReposByUser(ctx context.Context, client *github.Client, repoOwne } for _, repo := range repos { - var topics []byte - if topics, err = json.Marshal(repo.Topics); err != nil { + + jsonStr, err := json.Marshal(&repo.Topics) + if err != nil { return repositories, err } - repositories = append(repositories, &githubRepository{Name: *repo.Name, Owner: string(*repo.Owner.OrganizationsURL), Topics: string(topics)}) + repositories = append(repositories, &githubRepository{Name: *repo.Name, Owner: string(*repo.Owner.OrganizationsURL), + Topics: string(jsonStr)}) } if resp == nil { From 60c2e051d983004810aa8af3327869d1be152233 Mon Sep 17 00:00:00 2001 From: Ramiro Date: Wed, 25 Jan 2023 11:55:38 -0500 Subject: [PATCH 04/12] chore(import): Adding our logging --- internal/jobs/repo/import.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/internal/jobs/repo/import.go b/internal/jobs/repo/import.go index 0f6117f76..1f4d64300 100644 --- a/internal/jobs/repo/import.go +++ b/internal/jobs/repo/import.go @@ -37,7 +37,8 @@ func AutoImport(l *zerolog.Logger, pool *pgxpool.Pool, mergestat *sqlx.DB) sqlq. var queries = db.New(pool) return func(ctx context.Context, job *sqlq.Job) (err error) { - var logger = job.Logger() + // We are using our own logger instead of the sqlq logger + // var logger = job.Logger() // start sending periodic keep-alive pings! go job.SendKeepAlive(ctx, job.KeepAlive-(5*time.Second)) //nolint:errcheck @@ -45,13 +46,13 @@ func AutoImport(l *zerolog.Logger, pool *pgxpool.Pool, mergestat *sqlx.DB) sqlq. // fetch a list of all configured imports that are due now var imports []db.ListRepoImportsDueForImportRow if imports, err = queries.ListRepoImportsDueForImport(ctx); err != nil { - logger.Errorf("failed to list repo import job: %v", err) + l.Error().Msgf("failed to list repo import job: %v", err) return errors.Wrapf(sqlq.ErrSkipRetry, "failed to list repo import job: %v", err) } - logger.Infof("handling %d import(s)", len(imports)) + l.Info().Msgf("handling %d import(s)", len(imports)) for _, imp := range imports { - logger.Infof("executing import %s", imp.ID) + l.Info().Msgf("executing import %s", imp.ID) var tx pgx.Tx // each import is executed within its own transaction if tx, err = pool.Begin(ctx); err != nil { @@ -63,14 +64,14 @@ func AutoImport(l *zerolog.Logger, pool *pgxpool.Pool, mergestat *sqlx.DB) sqlq. // the job still continues executing. var importError error if importError = handleImport(ctx, l, pool, queries.WithTx(tx), mergestat, imp); importError != nil { - logger.Warnf("import(%s) failed: %v", imp.ID, importError.Error()) + l.Warn().Msgf("import(%s) failed: %v", imp.ID, importError.Error()) _ = tx.Rollback(ctx) } else { // if the import was successful, commit the changes if err = tx.Commit(ctx); err != nil { return errors.Wrapf(err, "failed to commit database transaction") } - logger.Infof("import(%s) was successful", imp.ID) + l.Info().Msgf("import(%s) was successful", imp.ID) } var importStatus = db.UpdateImportStatusParams{Status: "SUCCESS", ID: imp.ID} @@ -169,8 +170,6 @@ func handleImport(ctx context.Context, logger *zerolog.Logger, pool *pgxpool.Poo // upsert all fetched repositories for _, repo := range repos { - logger.Debug().Msgf(string(repo.Topics)) - var opts = db.UpsertRepoParams{ Repo: fmt.Sprintf("https://github.com/%s/%s", repoOwner, repo.Name), IsGithub: sql.NullBool{Bool: true, Valid: true}, @@ -282,7 +281,7 @@ func fetchGitHubReposByOrg(ctx context.Context, client *github.Client, repoOwner for _, repo := range repos { - jsonStr, _ := json.Marshal(&repo.Topics) + jsonStr, err := json.Marshal(&repo.Topics) if err != nil { return repositories, err } From ab578cc5ab9b9dec20eb18ffa44f86fc17dfedef Mon Sep 17 00:00:00 2001 From: Ramiro Date: Wed, 25 Jan 2023 14:06:21 -0500 Subject: [PATCH 05/12] chore(importer): Adding db logs and sqlc token query --- cmd/worker/main.go | 2 +- internal/db/models.go | 130 ++++++++++++++++++ internal/db/querier.go | 1 + internal/db/queries.sql | 3 + internal/db/queries.sql.go | 11 ++ internal/jobs/repo/import.go | 32 +++-- ...00014_add_schema_introspection_view.up.sql | 2 +- 7 files changed, 165 insertions(+), 16 deletions(-) diff --git a/cmd/worker/main.go b/cmd/worker/main.go index a8f13c3fb..a14c8938f 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -248,7 +248,7 @@ func main() { var worker, _ = embed.NewWorker(upstream, embed.WorkerConfig{Queues: queues}) // register job handlers for types implemented by this worker - _ = worker.Register("repos/auto-import", repo.AutoImport(&logger, pool, db)) + _ = worker.Register("repos/auto-import", repo.AutoImport(&logger, pool)) // TODO all of the following "params" should be configurable // either via the database/app or possibly with env vars diff --git a/internal/db/models.go b/internal/db/models.go index f32861bed..ba3ec6b4e 100644 --- a/internal/db/models.go +++ b/internal/db/models.go @@ -533,6 +533,73 @@ type GithubStargazer struct { MergestatSyncedAt time.Time } +type GitleaksRepoDetection struct { + // foreign key for public.repos.id + RepoID uuid.UUID + // description of the detection + Description interface{} + // detection start line + StartLine interface{} + // detection end line + EndLine interface{} + // detection start column + StartColumn interface{} + // detection end column + EndColumn interface{} + // detection match + Match interface{} + // detection secret + Secret interface{} + // detection file + File interface{} + // detected symlink file + SymlinkFile interface{} + // detection commit + Commit interface{} + // detection entropy + Entropy interface{} + // detection author + Author interface{} + // detection email + Email interface{} + // detection date + Date interface{} + // detection message + Message interface{} + // detection tags + Tags interface{} + // detection rule id + RuleID interface{} + // detection fingerprint + Fingerprint interface{} +} + +// scan output of a Gitleaks repo scan +type GitleaksRepoScan struct { + // foreign key for public.repos.id + RepoID uuid.UUID + // JSON output of a Gitleaks scan + Results pgtype.JSONB +} + +type GosecRepoDetection struct { + RepoID uuid.UUID + Severity interface{} + Confidence interface{} + CweID interface{} + RuleID interface{} + Details interface{} + File interface{} + Line interface{} + Column interface{} + Nosec interface{} +} + +type GosecRepoScan struct { + RepoID uuid.UUID + Issues pgtype.JSONB +} + type MergestatLatestRepoSync struct { ID int64 CreatedAt time.Time @@ -542,6 +609,13 @@ type MergestatLatestRepoSync struct { DoneAt sql.NullTime } +type MergestatQueryHistory struct { + ID uuid.UUID + RunAt sql.NullTime + RunBy string + Query string +} + // Table for "dynamic" repo imports - regularly loading from a GitHub org for example type MergestatRepoImport struct { ID uuid.UUID @@ -615,6 +689,31 @@ type MergestatRepoSyncTypeGroup struct { ConcurrentSyncs sql.NullInt32 } +// @name labels +type MergestatRepoSyncTypeLabel struct { + Label string + Description sql.NullString + Color string +} + +// @name labelAssociations +type MergestatRepoSyncTypeLabelAssociation struct { + Label string + RepoSyncType string +} + +type MergestatSchemaIntrospection struct { + Schema interface{} + TableName interface{} + TableType interface{} + ColumnName interface{} + OrdinalPosition interface{} + IsNullable interface{} + DataType interface{} + UdtName interface{} + ColumnDescription string +} + type MergestatServiceAuthCredential struct { ID uuid.UUID CreatedAt time.Time @@ -748,3 +847,34 @@ type TrivyRepoVulnerability struct { // timestamp when record was synced into the MergeStat database MergestatSyncedAt time.Time } + +type YelpDetectSecretsRepoDetection struct { + // foreign key for public.repos.id + RepoID uuid.UUID + // detection type + Type interface{} + // detection filename + Filename interface{} + // detection is verified + IsVerified interface{} + // detection line number + LineNumber interface{} + // detection secret + HashedSecret interface{} + // detection version + Version interface{} + // detection generated at + GeneratedAt interface{} + // detection filters used + FiltersUsed interface{} + // detection plugins used + PluginsUsed interface{} +} + +// scan output of a Yelp detect-secrets repo scan +type YelpDetectSecretsRepoScan struct { + // foreign key for public.repos.id + RepoID uuid.UUID + // JSON output of a Yelp detect-secrets scan + Results pgtype.JSONB +} diff --git a/internal/db/querier.go b/internal/db/querier.go index ebba7849f..1bbdbe99f 100644 --- a/internal/db/querier.go +++ b/internal/db/querier.go @@ -19,6 +19,7 @@ type Querier interface { // This allows us to make sure all repo syncs complete before we reschedule a new batch. // We have now also added a concept of type groups which allows us to apply this same logic but by each group type which is where the PARTITION BY clause comes into play EnqueueAllSyncs(ctx context.Context) error + FetchGitHubToken(ctx context.Context, pgpSymDecrypt string) (string, error) GetRepoIDsFromRepoImport(ctx context.Context, arg GetRepoIDsFromRepoImportParams) ([]uuid.UUID, error) GetRepoImportByID(ctx context.Context, id uuid.UUID) (MergestatRepoImport, error) GetRepoUrlFromImport(ctx context.Context, importid uuid.UUID) ([]string, error) diff --git a/internal/db/queries.sql b/internal/db/queries.sql index c19e87592..304df669d 100644 --- a/internal/db/queries.sql +++ b/internal/db/queries.sql @@ -86,6 +86,9 @@ INSERT INTO mergestat.repo_sync_logs (log_type, message, repo_sync_queue_id) VAL -- name: SetSyncJobStatus :exec SELECT mergestat.set_sync_job_status(@Status::TEXT, @ID::BIGINT); +-- name: FetchGitHubToken :one +SELECT pgp_sym_decrypt(credentials, $1) FROM mergestat.service_auth_credentials WHERE type = 'GITHUB_PAT' ORDER BY created_at DESC LIMIT 1; + -- We use a CTE here to retrieve all the repo_sync_jobs that were previously enqueued, to make sure that we *do not* re-enqueue anything new until the previously enqueued jobs are *completed*. -- This allows us to make sure all repo syncs complete before we reschedule a new batch. -- We have now also added a concept of type groups which allows us to apply this same logic but by each group type which is where the PARTITION BY clause comes into play diff --git a/internal/db/queries.sql.go b/internal/db/queries.sql.go index d7182789a..bc272d8e4 100644 --- a/internal/db/queries.sql.go +++ b/internal/db/queries.sql.go @@ -160,6 +160,17 @@ func (q *Queries) EnqueueAllSyncs(ctx context.Context) error { return err } +const fetchGitHubToken = `-- name: FetchGitHubToken :one +SELECT pgp_sym_decrypt(credentials, $1) FROM mergestat.service_auth_credentials WHERE type = 'GITHUB_PAT' ORDER BY created_at DESC LIMIT 1 +` + +func (q *Queries) FetchGitHubToken(ctx context.Context, pgpSymDecrypt string) (string, error) { + row := q.db.QueryRow(ctx, fetchGitHubToken, pgpSymDecrypt) + var pgp_sym_decrypt string + err := row.Scan(&pgp_sym_decrypt) + return pgp_sym_decrypt, err +} + const getRepoIDsFromRepoImport = `-- name: GetRepoIDsFromRepoImport :many SELECT id FROM public.repos WHERE repo_import_id = $1::uuid AND repo = ANY($2::TEXT[]) ` diff --git a/internal/jobs/repo/import.go b/internal/jobs/repo/import.go index 1f4d64300..826416a90 100644 --- a/internal/jobs/repo/import.go +++ b/internal/jobs/repo/import.go @@ -13,7 +13,6 @@ import ( "github.com/jackc/pgtype" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" - "github.com/jmoiron/sqlx" "github.com/mergestat/mergestat/internal/db" "github.com/mergestat/sqlq" "github.com/pkg/errors" @@ -33,12 +32,11 @@ func (repo *githubRepository) URL() string { // AutoImport implements the githubRepository auto-import job to automatically // sync githubRepository from user- or org- accounts. -func AutoImport(l *zerolog.Logger, pool *pgxpool.Pool, mergestat *sqlx.DB) sqlq.HandlerFunc { +func AutoImport(l *zerolog.Logger, pool *pgxpool.Pool) sqlq.HandlerFunc { var queries = db.New(pool) return func(ctx context.Context, job *sqlq.Job) (err error) { - // We are using our own logger instead of the sqlq logger - // var logger = job.Logger() + var logger = job.Logger() // start sending periodic keep-alive pings! go job.SendKeepAlive(ctx, job.KeepAlive-(5*time.Second)) //nolint:errcheck @@ -46,12 +44,15 @@ func AutoImport(l *zerolog.Logger, pool *pgxpool.Pool, mergestat *sqlx.DB) sqlq. // fetch a list of all configured imports that are due now var imports []db.ListRepoImportsDueForImportRow if imports, err = queries.ListRepoImportsDueForImport(ctx); err != nil { + logger.Errorf("failed to list repo import job: %v", err) l.Error().Msgf("failed to list repo import job: %v", err) return errors.Wrapf(sqlq.ErrSkipRetry, "failed to list repo import job: %v", err) } + logger.Infof("handling %d import(s)", len(imports)) l.Info().Msgf("handling %d import(s)", len(imports)) for _, imp := range imports { + logger.Infof("executing import %s", imp.ID) l.Info().Msgf("executing import %s", imp.ID) var tx pgx.Tx // each import is executed within its own transaction @@ -63,7 +64,8 @@ func AutoImport(l *zerolog.Logger, pool *pgxpool.Pool, mergestat *sqlx.DB) sqlq. // if the execution fails for some reason, only that import is marked as failed // the job still continues executing. var importError error - if importError = handleImport(ctx, l, pool, queries.WithTx(tx), mergestat, imp); importError != nil { + if importError = handleImport(ctx, queries.WithTx(tx), imp); importError != nil { + logger.Warnf("import(%s) failed: %v", imp.ID, importError.Error()) l.Warn().Msgf("import(%s) failed: %v", imp.ID, importError.Error()) _ = tx.Rollback(ctx) } else { @@ -71,6 +73,7 @@ func AutoImport(l *zerolog.Logger, pool *pgxpool.Pool, mergestat *sqlx.DB) sqlq. if err = tx.Commit(ctx); err != nil { return errors.Wrapf(err, "failed to commit database transaction") } + logger.Infof("import(%s) was successful", imp.ID) l.Info().Msgf("import(%s) was successful", imp.ID) } @@ -92,12 +95,12 @@ func AutoImport(l *zerolog.Logger, pool *pgxpool.Pool, mergestat *sqlx.DB) sqlq. } // handleImport handles execution of a given import configuration -func handleImport(ctx context.Context, logger *zerolog.Logger, pool *pgxpool.Pool, qry *db.Queries, mergestat *sqlx.DB, imp db.ListRepoImportsDueForImportRow) (err error) { +func handleImport(ctx context.Context, qry *db.Queries, imp db.ListRepoImportsDueForImportRow) (err error) { var repoOwner, ghToken string var removeDeletedRepos, defaultSyncTypes = true, make([]string, 0) //nolint:ineffassign var repos []*githubRepository - if ghToken, err = fetchGitHubTokenFromDB(ctx, pool); err != nil { + if ghToken, err = fetchGitHubTokenFromDB(ctx, qry); err != nil { return err } @@ -244,17 +247,18 @@ func difference(existing, new []string) []string { // TODO(ramirocastillo):Move this fn to the helper package // fetchGitHubTokenFromDB is a temporary helper function for retrieving the most recently added GITHUB_PAT service credential from the DB. // It's "temporary" because the way credentials are managed and retrieved will likely need to be much more robust in the future. -func fetchGitHubTokenFromDB(ctx context.Context, pool *pgxpool.Pool) (string, error) { +func fetchGitHubTokenFromDB(ctx context.Context, qry *db.Queries) (string, error) { + var credentials string + var err error encryptionSecret := os.Getenv("ENCRYPTION_SECRET") - row := pool.QueryRow(context.TODO(), "SELECT pgp_sym_decrypt(credentials, $1) FROM mergestat.service_auth_credentials WHERE type = 'GITHUB_PAT' ORDER BY created_at DESC LIMIT 1", encryptionSecret) - var credentials []byte - if err := row.Scan(&credentials); err != nil && !errors.Is(err, pgx.ErrNoRows) { - return "", fmt.Errorf("could not retrieve GitHub PAT from database: %v", err) + + if credentials, err = qry.FetchGitHubToken(ctx, encryptionSecret); err != nil { + return credentials, fmt.Errorf("could not retrieve GitHub PAT from database: %v", err) } - if credentials == nil { + if len(credentials) <= 0 { // default to the `GITHUB_TOKEN` env var if nothing in the DB - credentials = []byte(os.Getenv("GITHUB_TOKEN")) + credentials = os.Getenv("GITHUB_TOKEN") } return string(credentials), nil diff --git a/migrations/900000000000014_add_schema_introspection_view.up.sql b/migrations/900000000000014_add_schema_introspection_view.up.sql index fc63d98b3..108105d99 100644 --- a/migrations/900000000000014_add_schema_introspection_view.up.sql +++ b/migrations/900000000000014_add_schema_introspection_view.up.sql @@ -14,7 +14,7 @@ CREATE OR REPLACE VIEW mergestat.schema_introspection AS ( c.data_type, c.udt_name, pg_catalog.col_description(format('%s.%s', c.table_schema, c.table_name)::regclass::oid, c.ordinal_position) as column_description - FROM information_schema.tables as t + FROM information_schema.tables AS t INNER JOIN information_schema.columns AS c ON (t.table_name = c.table_name AND t.table_schema = c.table_schema) WHERE t.table_schema IN ('public', 'mergestat') ); From 00dba2584a37db22216ee4c9ce1448319eadf302 Mon Sep 17 00:00:00 2001 From: Ramiro Date: Wed, 25 Jan 2023 14:25:20 -0500 Subject: [PATCH 06/12] fix(sqlfluff): Omitting rules and generate test files to match interface --- internal/mocks/mock_pool.go | 2 +- internal/mocks/mock_queries.go | 15 +++++++++++++++ internal/mocks/sources.go | 2 +- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/internal/mocks/mock_pool.go b/internal/mocks/mock_pool.go index 8a4cc0737..92bc9b1f6 100644 --- a/internal/mocks/mock_pool.go +++ b/internal/mocks/mock_pool.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: ../../pool/pool.go +// Source: ../pool/pool.go // Package mocks is a generated GoMock package. package mocks diff --git a/internal/mocks/mock_queries.go b/internal/mocks/mock_queries.go index 825d474a8..e702fa986 100644 --- a/internal/mocks/mock_queries.go +++ b/internal/mocks/mock_queries.go @@ -109,6 +109,21 @@ func (mr *MockQuerierMockRecorder) EnqueueAllSyncs(ctx interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnqueueAllSyncs", reflect.TypeOf((*MockQuerier)(nil).EnqueueAllSyncs), ctx) } +// FetchGitHubToken mocks base method. +func (m *MockQuerier) FetchGitHubToken(ctx context.Context, pgpSymDecrypt string) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchGitHubToken", ctx, pgpSymDecrypt) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchGitHubToken indicates an expected call of FetchGitHubToken. +func (mr *MockQuerierMockRecorder) FetchGitHubToken(ctx, pgpSymDecrypt interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchGitHubToken", reflect.TypeOf((*MockQuerier)(nil).FetchGitHubToken), ctx, pgpSymDecrypt) +} + // GetRepoIDsFromRepoImport mocks base method. func (m *MockQuerier) GetRepoIDsFromRepoImport(ctx context.Context, arg db.GetRepoIDsFromRepoImportParams) ([]uuid.UUID, error) { m.ctrl.T.Helper() diff --git a/internal/mocks/sources.go b/internal/mocks/sources.go index 5933eabeb..fb1b48fda 100644 --- a/internal/mocks/sources.go +++ b/internal/mocks/sources.go @@ -1,5 +1,5 @@ package mocks -//go:generate mockgen -source ../../pool/pool.go -destination mock_pool.go -package=mocks +//go:generate mockgen -source ../pool/pool.go -destination mock_pool.go -package=mocks //go:generate mockgen -source ../../queries/queries.go -destination mock_queries.go -package=mocks //go:generate mockgen --destination mock_tx.go --package=mocks --build_flags=--mod=mod github.com/jackc/pgx/v4 Tx From 997ec1def5c95cf0483eb695660a3cfa22e54a65 Mon Sep 17 00:00:00 2001 From: Ramiro Date: Wed, 25 Jan 2023 14:29:44 -0500 Subject: [PATCH 07/12] fix(sqlfluff): Omitting rule --- .sqlfluff | 4 ++++ .../900000000000014_add_schema_introspection_view.up.sql | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.sqlfluff b/.sqlfluff index d799c851c..a04120f1b 100644 --- a/.sqlfluff +++ b/.sqlfluff @@ -11,7 +11,11 @@ templater = raw # Comma separated list of rules to check, default to all rules = all # Comma separated list of rules to exclude, or None +<<<<<<< HEAD exclude_rules = L016,L044,L006,L028,L014,L025,L010,L029,L027,L030,L022,L034,L026,L031 +======= +exclude_rules = L016,L044,L006,L028,L014,L025,L010,L029,L027,L030,L022,L031 +>>>>>>> 440e58c (fix(sqlfluff): Omitting rule) # The depth to recursively parse to (0 for unlimited) recurse = 0 # Below controls SQLFluff output, see max_line_length for SQL output diff --git a/migrations/900000000000014_add_schema_introspection_view.up.sql b/migrations/900000000000014_add_schema_introspection_view.up.sql index 108105d99..faba64bdc 100644 --- a/migrations/900000000000014_add_schema_introspection_view.up.sql +++ b/migrations/900000000000014_add_schema_introspection_view.up.sql @@ -13,7 +13,7 @@ CREATE OR REPLACE VIEW mergestat.schema_introspection AS ( c.is_nullable, c.data_type, c.udt_name, - pg_catalog.col_description(format('%s.%s', c.table_schema, c.table_name)::regclass::oid, c.ordinal_position) as column_description + pg_catalog.col_description(format('%s.%s', c.table_schema, c.table_name)::regclass::oid, c.ordinal_position) AS column_description FROM information_schema.tables AS t INNER JOIN information_schema.columns AS c ON (t.table_name = c.table_name AND t.table_schema = c.table_schema) WHERE t.table_schema IN ('public', 'mergestat') From 132f342942c4ab72c978afa971df804cde37a5bd Mon Sep 17 00:00:00 2001 From: Ramiro Date: Wed, 25 Jan 2023 14:46:21 -0500 Subject: [PATCH 08/12] fix(sqlfluff): Unsaved changes creating differences --- .sqlfluff | 5 ----- 1 file changed, 5 deletions(-) diff --git a/.sqlfluff b/.sqlfluff index a04120f1b..b83fbc777 100644 --- a/.sqlfluff +++ b/.sqlfluff @@ -8,14 +8,9 @@ nocolor = False dialect = postgres # One of [raw|jinja|python|placeholder] templater = raw -# Comma separated list of rules to check, default to all rules = all # Comma separated list of rules to exclude, or None -<<<<<<< HEAD exclude_rules = L016,L044,L006,L028,L014,L025,L010,L029,L027,L030,L022,L034,L026,L031 -======= -exclude_rules = L016,L044,L006,L028,L014,L025,L010,L029,L027,L030,L022,L031 ->>>>>>> 440e58c (fix(sqlfluff): Omitting rule) # The depth to recursively parse to (0 for unlimited) recurse = 0 # Below controls SQLFluff output, see max_line_length for SQL output From a2389d643d428be0f02b253fec55768c9441b7b2 Mon Sep 17 00:00:00 2001 From: Ramiro Date: Wed, 25 Jan 2023 14:58:34 -0500 Subject: [PATCH 09/12] fix(importer): Adding better error handling for githubtoken --- internal/jobs/repo/import.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/jobs/repo/import.go b/internal/jobs/repo/import.go index 826416a90..734de5b78 100644 --- a/internal/jobs/repo/import.go +++ b/internal/jobs/repo/import.go @@ -252,7 +252,7 @@ func fetchGitHubTokenFromDB(ctx context.Context, qry *db.Queries) (string, error var err error encryptionSecret := os.Getenv("ENCRYPTION_SECRET") - if credentials, err = qry.FetchGitHubToken(ctx, encryptionSecret); err != nil { + if credentials, err = qry.FetchGitHubToken(ctx, encryptionSecret); err != nil && !errors.Is(err, pgx.ErrNoRows) { return credentials, fmt.Errorf("could not retrieve GitHub PAT from database: %v", err) } From a35162b107682d7513eb2305f4283af6c21a56d9 Mon Sep 17 00:00:00 2001 From: Ramiro Date: Thu, 26 Jan 2023 10:09:35 -0500 Subject: [PATCH 10/12] fix(import): Fixing default syncs --- internal/jobs/repo/import.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/jobs/repo/import.go b/internal/jobs/repo/import.go index 734de5b78..cad0fc301 100644 --- a/internal/jobs/repo/import.go +++ b/internal/jobs/repo/import.go @@ -157,7 +157,7 @@ func handleImport(ctx context.Context, qry *db.Queries, imp db.ListRepoImportsDu // remove any deleted repositories if removeDeletedRepos { - var params = db.DeleteRemovedReposParams{Column1: imp.ID, Column2: repoUrls(repos)} + var params = db.DeleteRemovedReposParams{Column1: imp.ID, Column2: repoUrls(repos, repoOwner)} if err = qry.DeleteRemovedRepos(ctx, params); err != nil { return errors.Wrapf(err, "failed to remove deleted repositories") } @@ -188,7 +188,7 @@ func handleImport(ctx context.Context, qry *db.Queries, imp db.ListRepoImportsDu // (optional) configure default sync types if len(defaultSyncTypes) > 0 { // batch is a collection of newly added repositories - var batch = difference(existing, repoUrls(repos)) + var batch = difference(existing, repoUrls(repos, repoOwner)) // convert batch into a collection of repo ids var ids []uuid.UUID @@ -215,10 +215,10 @@ func handleImport(ctx context.Context, qry *db.Queries, imp db.ListRepoImportsDu return qry.MarkRepoImportAsUpdated(ctx, imp.ID) } -func repoUrls(repos []*githubRepository) []string { +func repoUrls(repos []*githubRepository, repoOwner string) []string { var ret = make([]string, len(repos)) for i, repo := range repos { - ret[i] = repo.URL() + ret[i] = fmt.Sprintf("https://github.com/%s/%s", repoOwner, repo.Name) } return ret } From 1e00fd8c4aa166a9e3fc2b57b632efe78809f21f Mon Sep 17 00:00:00 2001 From: Ramiro Date: Thu, 26 Jan 2023 10:17:08 -0500 Subject: [PATCH 11/12] chore(sqlfluff): deleted line --- .sqlfluff | 1 + 1 file changed, 1 insertion(+) diff --git a/.sqlfluff b/.sqlfluff index b83fbc777..d799c851c 100644 --- a/.sqlfluff +++ b/.sqlfluff @@ -8,6 +8,7 @@ nocolor = False dialect = postgres # One of [raw|jinja|python|placeholder] templater = raw +# Comma separated list of rules to check, default to all rules = all # Comma separated list of rules to exclude, or None exclude_rules = L016,L044,L006,L028,L014,L025,L010,L029,L027,L030,L022,L034,L026,L031 From dcc483507a8b1daa1a55738a7d4084312b54a69d Mon Sep 17 00:00:00 2001 From: Ramiro Date: Thu, 26 Jan 2023 10:45:11 -0500 Subject: [PATCH 12/12] chore(importer): Removing unnecesary continues --- internal/jobs/repo/import.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/jobs/repo/import.go b/internal/jobs/repo/import.go index cad0fc301..ac19365b7 100644 --- a/internal/jobs/repo/import.go +++ b/internal/jobs/repo/import.go @@ -301,7 +301,6 @@ func fetchGitHubReposByOrg(ctx context.Context, client *github.Client, repoOwner break } opt.Page = resp.NextPage - continue } return repositories, err @@ -345,7 +344,6 @@ func fetchGitHubReposByUser(ctx context.Context, client *github.Client, repoOwne break } opt.Page = resp.NextPage - continue }