forked from helm/acceptance-testing
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Kubectl.py
21 lines (18 loc) · 949 Bytes
/
Kubectl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import common
from Kind import kind_auth_wrap
class Kubectl(common.CommandRunner):
def service_has_ip(self, namespace, service_name):
cmd = 'kubectl get services --namespace='+namespace
cmd += ' | grep '+service_name
cmd += ' | awk \'{print $3}\' | grep \'\(.\).*\\1\''
self.run_command(kind_auth_wrap(cmd))
def persistent_volume_claim_is_bound(self, namespace, pvc_name):
cmd = 'kubectl get pvc --namespace='+namespace
cmd += ' | grep '+pvc_name
cmd += ' | awk \'{print $2}\' | grep ^Bound'
self.run_command(kind_auth_wrap(cmd))
def pods_with_prefix_are_running(self, namespace, pod_prefix, num_expected):
cmd = '[ `kubectl get pods --namespace='+namespace
cmd += ' | grep ^'+pod_prefix+' | awk \'{print $2 "--" $3}\''
cmd += ' | grep -E "^([1-9][0-9]*)/\\1--Running" | wc -l` == '+num_expected+' ]'
self.run_command(kind_auth_wrap(cmd))