Skip to content

Commit

Permalink
Another optimization attempt.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Jan 26, 2022
1 parent 273cb92 commit 870e237
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 60 deletions.
142 changes: 89 additions & 53 deletions prometheus/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,67 @@ var separatorByteSlice = []byte{model.SeparatorByte} // For convenient use with
// Use CachedTGatherer with classic Registry using NewMultiTRegistry and ToTransactionalGatherer helpers.
// NOTE(bwplotka): Experimental, API and behaviour can change.
type CachedTGatherer struct {
metrics map[uint64]*dto.Metric
metricFamilyByName map[string]*dto.MetricFamily
metricFamilyByName map[string]*family
mMu sync.RWMutex
}

func NewCachedTGatherer() *CachedTGatherer {
return &CachedTGatherer{
metrics: make(map[uint64]*dto.Metric),
metricFamilyByName: map[string]*dto.MetricFamily{},
metricFamilyByName: map[string]*family{},
}
}

type family struct {
*dto.MetricFamily

metricsByHash map[uint64]*metric
touched bool
}

type metric struct {
*dto.Metric
touched bool
}

// normalizeMetricFamilies returns a MetricFamily slice with empty
// MetricFamilies pruned and the remaining MetricFamilies sorted by name within
// the slice, with the contained Metrics sorted within each MetricFamily.
func normalizeMetricFamilies(metricFamiliesByName map[string]*family) []*dto.MetricFamily {
for _, mf := range metricFamiliesByName {
if cap(mf.Metric) < len(mf.metricsByHash) {
mf.Metric = make([]*dto.Metric, 0, len(mf.metricsByHash))
}
mf.Metric = mf.Metric[:0]
for _, m := range mf.metricsByHash {
mf.Metric = append(mf.Metric, m.Metric)
}
sort.Sort(internal.MetricSorter(mf.Metric))
}

for _, mf := range metricFamiliesByName {
sort.Sort(internal.MetricSorter(mf.Metric))
}
names := make([]string, 0, len(metricFamiliesByName))
for name, mf := range metricFamiliesByName {
if len(mf.Metric) > 0 {
names = append(names, name)
}
}
sort.Strings(names)
result := make([]*dto.MetricFamily, 0, len(names))
for _, name := range names {
result = append(result, metricFamiliesByName[name].MetricFamily)
}
return result
}

// Gather implements TransactionalGatherer interface.
func (c *CachedTGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
c.mMu.RLock()

// BenchmarkCachedTGatherer_Update shows, even for 1 million metrics with 1000 families
// BenchmarkCachedTGatherer_Update shows, even for 1 million metrics among 1000 families
// this is efficient enough (~300µs and ~50 kB per op), no need to cache it for now.
return internal.NormalizeMetricFamilies(c.metricFamilyByName), c.mMu.RUnlock, nil
return normalizeMetricFamilies(c.metricFamilyByName), c.mMu.RUnlock, nil
}

type Key struct {
Expand Down Expand Up @@ -123,13 +165,6 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key)
c.mMu.Lock()
defer c.mMu.Unlock()

currMetrics := c.metrics
currMetricFamilies := c.metricFamilyByName
if reset {
currMetrics = make(map[uint64]*dto.Metric, len(inserts))
currMetricFamilies = make(map[string]*dto.MetricFamily, len(c.metricFamilyByName))
}

errs := prometheus.MultiError{}
for i := range inserts {
// TODO(bwplotka): Validate more about this insert?
Expand All @@ -141,22 +176,25 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key)
// Update metric family.
mf, ok := c.metricFamilyByName[inserts[i].FQName]
if !ok {
mf = &dto.MetricFamily{}
mf = &family{
MetricFamily: &dto.MetricFamily{},
metricsByHash: map[uint64]*metric{},
}
mf.Name = &inserts[i].FQName
} else if reset {
// Reset metric slice, since we want to start from scratch.
mf.Metric = mf.Metric[:0]
}
mf.touched = true
mf.Type = inserts[i].ValueType.ToDTO()
mf.Help = &inserts[i].Help

currMetricFamilies[inserts[i].FQName] = mf
c.metricFamilyByName[inserts[i].FQName] = mf

// Update metric pointer.
hSum := inserts[i].hash()
m, ok := c.metrics[hSum]
m, ok := mf.metricsByHash[hSum]
if !ok {
m = &dto.Metric{Label: make([]*dto.LabelPair, 0, len(inserts[i].LabelNames))}
m = &metric{
Metric: &dto.Metric{Label: make([]*dto.LabelPair, 0, len(inserts[i].LabelNames))},
}
for j := range inserts[i].LabelNames {
m.Label = append(m.Label, &dto.LabelPair{
Name: &inserts[i].LabelNames[j],
Expand All @@ -165,6 +203,7 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key)
}
sort.Sort(internal.LabelPairSorter(m.Label))
}
m.touched = true

switch inserts[i].ValueType {
case prometheus.CounterValue:
Expand Down Expand Up @@ -202,16 +241,7 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key)
if inserts[i].Timestamp != nil {
m.TimestampMs = proto.Int64(inserts[i].Timestamp.Unix()*1000 + int64(inserts[i].Timestamp.Nanosecond()/1000000))
}
currMetrics[hSum] = m

if !reset && ok {
// If we did update without reset and we found metric in previous
// map, we know metric pointer exists in metric family map, so just continue.
continue
}

// Will be sorted later anyway, so just append.
mf.Metric = append(mf.Metric, m)
mf.metricsByHash[hSum] = m
}

for _, del := range deletions {
Expand All @@ -220,42 +250,48 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key)
continue
}

hSum := del.hash()
m, ok := currMetrics[hSum]
mf, ok := c.metricFamilyByName[del.FQName]
if !ok {
continue
}
delete(currMetrics, hSum)

mf, ok := currMetricFamilies[del.FQName]
if !ok {
// Impossible, but well...
errs.Append(fmt.Errorf("could not remove metric %s(%s) from metric family, metric family does not exists", del.FQName, del.LabelValues))
hSum := del.hash()
if _, ok := mf.metricsByHash[hSum]; !ok {
continue
}

toDel := -1
for i := range mf.Metric {
if mf.Metric[i] == m {
toDel = i
break
}
}

if toDel == -1 {
errs.Append(fmt.Errorf("could not remove metric %s(%s) from metric family, metric family does not have such metric", del.FQName, del.LabelValues))
if len(mf.metricsByHash) == 1 {
delete(c.metricFamilyByName, del.FQName)
continue
}

if len(mf.Metric) == 1 {
delete(currMetricFamilies, del.FQName)
continue
delete(mf.metricsByHash, hSum)
}

if reset {
for name, mf := range c.metricFamilyByName {
if !mf.touched {
delete(c.metricFamilyByName, name)
continue
}
for hash, m := range mf.metricsByHash {
if !m.touched {
delete(mf.metricsByHash, hash)
continue
}
}
if len(mf.metricsByHash) == 0 {
delete(c.metricFamilyByName, name)
}
}
}

mf.Metric = append(mf.Metric[:toDel], mf.Metric[toDel+1:]...)
for _, mf := range c.metricFamilyByName {
mf.touched = false
for _, m := range mf.metricsByHash {
m.touched = false
}
}

c.metrics = currMetrics
c.metricFamilyByName = currMetricFamilies
return errs.MaybeUnwrap()
}
2 changes: 1 addition & 1 deletion prometheus/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func BenchmarkCachedTGatherer_Update(b *testing.B) {
b.Error("update:", err)
}

if len(c.metricFamilyByName) != 1e3 || len(c.metrics) != 1e6 {
if len(c.metricFamilyByName) != 1e3 || len(c.metricFamilyByName["realistic_longer_name_123"].metricsByHash) != 1e3 {
// Ensure we did not generate duplicates.
panic("generated data set gave wrong numbers")
}
Expand Down
12 changes: 6 additions & 6 deletions prometheus/internal/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@ func (s LabelPairSorter) Less(i, j int) bool {
return s[i].GetName() < s[j].GetName()
}

// metricSorter is a sortable slice of *dto.Metric.
type metricSorter []*dto.Metric
// MetricSorter is a sortable slice of *dto.Metric.
type MetricSorter []*dto.Metric

func (s metricSorter) Len() int {
func (s MetricSorter) Len() int {
return len(s)
}

func (s metricSorter) Swap(i, j int) {
func (s MetricSorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (s metricSorter) Less(i, j int) bool {
func (s MetricSorter) Less(i, j int) bool {
if len(s[i].Label) != len(s[j].Label) {
// This should not happen. The metrics are
// inconsistent. However, we have to deal with the fact, as
Expand Down Expand Up @@ -84,7 +84,7 @@ func (s metricSorter) Less(i, j int) bool {
// the slice, with the contained Metrics sorted within each MetricFamily.
func NormalizeMetricFamilies(metricFamiliesByName map[string]*dto.MetricFamily) []*dto.MetricFamily {
for _, mf := range metricFamiliesByName {
sort.Sort(metricSorter(mf.Metric))
sort.Sort(MetricSorter(mf.Metric))
}
names := make([]string, 0, len(metricFamiliesByName))
for name, mf := range metricFamiliesByName {
Expand Down

0 comments on commit 870e237

Please sign in to comment.