Skip to content

Commit

Permalink
Chart: Provision Standalone Dag Processor (#23711)
Browse files Browse the repository at this point in the history
Add Dag Processor templates and values to schema, at the moment
the standalone dag processor is disabled by default.
  • Loading branch information
gmsantos committed Jul 6, 2022
1 parent 3dedbd3 commit 6642f80
Show file tree
Hide file tree
Showing 13 changed files with 1,291 additions and 17 deletions.
19 changes: 19 additions & 0 deletions chart/templates/_helpers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,17 @@ Create the name of the triggerer service account to use
{{- end -}}
{{- end -}}

{{/*
Create the name of the dag processor service account to use
*/}}
{{- define "dagProcessor.serviceAccountName" -}}
{{- if .Values.dagProcessor.serviceAccount.create -}}
{{ default (printf "%s-dag-processor" (include "airflow.fullname" .)) .Values.dagProcessor.serviceAccount.name }}
{{- else -}}
{{ default "default" .Values.dagProcessor.serviceAccount.name }}
{{- end -}}
{{- end -}}

{{/*
Create the name of the pgbouncer service account to use
*/}}
Expand Down Expand Up @@ -655,6 +666,14 @@ Create the name of the cleanup service account to use
airflow jobs check --job-type TriggererJob --hostname $(hostname)
{{- end }}

{{define "dag_processor_liveness_check_command"}}
- sh
- -c
- |
CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \
airflow jobs check --hostname $(hostname)
{{- end }}

{{ define "registry_docker_config" -}}
{{- $host := .Values.registry.connection.host }}
{{- $email := .Values.registry.connection.email }}
Expand Down
214 changes: 214 additions & 0 deletions chart/templates/dag-processor/dag-processor-deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

################################
## Airflow Dag Processor Deployment
#################################
{{- if semverCompare ">=2.3.0" .Values.airflowVersion }}
{{- if .Values.dagProcessor.enabled }}
{{- $nodeSelector := or .Values.dagProcessor.nodeSelector .Values.nodeSelector }}
{{- $affinity := or .Values.dagProcessor.affinity .Values.affinity }}
{{- $tolerations := or .Values.dagProcessor.tolerations .Values.tolerations }}
{{- $topologySpreadConstraints := or .Values.dagProcessor.topologySpreadConstraints .Values.topologySpreadConstraints }}
{{- $securityContext := include "airflowSecurityContext" (list . .Values.dagProcessor) }}
kind: Deployment
apiVersion: apps/v1
metadata:
name: {{ .Release.Name }}-dag-processor
labels:
tier: airflow
component: dag-processor
release: {{ .Release.Name }}
chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
heritage: {{ .Release.Service }}
{{- with .Values.labels }}
{{- toYaml . | nindent 4 }}
{{- end }}
spec:
replicas: {{ .Values.dagProcessor.replicas }}
selector:
matchLabels:
tier: airflow
component: dag-processor
release: {{ .Release.Name }}
{{- if .Values.dagProcessor.strategy }}
strategy:
{{- toYaml .Values.dagProcessor.strategy | nindent 4 }}
{{- end }}
template:
metadata:
labels:
tier: airflow
component: dag-processor
release: {{ .Release.Name }}
{{- with .Values.labels }}
{{- toYaml . | nindent 8 }}
{{- end }}
annotations:
checksum/metadata-secret: {{ include (print $.Template.BasePath "/secrets/metadata-connection-secret.yaml") . | sha256sum }}
checksum/pgbouncer-config-secret: {{ include (print $.Template.BasePath "/secrets/pgbouncer-config-secret.yaml") . | sha256sum }}
checksum/airflow-config: {{ include (print $.Template.BasePath "/configmaps/configmap.yaml") . | sha256sum }}
checksum/extra-configmaps: {{ include (print $.Template.BasePath "/configmaps/extra-configmaps.yaml") . | sha256sum }}
checksum/extra-secrets: {{ include (print $.Template.BasePath "/secrets/extra-secrets.yaml") . | sha256sum }}
{{- if .Values.dagProcessor.safeToEvict }}
cluster-autoscaler.kubernetes.io/safe-to-evict: "true"
{{- end }}
{{- if .Values.airflowPodAnnotations }}
{{- toYaml .Values.airflowPodAnnotations | nindent 8 }}
{{- end }}
{{- if .Values.dagProcessor.podAnnotations }}
{{- toYaml .Values.dagProcessor.podAnnotations | nindent 8 }}
{{- end }}
spec:
{{- if .Values.dagProcessor.priorityClassName }}
priorityClassName: {{ .Values.dagProcessor.priorityClassName }}
{{- end }}
nodeSelector:
{{- toYaml $nodeSelector | nindent 8 }}
affinity:
{{- if $affinity }}
{{- toYaml $affinity | nindent 8 }}
{{- else }}
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- podAffinityTerm:
labelSelector:
matchLabels:
component: dag-processor
topologyKey: kubernetes.io/hostname
weight: 100
{{- end }}
tolerations:
{{- toYaml $tolerations | nindent 8 }}
topologySpreadConstraints:
{{- toYaml $topologySpreadConstraints | nindent 8 }}
terminationGracePeriodSeconds: {{ .Values.dagProcessor.terminationGracePeriodSeconds }}
restartPolicy: Always
serviceAccountName: {{ include "dagProcessor.serviceAccountName" . }}
securityContext: {{ $securityContext | nindent 8 }}
{{- if or .Values.registry.secretName .Values.registry.connection }}
imagePullSecrets:
- name: {{ template "registry_secret" . }}
{{- end }}
initContainers:
{{- if .Values.dagProcessor.waitForMigrations.enabled }}
- name: wait-for-airflow-migrations
resources:
{{- toYaml .Values.dagProcessor.resources | nindent 12 }}
image: {{ template "airflow_image_for_migrations" . }}
imagePullPolicy: {{ .Values.images.airflow.pullPolicy }}
volumeMounts:
- name: config
mountPath: {{ template "airflow_config_path" . }}
subPath: airflow.cfg
readOnly: true
args:
{{- include "wait-for-migrations-command" . | nindent 10 }}
envFrom:
{{- include "custom_airflow_environment_from" . | default "\n []" | nindent 10 }}
env:
{{- include "custom_airflow_environment" . | nindent 10 }}
{{- include "standard_airflow_environment" . | nindent 10 }}
{{- end }}
{{- if and (.Values.dags.gitSync.enabled) (not .Values.dags.persistence.enabled) }}
{{- include "git_sync_container" (dict "Values" .Values "is_init" "true") | nindent 8 }}
{{- end }}
{{- if .Values.dagProcessor.extraInitContainers }}
{{- toYaml .Values.dagProcessor.extraInitContainers | nindent 8 }}
{{- end }}
containers:
- name: dag-processor
image: {{ template "airflow_image" . }}
imagePullPolicy: {{ .Values.images.airflow.pullPolicy }}
{{- if .Values.dagProcessor.command }}
command: {{ tpl (toYaml .Values.dagProcessor.command) . | nindent 12 }}
{{- end }}
{{- if .Values.dagProcessor.args }}
args: {{ tpl (toYaml .Values.dagProcessor.args) . | nindent 12 }}
{{- end }}
resources:
{{ toYaml .Values.dagProcessor.resources | nindent 12 }}
volumeMounts:
{{- if .Values.dagProcessor.extraVolumeMounts }}
{{ toYaml .Values.dagProcessor.extraVolumeMounts | nindent 12 }}
{{- end }}
- name: logs
mountPath: {{ template "airflow_logs" . }}
- name: config
mountPath: {{ template "airflow_config_path" . }}
subPath: airflow.cfg
readOnly: true
{{- if .Values.airflowLocalSettings }}
- name: config
mountPath: {{ template "airflow_local_setting_path" . }}
subPath: airflow_local_settings.py
readOnly: true
{{- end }}
{{- if or .Values.dags.persistence.enabled .Values.dags.gitSync.enabled }}
{{- include "airflow_dags_mount" . | nindent 12 }}
{{- end }}
envFrom:
{{- include "custom_airflow_environment_from" . | default "\n []" | nindent 10 }}
env:
{{- include "custom_airflow_environment" . | nindent 10 }}
{{- include "standard_airflow_environment" . | nindent 10 }}
livenessProbe:
initialDelaySeconds: {{ .Values.dagProcessor.livenessProbe.initialDelaySeconds }}
timeoutSeconds: {{ .Values.dagProcessor.livenessProbe.timeoutSeconds }}
failureThreshold: {{ .Values.dagProcessor.livenessProbe.failureThreshold }}
periodSeconds: {{ .Values.dagProcessor.livenessProbe.periodSeconds }}
exec:
command:
{{- if .Values.dagProcessor.livenessProbe.command }}
{{ toYaml .Values.dagProcessor.livenessProbe.command | nindent 16 }}
{{- else }}
{{- include "dag_processor_liveness_check_command" . | nindent 16 }}
{{- end }}
{{- if and (.Values.dags.gitSync.enabled) (not .Values.dags.persistence.enabled) }}
{{- include "git_sync_container" . | indent 8 }}
{{- end }}
{{- if .Values.dagProcessor.extraContainers }}
{{- toYaml .Values.dagProcessor.extraContainers | nindent 8 }}
{{- end }}
volumes:
- name: config
configMap:
name: {{ template "airflow_config" . }}
{{- if .Values.dags.persistence.enabled }}
- name: dags
persistentVolumeClaim:
claimName: {{ template "airflow_dags_volume_claim" . }}
{{- else if .Values.dags.gitSync.enabled }}
- name: dags
emptyDir: {}
{{- end }}
{{- if and .Values.dags.gitSync.enabled .Values.dags.gitSync.sshKeySecret }}
{{- include "git_sync_ssh_key_volume" . | indent 8 }}
{{- end }}
{{- if .Values.dagProcessor.extraVolumes }}
{{- toYaml .Values.dagProcessor.extraVolumes | nindent 8 }}
{{- end }}
{{- if .Values.logs.persistence.enabled }}
- name: logs
persistentVolumeClaim:
claimName: {{ template "airflow_logs_volume_claim" . }}
{{- else }}
- name: logs
emptyDir: {}
{{- end }}
{{- end }}
{{- end }}
41 changes: 41 additions & 0 deletions chart/templates/dag-processor/dag-processor-serviceaccount.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

################################
## Airflow Dag Processor ServiceAccount
#################################
{{- if semverCompare ">=2.3.0" .Values.airflowVersion }}
{{- if and .Values.dagProcessor.serviceAccount.create .Values.dagProcessor.enabled }}
kind: ServiceAccount
apiVersion: v1
metadata:
name: {{ include "dagProcessor.serviceAccountName" . }}
labels:
tier: airflow
component: dag-processor
release: {{ .Release.Name }}
chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
heritage: {{ .Release.Service }}
{{- with .Values.labels }}
{{ toYaml . | nindent 4 }}
{{- end }}
{{- with .Values.dagProcessor.serviceAccount.annotations}}
annotations:
{{ toYaml . | nindent 4 }}
{{- end }}
{{- end }}
{{- end }}
16 changes: 10 additions & 6 deletions chart/templates/scheduler/scheduler-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
#################################

# Are we using a local executor?
{{- $local := eq .Values.executor "LocalExecutor" }}
{{- $local := contains "Local" .Values.executor }}
# Is persistence enabled on the _workers_?
# This is important because in $local mode, the scheduler assumes the role of the worker
{{- $persistence := .Values.workers.persistence.enabled }}
# If we're using a StatefulSet
{{- $stateful := and $local $persistence }}
# We can skip DAGs mounts on scheduler if dagProcessor is enabled, except with $local mode
{{- $localOrDagProcessorDisabled := or (not .Values.dagProcessor.enabled) $local }}
# If we're using elasticsearch logging
{{- $elasticsearch := .Values.elasticsearch.enabled }}
{{- $nodeSelector := or .Values.scheduler.nodeSelector .Values.nodeSelector }}
Expand Down Expand Up @@ -141,7 +143,7 @@ spec:
{{- include "custom_airflow_environment" . | indent 10 }}
{{- include "standard_airflow_environment" . | indent 10 }}
{{- end }}
{{- if .Values.dags.gitSync.enabled }}
{{- if and $localOrDagProcessorDisabled .Values.dags.gitSync.enabled }}
{{- include "git_sync_container" (dict "Values" .Values "is_init" "true") | nindent 8 }}
{{- end }}
{{- if .Values.scheduler.extraInitContainers }}
Expand Down Expand Up @@ -202,13 +204,13 @@ spec:
subPath: airflow_local_settings.py
readOnly: true
{{- end }}
{{- if or .Values.dags.persistence.enabled .Values.dags.gitSync.enabled }}
{{- include "airflow_dags_mount" . | nindent 12 }}
{{- end }}
{{- if and $localOrDagProcessorDisabled (or .Values.dags.persistence.enabled .Values.dags.gitSync.enabled) }}
{{- include "airflow_dags_mount" . | nindent 12 }}
{{- end }}
{{- if .Values.scheduler.extraVolumeMounts }}
{{ toYaml .Values.scheduler.extraVolumeMounts | indent 12 }}
{{- end }}
{{- if .Values.dags.gitSync.enabled }}
{{- if and $localOrDagProcessorDisabled .Values.dags.gitSync.enabled }}
{{- include "git_sync_container" . | indent 8 }}
{{- end }}
{{- if .Values.scheduler.logGroomerSidecar.enabled }}
Expand Down Expand Up @@ -239,6 +241,7 @@ spec:
- name: config
configMap:
name: {{ template "airflow_config" . }}
{{- if $localOrDagProcessorDisabled }}
{{- if .Values.dags.persistence.enabled }}
- name: dags
persistentVolumeClaim:
Expand All @@ -250,6 +253,7 @@ spec:
{{- if and .Values.dags.gitSync.enabled .Values.dags.gitSync.sshKeySecret }}
{{- include "git_sync_ssh_key_volume" . | indent 8 }}
{{- end }}
{{- end }}
{{- if .Values.scheduler.extraVolumes }}
{{ toYaml .Values.scheduler.extraVolumes | indent 8 }}
{{- end }}
Expand Down
2 changes: 1 addition & 1 deletion chart/templates/triggerer/triggerer-serviceaccount.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
## Airflow Triggerer ServiceAccount
#################################
{{- if semverCompare ">=2.2.0" .Values.airflowVersion }}
{{- if and .Values.triggerer.serviceAccount.create }}
{{- if and .Values.triggerer.serviceAccount.create .Values.triggerer.enabled }}
kind: ServiceAccount
apiVersion: v1
metadata:
Expand Down

0 comments on commit 6642f80

Please sign in to comment.