-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(snapshot): Enable snapshot streaming after bulkload #8938
base: main
Are you sure you want to change the base?
Conversation
worker/draft.go
Outdated
// Need a delay otherwise it interfers with starting of Raft loop | ||
time.AfterFunc(1*time.Second, func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can i ask how it might interfere with the starting of Raft loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, and we should avoid it. This will turn into a bad idea, soon enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the node initializes, it begins in a follower state, even in a single alpha cluster scenario. This pause ensures that the leader election process occurs, allowing the node to acquire leadership. Without this pause, the node tries to capture a snapshot but fails the check in n.AmLeader(), resulting in no snapshot being taken.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need more tests as we discussed, a few questions and minor comments are dgraphtest package
@@ -626,6 +702,32 @@ func (c *LocalCluster) Client() (*GrpcClient, func(), error) { | |||
return &GrpcClient{Dgraph: client}, cleanup, nil | |||
} | |||
|
|||
// Client returns a grpc client that can talk to any Alpha in the cluster | |||
func (c *LocalCluster) ClientForAlpha(id int) (*GrpcClient, func(), error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should define this function such that existing (*LocalCluster).Client function can use it too. Feels duplicated code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defined one for conn
, clients are little high level to be able to use one for the other. Let me know if the modified piece is an ok abstraction.
I have added more tests and extended the dgraphtest package to support some of the functions required for the new tests. This PR is ready for another round of reviews. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, we need to think about more test cases. A few minor comments and questions.
return fmt.Errorf("failed to assign state. error: %s", resp.Errors[0].Message) | ||
} | ||
|
||
if resp, err := parseAssignIdResponse(body); err == nil && resp.StartId != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if there an error here?
@@ -92,13 +93,29 @@ type dnode interface { | |||
zeroURL(*LocalCluster) (string, error) | |||
} | |||
|
|||
type nodeType interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did we define a new interface? Can we not use the existing interface?
if err := os.MkdirAll(pDir, os.ModePerm); err != nil { | ||
return nil, errors.Wrap(err, "erorr creating bulk dir") | ||
} | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we checking for error here again?
@@ -493,3 +493,28 @@ func (c *LocalCluster) BulkLoad(opts BulkOpts) error { | |||
return nil | |||
} | |||
} | |||
|
|||
func (c *LocalCluster) CopyBulkLoadDirsToAlphaMounts() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we doing the copy when we can just let the data stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for the test when we want to copy p
directories in each of the alphas in the cluster and start them. This is different from the case when we let the data stream from one alpha to all the other alpha (covered in: TestBulkLoaderSnapshotPDirinAlpha0)
@@ -183,6 +184,13 @@ func (cc ClusterConfig) WithBulkLoadOutDir(dir string) ClusterConfig { | |||
return cc | |||
} | |||
|
|||
// WithBulkLoadpDirIn sets the p dir for the alphas. This controls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this option is not clear. Took me some time to make sense of it. We should try to come up with a better name and possible value.
|
||
// Start and query each alpha | ||
for i := 0; i < 3; i++ { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove the newlines
@@ -1837,6 +1852,11 @@ func (n *node) InitAndStartNode() { | |||
n.SetRaft(raft.StartNode(n.Cfg, peers)) | |||
// Trigger election, so this node can become the leader of this single-node cluster. | |||
n.canCampaign = true | |||
// Also trigger a snapshot so that this node can take a snapshot if required |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we loop on whether a leader is chosen instead?
Description: Enable snapshot streaming after bulkload
Summary: Previously, the bulkloaded
p
directory couldn't stream to a new alpha due to a commitTs of zero. In this PR, the commitTs is sourced from thep
directory, allowing the alpha to create a snapshot and subsequently stream it to another alpha.Tests:
p
directory using bulkload. Then, initiate one alpha using the bulkloadedp
directory and start a second alpha without anyp
directory. Query both alphas to ensure the snapshot has been successfully generated and that the data is accessible from both instances.p
directory using bulkload. Then, copy bulkloadedp
directory in all alphas and start the cluster. Query all alphas to ensure that the data is accessible from all instances.zero
timestamp and load ap
directory using bulkload. Then use thisp
directory on a fresh cluster (both zero and alpha are new). Validate that the query doesn't work without moving the timestamp of the new zero.Closes: https://dgraph.atlassian.net/browse/DGRAPHCORE-214
Docs: NA