Skip to content

Commit

Permalink
Add stats monitor for calculating NanoCores
Browse files Browse the repository at this point in the history
Signed-off-by: James Sturtevant <jstur@microsoft.com>
Signed-off-by: James Sturtevant <jsturtevant@gmail.com>
Signed-off-by: James Sturtevant <jstur@microsoft.com>
  • Loading branch information
jsturtevant committed Mar 29, 2024
1 parent 124456e commit 7fe6d03
Show file tree
Hide file tree
Showing 14 changed files with 921 additions and 430 deletions.
105 changes: 12 additions & 93 deletions internal/cri/server/container_stats_list.go
Expand Up @@ -27,13 +27,11 @@ import (
cg1 "github.com/containerd/cgroups/v3/cgroup1/stats"
cg2 "github.com/containerd/cgroups/v3/cgroup2/stats"
"github.com/containerd/log"
"github.com/containerd/typeurl/v2"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"

"github.com/containerd/containerd/v2/api/services/tasks/v1"
"github.com/containerd/containerd/v2/api/types"
containerstore "github.com/containerd/containerd/v2/internal/cri/store/container"
"github.com/containerd/containerd/v2/internal/cri/store/stats"
"github.com/containerd/containerd/v2/protobuf"
"github.com/containerd/errdefs"
)
Expand Down Expand Up @@ -143,84 +141,15 @@ func (c *criService) toCRIContainerStats(
return nil, fmt.Errorf("failed to decode container metrics for %q: %w", cntr.ID, err)
}

if cs.Cpu != nil && cs.Cpu.UsageCoreNanoSeconds != nil {
// this is a calculated value and should be computed for all OSes
nanoUsage, err := c.getUsageNanoCores(cntr.Metadata.ID, false, cs.Cpu.UsageCoreNanoSeconds.Value, time.Unix(0, cs.Cpu.Timestamp))
if err != nil {
return nil, fmt.Errorf("failed to get usage nano cores, containerID: %s: %w", cntr.Metadata.ID, err)
}
cs.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoUsage}
if cs.Cpu != nil && cntr.Stats != nil {
// this is a calculated value every 10s
cs.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: cntr.Stats.UsageNanoCores}
}
containerStats.Stats = append(containerStats.Stats, cs)
}
return containerStats, nil
}

func (c *criService) getUsageNanoCores(containerID string, isSandbox bool, currentUsageCoreNanoSeconds uint64, currentTimestamp time.Time) (uint64, error) {
var oldStats *stats.ContainerStats

if isSandbox {
sandbox, err := c.sandboxStore.Get(containerID)
if err != nil {
return 0, fmt.Errorf("failed to get sandbox container: %s: %w", containerID, err)
}
oldStats = sandbox.Stats
} else {
container, err := c.containerStore.Get(containerID)
if err != nil {
return 0, fmt.Errorf("failed to get container ID: %s: %w", containerID, err)
}
oldStats = container.Stats
}

if oldStats == nil {
newStats := &stats.ContainerStats{
UsageCoreNanoSeconds: currentUsageCoreNanoSeconds,
Timestamp: currentTimestamp,
}
if isSandbox {
err := c.sandboxStore.UpdateContainerStats(containerID, newStats)
if err != nil {
return 0, fmt.Errorf("failed to update sandbox stats container ID: %s: %w", containerID, err)
}
} else {
err := c.containerStore.UpdateContainerStats(containerID, newStats)
if err != nil {
return 0, fmt.Errorf("failed to update container stats ID: %s: %w", containerID, err)
}
}
return 0, nil
}

nanoSeconds := currentTimestamp.UnixNano() - oldStats.Timestamp.UnixNano()

// zero or negative interval
if nanoSeconds <= 0 {
return 0, nil
}

newUsageNanoCores := uint64(float64(currentUsageCoreNanoSeconds-oldStats.UsageCoreNanoSeconds) /
float64(nanoSeconds) * float64(time.Second/time.Nanosecond))

newStats := &stats.ContainerStats{
UsageCoreNanoSeconds: currentUsageCoreNanoSeconds,
Timestamp: currentTimestamp,
}
if isSandbox {
err := c.sandboxStore.UpdateContainerStats(containerID, newStats)
if err != nil {
return 0, fmt.Errorf("failed to update sandbox container stats: %s: %w", containerID, err)
}
} else {
err := c.containerStore.UpdateContainerStats(containerID, newStats)
if err != nil {
return 0, fmt.Errorf("failed to update container stats ID: %s: %w", containerID, err)
}
}

return newUsageNanoCores, nil
}

func (c *criService) normalizeContainerStatsFilter(filter *runtime.ContainerStatsFilter) {
if cntr, err := c.containerStore.Get(filter.GetId()); err == nil {
filter.Id = cntr.ID
Expand Down Expand Up @@ -301,14 +230,15 @@ func (c *criService) windowsContainerMetrics(
}

if stats != nil {
s, err := typeurl.UnmarshalAny(stats.Data)
data, err := convertMetric(stats)
if err != nil {
return nil, fmt.Errorf("failed to extract container metrics: %w", err)
return nil, err
}
wstats := s.(*wstats.Statistics).GetWindows()
wstats := data.(*wstats.Statistics).GetWindows()
if wstats == nil {
return nil, errors.New("windows stats is empty")
}

if wstats.Processor != nil {
cs.Cpu = &runtime.CpuUsage{
Timestamp: (protobuf.FromTimestamp(wstats.Timestamp)).UnixNano(),
Expand Down Expand Up @@ -357,23 +287,12 @@ func (c *criService) linuxContainerMetrics(
}

if stats != nil {
var data interface{}
switch {
case typeurl.Is(stats.Data, (*cg1.Metrics)(nil)):
data = &cg1.Metrics{}
case typeurl.Is(stats.Data, (*cg2.Metrics)(nil)):
data = &cg2.Metrics{}
case typeurl.Is(stats.Data, (*wstats.Statistics)(nil)):
data = &wstats.Statistics{}
default:
return nil, errors.New("cannot convert metric data to cgroups.Metrics or windows.Statistics")
}

if err := typeurl.UnmarshalTo(stats.Data, data); err != nil {
return nil, fmt.Errorf("failed to extract container metrics: %w", err)
data, err := convertMetric(stats)
if err != nil {
return nil, err
}

cpuStats, err := c.cpuContainerStats(meta.ID, false /* isSandbox */, data, protobuf.FromTimestamp(stats.Timestamp))
cpuStats, err := c.cpuContainerStats(data, protobuf.FromTimestamp(stats.Timestamp))
if err != nil {
return nil, fmt.Errorf("failed to obtain cpu stats: %w", err)
}
Expand Down Expand Up @@ -440,7 +359,7 @@ func getAvailableBytesV2(memory *cg2.MemoryStat, workingSetBytes uint64) uint64
return 0
}

func (c *criService) cpuContainerStats(ID string, isSandbox bool, stats interface{}, timestamp time.Time) (*runtime.CpuUsage, error) {
func (c *criService) cpuContainerStats(stats interface{}, timestamp time.Time) (*runtime.CpuUsage, error) {
switch metrics := stats.(type) {
case *cg1.Metrics:
metrics.GetCPU().GetUsage()
Expand Down
51 changes: 0 additions & 51 deletions internal/cri/server/container_stats_list_test.go
Expand Up @@ -32,57 +32,6 @@ import (
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)

func TestContainerMetricsCPUNanoCoreUsage(t *testing.T) {
c := newTestCRIService()
timestamp := time.Now()
secondAfterTimeStamp := timestamp.Add(time.Second)
ID := "ID"

for _, test := range []struct {
desc string
firstCPUValue uint64
secondCPUValue uint64
expectedNanoCoreUsageFirst uint64
expectedNanoCoreUsageSecond uint64
}{
{
desc: "metrics",
firstCPUValue: 50,
secondCPUValue: 500,
expectedNanoCoreUsageFirst: 0,
expectedNanoCoreUsageSecond: 450,
},
} {
test := test
t.Run(test.desc, func(t *testing.T) {
container, err := containerstore.NewContainer(
containerstore.Metadata{ID: ID},
)
assert.NoError(t, err)
assert.Nil(t, container.Stats)
err = c.containerStore.Add(container)
assert.NoError(t, err)

cpuUsage, err := c.getUsageNanoCores(ID, false, test.firstCPUValue, timestamp)
assert.NoError(t, err)

container, err = c.containerStore.Get(ID)
assert.NoError(t, err)
assert.NotNil(t, container.Stats)

assert.Equal(t, test.expectedNanoCoreUsageFirst, cpuUsage)

cpuUsage, err = c.getUsageNanoCores(ID, false, test.secondCPUValue, secondAfterTimeStamp)
assert.NoError(t, err)
assert.Equal(t, test.expectedNanoCoreUsageSecond, cpuUsage)

container, err = c.containerStore.Get(ID)
assert.NoError(t, err)
assert.NotNil(t, container.Stats)
})
}
}

func TestGetWorkingSet(t *testing.T) {
for _, test := range []struct {
desc string
Expand Down
6 changes: 5 additions & 1 deletion internal/cri/server/sandbox_stats_linux.go
Expand Up @@ -59,10 +59,14 @@ func (c *criService) podSandboxStats(
if stats != nil {
timestamp := time.Now()

cpuStats, err := c.cpuContainerStats(meta.ID, true /* isSandbox */, stats, timestamp)
cpuStats, err := c.cpuContainerStats(stats, timestamp)
if err != nil {
return nil, fmt.Errorf("failed to obtain cpu stats: %w", err)
}
if cpuStats != nil && sandbox.Stats != nil {
// this is a calculated value every 10s
cpuStats.UsageNanoCores = &runtime.UInt64Value{Value: sandbox.Stats.UsageNanoCores}
}
podSandboxStats.Linux.Cpu = cpuStats

memoryStats, err := c.memoryContainerStats(meta.ID, stats, timestamp)
Expand Down
80 changes: 19 additions & 61 deletions internal/cri/server/sandbox_stats_windows.go
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/containerd/containerd/v2/api/types"
containerstore "github.com/containerd/containerd/v2/internal/cri/store/container"
sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox"
"github.com/containerd/containerd/v2/internal/cri/store/stats"
ctrdutil "github.com/containerd/containerd/v2/internal/cri/util"
"github.com/containerd/containerd/v2/protobuf"
"github.com/containerd/errdefs"
Expand Down Expand Up @@ -86,8 +85,6 @@ func (c *criService) podSandboxStats(
ProcessCount: &runtime.UInt64Value{Value: pidCount},
}

c.saveSandBoxMetrics(podSandboxStats.Attributes.Id, podSandboxStats)

return podSandboxStats, nil
}

Expand Down Expand Up @@ -132,6 +129,7 @@ func (c *criService) toPodSandboxStats(sandbox sandboxstore.Sandbox, statsMap ma
return nil, nil, fmt.Errorf("failed to covert container metrics for sandbox with id %s: %w", sandbox.ID, err)
}

containerNanoCoreTotal := uint64(0)
windowsContainerStats := make([]*runtime.WindowsContainerStats, 0, len(statsMap))
for _, cntr := range containers {
containerMetric := statsMap[cntr.ID]
Expand All @@ -153,9 +151,14 @@ func (c *criService) toPodSandboxStats(sandbox sandboxstore.Sandbox, statsMap ma
}

// Calculate NanoCores for container
if containerStats.Cpu != nil && containerStats.Cpu.UsageCoreNanoSeconds != nil {
nanoCoreUsage := getUsageNanoCores(containerStats.Cpu.UsageCoreNanoSeconds.Value, cntr.Stats, containerStats.Cpu.Timestamp)
containerStats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoCoreUsage}
if containerStats.Cpu != nil {
containerNanoCores := uint64(0)
if cntr.Stats != nil {
containerNanoCores = cntr.Stats.UsageNanoCores
}

containerNanoCoreTotal += containerNanoCores
containerStats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: containerNanoCores}
}

// On Windows we need to add up all the podStatsData to get the Total for the Pod as there isn't something
Expand Down Expand Up @@ -189,9 +192,16 @@ func (c *criService) toPodSandboxStats(sandbox sandboxstore.Sandbox, statsMap ma
}

// Calculate NanoCores for pod after adding containers cpu including the pods cpu
if podRuntimeStats.Cpu != nil && podRuntimeStats.Cpu.UsageCoreNanoSeconds != nil {
nanoCoreUsage := getUsageNanoCores(podRuntimeStats.Cpu.UsageCoreNanoSeconds.Value, sandbox.Stats, podRuntimeStats.Cpu.Timestamp)
podRuntimeStats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoCoreUsage}
if podRuntimeStats.Cpu != nil {
sandboxNanoCores := uint64(0)
if sandbox.Stats != nil {
sandboxNanoCores = sandbox.Stats.UsageNanoCores
}
// There is not a cgroup equivalent on windows where we can get the total cpu usage for the pod
// The sandbox container stats are for the "sandbox" container only
// To get the total for the pod we need to add the sandbox container stats to the total of the containers
// This could possibly change when internal/cri/server/podsandbox/sandbox_stats.go is implemented
podRuntimeStats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: sandboxNanoCores + containerNanoCoreTotal}
}

return podRuntimeStats, windowsContainerStats, nil
Expand Down Expand Up @@ -312,22 +322,6 @@ func (c *criService) convertToCRIStats(stats *wstats.Statistics) (*runtime.Windo
return &cs, nil
}

func getUsageNanoCores(usageCoreNanoSeconds uint64, oldStats *stats.ContainerStats, newtimestamp int64) uint64 {
if oldStats == nil {
return 0
}

nanoSeconds := newtimestamp - oldStats.Timestamp.UnixNano()

// zero or negative interval
if nanoSeconds <= 0 {
return 0
}

return uint64(float64(usageCoreNanoSeconds-oldStats.UsageCoreNanoSeconds) /
float64(nanoSeconds) * float64(time.Second/time.Nanosecond))
}

func windowsNetworkUsage(ctx context.Context, sandbox sandboxstore.Sandbox, timestamp time.Time) *runtime.WindowsNetworkUsage {
eps, err := hcn.GetNamespaceEndpointIds(sandbox.NetNSPath)
if err != nil {
Expand Down Expand Up @@ -362,42 +356,6 @@ func windowsNetworkUsage(ctx context.Context, sandbox sandboxstore.Sandbox, time
return networkUsage
}

func (c *criService) saveSandBoxMetrics(sandboxID string, sandboxStats *runtime.PodSandboxStats) error {
// we may not have stats since container hasn't started yet so skip saving to cache
if sandboxStats == nil || sandboxStats.Windows == nil || sandboxStats.Windows.Cpu == nil ||
sandboxStats.Windows.Cpu.UsageCoreNanoSeconds == nil {
return nil
}

newStats := &stats.ContainerStats{
UsageCoreNanoSeconds: sandboxStats.Windows.Cpu.UsageCoreNanoSeconds.Value,
Timestamp: time.Unix(0, sandboxStats.Windows.Cpu.Timestamp),
}
err := c.sandboxStore.UpdateContainerStats(sandboxID, newStats)
if err != nil {
return err
}

// We queried the stats when getting sandbox stats. We need to save the query to cache
for _, cntr := range sandboxStats.Windows.Containers {
// we may not have stats since container hasn't started yet so skip saving to cache
if cntr == nil || cntr.Cpu == nil || cntr.Cpu.UsageCoreNanoSeconds == nil {
return nil
}

newStats := &stats.ContainerStats{
UsageCoreNanoSeconds: cntr.Cpu.UsageCoreNanoSeconds.Value,
Timestamp: time.Unix(0, cntr.Cpu.Timestamp),
}
err = c.containerStore.UpdateContainerStats(cntr.Attributes.Id, newStats)
if err != nil {
return err
}
}

return nil
}

func (c *criService) getSandboxPidCount(ctx context.Context, sandbox sandboxstore.Sandbox) (uint64, error) {
var pidCount uint64

Expand Down

0 comments on commit 7fe6d03

Please sign in to comment.