Skip to content

Commit

Permalink
debug thread-safe zerolog usage, rs/zerolog#555, try to remove local …
Browse files Browse the repository at this point in the history
…logger in upload.go
  • Loading branch information
Slach committed Jun 14, 2023
1 parent 733e689 commit fd90f25
Showing 1 changed file with 9 additions and 10 deletions.
19 changes: 9 additions & 10 deletions pkg/backup/upload.go
Expand Up @@ -453,8 +453,7 @@ func (b *Backuper) uploadTableData(ctx context.Context, backupName string, table
for disk := range table.Parts {
capacity += len(table.Parts[disk])
}
logger := log.With().Str("logger", "uploadTableData").Logger()
logger.Debug().Msgf("start %s.%s with concurrency=%d len(table.Parts[...])=%d", table.Database, table.Table, b.cfg.General.UploadConcurrency, capacity)
log.Debug().Msgf("start %s.%s with concurrency=%d len(table.Parts[...])=%d", table.Database, table.Table, b.cfg.General.UploadConcurrency, capacity)
s := semaphore.NewWeighted(int64(b.cfg.General.UploadConcurrency))
g, ctx := errgroup.WithContext(ctx)
var uploadedBytes int64
Expand All @@ -479,7 +478,7 @@ breakByError:
continue
}
if err := s.Acquire(ctx, 1); err != nil {
logger.Error().Msgf("can't acquire semaphore during Upload data parts: %v", err)
log.Error().Msgf("can't acquire semaphore during Upload data parts: %v", err)
break breakByError
}
backupPath := b.getLocalBackupDataPathForTable(backupName, disk, dbAndTablePath)
Expand All @@ -499,17 +498,17 @@ breakByError:
return nil
}
}
logger.Debug().Msgf("start upload %d files to %s", len(partFiles), remotePath)
log.Debug().Msgf("start upload %d files to %s", len(partFiles), remotePath)
if uploadPathBytes, err := b.dst.UploadPath(ctx, 0, backupPath, partFiles, remotePath, b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration); err != nil {
logger.Error().Msgf("UploadPath return error: %v", err)
log.Error().Msgf("UploadPath return error: %v", err)
return fmt.Errorf("can't upload: %v", err)
} else {
atomic.AddInt64(&uploadedBytes, uploadPathBytes)
if b.resume {
b.resumableState.AppendToState(remotePathFull, uploadPathBytes)
}
}
logger.Debug().Msgf("finish upload %d files to %s", len(partFiles), remotePath)
log.Debug().Msgf("finish upload %d files to %s", len(partFiles), remotePath)
return nil
})
} else {
Expand All @@ -525,13 +524,13 @@ breakByError:
return nil
}
}
logger.Debug().Msgf("start upload %d files to %s", len(localFiles), remoteDataFile)
log.Debug().Msgf("start upload %d files to %s", len(localFiles), remoteDataFile)
retry := retrier.New(retrier.ConstantBackoff(b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration), nil)
err := retry.RunCtx(ctx, func(ctx context.Context) error {
return b.dst.UploadCompressedStream(ctx, backupPath, localFiles, remoteDataFile)
})
if err != nil {
logger.Error().Msgf("UploadCompressedStream return error: %v", err)
log.Error().Msgf("UploadCompressedStream return error: %v", err)
return fmt.Errorf("can't upload: %v", err)
}
remoteFile, err := b.dst.StatFile(ctx, remoteDataFile)
Expand All @@ -542,7 +541,7 @@ breakByError:
if b.resume {
b.resumableState.AppendToState(remoteDataFile, remoteFile.Size())
}
logger.Debug().Msgf("finish upload to %s", remoteDataFile)
log.Debug().Msgf("finish upload to %s", remoteDataFile)
return nil
})
}
Expand All @@ -551,7 +550,7 @@ breakByError:
if err := g.Wait(); err != nil {
return nil, 0, fmt.Errorf("one of uploadTableData go-routine return error: %v", err)
}
logger.Debug().Msgf("finish %s.%s with concurrency=%d len(table.Parts[...])=%d uploadedFiles=%v, uploadedBytes=%v", table.Database, table.Table, b.cfg.General.UploadConcurrency, capacity, uploadedFiles, uploadedBytes)
log.Debug().Msgf("finish %s.%s with concurrency=%d len(table.Parts[...])=%d uploadedFiles=%v, uploadedBytes=%v", table.Database, table.Table, b.cfg.General.UploadConcurrency, capacity, uploadedFiles, uploadedBytes)
return uploadedFiles, uploadedBytes, nil
}

Expand Down

0 comments on commit fd90f25

Please sign in to comment.