- バックエンドエンジニア
- アナリスト
- 内部統制(J-SOX)
- Other occupations (8)
- Development
- Business
- Other
こんにちは、MA基盤チームの田島です。私達のチームでは複数のワークフローエンジンを利用し、メールやLINEなどへの配信を含むバッチ処理を行っていました。今回それらのワークフローエンジンをすべてDigdagに統一しました。そして実行環境としてGKEのAutopilot環境を選択したことにより、柔軟にスケールするバッチ処理基盤を実現しましたのでそれについて紹介します。 また、その中で得られた運用Tipsについても合わせて紹介します。
Digdag on GKE Autopilotの構成
今回構築したDigdag on GKE Autopilot環境の最終構成は次のとおりです。
GKE Standard環境における、Digdagの構築はすでに弊社の別チームで行われており、スケーリング部分以外はほぼそれを踏襲する形で構築しました。以下は当時の発表資料です。
参考にした構成から一部拡張した部分について、またAutopilot環境だからこその利点についてなどを含め、改めて構成を紹介します。
Digdagの4つの役割
Digdagは役割ごとに以下の「Worker」「Scheduler」「Web」「API」のDeploymentを作成し、クラスタを構成しています。
Worker
Digdagではワークフローのなかの1つ1つの処理のことをタスクと呼びます。Workerは実際にタスクを実行する役割を担います。DigdagのタスクはPostgreSQL(CloudSQL)に一度キューという形で登録され、Workerは登録されているタスクのうち実行可能なタスクを取得して実行します。
WorkerのDeploymentのマニフェストは次のとおりです。Digdag起動時に disable-scheduler を指定することで、次で紹介するSchedulerの役割を除外しています。Kubernetes関連のオプションに関しては後ほど紹介します。
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
run: digdag-worker
name: digdag-worker
namespace: digdag
spec:
progressDeadlineSeconds: 600
revisionHistoryLimit: 10
selector:
matchLabels:
run: digdag-worker
strategy:
rollingUpdate:
maxSurge: 2
maxUnavailable: 0
type: RollingUpdate
template:
metadata:
labels:
run: digdag-worker
spec:
serviceAccountName: digdag
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
terminationGracePeriodSeconds: 5400
volumes:
- name: digdag-config-volume
configMap:
name: digdag-config
containers:
- name: digdag-worker
image: <YOUR_DIGDAG_IMAGE>
imagePullPolicy: Always
volumeMounts:
- name: digdag-config-volume
mountPath: /etc/config
command: ["/bin/bash"]
args:
- "-cx"
- |
digdag server \
--disable-scheduler \
--log-level <LOG_LEVEL> \
--max-task-threads <MAX_TASK_THREADS> \
--config /etc/config/digdag.properties \
-p environment=<ENVIRONMENT> \
-X database.host=<POSTGRES_IP> \
-X database.password=$POSTGRES_PASSWORD \
-X digdag.secret-encryption-key=$SECRET_ENCRYPTION_KEY \
-X archive.gcs.bucket=<DIGDAG_ARCHIVE_BUCKET> \
-X log-server.gcs.bucket=<DIGDAG_LOG_BUCKET> \
-X agent.command_executor.type=kubernetes \
-X agent.command_executor.kubernetes.config_storage.in.gcs.bucket=<DIGDAG_ARCHIVE_BUCKET> \
-X agent.command_executor.kubernetes.config_storage.out.gcs.bucket=<DIGDAG_ARCHIVE_BUCKET> \
-X agent.command_executor.kubernetes.name=<KUBERNETS_CLUSTER_NAME> \
-X agent.command_executor.kubernetes.<KUBERNETS_CLUSTER_NAME>.master=$KUBERNETS_MASTER \
-X agent.command_executor.kubernetes.<KUBERNETS_CLUSTER_NAME>.certs_ca_data=`cat /var/run/secrets/kubernetes.io/serviceaccount/ca.crt | base64 -w 0` \
-X agent.command_executor.kubernetes.<KUBERNETS_CLUSTER_NAME>.oauth_token=`cat /var/run/secrets/kubernetes.io/serviceaccount/token` \
-X agent.command_executor.kubernetes.<KUBERNETS_CLUSTER_NAME>.namespace=digdag \
-X agent.command_executor.kubernetes.config_storage.in.type=gcs \
-X agent.command_executor.kubernetes.config_storage.out.type=gcs \
-X agent.command_executor.kubernetes.config_storage.out.gcs.direct_upload_expiration=<GCS_DIRECT_UPLOAD_EXPIRATION> \
-X executor.task_ttl=<TASK_TTL>
resources:
requests:
cpu: 1000m
memory: 2Gi
limits:
cpu: 1000m
memory: 2Gi
また、configmapは以下のように定義しています。
configMapGenerator:
- name: digdag-config
namespace: digdag
files:
- config/digdag.properties
server.bind=0.0.0.0
server.port=8080
database.type=postgresql
database.port=5432
database.user=digdag
database.database=digdag
archive.type=gcs
log-server.type=gcs
Scheduler
Digdagでは、ワークフローごとに実行のスケジューリングを行うことができます。これもまた、PostgreSQLにスケジュールが登録されます。そして、Schedulerは実行時間になったワークフローの実行を開始します。
SchedulerのDeploymentのマニフェストは次のとおりです。Digdag起動時に disable-executor-loop を指定することでWorkerの役割を除外しています。
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
run: digdag-scheduler
name: digdag-scheduler
namespace: digdag
spec:
progressDeadlineSeconds: 600
revisionHistoryLimit: 10
selector:
matchLabels:
run: digdag-scheduler
strategy:
rollingUpdate:
maxSurge: 2
maxUnavailable: 0
type: RollingUpdate
template:
metadata:
labels:
run: digdag-scheduler
spec:
serviceAccountName: digdag
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
volumes:
- name: digdag-config-volume
configMap:
name: digdag-config
containers:
- name: digdag-scheduler
image: <YOUR_DIGDAG_IMAGE>
imagePullPolicy: Always
volumeMounts:
- name: digdag-config-volume
mountPath: /etc/config
command: ["/bin/bash"]
args:
- "-cx"
- |
digdag server \
--disable-local-agent \
--disable-executor-loop \
--log-level <LOG_LEVEL> \
--config /etc/config/digdag.properties \
-p environment=<ENVIRONMENT> \
-X database.host=<POSTGRES_IP> \
-X database.password=$POSTGRES_PASSWORD \
-X digdag.secret-encryption-key=$SECRET_ENCRYPTION_KEY \
-X archive.gcs.bucket=<DIGDAG_ARCHIVE_BUCKET> \
-X log-server.gcs.bucket=<DIGDAG_LOG_BUCKET>
resources:
requests:
cpu: 200m
memory: 300Mi
limits:
cpu: 200m
memory: 300Mi
Web
DigdagにはDigdag UIと言って、ワークフローをGUIから確認・実行できるものがあります。そのDigdag UIを提供するのがWebになります。また、Digdag UI上からリクエストされるAPIの処理もこのWebが担います。
WebのDeploymentのマニフェストは次のとおりです。Digdag起動時にdisable-scheduler と disable-executor-loop を指定することで、SchedulerとWorkerの役割を除外しています。そして、外部からアクセスできるようポートの設定をしています。
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
run: digdag-web
name: digdag-web
namespace: digdag
spec:
progressDeadlineSeconds: 600
revisionHistoryLimit: 10
selector:
matchLabels:
run: digdag-web
strategy:
rollingUpdate:
maxSurge: 2
maxUnavailable: 0
type: RollingUpdate
template:
metadata:
labels:
run: digdag-web
spec:
serviceAccountName: digdag
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
terminationGracePeriodSeconds: 20
securityContext: {}
volumes:
- name: digdag-config-volume
configMap:
name: digdag-config
containers:
- name: digdag-web
image: <YOUR_DIGDAG_IMAGE>
imagePullPolicy: Always
volumeMounts:
- name: digdag-config-volume
mountPath: /etc/config
command: ["/bin/bash"]
args:
- "-cx"
- |
digdag server \
--disable-local-agent \
--disable-scheduler \
--disable-executor-loop \
--log-level <LOG_LEVEL> \
--config /etc/config/digdag.properties \
-p environment=<ENVIRONMENT> \
-X server.http.io-idle-timeout=60 \
-X server.http.no-request-timeout=30 \
-X database.host=<POSTGRES_IP> \
-X database.password=<POSTGRES_PASSWORD> \
-X digdag.secret-encryption-key=<SECRET_ENCRYPTION_KEY> \
-X archive.gcs.bucket=<DIGDAG_ARCHIVE_BUCKET> \
-X log-server.gcs.bucket=<DIGDAG_LOG_BUCKET>
ports:
- containerPort: 8080
protocol: TCP
resources:
requests:
cpu: 1000m
memory: 4Gi
limits:
cpu: 100m
memory: 4Gi
readinessProbe:
httpGet:
path: /
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 4
successThreshold: 1
failureThreshold: 3
livenessProbe:
tcpSocket:
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 4
successThreshold: 1
failureThreshold: 3
API
DigdagはDigdagClientを利用したり先程紹介したDigdag UIを利用してコントロールします。クライアントやUIはDigdagサーバへのAPIリクエストをすることでDigdagを操作します。そのAPIを直接利用したいというケースがあったため、API専用のDigdagを今回作成しました。例えば私達のチームではAPIを利用して、特定のワークフローの完了待ちをすると言った処理を別のDigdagや、同一のDigdagから行っています。
最初はWebと同じサーバーを利用していましたが、Webの処理によりPodが落ちるということがたまに発生していました。Webだけであれば画面が使えなくなるだけですから、数秒から数分でPodが復旧すれば問題ありませんでした。しかし、APIの場合では他のアプリケーションから参照されるため、それでは困るケースがあり役割を分離しました。APIのDeploymentはWebのものとほぼ同じ構成となります。
Kubernetes Command Executor
Workerでのタスク実行の問題
タスクの実行はWorkerで処理すると説明しました。私達のチームではメールやLINE・PUSH通知などの配信をしたり、データマートの集計をしたりと様々な種類のバッチ処理がDigdagで実行されます。中には大量のデータを処理するようなものもあれば、単純にHTTPリクエストするだけのものなどワークロードがバラバラです。そのため、Workerは高負荷なタスクに合わせて作成しておく必要があります。それにより、高負荷なタスクが無い場合にはWorkerのPodがオーバースペックになるため、コスト的にかなりのデメリットになります。
Command Executor
この課題を解決するためにKubernetes Command Executorを利用しました。DigdagはCommand ExecutorといってKubernetes等の環境でShellやRuby/Pythonといった処理を実行できる機能があります。
2022年1月にリリースされたDigdag v0.10.4にて、Command Executorのプラグイン化がリリースされました。それにより設定ファイル等でどのCommand Executorを利用するかが選択できるようになりました。Command Executorには現在以下の種類が存在します。
続きはこちら