Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Podresources: add Watch() API #94612

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
130 changes: 115 additions & 15 deletions pkg/kubelet/apis/podresources/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package podresources

import (
"context"
"sync"

"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
Expand All @@ -29,24 +30,61 @@ type DevicesProvider interface {
UpdateAllocatedDevices()
}

// TODO fix the name
// must be compatible with pod.Manager
type PodNotifier interface {
AddPod(pod *v1.Pod)
UpdatePod(pod *v1.Pod)
DeletePod(pod *v1.Pod)
}

// PodsProvider knows how to provide the pods admitted by the node
type PodsProvider interface {
GetPods() []*v1.Pod
}

type podInfo struct {
Action v1alpha1.WatchPodAction
Pod *v1.Pod
}

// podResourcesServer implements PodResourcesListerServer
type podResourcesServer struct {
podsProvider PodsProvider
devicesProvider DevicesProvider
podSource chan podInfo
lock sync.RWMutex
sinkId int
podSinks map[int]chan podInfo
}

// NewPodResourcesServer returns a PodResourcesListerServer which lists pods provided by the PodsProvider
// with device information provided by the DevicesProvider
func NewPodResourcesServer(podsProvider PodsProvider, devicesProvider DevicesProvider) v1alpha1.PodResourcesListerServer {
return &podResourcesServer{
func NewPodResourcesServer(podsProvider PodsProvider, devicesProvider DevicesProvider) (v1alpha1.PodResourcesListerServer, PodNotifier) {
p := &podResourcesServer{
podsProvider: podsProvider,
devicesProvider: devicesProvider,
podSource: make(chan podInfo),
podSinks: make(map[int]chan podInfo),
}
go p.dispatchPods()
return p, p
}

func (p *podResourcesServer) makePodResources(pod *v1.Pod) *v1alpha1.PodResources {
pRes := v1alpha1.PodResources{
Name: pod.Name,
Namespace: pod.Namespace,
Containers: make([]*v1alpha1.ContainerResources, len(pod.Spec.Containers)),
}

for j, container := range pod.Spec.Containers {
pRes.Containers[j] = &v1alpha1.ContainerResources{
Name: container.Name,
Devices: p.devicesProvider.GetDevices(string(pod.UID), container.Name),
}
}
return &pRes
}

// List returns information about the resources assigned to pods on the node
Expand All @@ -56,22 +94,84 @@ func (p *podResourcesServer) List(ctx context.Context, req *v1alpha1.ListPodReso
p.devicesProvider.UpdateAllocatedDevices()

for i, pod := range pods {
pRes := v1alpha1.PodResources{
Name: pod.Name,
Namespace: pod.Namespace,
Containers: make([]*v1alpha1.ContainerResources, len(pod.Spec.Containers)),
}

for j, container := range pod.Spec.Containers {
pRes.Containers[j] = &v1alpha1.ContainerResources{
Name: container.Name,
Devices: p.devicesProvider.GetDevices(string(pod.UID), container.Name),
}
}
podResources[i] = &pRes
podResources[i] = p.makePodResources(pod)
}

return &v1alpha1.ListPodResourcesResponse{
PodResources: podResources,
}, nil
}

func (p *podResourcesServer) AddPod(pod *v1.Pod) {
p.podSource <- podInfo{
Action: v1alpha1.WatchPodAction_ADDED,
Pod: pod,
}
}

func (p *podResourcesServer) UpdatePod(pod *v1.Pod) {
p.podSource <- podInfo{
Action: v1alpha1.WatchPodAction_MODIFIED,
Pod: pod,
}
}

func (p *podResourcesServer) DeletePod(pod *v1.Pod) {
p.podSource <- podInfo{
Action: v1alpha1.WatchPodAction_DELETED,
Pod: pod,
}
}

func (p *podResourcesServer) dispatchPods() {
for {
info := <-p.podSource

p.lock.RLock()
for _, ch := range p.podSinks {
ch <- info
}
p.lock.RUnlock()
}
}

func (p *podResourcesServer) registerWatcher() (int, chan podInfo) {
p.lock.Lock()
defer p.lock.Unlock()
sinkChan := make(chan podInfo)
sinkId := p.sinkId
p.sinkId++
p.podSinks[sinkId] = sinkChan
return sinkId, sinkChan
}

func (p *podResourcesServer) unregisterWatcher(sinkId int) {
p.lock.Lock()
defer p.lock.Unlock()
// TODO: sink close?
delete(p.podSinks, sinkId)
}

func (p *podResourcesServer) makeWatchPodResponse(info podInfo) *v1alpha1.WatchPodResourcesResponse {
resp := v1alpha1.WatchPodResourcesResponse{
Action: info.Action,
PodResources: []*v1alpha1.PodResources{
p.makePodResources(info.Pod),
},
}
return &resp
}

func (p *podResourcesServer) Watch(req *v1alpha1.WatchPodResourcesRequest, srv v1alpha1.PodResourcesLister_WatchServer) error {
sinkId, sinkChan := p.registerWatcher()
defer p.unregisterWatcher(sinkId)
for {
pod := <-sinkChan
resp := p.makeWatchPodResponse(pod)
err := srv.Send(resp)
if err != nil {
return err
}
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/kubelet/apis/podresources/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestListPodResources(t *testing.T) {
m.On("GetPods").Return(tc.pods)
m.On("GetDevices", string(podUID), containerName).Return(tc.devices)
m.On("UpdateAllocatedDevices").Return()
server := NewPodResourcesServer(m, m)
server, _ := NewPodResourcesServer(m, m)
resp, err := server.List(context.TODO(), &v1alpha1.ListPodResourcesRequest{})
if err != nil {
t.Errorf("want err = %v, got %q", nil, err)
Expand Down