Skip to content

Commit

Permalink
Merge pull request kiali#3669 from xeviknal/3664-3654-3653-1.29
Browse files Browse the repository at this point in the history
[1.29] Cherry-picking kiali-3664 kiali-3654 kiali-3653
  • Loading branch information
xeviknal committed Feb 3, 2021
2 parents 92b558e + 40bc2eb commit 26b55ff
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 20 deletions.
23 changes: 23 additions & 0 deletions business/dashboards_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,29 @@ func TestDiscoveryMatcherWithComposition(t *testing.T) {
assert.Equal("dashboard2", runtimes[0].DashboardRefs[0].Template)
}

func TestGetCustomDashboardRefs(t *testing.T) {
assert := assert.New(t)

// Setup mocks
service, k8s, prom := setupService()
d1 := fakeDashboard("1")
d2 := fakeDashboard("2")
k8s.On("GetDashboards", "my-namespace").Return([]v1alpha1.MonitoringDashboard{}, nil)
k8s.On("GetDashboards", "istio-system").Return([]v1alpha1.MonitoringDashboard{*d1, *d2}, nil)
prom.MockMetricsForLabels([]string{"my_metric_1_1", "request_count", "tcp_received", "tcp_sent"})
pods := []*models.Pod{}

runtimes := service.GetCustomDashboardRefs("my-namespace", "app", "", pods)

k8s.AssertNumberOfCalls(t, "GetDashboards", 2)
prom.AssertNumberOfCalls(t, "GetMetricsForLabels", 1)
assert.Len(runtimes, 1)
assert.Equal("Runtime 1", runtimes[0].Name)
assert.Len(runtimes[0].DashboardRefs, 1)
assert.Equal("dashboard1", runtimes[0].DashboardRefs[0].Template)
assert.Equal("Dashboard 1", runtimes[0].DashboardRefs[0].Title)
}

func fakeDashboard(id string) *v1alpha1.MonitoringDashboard {
return &v1alpha1.MonitoringDashboard{
ObjectMeta: v1.ObjectMeta{
Expand Down
13 changes: 12 additions & 1 deletion business/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,9 @@ func (in *HealthService) getNamespaceServiceHealth(namespace string, services []
health.Requests.AggregateInbound(sample)
}
}

for _, health := range allHealth {
health.Requests.CombineReporters()
}
return allHealth
}

Expand Down Expand Up @@ -259,6 +261,9 @@ func fillAppRequestRates(allHealth models.NamespaceAppHealth, rates model.Vector
health.Requests.AggregateOutbound(sample)
}
}
for _, health := range allHealth {
health.Requests.CombineReporters()
}
}

// fillWorkloadRequestRates aggregates requests rates from metrics fetched from Prometheus, and stores the result in the health map.
Expand All @@ -275,6 +280,9 @@ func fillWorkloadRequestRates(allHealth models.NamespaceWorkloadHealth, rates mo
health.Requests.AggregateOutbound(sample)
}
}
for _, health := range allHealth {
health.Requests.CombineReporters()
}
}

func (in *HealthService) getServiceRequestsHealth(namespace, service, rateInterval string, queryTime time.Time) (models.RequestHealth, error) {
Expand All @@ -283,6 +291,7 @@ func (in *HealthService) getServiceRequestsHealth(namespace, service, rateInterv
for _, sample := range inbound {
rqHealth.AggregateInbound(sample)
}
rqHealth.CombineReporters()
return rqHealth, err
}

Expand All @@ -295,6 +304,7 @@ func (in *HealthService) getAppRequestsHealth(namespace, app, rateInterval strin
for _, sample := range outbound {
rqHealth.AggregateOutbound(sample)
}
rqHealth.CombineReporters()
return rqHealth, err
}

Expand All @@ -307,5 +317,6 @@ func (in *HealthService) getWorkloadRequestsHealth(namespace, workload, rateInte
for _, sample := range outbound {
rqHealth.AggregateOutbound(sample)
}
rqHealth.CombineReporters()
return rqHealth, err
}
5 changes: 5 additions & 0 deletions business/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ var (
"destination_service": "httpbin.tutorial.svc.cluster.local",
"request_protocol": "http",
"response_code": "200",
"reporter": "source",
},
Value: model.SampleValue(5),
Timestamp: model.Now(),
Expand All @@ -270,6 +271,7 @@ var (
"destination_service": "httpbin.tutorial.svc.cluster.local",
"request_protocol": "http",
"response_code": "400",
"reporter": "source",
},
Value: model.SampleValue(3.5),
Timestamp: model.Now(),
Expand All @@ -280,6 +282,7 @@ var (
"destination_service": "httpbin.tutorial.svc.cluster.local",
"request_protocol": "grpc",
"grpc_response_status": "0",
"reporter": "source",
},
Value: model.SampleValue(5),
Timestamp: model.Now(),
Expand All @@ -290,6 +293,7 @@ var (
"destination_service": "httpbin.tutorial.svc.cluster.local",
"request_protocol": "grpc",
"grpc_response_status": "7",
"reporter": "source",
},
Value: model.SampleValue(3.5),
Timestamp: model.Now(),
Expand Down Expand Up @@ -345,6 +349,7 @@ var (
"request_protocol": "http",
"source_service": "unknown",
"response_code": "500",
"reporter": "source",
},
Value: model.SampleValue(1.6),
Timestamp: model.Now(),
Expand Down
63 changes: 51 additions & 12 deletions kubernetes/istio.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"regexp"
"strings"
"sync"
"time"

"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -214,26 +215,64 @@ func (in *K8SClient) GetProxyStatus() ([]*ProxyStatus, error) {
return nil, errors.New("unable to find any Pilot instances")
}

wg := sync.WaitGroup{}
wg.Add(len(istiods))
errChan := make(chan error, len(istiods))
syncChan := make(chan map[string][]byte, len(istiods))

healthyIstiods := 0
result := map[string][]byte{}
for _, istiod := range istiods {
res, err := in.k8s.CoreV1().RESTClient().Get().
Namespace(istiod.Namespace).
Resource("pods").
SubResource("proxy").
Name(istiod.Name).
Suffix("/debug/syncz").
DoRaw(in.ctx)
if istiod.Status.Phase != "Running" {
wg.Add(-1)
continue
}
healthyIstiods = healthyIstiods + 1

go func(name, namespace string) {
defer wg.Done()
res, err := in.k8s.CoreV1().Pods(namespace).
ProxyGet(
"http",
name,
"8080",
"/debug/syncz",
map[string]string{}).
DoRaw(in.ctx)

if err != nil {
errChan <- fmt.Errorf("%s: %s", name, err.Error())
} else {
syncChan <- map[string][]byte{name: res}
}
}(istiod.Name, istiod.Namespace)
}

if err != nil {
return nil, err
wg.Wait()
close(errChan)
close(syncChan)

errs := ""
for err := range errChan {
if errs != "" {
errs = errs + "; "
}
errs = errs + err.Error()
}
errs = "Error fetching the proxy-status in the following pods: " + errs

if len(res) > 0 {
result[istiod.Name] = res
for status := range syncChan {
for pilot, sync := range status {
result[pilot] = sync
}
}

return getStatus(result)
// If there is one sync, we consider it as valid
if len(result) > 0 {
return getStatus(result)
}

return nil, errors.New(errs)
}

func getStatus(statuses map[string][]byte) ([]*ProxyStatus, error) {
Expand Down
68 changes: 63 additions & 5 deletions models/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package models

import (
"github.com/prometheus/common/model"

"github.com/kiali/kiali/log"
)

// NamespaceAppHealth is an alias of map of app name x health
Expand All @@ -25,7 +27,12 @@ type AppHealth struct {
}

func NewEmptyRequestHealth() RequestHealth {
return RequestHealth{Inbound: make(map[string]map[string]float64), Outbound: make(map[string]map[string]float64)}
return RequestHealth{
Inbound: make(map[string]map[string]float64),
Outbound: make(map[string]map[string]float64),
inboundSource: make(map[string]map[string]float64),
inboundDestination: make(map[string]map[string]float64),
}
}

// EmptyAppHealth create an empty AppHealth
Expand Down Expand Up @@ -87,18 +94,69 @@ type ProxyStatus struct {
// - Inbound//Outbound are the rates of requests by protocol and status_code.
// Example: Inbound: { "http": {"200": 1.5, "400": 2.3}, "grpc": {"1": 1.2} }
type RequestHealth struct {
Inbound map[string]map[string]float64 `json:"inbound"`
Outbound map[string]map[string]float64 `json:"outbound"`
Inbound map[string]map[string]float64 `json:"inbound"`
Outbound map[string]map[string]float64 `json:"outbound"`
inboundSource map[string]map[string]float64
inboundDestination map[string]map[string]float64
}

// AggregateInbound adds the provided metric sample to internal inbound counters and updates error ratios
func (in *RequestHealth) AggregateInbound(sample *model.Sample) {
aggregate(sample, in.Inbound)
// Samples need to be aggregated by source or destination reporter, but not accumulated both
reporter := string(sample.Metric[model.LabelName("reporter")])
switch reporter {
case "source":
aggregate(sample, in.inboundSource)
case "destination":
aggregate(sample, in.inboundDestination)
default:
log.Tracef("Inbound metric without reporter %v ", sample)
aggregate(sample, in.Inbound)
}
}

// AggregateOutbound adds the provided metric sample to internal outbound counters and updates error ratios
func (in *RequestHealth) AggregateOutbound(sample *model.Sample) {
aggregate(sample, in.Outbound)
// Outbound traffic will be aggregated per source reporter
reporter := string(sample.Metric[model.LabelName("reporter")])
if reporter == "source" {
aggregate(sample, in.Outbound)
}
}

// RequestHealth internally stores Inbound rate separated by reporter
// There were duplicated values that should exist in both reports
// but there may exist values that only are present in one or another reporter,
// those should be consolidated into a single result
func (in *RequestHealth) CombineReporters() {
// Inbound
// Init Inbound with data from source reporter
for isProtocol, isCodes := range in.inboundSource {
if _, ok := in.Inbound[isProtocol]; !ok {
in.Inbound[isProtocol] = make(map[string]float64)
}
for isCode, isValue := range isCodes {
in.Inbound[isProtocol][isCode] = isValue
}
}
// Combine data from destination and source reporters for Inbound rate
for idProtocol, idCodes := range in.inboundDestination {
if _, ok := in.Inbound[idProtocol]; !ok {
in.Inbound[idProtocol] = make(map[string]float64)
}
for idCode, idValue := range idCodes {
// If an Inbound -> protocol -> value is reported by destination but not by source reporter, we add it
if _, ok := in.Inbound[idProtocol][idCode]; !ok {
in.Inbound[idProtocol][idCode] = idValue
} else {
// If the value provided by destination is higher than the source we replace it
// i.e. destination reports errors but not from source
if idValue > in.Inbound[idProtocol][idCode] {
in.Inbound[idProtocol][idCode] = idValue
}
}
}
}
}

func aggregate(sample *model.Sample, requests map[string]map[string]float64) {
Expand Down
14 changes: 12 additions & 2 deletions prometheus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package prometheus
import (
"context"
"fmt"
"net"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -84,9 +85,17 @@ func NewClientForConfig(cfg config.PrometheusConfig) (*Client, error) {
}

// make a copy of the prometheus DefaultRoundTripper to avoid race condition (issue #3518)
defaultRoundTripper := *api.DefaultRoundTripper.(*http.Transport)
// Do not copy the struct itself, it contains a lock. Re-create it from scratch instead.
roundTripper := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
TLSHandshakeTimeout: 10 * time.Second,
}

transportConfig, err := httputil.CreateTransport(&auth, &defaultRoundTripper, httputil.DefaultTimeout)
transportConfig, err := httputil.CreateTransport(&auth, roundTripper, httputil.DefaultTimeout)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -275,6 +284,7 @@ func (in *Client) GetMetricsForLabels(labels []string) ([]string, error) {
// Arbitrarily set time range. Meaning that discovery works with metrics produced within last hour
end := time.Now()
start := end.Add(-time.Hour)
log.Tracef("[Prom] GetMetricsForLabels: %v", labels)
results, warnings, err := in.api.Series(in.ctx, labels, start, end)
if warnings != nil && len(warnings) > 0 {
log.Warningf("GetMetricsForLabels. Prometheus Warnings: [%s]", strings.Join(warnings, ","))
Expand Down
3 changes: 3 additions & 0 deletions prometheus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func fetchHistogramValues(ctx context.Context, api prom_v1.API, metricName, labe
queries := buildHistogramQueries(metricName, labels, grouping, rateInterval, avg, quantiles)
histogram := make(map[string]model.Vector, len(queries))
for k, query := range queries {
log.Tracef("[Prom] fetchHistogramValues: %s", query)
result, warnings, err := api.Query(ctx, query, queryTime)
if warnings != nil && len(warnings) > 0 {
log.Warningf("fetchHistogramValues. Prometheus Warnings: [%s]", strings.Join(warnings, ","))
Expand Down Expand Up @@ -93,6 +94,7 @@ func buildHistogramQueries(metricName, labels, grouping, rateInterval string, av
}

func fetchRange(ctx context.Context, api prom_v1.API, query string, bounds prom_v1.Range) Metric {
log.Tracef("[Prom] fetchRange: %s", query)
result, warnings, err := api.QueryRange(ctx, query, bounds)
if warnings != nil && len(warnings) > 0 {
log.Warningf("fetchRange. Prometheus Warnings: [%s]", strings.Join(warnings, ","))
Expand Down Expand Up @@ -172,6 +174,7 @@ func getItemRequestRates(ctx context.Context, api prom_v1.API, namespace, item,

func getRequestRatesForLabel(ctx context.Context, api prom_v1.API, time time.Time, labels, ratesInterval string) (model.Vector, error) {
query := fmt.Sprintf("rate(istio_requests_total{%s}[%s]) > 0", labels, ratesInterval)
log.Tracef("[Prom] getRequestRatesForLabel: %s", query)
promtimer := internalmetrics.GetPrometheusProcessingTimePrometheusTimer("Metrics-GetRequestRates")
result, warnings, err := api.Query(ctx, query, time)
if warnings != nil && len(warnings) > 0 {
Expand Down
5 changes: 5 additions & 0 deletions prometheus/prometheustest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ func (o *PromClientMock) MockWorkloadRequestRates(namespace, wkld string, in, ou
o.On("GetWorkloadRequestRates", namespace, wkld, mock.AnythingOfType("string"), mock.AnythingOfType("time.Time")).Return(in, out, nil)
}

// MockMetricsForLabels mocks GetMetricsForLabels
func (o *PromClientMock) MockMetricsForLabels(metrics []string) {
o.On("GetMetricsForLabels", mock.AnythingOfType("[]string")).Return(metrics, nil)
}

func (o *PromClientMock) GetAllRequestRates(namespace, ratesInterval string, queryTime time.Time) (model.Vector, error) {
args := o.Called(namespace, ratesInterval, queryTime)
return args.Get(0).(model.Vector), args.Error(1)
Expand Down

0 comments on commit 26b55ff

Please sign in to comment.