/
kubelet.go
242 lines (209 loc) · 6.15 KB
/
kubelet.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
package kubernetes
import (
"encoding/json"
"fmt"
"net/url"
"os"
"path/filepath"
"strconv"
"time"
"github.com/newrelic/nri-discovery-kubernetes/internal/config"
"github.com/newrelic/nri-discovery-kubernetes/internal/http"
"github.com/newrelic/nri-discovery-kubernetes/internal/utils"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
const (
podsPath = "/pods"
clusterNameEnvVar = "CLUSTER_NAME"
nodeNameEnvVar = "NRK8S_NODE_NAME"
)
type (
// PortsMap stores container ports indexed by name.
PortsMap map[string]int32
// LabelsMap stores Pod labels.
LabelsMap map[string]string
// AnnotationsMap stores Pod annotations.
AnnotationsMap map[string]string
)
// ContainerInfo represents discovery-specific format for found Pods via Kubelet API.
type ContainerInfo struct {
Name string
ID string
Image string
ImageID string
Ports PortsMap
PodLabels LabelsMap
PodAnnotations AnnotationsMap
PodIP string
PodName string
NodeName string
NodeIP string
Namespace string
Cluster string
}
// Kubelet defines what functionality kubelet client provides.
type Kubelet interface {
FindContainers(namespaces []string) ([]ContainerInfo, error)
}
type kubelet struct {
client http.Client
NodeName string
ClusterName string
}
func (kube *kubelet) FindContainers(namespaces []string) ([]ContainerInfo, error) {
allPods, err := kube.getPods()
if err != nil {
return nil, err
}
pods := filterByNamespace(allPods, namespaces)
return getContainers(kube.ClusterName, kube.NodeName, pods), nil
}
func (kube *kubelet) getPods() ([]corev1.Pod, error) {
resp, err := kube.client.Get(podsPath)
if err != nil {
err = fmt.Errorf("failed to execute request against kubelet: %s ", err)
return []corev1.Pod{}, err
}
pods := &corev1.PodList{}
err = json.Unmarshal(resp, pods)
if err != nil {
err = fmt.Errorf("failed to unmarshall result in to list of pods: %s", err)
}
return pods.Items, err
}
func filterByNamespace(allPods []corev1.Pod, namespaces []string) []corev1.Pod {
if len(namespaces) == 0 {
return allPods
}
var result []corev1.Pod
for _, pod := range allPods {
if utils.Contains(namespaces, pod.Namespace) {
result = append(result, pod)
}
}
return result
}
func getContainers(clusterName string, nodeName string, pods []corev1.Pod) []ContainerInfo {
var containers []ContainerInfo
for _, pod := range pods {
if pod.Status.Phase != corev1.PodRunning {
continue
}
for idx, cs := range pod.Status.ContainerStatuses {
if cs.State.Running == nil {
continue
}
ports := getPorts(pod, idx)
c := ContainerInfo{
Name: cs.Name,
ID: cs.ContainerID,
Image: cs.Image,
ImageID: cs.ImageID,
Ports: ports,
PodIP: pod.Status.PodIP,
PodLabels: pod.Labels,
PodAnnotations: pod.Annotations,
PodName: pod.Name,
NodeName: nodeName,
NodeIP: pod.Status.HostIP,
Namespace: pod.Namespace,
Cluster: clusterName,
}
containers = append(containers, c)
}
}
return containers
}
func getPorts(pod corev1.Pod, containerIndex int) PortsMap {
ports := make(PortsMap)
if len(pod.Spec.Containers) > 0 &&
len(pod.Spec.Containers[containerIndex].Ports) > 0 {
// we add the port index and if available the name.
// you can then use either to refer to the value
for portIndex, port := range pod.Spec.Containers[containerIndex].Ports {
ports[strconv.Itoa(portIndex)] = port.ContainerPort
if len(port.Name) > 0 {
ports[port.Name] = port.ContainerPort
}
}
}
return ports
}
func getClusterName() string {
// there is no way at the moment to get the cluster name from the Kubelet API
clusterName := os.Getenv(clusterNameEnvVar)
return clusterName
}
// NewKubelet validates and constructs Kubelet client.
func NewKubelet(host string, port int, useTLS bool, autoConfig bool, timeout time.Duration) (Kubelet, error) {
restConfig, err := rest.InClusterConfig()
// not inside the cluster?
if err != nil {
kConfigPath := filepath.Join(utils.HomeDir(), ".kube", "config")
restConfig, err = clientcmd.BuildConfigFromFlags("", kConfigPath)
if err != nil {
return nil, fmt.Errorf("failed to get cluster configuration: %s", err)
}
}
clusterName := getClusterName()
nodeName, isNodeNameSet := os.LookupEnv(nodeNameEnvVar)
if autoConfig && isNodeNameSet {
client, err := http.NewKubeletClient(nodeName, timeout)
if err != nil {
logrus.WithError(err).Warn("failed to initialize kubelet client")
} else {
kubelet := &kubelet{
client: client,
NodeName: nodeName,
ClusterName: clusterName,
}
return kubelet, nil
}
}
// host provided by cmd line arg has higher precedence.
// if host cmd line arg is not provided use NRK8S_NODE_NAME in case is set, otherwise localhost.
kubeletHost := host
if isNodeNameSet && !config.IsFlagPassed(config.Host) {
kubeletHost = nodeName
}
hostURL := makeURL(kubeletHost, port, useTLS)
// Allow kubelet to use self-signed serving certificate.
restConfig.Insecure = true
// When Insecure == true, make sure no CA certificate is set, otherwise creating transport fails.
//
// https://github.com/kubernetes/client-go/blob/09dbda0b387fa7a9f71c5086e9f8f0529d7a0436/transport/transport.go#L66
restConfig.CAFile = ""
restConfig.CAData = nil
tr, err := rest.TransportFor(restConfig)
if err != nil {
return nil, fmt.Errorf("creating HTTP transport config from kubeconfig: %w", err)
}
httpClient := http.NewClient(hostURL, tr)
kubelet := &kubelet{
client: httpClient,
NodeName: kubeletHost,
ClusterName: clusterName,
}
return kubelet, nil
}
// NewKubeletWithClient constructs Kubelet client with given HTTP client.
func NewKubeletWithClient(httpClient http.Client) (Kubelet, error) {
k := &kubelet{
client: httpClient,
}
return k, nil
}
func makeURL(host string, port int, useTLS bool) url.URL {
scheme := "http"
if useTLS {
scheme = "https"
}
kubeletURL := url.URL{
Scheme: scheme,
Host: host + ":" + strconv.Itoa(port),
}
return kubeletURL
}