New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support handle events order by sender #33
Conversation
01af2eb
to
6e70fd9
Compare
6e70fd9
to
d8384d7
Compare
eventbus/mysql/eventbus.go
Outdated
|
||
// 按EventCreatedAt 排序 | ||
sort.Sort(ByEventCreatedAt(eventPOs)) | ||
// 按sender分组 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
是否应该有个开关,如果业务场景大量事件sender相同,并发消费会失效。框架之前有设计事件发送类型,可以考虑实现SendTypeFIFO(保序事件)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SendTypeFIFO定义:
Line 38 in eb0f0ef
SendTypeFIFO SendType = "FIFO" // 保序事件,即事件以 Sender 的发送时间顺序被消费执行 |
eventbus/mysql/eventbus.go
Outdated
senderEvents[e.Event.GetSender()] = append(_events, e) | ||
} | ||
events := make(chan []*EventPO, len(senderEvents)) | ||
for _, _events := range senderEvents { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ByEventCreatedAt用基数排序实现,把sender也纳入排序,就不用搞个map中间变量了
eventbus/mysql/eventbus_test.go
Outdated
|
||
func TestSortEvent(t *testing.T) { | ||
e1 := &EventPO{Event: &dddfirework.DomainEvent{Sender: "1"}, EventCreatedAt: time.Now()} | ||
time.Sleep(1 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
不建议在ut里sleep,可通过计算实现,比如:time.Now().Add(1 * time.Millisecond)
d8384d7
to
c19ce1c
Compare
eventbus/mysql/eventbus.go
Outdated
@@ -423,11 +424,33 @@ func (e *EventBus) doRetryStrategy(service *ServicePO, remainIDs, failedIDs []in | |||
} | |||
|
|||
func (e *EventBus) dispatchEvents(ctx context.Context, eventPOs []*EventPO) (failed, panics []int64) { | |||
events := make(chan *EventPO, len(eventPOs)) | |||
|
|||
// 按EventCreatedAt 排序 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
只有FIFO事件需要排序,FIFO不一定是主要场景,是否不对所有事件排序而是只对需要排序的事件排序?另外,考虑是否考虑sql查询的时候直接排序,只对limit结果做排序的话全局是否仍然会出现乱序
c19ce1c
to
c2145b3
Compare
c2145b3
to
7be8efa
Compare
No description provided.