Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some error wrapping to sync API, use background context for sync #1363

Merged
merged 2 commits into from Aug 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 12 additions & 9 deletions syncapi/storage/shared/syncserver.go
Expand Up @@ -451,7 +451,7 @@ func (d *Database) addPDUDeltaToResponse(
wantFullState bool,
res *types.Response,
) (joinedRoomIDs []string, err error) {
txn, err := d.DB.BeginTx(ctx, &txReadOnlySnapshot)
txn, err := d.DB.BeginTx(context.TODO(), &txReadOnlySnapshot) // TODO: check mattn/go-sqlite3#764
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -577,20 +577,23 @@ func (d *Database) IncrementalSync(
joinedRoomIDs, err = d.addPDUDeltaToResponse(
ctx, device, r, numRecentEventsPerRoom, wantFullState, res,
)
if err != nil {
return nil, fmt.Errorf("d.addPDUDeltaToResponse: %w", err)
}
} else {
joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(
ctx, nil, device.UserID, gomatrixserverlib.Join,
)
}
if err != nil {
return nil, err
if err != nil {
return nil, fmt.Errorf("d.CurrentRoomState.SelectRoomIDsWithMembership: %w", err)
}
}

err = d.addEDUDeltaToResponse(
fromPos, toPos, joinedRoomIDs, res,
)
if err != nil {
return nil, err
return nil, fmt.Errorf("d.addEDUDeltaToResponse: %w", err)
}

return res, nil
Expand Down Expand Up @@ -632,7 +635,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
// a consistent view of the database throughout. This includes extracting the sync position.
// This does have the unfortunate side-effect that all the matrixy logic resides in this function,
// but it's better to not hide the fact that this is being done in a transaction.
txn, err := d.DB.BeginTx(ctx, &txReadOnlySnapshot)
txn, err := d.DB.BeginTx(context.TODO(), &txReadOnlySnapshot) // TODO: check mattn/go-sqlite3#764
if err != nil {
return
}
Expand Down Expand Up @@ -719,15 +722,15 @@ func (d *Database) CompleteSync(
ctx, res, device.UserID, numRecentEventsPerRoom,
)
if err != nil {
return nil, err
return nil, fmt.Errorf("d.getResponseWithPDUsForCompleteSync: %w", err)
}

// Use a zero value SyncPosition for fromPos so all EDU states are added.
err = d.addEDUDeltaToResponse(
types.NewStreamToken(0, 0, nil), toPos, joinedRoomIDs, res,
)
if err != nil {
return nil, err
return nil, fmt.Errorf("d.addEDUDeltaToResponse: %w", err)
}

return res, nil
Expand All @@ -753,7 +756,7 @@ func (d *Database) addInvitesToResponse(
ctx, txn, userID, r,
)
if err != nil {
return err
return fmt.Errorf("d.Invites.SelectInviteEventsInRange: %w", err)
}
for roomID, inviteEvent := range invites {
ir := types.NewInviteResponse(inviteEvent)
Expand Down
24 changes: 14 additions & 10 deletions syncapi/sync/requestpool.go
Expand Up @@ -18,6 +18,7 @@ package sync

import (
"context"
"fmt"
"net/http"
"time"

Expand Down Expand Up @@ -204,31 +205,34 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
// See if we have any new tasks to do for the send-to-device messaging.
events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, since)
if err != nil {
return nil, err
return nil, fmt.Errorf("rp.db.SendToDeviceUpdatesForSync: %w", err)
}

// TODO: handle ignored users
if req.since == nil {
res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit)
if err != nil {
return res, fmt.Errorf("rp.db.CompleteSync: %w", err)
}
} else {
res, err = rp.db.IncrementalSync(req.ctx, res, req.device, *req.since, latestPos, req.limit, req.wantFullState)
}
if err != nil {
return res, err
if err != nil {
return res, fmt.Errorf("rp.db.IncrementalSync: %w", err)
}
}

accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead
res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition(), &accountDataFilter)
if err != nil {
return res, err
return res, fmt.Errorf("rp.appendAccountData: %w", err)
}
res, err = rp.appendDeviceLists(res, req.device.UserID, since, latestPos)
if err != nil {
return res, err
return res, fmt.Errorf("rp.appendDeviceLists: %w", err)
}
err = internal.DeviceOTKCounts(req.ctx, rp.keyAPI, req.device.UserID, req.device.ID, res)
if err != nil {
return res, err
return res, fmt.Errorf("internal.DeviceOTKCounts: %w", err)
}

// Before we return the sync response, make sure that we take action on
Expand All @@ -238,7 +242,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
// Handle the updates and deletions in the database.
err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, since)
if err != nil {
return res, err
return res, fmt.Errorf("rp.db.CleanSendToDeviceUpdates: %w", err)
}
}
if len(events) > 0 {
Expand All @@ -263,7 +267,7 @@ func (rp *RequestPool) appendDeviceLists(
) (*types.Response, error) {
_, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since, to)
if err != nil {
return nil, err
return nil, fmt.Errorf("internal.DeviceListCatchup: %w", err)
}

return data, nil
Expand Down Expand Up @@ -329,7 +333,7 @@ func (rp *RequestPool) appendAccountData(
req.ctx, userID, r, accountDataFilter,
)
if err != nil {
return nil, err
return nil, fmt.Errorf("rp.db.GetAccountDataInRange: %w", err)
}

if len(dataTypes) == 0 {
Expand Down