Skip to content

Commit

Permalink
Fix bug in handling sub events of replication.TransactionPayloadEvent (
Browse files Browse the repository at this point in the history
  • Loading branch information
froot committed May 10, 2024
1 parent eb2c6d1 commit e35272c
Showing 1 changed file with 98 additions and 91 deletions.
189 changes: 98 additions & 91 deletions canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ func (c *Canal) runSyncBinlog() error {
return err
}

savePos := false
force := false

for {
ev, err := s.GetEvent(c.ctx)
if err != nil {
Expand Down Expand Up @@ -69,110 +66,120 @@ func (c *Canal) runSyncBinlog() error {
}
}

savePos = false
force = false
pos := c.master.Position()
err = c.handleEvent(ev)
if err != nil {
return err
}
}
}

curPos := pos.Pos
func (c *Canal) handleEvent(ev *replication.BinlogEvent) error {
savePos := false
force := false
pos := c.master.Position()
var err error

// next binlog pos
pos.Pos = ev.Header.LogPos
curPos := pos.Pos

// We only save position with RotateEvent and XIDEvent.
// For RowsEvent, we can't save the position until meeting XIDEvent
// which tells the whole transaction is over.
// TODO: If we meet any DDL query, we must save too.
switch e := ev.Event.(type) {
case *replication.RotateEvent:
pos.Name = string(e.NextLogName)
pos.Pos = uint32(e.Position)
c.cfg.Logger.Infof("rotate binlog to %s", pos)
savePos = true
force = true
if err = c.eventHandler.OnRotate(ev.Header, e); err != nil {
return errors.Trace(err)
}
case *replication.RowsEvent:
// we only focus row based event
err = c.handleRowsEvent(ev)
// next binlog pos
pos.Pos = ev.Header.LogPos

// We only save position with RotateEvent and XIDEvent.
// For RowsEvent, we can't save the position until meeting XIDEvent
// which tells the whole transaction is over.
// TODO: If we meet any DDL query, we must save too.
switch e := ev.Event.(type) {
case *replication.RotateEvent:
pos.Name = string(e.NextLogName)
pos.Pos = uint32(e.Position)
c.cfg.Logger.Infof("rotate binlog to %s", pos)
savePos = true
force = true
if err = c.eventHandler.OnRotate(ev.Header, e); err != nil {
return errors.Trace(err)
}
case *replication.RowsEvent:
// we only focus row based event
err = c.handleRowsEvent(ev)
if err != nil {
c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
return errors.Trace(err)
}
return nil
case *replication.TransactionPayloadEvent:
// handle subevent row by row
ev := ev.Event.(*replication.TransactionPayloadEvent)
for _, subEvent := range ev.Events {
err = c.handleEvent(subEvent)
if err != nil {
c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
c.cfg.Logger.Errorf("handle transaction payload subevent at (%s, %d) error %v", pos.Name, curPos, err)
return errors.Trace(err)
}
continue
case *replication.TransactionPayloadEvent:
// handle subevent row by row
ev := ev.Event.(*replication.TransactionPayloadEvent)
for _, subEvent := range ev.Events {
err = c.handleRowsEvent(subEvent)
if err != nil {
c.cfg.Logger.Errorf("handle transaction payload rows event at (%s, %d) error %v", pos.Name, curPos, err)
}
return nil
case *replication.XIDEvent:
savePos = true
// try to save the position later
if err := c.eventHandler.OnXID(ev.Header, pos); err != nil {
return errors.Trace(err)
}
if e.GSet != nil {
c.master.UpdateGTIDSet(e.GSet)
}
case *replication.MariadbGTIDEvent:
if err := c.eventHandler.OnGTID(ev.Header, e); err != nil {
return errors.Trace(err)
}
case *replication.GTIDEvent:
if err := c.eventHandler.OnGTID(ev.Header, e); err != nil {
return errors.Trace(err)
}
case *replication.RowsQueryEvent:
if err := c.eventHandler.OnRowsQueryEvent(e); err != nil {
return errors.Trace(err)
}
case *replication.QueryEvent:
stmts, _, err := c.parser.Parse(string(e.Query), "", "")
if err != nil {
c.cfg.Logger.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
return nil
}
for _, stmt := range stmts {
nodes := parseStmt(stmt)
for _, node := range nodes {
if node.db == "" {
node.db = string(e.Schema)
}
if err = c.updateTable(ev.Header, node.db, node.table); err != nil {
return errors.Trace(err)
}
}
continue
case *replication.XIDEvent:
savePos = true
// try to save the position later
if err := c.eventHandler.OnXID(ev.Header, pos); err != nil {
return errors.Trace(err)
}
if e.GSet != nil {
c.master.UpdateGTIDSet(e.GSet)
}
case *replication.MariadbGTIDEvent:
if err := c.eventHandler.OnGTID(ev.Header, e); err != nil {
return errors.Trace(err)
}
case *replication.GTIDEvent:
if err := c.eventHandler.OnGTID(ev.Header, e); err != nil {
return errors.Trace(err)
}
case *replication.RowsQueryEvent:
if err := c.eventHandler.OnRowsQueryEvent(e); err != nil {
return errors.Trace(err)
}
case *replication.QueryEvent:
stmts, _, err := c.parser.Parse(string(e.Query), "", "")
if err != nil {
c.cfg.Logger.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
continue
}
for _, stmt := range stmts {
nodes := parseStmt(stmt)
for _, node := range nodes {
if node.db == "" {
node.db = string(e.Schema)
}
if err = c.updateTable(ev.Header, node.db, node.table); err != nil {
return errors.Trace(err)
}
}
if len(nodes) > 0 {
savePos = true
force = true
// Now we only handle Table Changed DDL, maybe we will support more later.
if err = c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
return errors.Trace(err)
}
if len(nodes) > 0 {
savePos = true
force = true
// Now we only handle Table Changed DDL, maybe we will support more later.
if err = c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
return errors.Trace(err)
}
}
if savePos && e.GSet != nil {
c.master.UpdateGTIDSet(e.GSet)
}
default:
continue
}
if savePos && e.GSet != nil {
c.master.UpdateGTIDSet(e.GSet)
}
default:
return nil
}

if savePos {
c.master.Update(pos)
c.master.UpdateTimestamp(ev.Header.Timestamp)
if savePos {
c.master.Update(pos)
c.master.UpdateTimestamp(ev.Header.Timestamp)

if err := c.eventHandler.OnPosSynced(ev.Header, pos, c.master.GTIDSet(), force); err != nil {
return errors.Trace(err)
}
if err := c.eventHandler.OnPosSynced(ev.Header, pos, c.master.GTIDSet(), force); err != nil {
return errors.Trace(err)
}
}

return nil
}

type node struct {
Expand Down

0 comments on commit e35272c

Please sign in to comment.