/
progress.go
123 lines (101 loc) · 3.33 KB
/
progress.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Copyright 2021-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.
package cbft
import (
"fmt"
"net/http"
"github.com/couchbase/cbgt"
"github.com/couchbase/cbgt/rest"
)
// ProgressStatsHandler is a REST handler that provides stats relevant to
// infer indexing progress.
type ProgressStatsHandler struct {
mgr *cbgt.Manager
}
func NewProgressStatsHandler(mgr *cbgt.Manager) *ProgressStatsHandler {
return &ProgressStatsHandler{mgr: mgr}
}
func (h *ProgressStatsHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
indexName := rest.IndexNameLookup(req)
if indexName == "" {
rest.ShowError(w, req, "index name is required", http.StatusBadRequest)
return
}
indexProgressStats, _ := gatherIndexProgressStats(h.mgr, indexName)
docCount, _ := indexProgressStats["doc_count"].(uint64)
totSeqReceived, _ := indexProgressStats["tot_seq_received"].(uint64)
numMutationsToIndex, _ := indexProgressStats["num_mutations_to_index"].(uint64)
rv := struct {
Status string `json:"status"`
DocCount uint64 `json:"doc_count"`
TotSeqReceived uint64 `json:"tot_seq_received"`
NumMutationsToIndex uint64 `json:"num_mutations_to_index"`
}{
Status: "ok",
DocCount: docCount,
NumMutationsToIndex: numMutationsToIndex,
TotSeqReceived: totSeqReceived,
}
rest.MustEncode(w, rv)
}
// ---------------------------------------------------------------
func gatherIndexProgressStats(mgr *cbgt.Manager, indexName string) (
map[string]interface{}, error) {
if mgr == nil {
return nil, fmt.Errorf("manager not available")
}
indexDef, pindexImplType, err := mgr.GetIndexDef(indexName, false)
if err != nil || indexDef == nil {
return nil,
fmt.Errorf("unable to obtain index def for `%v`, err: %v", indexName, err)
}
count, err := pindexImplType.Count(mgr, indexName, "")
if err != nil {
return nil, err
}
rv := map[string]interface{}{}
rv["doc_count"] = count
sourcePartitionSeqs := GetSourcePartitionSeqs(SourceSpec{
SourceType: indexDef.SourceType,
SourceName: indexDef.SourceName,
SourceUUID: indexDef.SourceUUID,
SourceParams: indexDef.SourceParams,
Server: mgr.Server(),
})
destPartitionSeqs := map[string]cbgt.UUIDSeq{}
_, pindexes := mgr.CurrentMaps()
for _, pindex := range pindexes {
if pindex.IndexName != indexDef.Name {
continue
}
if pindex.Dest != nil {
destForwarder, ok := pindex.Dest.(*cbgt.DestForwarder)
if !ok {
continue
}
partitionSeqsProvider, ok :=
destForwarder.DestProvider.(PartitionSeqsProvider)
if !ok {
continue
}
if partitionSeqs, err := partitionSeqsProvider.PartitionSeqs(); err == nil {
for partitionId, uuidSeq := range partitionSeqs {
destPartitionSeqs[partitionId] = uuidSeq
}
}
}
}
totSeqReceived, numMutationsToIndex, err := obtainDestSeqsForIndex(
indexDef, sourcePartitionSeqs, destPartitionSeqs)
if err != nil {
return rv, err
}
rv["tot_seq_received"] = totSeqReceived
rv["num_mutations_to_index"] = numMutationsToIndex
return rv, nil
}