Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
このページでは、Cloud Composer 環境で DAG を管理する方法について説明します。
Cloud Composer は、Cloud Storage バケットを使用して Cloud Composer 環境の DAG を保存します。環境では、このバケットから Airflow ワーカーやスケジューラなどの Airflow コンポーネントに DAG が同期されます。
準備
- Apache Airflow では強固な DAG 分離が行われないため、DAG の干渉を回避するために本番環境とテスト環境を個別に管理することをおすすめします。詳細については、DAG のテストをご覧ください。
- アカウントに DAG の管理に十分な権限が付与されていることを確認してください。
- DAG に対する変更は 3~5 分以内に Airflow に反映されます。タスクのステータスは、Airflow ウェブ インターフェースで確認できます。
環境のバケットにアクセスする
環境に関連付けられたバケットにアクセスするには:
コンソール
- Google Cloud コンソールで、[環境] ページに移動します。 
- 環境のリストで、環境の名前を含む行を探し、[DAG フォルダ] 列で [DAG] リンクをクリックします。[バケットの詳細] ページが開きます。環境のバケット内の - /dagsフォルダの内容が表示されます。
gcloud
gcloud CLI には、環境のバケットに DAG を追加および削除するための別個のコマンドがあります。
環境のバケットを操作する際に、Google Cloud CLI を使用することもできます。環境のバケットのアドレスを取得するには、次の gcloud CLI コマンドを実行します。
gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"
以下のように置き換えます。
- ENVIRONMENT_NAMEを環境の名前にする。
- LOCATIONは、環境が配置されているリージョン。
例:
gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"
API
environments.get API リクエストを作成します。Environment リソースでは、dagGcsPrefix リソースの EnvironmentConfig リソースは環境のバケットのアドレスです。
例:
GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment
Python
google-auth ライブラリを使用して認証情報を取得し、requests ライブラリを使用して REST API を呼び出します。
DAG を追加または更新する
DAG を追加または更新するには、DAG の Python .py ファイルを環境のバケット内の /dags フォルダに移動します。
コンソール
- Google Cloud コンソールで、[環境] ページに移動します。 
- 環境のリストで、環境の名前を含む行を探し、[DAG フォルダ] 列で [DAG] リンクをクリックします。[バケットの詳細] ページが開きます。環境のバケット内の - /dagsフォルダの内容が表示されます。
- [ファイルをアップロード] をクリックします。次に、ブラウザのダイアログを使用して DAG の Python - .pyファイルを選択し、確認します。
gcloud
gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"
以下のように置き換えます。
- ENVIRONMENT_NAMEを環境の名前にする。
- LOCATIONは、環境が配置されているリージョン。
- LOCAL_FILE_TO_UPLOADは、DAG の Python- .pyファイルに置き換えます。
例:
gcloud composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"
アクティブな DAG の実行を含む DAG を更新する
DAG 実行がアクティブな DAG を更新する場合は、次のようになります。
- 現在実行中のすべてのタスクは、元の DAG ファイルを使用して完了します。
- スケジュールされているが、現在実行されていないすべてのタスクでは、更新された DAG ファイルが使用されます。
- 更新された DAG ファイルに存在しないタスクは、すべて削除済みとしてマークされます。
頻繁なスケジュールを実行する DAG を更新する
DAG ファイルをアップロードした後、Airflow がこのファイルを読み込んで DAG を更新するまでには時間がかかります。DAG が頻度の高いスケジュールで実行される場合は、DAG が更新されたバージョンの DAG ファイルを使用するようにしてください。手順は次のとおりです。
- DAG を Airflow UI で一時停止します。 
- 更新した DAG ファイルをアップロードします。 
- Airflow UI に更新が表示されるまで待ちます。これは、DAG がスケジューラによって正しく解析され、Airflow データベースで更新されたことを意味します。 - Airflow UI に更新された DAG が表示されても、Airflow ワーカーに更新された DAG ファイルがあることは保証されません。これは、DAG ファイルがスケジューラとワーカーに対して別々に同期されるためです。 
- 待機時間を延長して、DAG ファイルが環境内のすべてのワーカーと同期されるようにすることをおすすめします。同期は毎分数回行われます。正常な環境では、約 20~30 秒待つとすべてのワーカーが同期します。 
- (省略可)すべてのワーカーに DAG ファイルの新しいバージョンが含まれていることを確認するには、個々のワーカーのログを調べます。手順は次のとおりです。 - Google Cloud コンソールで、環境の [ログ] タブを開きます。 
- [Composer のログ] > [インフラストラクチャ] > [Cloud Storage の同期] 項目に移動し、環境内のすべてのワーカーのログを検査します。新しい DAG ファイルをアップロードした後のタイムスタンプを持つ最新の - Syncing dags directoryログアイテムを探します。それ以降の- Finished syncingアイテムが表示されている場合は、このワーカーで DAG が正常に同期されています。
 
- DAG の一時停止を解除します。 
DAG を再解析する
DAG は環境のバケットに保存されるため、各 DAG は最初に DAG プロセッサに同期され、その後、DAG プロセッサによって 短い遅延で解析されます。Airflow UI などを使用して DAG を手動で再解析すると、DAG プロセッサは使用可能な DAG の現在のバージョンを再解析します。これは、環境のバケットにアップロードした DAG の最新バージョンではない可能性があります。
オンデマンドの再解析は、解析に時間がかかる場合にのみ使用することをおすすめします。たとえば、環境に多数のファイルがある場合や、Airflow 構成オプションで 長い DAG 解析間隔が構成されている場合に、この問題が発生することがあります。
環境内の DAG を削除する
DAG を削除するには、DAG の Python .py ファイルを環境のバケット内の環境の /dags フォルダから削除します。
コンソール
- Google Cloud コンソールで、[環境] ページに移動します。 
- 環境のリストで、環境の名前を含む行を探し、[DAG フォルダ] 列で [DAG] リンクをクリックします。[バケットの詳細] ページが開きます。環境のバケット内の - /dagsフォルダの内容が表示されます。
- DAG ファイルを選択し、[削除] をクリックして、オペレーションを確認します。 
gcloud
gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE
以下のように置き換えます。
- ENVIRONMENT_NAMEを環境の名前にする。
- LOCATIONは、環境が配置されているリージョン。
- DAG_FILEは、DAG の Python- .pyファイルに置き換えます。
例:
gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py
Airflow UI から DAG を削除する
Airflow ウェブ インターフェースから DAG のメタデータを削除するには、次のように操作します。
Airflow UI
- 環境の Airflow UI に移動します。
- DAG で、[DAG を削除] をクリックします。
gcloud
gcloud CLI で次のコマンドを実行します。
  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME
以下のように置き換えます。
- ENVIRONMENT_NAMEを環境の名前に置き換えます。
- LOCATIONは、環境が配置されているリージョン。
- DAG_NAMEは、削除する DAG の名前です。