Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syntactic indexing: enqueuer and scheduler #62485

Open
wants to merge 18 commits into
base: main
Choose a base branch
from

Conversation

keynmol
Copy link
Contributor

@keynmol keynmol commented May 7, 2024

This PR introduces three main components:

  • Enqueuer – a thin layer responsible for actually inserting the syntactic indexing records into the database.
  • Scheduler - a service that
    • Identifies repositories that haven't been processed in a while
    • Identifies policies that match those repositories (policies that have syntactic indexing enabled)
    • Identifies commits that match any of the policies
    • And finally, enqueues the jobs to index the discovered repositories and commits
  • Scheduler job – a periodic routine that triggers Scheduler on with specified interval. This job runs as part of the main Worker service, and only schedules jobs if the experimental syntactic indexing feature is enabled.

Refactoring:

  • Making some methods public in policies.Service to make it easier to test logic that depends on glob matching of repository names (this matching requires a separate state to be updated)
  • Extracting some test utilities into a separate package

TODO:

  • [ ] Tests for policy iterator - we're currently investigating whether policy iterator is needed at all, as pagination of policies is in question.
  • Tests for Scheduler
  • Wire in experimental feature flag

Test plan

  • New tests for all components

@cla-bot cla-bot bot added the cla-signed label May 7, 2024
@github-actions github-actions bot added team/graph Graph Team (previously Code Intel/Language Tools/Language Platform) team/product-platform labels May 7, 2024
@keynmol keynmol force-pushed the syntactic-indexing-enqueuer branch from 11d5272 to 03414ce Compare May 13, 2024 11:11
@keynmol keynmol changed the title Syntactic indexing: enqueuer Syntactic indexing: enqueuer and scheduler May 20, 2024
Comment on lines 220 to 225
for policyID, patterns := range map[int][]string{
1100: {"github.com/*"},
} {
if err := store.UpdateReposMatchingPatterns(ctx, patterns, policyID, nil); err != nil {
t.Fatalf("unexpected error while updating repositories matching patterns: %s", err)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@varungandhi-src I need to use this method to make sure there's data in database created for policy matcher to pick up a repository based on pattern matching.

To use it I had to make policies/store package non-internal. It's not great for encapsulation, but I'm not aware of an idiomatic Go way to make this work.

An alternative would be to change this particular policy to one matching repository by ID explicitly, slightly weakening the tests.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idiomatic way would likely be to only expose a public interface with the method (or handful of methods) of interest. Sometimes that can be tricky though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes that can be tricky though

Yeah, I think exposing this particular method is not worth it.

I think I can expose a new service whose sole purpose is to be used in the periodic job that updates the repo <--> policy matching cache.

So store will remain internal, and the service will delegate to it. If if works out, I'll extract it to a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks to our call, I realised it was much easier than I thought initially: 42022f2

Copy link
Contributor

Caution

License checking failed, please read: how to deal with third parties licensing.

@keynmol keynmol marked this pull request as ready for review May 23, 2024 11:30
Copy link
Contributor

Caution

License checking failed, please read: how to deal with third parties licensing.

Copy link
Contributor

@varungandhi-src varungandhi-src left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some initial comments; will take a deeper look at more of the code shortly.

Comment on lines +24 to +31
func InitRawDB(observationCtx *observation.Context) (*sql.DB, error) {
rawDB, err := initDatabaseMemo.Init(observationCtx)
if err != nil {
return nil, err
}

return rawDB, err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func InitRawDB(observationCtx *observation.Context) (*sql.DB, error) {
rawDB, err := initDatabaseMemo.Init(observationCtx)
if err != nil {
return nil, err
}
return rawDB, err
}
func InitRawDB(observationCtx *observation.Context) (*sql.DB, error) {
return initDatabaseMemo.Init(observationCtx)
}

I'm a bit confused though. Why does this need to be called separately/why is InitDB not called during the normal initialization?

Comment on lines +22 to +25
type EnqueueOptions struct {
force bool
bypassLimit bool
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add doc comments for these fields.

}

type operations struct {
queueIndex *observation.Operation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
queueIndex *observation.Operation
queueIndexes *observation.Operation

nit: Following existing conventions, this should match the method name exactly

}

return &operations{
queueIndex: op("QueueIndex"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
queueIndex: op("QueueIndex"),
queueIndexes: op("QueueIndexes"),

if err != nil {
return nil, errors.Wrap(err, "gitserver.ResolveRevision")
}
commit := string(commitID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's delay converting this to a string until the very end. The api.CommitID type says it is a 40-character SHA, that's useful to know compared to just the name commit.

Comment on lines +47 to +80
offset := 0

for {

options := policiesshared.GetConfigurationPoliciesOptions{
RepositoryID: p.RepositoryID,
ForSyntacticIndexing: forSyntacticIndexing,
ForPreciseIndexing: forPreciseIndexing,
Limit: p.BatchSize,
Offset: offset,
}

policies, totalCount, err := p.Service.GetConfigurationPolicies(ctx, options)

if err != nil {
return err
}

if len(policies) == 0 {
break
}

handlerError := handle(policies)

if handlerError != nil {
return handlerError // propagate error from the handler
}

offset = offset + len(policies)

if offset >= totalCount {
break
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
offset := 0
for {
options := policiesshared.GetConfigurationPoliciesOptions{
RepositoryID: p.RepositoryID,
ForSyntacticIndexing: forSyntacticIndexing,
ForPreciseIndexing: forPreciseIndexing,
Limit: p.BatchSize,
Offset: offset,
}
policies, totalCount, err := p.Service.GetConfigurationPolicies(ctx, options)
if err != nil {
return err
}
if len(policies) == 0 {
break
}
handlerError := handle(policies)
if handlerError != nil {
return handlerError // propagate error from the handler
}
offset = offset + len(policies)
if offset >= totalCount {
break
}
}
options := policiesshared.GetConfigurationPoliciesOptions{
RepositoryID: p.RepositoryID,
ForSyntacticIndexing: forSyntacticIndexing,
ForPreciseIndexing: forPreciseIndexing,
Limit: p.BatchSize,
}
for offset := 0; ; {
options.Offset = offset
policies, totalCount, err := p.Service.GetConfigurationPolicies(ctx, options)
if err != nil {
return err
}
if len(policies) == 0 {
break
}
if handlerError := handle(policies); handlerError != nil {
return handlerError
}
if offset = offset + len(policies); offset >= totalCount {
break
}
}

nit: Please don't add whitespace after every statement.

Comment on lines +21 to +22
InsertIndexes(ctx context.Context, indexes []SyntacticIndexingJob) ([]SyntacticIndexingJob, error)
IsQueued(ctx context.Context, repositoryID int, commit string) (bool, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on pushing this logic into a single method? Otherwise, this API creates the risk of TOCTOU because something might get queued in-between the IsQueued check and the Insert check. AFAICT, that's impossible given that these are currently invoked from the same goroutine, and there's only going to be one such goroutine, and the worst case scenario is that there will be some redundant work, but it seems unnecessarily error-prone.

Comment on lines +108 to +109
// TODO: add operations and record this operation
// s.operations.indexesInserted.Add(float64(len(ids)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need fixing?

Comment on lines +104 to +122
ids, err := basestore.ScanInts(tx.Query(ctx, sqlf.Sprintf(insertIndexQuery, sqlf.Join(values, ","))))
if err != nil {
return err
}
// TODO: add operations and record this operation
// s.operations.indexesInserted.Add(float64(len(ids)))

authzConds, err := database.AuthzQueryConds(ctx, database.NewDBWith(s.logger, s.db))
if err != nil {
return err
}

queries := make([]*sqlf.Query, 0, len(ids))
for _, id := range ids {
queries = append(queries, sqlf.Sprintf("%d", id))
}

indexes, err = scanIndexes(tx.Query(ctx, sqlf.Sprintf(getIndexesByIDsQuery, sqlf.Join(queries, ", "), authzConds)))
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this code. It looks like we're performing insertions into syntactic_scip_indexing_jobs and returning the ids for the just-inserted rows. Then we're reading more data from those rows in a separate query (through a view). Can we expand the RETURNING clause to return more columns during insertion? I don't understand what role the extra authzConds is playing here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed team/graph Graph Team (previously Code Intel/Language Tools/Language Platform) team/product-platform
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants