Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

relay: add pullBinlogs interface for dm-worker to get local/grpc relay log #2216

Closed
wants to merge 10 commits into from

Conversation

lichunzhu
Copy link
Contributor

What problem does this PR solve?

Close #2214

What is changed and how it works?

  1. Add PullBinlogs interface for server side.
  2. Add PullBinlogs for client side for local way.

Check List

Tests

  • Unit test
  • Integration test

Code changes

  • Has exported function/method change
  • Has exported variable/fields change

Related changes

  • Need to cherry-pick to the release branch

@ti-chi-bot
Copy link
Member

ti-chi-bot commented Oct 13, 2021

[REVIEW NOTIFICATION]

This pull request has been approved by:

  • Ehco1996

To complete the pull request process, please ask the reviewers in the list to review by filling /cc @reviewer in the comment.
After your PR has acquired the required number of LGTMs, you can assign this pull request to the committer in the list by filling /assign @committer in the comment to help you merge this pull request.

The full list of commands accepted by this bot can be found here.

Reviewer can indicate their review by submitting an approval review.
Reviewer can cancel approval by submitting a request changes review.

@lichunzhu lichunzhu requested a review from glorv October 13, 2021 06:12
@lichunzhu
Copy link
Contributor Author

/cc @D3Hunter

@ti-chi-bot
Copy link
Member

@lichunzhu: GitHub didn't allow me to request PR reviews from the following users: D3Hunter.

Note that only pingcap members and repo collaborators can review this PR, and authors cannot review their own PRs.

In response to this:

/cc @D3Hunter

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

dm/proto/dmworker.proto Outdated Show resolved Hide resolved
dm/worker/server.go Outdated Show resolved Hide resolved
dm/worker/server.go Outdated Show resolved Hide resolved
dm/worker/subtask.go Show resolved Hide resolved
Copy link
Collaborator

@lance6716 lance6716 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will review later

dm/proto/dmworker.proto Outdated Show resolved Hide resolved
dm/proto/dmworker.proto Outdated Show resolved Hide resolved
dm/worker/server.go Outdated Show resolved Hide resolved
dm/worker/source_worker.go Outdated Show resolved Hide resolved
@@ -940,3 +940,42 @@ func (s *Server) GetWorkerCfg(ctx context.Context, req *pb.GetWorkerCfgRequest)
resp.Cfg, err = s.cfg.Toml()
return resp, err
}

// PullBinlogs will start a goroutine to continuously parse binlogs in relay dir and send them in streaming way.
func (s *Server) PullBinlogs(req *pb.PullBinlogReq, stream pb.Worker_PullBinlogsServer) error {
Copy link
Contributor

@D3Hunter D3Hunter Oct 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this api's operating on SourceWorker layer
also we may have more than one SourceWorker later

maybe it (or part of the implementation of it) can move into SourceWorker layer, not Server layer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will only register one grpc server service in dm-worker Server layer so I put this function here. How to register grpc server if we put this function to SourceWorker layer?

Copy link
Contributor

@D3Hunter D3Hunter Oct 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or part of the implementation of it, then server calls it, to separate the api and the work(belongs to SourceWorker i think)


message PullBinlogReq {
// Specifies which source of binlog to pull.
string source = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the value of source? source-name or server-id?

there's another layer of sequence in relay dir, can it handle that?

<deploy_dir>/relay_log/
|-- 7e427cc0-091c-11e9-9e45-72b7c59d52d7.000001

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Source is source-name. UUID is the detail that relay to think about. For the dm-worker layer I think we'd better use source.
  2. Current relay can handle this so I think it's okay.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relay log filename maybe the same under different relay dir, for pos-based sync, it maybe ambiguous

<deploy_dir>/relay_log/
|-- 7e427cc0-091c-11e9-9e45-72b7c59d52d7.000001
--|_mysql-000001
|-- another-server-under-load-balancer.000001
--|_mysql-000001

Copy link
Contributor Author

@lichunzhu lichunzhu Oct 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. Relay replicated by binlog position shouldn't support switch MySQL leader/follower. So I think using Source doesn't have a problem.

@Ehco1996 Ehco1996 added this to the v2.1.0 milestone Oct 14, 2021
return terror.ErrWorkerPullBinlogsInvalidRequest.Generate("worker doesn't enable relay")
}

ch, ech := s.worker.relayHolder.PullBinlogs(ctx, req)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s.worker -> w?

dm/proto/dmworker.proto Show resolved Hide resolved
dm/proto/dmworker.proto Show resolved Hide resolved
Copy link
Contributor

@Ehco1996 Ehco1996 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rest LGTM

dm/proto/dmworker.proto Show resolved Hide resolved
@lichunzhu
Copy link
Contributor Author

ping @lance6716 @Ehco1996 @D3Hunter

Copy link
Contributor

@Ehco1996 Ehco1996 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ti-chi-bot ti-chi-bot added the status/LGT1 One reviewer already commented LGTM label Oct 19, 2021
@@ -222,7 +223,8 @@ type Syncer struct {
}

// NewSyncer creates a new Syncer.
func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *Syncer {
func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client,
pullLocalBinlogs func(context.Context, *pb.PullBinlogReq) (chan *replication.BinlogEvent, chan error)) *Syncer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why syncer needs to care about api PullBinlog?

Copy link
Contributor

@Ehco1996 Ehco1996 Oct 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because streamerController in sycner need to use this to pull and control binlog 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, create reader is on relay and syncer need to access relay's createReader method.

Copy link
Contributor

@D3Hunter D3Hunter Oct 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even with that design, it would be better to abstract it, not passing a function(with full signature) around, it's hard to extend and read.

Location startFrom = 2;
}

message PullBinlogResp {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to include a error field here? What if the source is not exist in the target worker?

@ti-chi-bot
Copy link
Member

@lichunzhu: PR needs rebase.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@lichunzhu lichunzhu removed this from the v2.1.0 milestone Oct 27, 2021
@lichunzhu
Copy link
Contributor Author

merge this in next sprint

@lichunzhu lichunzhu closed this Nov 25, 2021
@lance6716
Copy link
Collaborator

BTW, the perfect interface in my mind is using MySQL replication protocal, so someday we can decouple "relay log" functionalities from DM worker, DM worker will transparently connect to relay log nodes or real upstream.

cc @sunzhaoyang

@lichunzhu lichunzhu deleted the addBinlogInterface branch November 25, 2021 03:21
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
needs-rebase size/XXL status/LGT1 One reviewer already commented LGTM
Projects
None yet
Development

Successfully merging this pull request may close these issues.

relay: refactor dm-worker relay logic
6 participants