diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 227025795..a14c8938f 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -5,13 +5,6 @@ import ( "database/sql" "errors" "fmt" - "github.com/mergestat/mergestat/internal/cron" - "github.com/mergestat/mergestat/internal/jobs/repo" - "github.com/mergestat/mergestat/internal/syncer" - "github.com/mergestat/mergestat/internal/timeout" - "github.com/mergestat/sqlq" - "github.com/mergestat/sqlq/runtime/embed" - "github.com/mergestat/sqlq/schema" "net/http" _ "net/http/pprof" "net/url" @@ -22,6 +15,14 @@ import ( "syscall" "time" + "github.com/mergestat/mergestat/internal/cron" + "github.com/mergestat/mergestat/internal/jobs/repo" + "github.com/mergestat/mergestat/internal/syncer" + "github.com/mergestat/mergestat/internal/timeout" + "github.com/mergestat/sqlq" + "github.com/mergestat/sqlq/runtime/embed" + "github.com/mergestat/sqlq/schema" + "github.com/go-git/go-git/v5" "github.com/golang-migrate/migrate/v4" _ "github.com/golang-migrate/migrate/v4/database/postgres" @@ -247,7 +248,7 @@ func main() { var worker, _ = embed.NewWorker(upstream, embed.WorkerConfig{Queues: queues}) // register job handlers for types implemented by this worker - _ = worker.Register("repos/auto-import", repo.AutoImport(pool, db)) + _ = worker.Register("repos/auto-import", repo.AutoImport(&logger, pool)) // TODO all of the following "params" should be configurable // either via the database/app or possibly with env vars diff --git a/internal/db/models.go b/internal/db/models.go index f32861bed..ba3ec6b4e 100644 --- a/internal/db/models.go +++ b/internal/db/models.go @@ -533,6 +533,73 @@ type GithubStargazer struct { MergestatSyncedAt time.Time } +type GitleaksRepoDetection struct { + // foreign key for public.repos.id + RepoID uuid.UUID + // description of the detection + Description interface{} + // detection start line + StartLine interface{} + // detection end line + EndLine interface{} + // detection start column + StartColumn interface{} + // detection end column + EndColumn interface{} + // detection match + Match interface{} + // detection secret + Secret interface{} + // detection file + File interface{} + // detected symlink file + SymlinkFile interface{} + // detection commit + Commit interface{} + // detection entropy + Entropy interface{} + // detection author + Author interface{} + // detection email + Email interface{} + // detection date + Date interface{} + // detection message + Message interface{} + // detection tags + Tags interface{} + // detection rule id + RuleID interface{} + // detection fingerprint + Fingerprint interface{} +} + +// scan output of a Gitleaks repo scan +type GitleaksRepoScan struct { + // foreign key for public.repos.id + RepoID uuid.UUID + // JSON output of a Gitleaks scan + Results pgtype.JSONB +} + +type GosecRepoDetection struct { + RepoID uuid.UUID + Severity interface{} + Confidence interface{} + CweID interface{} + RuleID interface{} + Details interface{} + File interface{} + Line interface{} + Column interface{} + Nosec interface{} +} + +type GosecRepoScan struct { + RepoID uuid.UUID + Issues pgtype.JSONB +} + type MergestatLatestRepoSync struct { ID int64 CreatedAt time.Time @@ -542,6 +609,13 @@ type MergestatLatestRepoSync struct { DoneAt sql.NullTime } +type MergestatQueryHistory struct { + ID uuid.UUID + RunAt sql.NullTime + RunBy string + Query string +} + // Table for "dynamic" repo imports - regularly loading from a GitHub org for example type MergestatRepoImport struct { ID uuid.UUID @@ -615,6 +689,31 @@ type MergestatRepoSyncTypeGroup struct { ConcurrentSyncs sql.NullInt32 } +// @name labels +type MergestatRepoSyncTypeLabel struct { + Label string + Description sql.NullString + Color string +} + +// @name labelAssociations +type MergestatRepoSyncTypeLabelAssociation struct { + Label string + RepoSyncType string +} + +type MergestatSchemaIntrospection struct { + Schema interface{} + TableName interface{} + TableType interface{} + ColumnName interface{} + OrdinalPosition interface{} + IsNullable interface{} + DataType interface{} + UdtName interface{} + ColumnDescription string +} + type MergestatServiceAuthCredential struct { ID uuid.UUID CreatedAt time.Time @@ -748,3 +847,34 @@ type TrivyRepoVulnerability struct { // timestamp when record was synced into the MergeStat database MergestatSyncedAt time.Time } + +type YelpDetectSecretsRepoDetection struct { + // foreign key for public.repos.id + RepoID uuid.UUID + // detection type + Type interface{} + // detection filename + Filename interface{} + // detection is verified + IsVerified interface{} + // detection line number + LineNumber interface{} + // detection secret + HashedSecret interface{} + // detection version + Version interface{} + // detection generated at + GeneratedAt interface{} + // detection filters used + FiltersUsed interface{} + // detection plugins used + PluginsUsed interface{} +} + +// scan output of a Yelp detect-secrets repo scan +type YelpDetectSecretsRepoScan struct { + // foreign key for public.repos.id + RepoID uuid.UUID + // JSON output of a Yelp detect-secrets scan + Results pgtype.JSONB +} diff --git a/internal/db/querier.go b/internal/db/querier.go index ebba7849f..1bbdbe99f 100644 --- a/internal/db/querier.go +++ b/internal/db/querier.go @@ -19,6 +19,7 @@ type Querier interface { // This allows us to make sure all repo syncs complete before we reschedule a new batch. // We have now also added a concept of type groups which allows us to apply this same logic but by each group type which is where the PARTITION BY clause comes into play EnqueueAllSyncs(ctx context.Context) error + FetchGitHubToken(ctx context.Context, pgpSymDecrypt string) (string, error) GetRepoIDsFromRepoImport(ctx context.Context, arg GetRepoIDsFromRepoImportParams) ([]uuid.UUID, error) GetRepoImportByID(ctx context.Context, id uuid.UUID) (MergestatRepoImport, error) GetRepoUrlFromImport(ctx context.Context, importid uuid.UUID) ([]string, error) diff --git a/internal/db/queries.sql b/internal/db/queries.sql index c19e87592..304df669d 100644 --- a/internal/db/queries.sql +++ b/internal/db/queries.sql @@ -86,6 +86,9 @@ INSERT INTO mergestat.repo_sync_logs (log_type, message, repo_sync_queue_id) VAL -- name: SetSyncJobStatus :exec SELECT mergestat.set_sync_job_status(@Status::TEXT, @ID::BIGINT); +-- name: FetchGitHubToken :one +SELECT pgp_sym_decrypt(credentials, $1) FROM mergestat.service_auth_credentials WHERE type = 'GITHUB_PAT' ORDER BY created_at DESC LIMIT 1; + -- We use a CTE here to retrieve all the repo_sync_jobs that were previously enqueued, to make sure that we *do not* re-enqueue anything new until the previously enqueued jobs are *completed*. -- This allows us to make sure all repo syncs complete before we reschedule a new batch. -- We have now also added a concept of type groups which allows us to apply this same logic but by each group type which is where the PARTITION BY clause comes into play diff --git a/internal/db/queries.sql.go b/internal/db/queries.sql.go index d7182789a..bc272d8e4 100644 --- a/internal/db/queries.sql.go +++ b/internal/db/queries.sql.go @@ -160,6 +160,17 @@ func (q *Queries) EnqueueAllSyncs(ctx context.Context) error { return err } +const fetchGitHubToken = `-- name: FetchGitHubToken :one +SELECT pgp_sym_decrypt(credentials, $1) FROM mergestat.service_auth_credentials WHERE type = 'GITHUB_PAT' ORDER BY created_at DESC LIMIT 1 +` + +func (q *Queries) FetchGitHubToken(ctx context.Context, pgpSymDecrypt string) (string, error) { + row := q.db.QueryRow(ctx, fetchGitHubToken, pgpSymDecrypt) + var pgp_sym_decrypt string + err := row.Scan(&pgp_sym_decrypt) + return pgp_sym_decrypt, err +} + const getRepoIDsFromRepoImport = `-- name: GetRepoIDsFromRepoImport :many SELECT id FROM public.repos WHERE repo_import_id = $1::uuid AND repo = ANY($2::TEXT[]) ` diff --git a/internal/jobs/repo/import.go b/internal/jobs/repo/import.go index 1842aeff5..ac19365b7 100644 --- a/internal/jobs/repo/import.go +++ b/internal/jobs/repo/import.go @@ -5,15 +5,19 @@ import ( "database/sql" "encoding/json" "fmt" + "os" + "time" + + "github.com/google/go-github/v41/github" "github.com/google/uuid" "github.com/jackc/pgtype" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" - "github.com/jmoiron/sqlx" "github.com/mergestat/mergestat/internal/db" "github.com/mergestat/sqlq" "github.com/pkg/errors" - "time" + "github.com/rs/zerolog" + "golang.org/x/oauth2" ) type githubRepository struct { @@ -28,7 +32,7 @@ func (repo *githubRepository) URL() string { // AutoImport implements the githubRepository auto-import job to automatically // sync githubRepository from user- or org- accounts. -func AutoImport(pool *pgxpool.Pool, mergestat *sqlx.DB) sqlq.HandlerFunc { +func AutoImport(l *zerolog.Logger, pool *pgxpool.Pool) sqlq.HandlerFunc { var queries = db.New(pool) return func(ctx context.Context, job *sqlq.Job) (err error) { @@ -41,12 +45,15 @@ func AutoImport(pool *pgxpool.Pool, mergestat *sqlx.DB) sqlq.HandlerFunc { var imports []db.ListRepoImportsDueForImportRow if imports, err = queries.ListRepoImportsDueForImport(ctx); err != nil { logger.Errorf("failed to list repo import job: %v", err) + l.Error().Msgf("failed to list repo import job: %v", err) return errors.Wrapf(sqlq.ErrSkipRetry, "failed to list repo import job: %v", err) } logger.Infof("handling %d import(s)", len(imports)) + l.Info().Msgf("handling %d import(s)", len(imports)) for _, imp := range imports { logger.Infof("executing import %s", imp.ID) + l.Info().Msgf("executing import %s", imp.ID) var tx pgx.Tx // each import is executed within its own transaction if tx, err = pool.Begin(ctx); err != nil { @@ -57,8 +64,9 @@ func AutoImport(pool *pgxpool.Pool, mergestat *sqlx.DB) sqlq.HandlerFunc { // if the execution fails for some reason, only that import is marked as failed // the job still continues executing. var importError error - if importError = handleImport(ctx, queries.WithTx(tx), mergestat, imp); importError != nil { + if importError = handleImport(ctx, queries.WithTx(tx), imp); importError != nil { logger.Warnf("import(%s) failed: %v", imp.ID, importError.Error()) + l.Warn().Msgf("import(%s) failed: %v", imp.ID, importError.Error()) _ = tx.Rollback(ctx) } else { // if the import was successful, commit the changes @@ -66,6 +74,7 @@ func AutoImport(pool *pgxpool.Pool, mergestat *sqlx.DB) sqlq.HandlerFunc { return errors.Wrapf(err, "failed to commit database transaction") } logger.Infof("import(%s) was successful", imp.ID) + l.Info().Msgf("import(%s) was successful", imp.ID) } var importStatus = db.UpdateImportStatusParams{Status: "SUCCESS", ID: imp.ID} @@ -86,9 +95,25 @@ func AutoImport(pool *pgxpool.Pool, mergestat *sqlx.DB) sqlq.HandlerFunc { } // handleImport handles execution of a given import configuration -func handleImport(ctx context.Context, qry *db.Queries, mergestat *sqlx.DB, imp db.ListRepoImportsDueForImportRow) (err error) { - var query, repoOwner string +func handleImport(ctx context.Context, qry *db.Queries, imp db.ListRepoImportsDueForImportRow) (err error) { + var repoOwner, ghToken string var removeDeletedRepos, defaultSyncTypes = true, make([]string, 0) //nolint:ineffassign + var repos []*githubRepository + + if ghToken, err = fetchGitHubTokenFromDB(ctx, qry); err != nil { + return err + } + + ts := oauth2.StaticTokenSource( + &oauth2.Token{AccessToken: ghToken}, + ) + + tc := oauth2.NewClient(ctx, ts) + if len(ghToken) <= 0 { + tc = nil + } + + client := github.NewClient(tc) // determine the kind of import (Org vs. User) and parse the settings switch imp.Type { @@ -105,7 +130,10 @@ func handleImport(ctx context.Context, qry *db.Queries, mergestat *sqlx.DB, imp repoOwner = settings.Org removeDeletedRepos = settings.RemoveDeletedRepos defaultSyncTypes = settings.DefaultSyncTypes - query = `SELECT login, name, topics FROM github_org_repos(?)` + + if repos, err = fetchGitHubReposByOrg(ctx, client, repoOwner, ghToken); err != nil { + return err + } case "GITHUB_USER": var settings struct { User string `json:"user"` @@ -119,20 +147,17 @@ func handleImport(ctx context.Context, qry *db.Queries, mergestat *sqlx.DB, imp repoOwner = settings.User removeDeletedRepos = settings.RemoveDeletedRepos defaultSyncTypes = settings.DefaultSyncTypes - query = `SELECT login, name, topics FROM github_user_repos(?)` + + if repos, err = fetchGitHubReposByUser(ctx, client, repoOwner, ghToken); err != nil { + return err + } default: return errors.Errorf("unknown import type: %s", imp.Type) } - // execute the mergestat query - this will scan a list of repos to be imported - var repos []*githubRepository - if err = mergestat.SelectContext(ctx, &repos, query, repoOwner); err != nil { - return errors.Wrapf(err, "failed to fetch repositories") - } - // remove any deleted repositories if removeDeletedRepos { - var params = db.DeleteRemovedReposParams{Column1: imp.ID, Column2: repoUrls(repos)} + var params = db.DeleteRemovedReposParams{Column1: imp.ID, Column2: repoUrls(repos, repoOwner)} if err = qry.DeleteRemovedRepos(ctx, params); err != nil { return errors.Wrapf(err, "failed to remove deleted repositories") } @@ -163,7 +188,7 @@ func handleImport(ctx context.Context, qry *db.Queries, mergestat *sqlx.DB, imp // (optional) configure default sync types if len(defaultSyncTypes) > 0 { // batch is a collection of newly added repositories - var batch = difference(existing, repoUrls(repos)) + var batch = difference(existing, repoUrls(repos, repoOwner)) // convert batch into a collection of repo ids var ids []uuid.UUID @@ -190,10 +215,10 @@ func handleImport(ctx context.Context, qry *db.Queries, mergestat *sqlx.DB, imp return qry.MarkRepoImportAsUpdated(ctx, imp.ID) } -func repoUrls(repos []*githubRepository) []string { +func repoUrls(repos []*githubRepository, repoOwner string) []string { var ret = make([]string, len(repos)) for i, repo := range repos { - ret[i] = repo.URL() + ret[i] = fmt.Sprintf("https://github.com/%s/%s", repoOwner, repo.Name) } return ret } @@ -218,3 +243,109 @@ func difference(existing, new []string) []string { return diff } + +// TODO(ramirocastillo):Move this fn to the helper package +// fetchGitHubTokenFromDB is a temporary helper function for retrieving the most recently added GITHUB_PAT service credential from the DB. +// It's "temporary" because the way credentials are managed and retrieved will likely need to be much more robust in the future. +func fetchGitHubTokenFromDB(ctx context.Context, qry *db.Queries) (string, error) { + var credentials string + var err error + encryptionSecret := os.Getenv("ENCRYPTION_SECRET") + + if credentials, err = qry.FetchGitHubToken(ctx, encryptionSecret); err != nil && !errors.Is(err, pgx.ErrNoRows) { + return credentials, fmt.Errorf("could not retrieve GitHub PAT from database: %v", err) + } + + if len(credentials) <= 0 { + // default to the `GITHUB_TOKEN` env var if nothing in the DB + credentials = os.Getenv("GITHUB_TOKEN") + } + + return string(credentials), nil +} + +// fetchGitHubReposByOrg fetch all repos from a given organization, if a github token is not +// provided will search only public repos +func fetchGitHubReposByOrg(ctx context.Context, client *github.Client, repoOwner string, ghToken string) ([]*githubRepository, error) { + var repositories []*githubRepository + var repos []*github.Repository + var err error + var typeOfRepo string + var resp *github.Response + + if len(ghToken) <= 0 { + typeOfRepo = "public" + } + opt := &github.RepositoryListByOrgOptions{Type: typeOfRepo} + + for { + if repos, resp, err = client.Repositories.ListByOrg(ctx, repoOwner, opt); err != nil { + return repositories, err + } + + for _, repo := range repos { + + jsonStr, err := json.Marshal(&repo.Topics) + if err != nil { + return repositories, err + } + + repositories = append(repositories, &githubRepository{Name: *repo.Name, Owner: string(*repo.Owner.OrganizationsURL), + Topics: string(jsonStr)}) + } + if resp == nil { + break + } + + if resp.NextPage == 0 { + break + } + opt.Page = resp.NextPage + + } + return repositories, err +} + +// fetchGitHubReposByUser fetch all repos from a given user, if a github token is not +// provided will search only public repos +func fetchGitHubReposByUser(ctx context.Context, client *github.Client, repoOwner string, ghToken string) ([]*githubRepository, error) { + var repositories []*githubRepository + var repos []*github.Repository + var err error + var typeOfRepo string + var resp *github.Response + + if len(ghToken) <= 0 { + typeOfRepo = "public" + } + opt := &github.RepositoryListOptions{Type: typeOfRepo} + + for { + if repos, resp, err = client.Repositories.List(ctx, repoOwner, opt); err != nil { + return repositories, err + } + + for _, repo := range repos { + + jsonStr, err := json.Marshal(&repo.Topics) + if err != nil { + return repositories, err + } + + repositories = append(repositories, &githubRepository{Name: *repo.Name, Owner: string(*repo.Owner.OrganizationsURL), + Topics: string(jsonStr)}) + } + + if resp == nil { + break + } + + if resp.NextPage == 0 { + break + } + opt.Page = resp.NextPage + + } + + return repositories, err +} diff --git a/internal/mocks/mock_pool.go b/internal/mocks/mock_pool.go index 8a4cc0737..92bc9b1f6 100644 --- a/internal/mocks/mock_pool.go +++ b/internal/mocks/mock_pool.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: ../../pool/pool.go +// Source: ../pool/pool.go // Package mocks is a generated GoMock package. package mocks diff --git a/internal/mocks/mock_queries.go b/internal/mocks/mock_queries.go index 825d474a8..e702fa986 100644 --- a/internal/mocks/mock_queries.go +++ b/internal/mocks/mock_queries.go @@ -109,6 +109,21 @@ func (mr *MockQuerierMockRecorder) EnqueueAllSyncs(ctx interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnqueueAllSyncs", reflect.TypeOf((*MockQuerier)(nil).EnqueueAllSyncs), ctx) } +// FetchGitHubToken mocks base method. +func (m *MockQuerier) FetchGitHubToken(ctx context.Context, pgpSymDecrypt string) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchGitHubToken", ctx, pgpSymDecrypt) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchGitHubToken indicates an expected call of FetchGitHubToken. +func (mr *MockQuerierMockRecorder) FetchGitHubToken(ctx, pgpSymDecrypt interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchGitHubToken", reflect.TypeOf((*MockQuerier)(nil).FetchGitHubToken), ctx, pgpSymDecrypt) +} + // GetRepoIDsFromRepoImport mocks base method. func (m *MockQuerier) GetRepoIDsFromRepoImport(ctx context.Context, arg db.GetRepoIDsFromRepoImportParams) ([]uuid.UUID, error) { m.ctrl.T.Helper() diff --git a/internal/mocks/sources.go b/internal/mocks/sources.go index 5933eabeb..fb1b48fda 100644 --- a/internal/mocks/sources.go +++ b/internal/mocks/sources.go @@ -1,5 +1,5 @@ package mocks -//go:generate mockgen -source ../../pool/pool.go -destination mock_pool.go -package=mocks +//go:generate mockgen -source ../pool/pool.go -destination mock_pool.go -package=mocks //go:generate mockgen -source ../../queries/queries.go -destination mock_queries.go -package=mocks //go:generate mockgen --destination mock_tx.go --package=mocks --build_flags=--mod=mod github.com/jackc/pgx/v4 Tx diff --git a/internal/warehouse/warehouse.go b/internal/warehouse/warehouse.go index c2d5c60dc..2b9ee0e14 100644 --- a/internal/warehouse/warehouse.go +++ b/internal/warehouse/warehouse.go @@ -32,6 +32,10 @@ func New(ctx context.Context, db *db.Queries, pgpool *pgxpool.Pool, logger *zero ) tc := oauth2.NewClient(ctx, ts) + if len(ghToken) <= 0 { + tc = nil + } + client := github.NewClient(tc) pool := pool.Init(pgpool) queries := queries.NewQuerier(db) diff --git a/migrations/900000000000014_add_schema_introspection_view.up.sql b/migrations/900000000000014_add_schema_introspection_view.up.sql index fc63d98b3..faba64bdc 100644 --- a/migrations/900000000000014_add_schema_introspection_view.up.sql +++ b/migrations/900000000000014_add_schema_introspection_view.up.sql @@ -13,8 +13,8 @@ CREATE OR REPLACE VIEW mergestat.schema_introspection AS ( c.is_nullable, c.data_type, c.udt_name, - pg_catalog.col_description(format('%s.%s', c.table_schema, c.table_name)::regclass::oid, c.ordinal_position) as column_description - FROM information_schema.tables as t + pg_catalog.col_description(format('%s.%s', c.table_schema, c.table_name)::regclass::oid, c.ordinal_position) AS column_description + FROM information_schema.tables AS t INNER JOIN information_schema.columns AS c ON (t.table_name = c.table_name AND t.table_schema = c.table_schema) WHERE t.table_schema IN ('public', 'mergestat') );