- Web Engineer
- アウトバウンド営業
- Webエンジニア(経験者)
- Other occupations (17)
- Development
- Business
※弊社エンジニアの記事になります。
はじめに
Airflowは分析基盤でよく使われるワークフロー管理プラットフォーム、僕なりに例えると「リッチなcrontab」ですね。
いろいろなタスクをOperatorというクラスを使って定義するのですが、今回はよく使う便利なOperatorをまとめてみました。
DummyOperator
何も実行しないOperatorです。
まだ具体的なタスクの実装ができていない場合の仮のタスクとしたり、複数タスクをまとめる終了タスクとして使用したり、条件分岐したあとの何もしないタスクが必要な時に使用したりします。
また、DAGを途中から実行したいことがよくあるのですが、その時に選択できる中間ポイントとして使うこともあります。
以下の例だとstart_bのDownstreamをClearするとtask_b1とtask_b2だけ実行することができます。
最初にAirflowを勉強していた時は何の意味があるんだ? と思っていましたが、たくさんのタスクのフローを定義しているとほぼ必ず必要になるOperatorです。
from airflow.operators.dummy_operator import DummyOperator
start = DummyOperator(task_id='start', dag=dag)
start_a = DummyOperator(task_id='start_a', dag=dag)
start_b = DummyOperator(task_id='start_b', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
start >> start_a >> [task_a1, task_a2, task_a3] >> start_b >> [task_b1, task_b2] >> end
BashOperator
Bashを実行するOperatorです。
システムコマンドや.sh
ファイルを実行することができます。
Bashゆえに何でもできてしまうのですが、Bashばかり使っていると逆にわかりにくくなってしまうことも多いです。
from airflow.operators.bash_operator import BashOperator
task = BashOperator(
task_id='echo'
bash_command='echo hello'
dag=dag
)
PythonOperator
Python関数を実行するOperatorです。
Pythonで処理を書けるのでいろいろなことができます。
もちろんpandasやnumpy, sklearnを使ってなにかすることができますが、あまりに重いような処理はGKEなど別の環境で動かしたほうがいいこともあります。
from airflow.operators.python_operator import PythonOperator
def my_python_function():
# ここにPythonのコードを書く
print("Hello from PythonOperator!")
python_task = PythonOperator(
task_id='my_python_task',
python_callable=my_python_function,
dag=dag
)
BranchPythonOperator
Python関数の実行結果によって、次に実行するべきタスクを自動で決定することができます。
条件付きのワークフローが必要な場面でとても便利です。
python_callableで渡す関数は、次に実行するタスクのtask_idを返却するようにします。
例えば、対象のデータソースが更新されているかチェックして、更新していなければ実行をスキップするようにして、効率の良いデータパイプラインを作ることができます。
from airflow.operators.python_operator import BranchPythonOperator
def check_dataset_update_time():
# ここで条件判定を行う
if some_condition:
return 'do_task'
else:
return 'skip_task'
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=check_dataset_update_time,
dag=dag
)
do_task = DummyOperator(task_id='do_task', dag=dag)
skip_task = DummyOperator(task_id='skip_task', dag=dag)
branch_task >> [do_task, skip_task]
…
記事の続きは下のリンクをクリック!
https://rightcode.co.jp/blog/information-technology/airflow-operator-syain
【2024年卒】新卒採用エントリー開始しました!
特設ページはこちら:https://rightcode.co.jp/recruit/entry-2024
※募集は終了致しました。次回の募集までもうしばらくお待ちください
インターン募集!未経験ok、チャレンジ精神ある方求む
メディア運営:https://rightcode.co.jp/recruit/intern-media
社長と一杯飲みながらお話しませんか?(転職者向け)
特設ページはこちら: https://rightcode.co.jp/gohan-sake-president-talk
もっとワクワクしたいあなたへ
現在、ライトコードでは「WEBエンジニア」「スマホアプリエンジニア」「ゲームエンジニア」、「デザイナー」「WEBディレクター」「エンジニアリングマネージャー」「営業」などを積極採用中です!
有名WEBサービスやアプリの受託開発などの企画、開発案件が目白押しの状況です。
- もっと大きなことに挑戦したい!
- エンジニアとしてもっと成長したい!
- モダンな技術に触れたい!
現状に満足していない方は、まずは、エンジニアとしても第一線を走り続ける弊社代表と気軽にお話してみませんか?
ネット上では、ちょっとユルそうな会社に感じると思いますが(笑)、
実は技術力に定評があり、沢山の実績を残している会社ということをお伝えしたいと思っております。
- ライトコードの魅力を知っていただきたい!
- 社風や文化なども知っていただきたい!
- 技術に対して熱意のある方に入社していただきたい!
一度、【Wantedly内の弊社ページ】や【コーポレートサイト】をのぞいてみてください。
【コーポレートサイト】https://rightcode.co.jp/
【採用募集】https://rightcode.co.jp/recruit(こちらからの応募がスムーズ)
【wantedlyぺージ】https://www.wantedly.com/companies/rightcode