diff --git a/cmd/new-ui/v1beta1/main.go b/cmd/new-ui/v1beta1/main.go index 190bd774ae1..0636532d218 100644 --- a/cmd/new-ui/v1beta1/main.go +++ b/cmd/new-ui/v1beta1/main.go @@ -67,6 +67,7 @@ func main() { http.HandleFunc("/katib/edit_template/", kuh.EditTemplate) http.HandleFunc("/katib/delete_template/", kuh.DeleteTemplate) http.HandleFunc("/katib/fetch_namespaces", kuh.FetchNamespaces) + http.HandleFunc("/katib/fetch_trial_logs/", kuh.FetchTrialLogs) log.Printf("Serving at %s:%s", *host, *port) if err := http.ListenAndServe(fmt.Sprintf("%s:%s", *host, *port), nil); err != nil { diff --git a/manifests/v1beta1/components/ui/rbac.yaml b/manifests/v1beta1/components/ui/rbac.yaml index c549bf351d3..85798dc2d2a 100644 --- a/manifests/v1beta1/components/ui/rbac.yaml +++ b/manifests/v1beta1/components/ui/rbac.yaml @@ -19,6 +19,18 @@ rules: - suggestions verbs: - "*" + - apiGroups: + - "" + resources: + - pods + verbs: + - list + - apiGroups: + - "" + resources: + - pods/log + verbs: + - get --- apiVersion: v1 kind: ServiceAccount diff --git a/manifests/v1beta1/installs/katib-with-kubeflow/kubeflow-katib-roles.yaml b/manifests/v1beta1/installs/katib-with-kubeflow/kubeflow-katib-roles.yaml index 6394146705e..57b0fbaf318 100644 --- a/manifests/v1beta1/installs/katib-with-kubeflow/kubeflow-katib-roles.yaml +++ b/manifests/v1beta1/installs/katib-with-kubeflow/kubeflow-katib-roles.yaml @@ -34,6 +34,18 @@ rules: - deletecollection - patch - update + - apiGroups: + - "" + resources: + - pods + verbs: + - list + - apiGroups: + - "" + resources: + - pods/log + verbs: + - get --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole diff --git a/pkg/new-ui/v1beta1/backend.go b/pkg/new-ui/v1beta1/backend.go index 2594395cac2..6e6817eddb3 100644 --- a/pkg/new-ui/v1beta1/backend.go +++ b/pkg/new-ui/v1beta1/backend.go @@ -17,8 +17,11 @@ limitations under the License. package v1beta1 import ( + "bytes" + "context" "encoding/json" "errors" + "io" "log" "net/http" "path/filepath" @@ -29,10 +32,19 @@ import ( experimentv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1beta1" suggestionv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1beta1" + trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1" api_pb_v1beta1 "github.com/kubeflow/katib/pkg/apis/manager/v1beta1" consts "github.com/kubeflow/katib/pkg/controller.v1beta1/consts" "github.com/kubeflow/katib/pkg/util/v1beta1/katibclient" corev1 "k8s.io/api/core/v1" + + common "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1" + mccommon "github.com/kubeflow/katib/pkg/metricscollector/v1beta1/common" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/client/config" ) func NewKatibUIHandler(dbManagerAddr string) *KatibUIHandler { @@ -574,3 +586,149 @@ func (k *KatibUIHandler) FetchTrial(w http.ResponseWriter, r *http.Request) { return } } + +// FetchTrialLogs fetches logs for a trial in specific namespace. +func (k *KatibUIHandler) FetchTrialLogs(w http.ResponseWriter, r *http.Request) { + namespaces, ok := r.URL.Query()["namespace"] + if !ok { + log.Printf("No namespace provided in Query parameters! Provide a 'namespace' param") + err := errors.New("no 'namespace' provided") + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + trialNames, ok := r.URL.Query()["trialName"] + if !ok { + log.Printf("No trialName provided in Query parameters! Provide a 'trialName' param") + err := errors.New("no 'trialName' provided") + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + trialName := trialNames[0] + namespace := namespaces[0] + + user, err := IsAuthorized(consts.ActionTypeGet, namespace, consts.PluralTrial, "", trialName, trialsv1beta1.SchemeGroupVersion, k.katibClient.GetClient(), r) + if user == "" && err != nil { + log.Printf("No user provided in kubeflow-userid header.") + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } else if err != nil { + log.Printf("The user: %s is not authorized to get trial: %s in namespace: %s \n", user, trialName, namespace) + http.Error(w, err.Error(), http.StatusForbidden) + return + } + + trial := &trialsv1beta1.Trial{} + if err := k.katibClient.GetClient().Get(context.Background(), types.NamespacedName{Name: trialName, Namespace: namespace}, trial); err != nil { + log.Printf("GetLogs failed: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // TODO: Use controller-runtime client instead of kubernetes client to get logs, once this is available + clientset, err := createKubernetesClientset() + if err != nil { + log.Printf("GetLogs failed: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + podName, err := fetchMasterPodName(clientset, trial) + if err != nil { + log.Printf("GetLogs failed: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + user, err = IsAuthorized(consts.ActionTypeGet, namespace, corev1.ResourcePods.String(), "log", podName, corev1.SchemeGroupVersion, k.katibClient.GetClient(), r) + if user == "" && err != nil { + log.Printf("No user provided in kubeflow-userid header.") + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } else if err != nil { + log.Printf("The user: %s is not authorized to get pod logs: %s in namespace: %s \n", user, podName, namespace) + http.Error(w, err.Error(), http.StatusForbidden) + return + } + + podLogOpts := apiv1.PodLogOptions{} + podLogOpts.Container = trial.Spec.PrimaryContainerName + if trial.Spec.MetricsCollector.Collector.Kind == common.StdOutCollector { + podLogOpts.Container = mccommon.MetricLoggerCollectorContainerName + } + + logs, err := fetchPodLogs(clientset, namespace, podName, podLogOpts) + if err != nil { + log.Printf("GetLogs failed: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + response, err := json.Marshal(logs) + if err != nil { + log.Printf("Marshal logs failed: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if _, err = w.Write(response); err != nil { + log.Printf("Write logs failed: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +// createKubernetesClientset returns kubernetes clientset +func createKubernetesClientset() (*kubernetes.Clientset, error) { + cfg, err := config.GetConfig() + if err != nil { + return nil, err + } + clientset, err := kubernetes.NewForConfig(cfg) + if err != nil { + return nil, err + } + return clientset, nil +} + +// fetchMasterPodName returns name of the master pod for a trial +func fetchMasterPodName(clientset *kubernetes.Clientset, trial *trialsv1beta1.Trial) (string, error) { + selectionLabel := consts.LabelTrialName + "=" + trial.ObjectMeta.Name + for primaryKey, primaryValue := range trial.Spec.PrimaryPodLabels { + selectionLabel = selectionLabel + "," + primaryKey + "=" + primaryValue + } + + podList, err := clientset.CoreV1().Pods(trial.ObjectMeta.Namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selectionLabel}) + if err != nil { + return "", err + } + + if len(podList.Items) == 0 { + return "", errors.New(`Logs for the trial could not be found. +Was 'retain: true' specified in the Experiment definition? +An example can be found here: https://github.com/kubeflow/katib/blob/7bf39225f7235ee4ba6cf285ecc2c455c6471234/examples/v1beta1/argo/argo-workflow.yaml#L33`) + } + if len(podList.Items) > 1 { + return "", errors.New("More than one master replica found") + } + + return podList.Items[0].Name, nil +} + +// fetchPodLogs returns logs of a pod for the given job name and namespace +func fetchPodLogs(clientset *kubernetes.Clientset, namespace string, podName string, podLogOpts apiv1.PodLogOptions) (string, error) { + req := clientset.CoreV1().Pods(namespace).GetLogs(podName, &podLogOpts) + podLogs, err := req.Stream(context.Background()) + if err != nil { + return "", err + } + defer podLogs.Close() + + buf := new(bytes.Buffer) + _, err = io.Copy(buf, podLogs) + if err != nil { + return "", err + } + str := buf.String() + + return str, nil +}