Airflow2.2.3 + Celery + MySQL 8 は堅牢な分散スケジューリング クラスタを構築します

Airflow2.2.3 + Celery + MySQL 8 は堅牢な分散スケジューリング クラスタを構築します

先ほど、Airflow インフラストラクチャと、コンテナに Airflow をデプロイする方法について説明しました。今日は、Airflow と Celery を使用して堅牢な分散スケジューリング クラスターを構築する方法を見ていきます。

1 クラスタ環境

Ubuntu 20.04.3 LTS マシンにも Airflow クラスターをインストールします。今回はテスト用に同じ構成のサーバーを3台用意します。前回の記事[1]では、Bigdata1サーバーにairflowのすべてのコンポーネントをインストールしました。まだ読んでいない方は、リンクをクリックして、まずは前回の記事を読んでみてください。ここで、他の 2 つのノードにワーカー コンポーネントをインストールするだけです。

ビッグデータ1(A) ビッグデータ2(B) ビッグデータ3(C)
ウェブサーバー
スケジューラ
ワーカー

前回の記事の docker-compose.yml では、デプロイメント ファイルとデータ ディレクトリが分離されておらず、後からの管理に不便です。したがって、サービスを停止し、データベースとデータ ディレクトリをデプロイメント ファイルから分離することができます。

  • デプロイメントファイル: docker-compose.yaml/.env は /apps/airflow ディレクトリに保存されます
  • MySQLと設定ファイル: /data/mysql に配置
  • エアフローデータディレクトリ: /data/airflow に配置

この分割により、後で統一された管理が容易になります。

2. ワーカーサービスをデプロイする

事前準備

  1. mkdir /data/airflow/{dags,plugins} -pv
  2. mkdir -pv /apps/airflow
  3. mkdir -pv /logs/エアフロー

ワーカーデプロイメントファイル:

  1. ---  
  2. バージョン: '3'  
  3. x-エアフロー共通:
  4. エアフロー共通
  5.  注文  カスタム依存関係を追加したりプロバイダー パッケージをアップグレードしたりすると、拡張イメージを使用できるようになります。
  6. # イメージ行をコメントアウトし、 docker-compose.yaml を配置したディレクトリDockerfileを配置します。
  7. # 以下の「build」のコメントを解除し ` docker-compose build` を実行してイメージをビルドします
  8. イメージ: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3}
  9. # 建てる: 。
  10. 環境:
  11. &エアフロー共通環境
  12. AIRFLOW__CORE__EXECUTOR: セロリエグゼキューター
  13. AIRFLOW__CORE__SQL_ALCHEMY_CONN: mysql+mysqldb://airflow:aaaa@$${MYSQL_HOST}:3306/airflow #MySQLに対応するアカウントとパスワードを変更します
  14. AIRFLOW__CELERY__RESULT_BACKEND: db+mysql://airflow:aaaa@$${MYSQL_HOST}:3306/airflow #MySQLに対応するアカウントとパスワードを変更します
  15. AIRFLOW__CELERY__BROKER_URL: redis://:xxxx@$${REDIS_HOST}:7480/0 #Redisのパスワードを変更する
  16. AIRFLOW__CORE__FERNET_KEY: ''  
  17. AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'  
  18. AIRFLOW__CORE__LOAD_EXAMPLES: 'true'  
  19. AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'  
  20. _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  21. ボリューム:
  22. - /data/airflow/dags:/opt/airflow/dags
  23. - /logs/エアフロー:/opt/エアフロー/ログ
  24. -/data/airflow/plugins:/opt/airflow/plugins
  25. - /data/airflow/airflow.cfg:/opt/airflow/airflow.cfg
  26. ユーザー: "${AIRFLOW_UID:-50000}:0"  
  27.  
  28. サービス:
  29. エアフローワーカー:
  30. <<: *エアフロー共通
  31. コマンド: セロリワーカー
  32. ヘルスチェック:
  33. テスト:
  34. - 「CMD-SHELL」  
  35. - 'celery --app airflow.executors.celery_executor.app 検査 ping -d "celery@$${HOSTNAME}"'  
  36. 間隔: 10秒
  37. タイムアウト: 10秒
  38. 再試行: 5
  39. 環境:
  40. <<: *エアフロー共通環境
  41. #セロリワーカーウォームシャットダウンを適切に処理するために必要
  42. # https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation を参照してください
  43. DUMB_INIT_SETSID: "0"  
  44. 再起動: 常に
  45. ホスト名: bigdata-20-194 # ここでコンテナのホスト名を設定すると、どのワーカーであるかを簡単に確認できます
  46. 依存:
  47. エアフロー初期化:
  48. 条件: サービスが正常に完了しました
  49.  
  50. エアフロー初期化:
  51. <<: *エアフロー共通
  52. エントリポイント: /bin/bash
  53. #yamllintルールを無効にする:line-length
  54. 指示:
  55. - -c
  56. - |
  57. 関数ver() {
  58. printf "%04d%04d%04d%04d" $${1//./ }
  59. }
  60. airflow_version=$$(gosu エアフロー エアフロー バージョン)
  61. airflow_version_comparable=$$(バージョン $${airflow_version})
  62. 最小エアフローバージョン=2.2.0
  63. min_airflow_version_comparable=$$(バージョン $${min_airflow_version})
  64. 比較可能なエアフローバージョンが最小エアフローバージョンの場合、それから 
  65. エコー
  66. echo -e "\033[1;31mエラー!!!: Airflowのバージョンが古すぎます $${airflow_version}!\e[0m"  
  67. echo "サポートされている最小の Airflow バージョン: $${min_airflow_version}。これ以降のバージョンのみを使用してください!"  
  68. エコー
  69. 出口1
  70. フィ
  71. [[ -z "${AIRFLOW_UID}" ]]の場合;それから 
  72. エコー
  73. echo -e "\033[1;33m警告!!!: AIRFLOW_UIDが設定されていません!\e[0m"  
  74. echo "Linux を使用している場合は、以下の手順に従って設定する必要があります"  
  75. echo "AIRFLOW_UID 環境変数。そうでない場合、ファイルは root によって所有されます。"  
  76. echo "他のオペレーティング システムの場合は、手動で作成した .env ファイルで警告を取り除くことができます:"  
  77. echo "参照: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user"  
  78. エコー
  79. フィ
  80. 1048576 メガバイト
  81. mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
  82. cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
  83. disk_available=$$(df / | tail -1 | awk '{print $$4}' )
  84. warning_resources = "偽"  
  85. mem_available が 4000 未満の場合それから 
  86. エコー
  87. echo -e "\033[1;33m警告!!!: Dockerに使用できるメモリが不足しています。\e[0m"  
  88. echo "少なくとも 4GB のメモリが必要です。$$(numfmt --to iec $$((mem_available * one_meg))) があります"  
  89. エコー
  90. warning_resources= "true"  
  91. フィ
  92. 利用可能なCPU数が2以下の場合それから 
  93. エコー
  94. echo -e "\033[1;33m警告!!!: Dockerに使用できるCPUが足りません。\e[0m"  
  95. echo "少なくとも 2 つの CPU を推奨します。$${cpus_available} があります"  
  96. エコー
  97. warning_resources= "true"  
  98. フィ
  99. ディスクの空き容量が 1 メガバイト未満の場合、それから 
  100. エコー
  101. echo -e "\033[1;33m警告!!!: Docker に使用できるディスク容量が足りません。\e[0m"  
  102. echo "少なくとも 10 GB を推奨します。$$(numfmt --to iec $$((disk_available * 1024 ))) があります"  
  103. エコー
  104. warning_resources= "true"  
  105. フィ
  106. [[ $${warning_resources} == "true" ]] の場合;それから 
  107. エコー
  108. echo -e "\033[1;33m警告!!!: Airflow を実行するためのリソースが不足しています (上記を参照)!\e[0m"  
  109. echo "利用可能なリソースの量を増やすには、次の手順に従ってください:"  
  110. 「https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin」をエコーし​​ます 
  111. エコー
  112. フィ
  113. mkdir -p /sources/logs /sources/dags /sources/plugins
  114. chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
  115. exec /entrypoint エアフローバージョン
  116. #yamllintルールを有効にする:line-length
  117. 環境:
  118. <<: *エアフロー共通環境
  119. _AIRFLOW_DB_UPGRADE: 'true'  
  120. _AIRFLOW_WWW_USER_CREATE: 'true'  
  121. _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
  122. _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
  123. ユーザー: "0:0"  
  124. ボリューム:
  125. - .:/ソース
  126.  
  127. エアフロー CLI:
  128. <<: *エアフロー共通
  129. プロフィール:
  130. - デバッグ
  131. 環境:
  132. <<: *エアフロー共通環境
  133. CONNECTION_CHECK_MAX_COUNT: "0"  
  134. # エントリポイントの問題に対する回避策。参照: https://github.com/apache/airflow/issues/16252
  135. 指示:
  136. -バッシュ
  137. - -c
  138. - 気流

テストを初期化し、環境が次の要件を満たしているかどうかを確認します。

  1. cd /apps/ariflow/
  2. echo -e "AIRFLOW_UID=$(id -u)" > .env # AIRFLOW_UID は通常のユーザーの UID である必要があり、これらの永続的なディレクトリを作成する権限を持っている必要があることに注意してください。
  3. docker-compose を起動し、airflow-init を実行します。

データベースがすでに存在する場合、初期化テストは既存のデータベースに影響しません。次に、airflow-worker サービスを実行します。

  1. docker-compose を起動 -d

次に、同じ方法でbigdata3ノードにairflow-workerサービスをインストールします。デプロイメントが完了したら、flower を通じてブローカーのステータスを確認できます。

3永続的な設定ファイル

ほとんどの場合、複数の Airflow ワーカー ノードを持つクラスターを使用する場合は、Airflow 構成ファイルを永続化し、すべてのノードに Airflow を同期する必要があります。そのため、docker-compose.yaml 内の x-airflow-common のボリュームを変更し、airflow.cfg をボリュームとしてコンテナにマウントする必要があります。構成ファイルをコンテナにコピーして変更することができます。

使用の初期段階では、docker-compose ファイル内のいくつかの環境変数の値を airflow.cfg ファイルに書き込む必要があります。たとえば、次の情報です。

  1. [コア]
  2. dags_folder = /opt/airflow/dags
  3. ホスト名_callable = socket.getfqdn
  4. default_timezone = Asia/Shanghai # タイムゾーンを変更する
  5. 実行者 = CeleryExecutor
  6. sql_alchemy_conn = mysql+mysqldb://airflow:aaaa@$${MYSQL_HOST}:3306/airflow
  7. sql_engine_encoding = utf-8
  8. sql_alchemy_pool_enabled =有効 
  9. sql_alchemy_pool_size = 5
  10. sql_alchemy_max_overflow = 10
  11. sql_alchemy_pool_recycle = 1800
  12. sql_alchemy_pool_pre_ping = 
  13. sql_alchemy_schema =
  14. 並列度 = 32
  15. 1日あたりの最大アクティブタスク数 = 16
  16. dags_are_paused_at_creation = 
  17. 1日あたりの最大アクティブ実行数 = 16
  18. load_examples = True  
  19. load_default_connections = True  
  20. プラグインフォルダ = /opt/airflow/plugins
  21. execute_tasks_new_python_interpreter = False  
  22. fernet_key =
  23. donot_pickle = 
  24. dagbag_import_timeout = 30.0
  25. dagbag_import_error_tracebacks = True  
  26. dagbag_import_error_traceback_depth = 2
  27. dag_file_processor_timeout = 50
  28. task_runner = 標準タスクランナー
  29. デフォルト偽装 =
  30. セキュリティ =
  31. ユニットテストモード = False  
  32. enable_xcom_pickle = False です 
  33. 強制終了したタスクのクリーンアップ時間 = 60
  34. dag_run_conf_overrides_params = True  
  35. dag_discovery_safe_mode = 
  36. デフォルトタスク再試行 = 0
  37. default_task_weight_rule = ダウンストリーム
  38. 最小シリアル化DAG更新間隔 = 30
  39. 最小シリアル化DAGフェッチ間隔 = 10
  40. タスクあたりのレンダリングされた ti フィールドの最大数 = 30
  41. check_slas = 
  42. xcom_backend = airflow.models.xcom.BaseXCom
  43. lazy_load_plugins = True  
  44. lazy_discover_providers = True  
  45. 最大データベース再試行回数 = 3
  46. 機密性の高いvar_conn_fieldsを非表示にする = True  
  47. 機密変数接続名 =
  48. デフォルト_プール_タスク_スロット数 = 128
  49. [ログ]
  50. ベースログフォルダ = /opt/airflow/logs
  51. リモートログ = False  
  52. リモートログ接続ID =
  53. google_key_path =
  54. リモートベースログフォルダー =
  55. encrypt_s3_logs = False  
  56. ログレベル = INFO
  57. fab_logging_level = 警告
  58. ログ記録構成クラス =
  59. 色付きコンソールログ = True  
  60. coloured_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
  61. 色付きフォーマッタクラス = airflow.utils.log.colored_log.CustomTTYColoredFormatter
  62. log_format = [%%(asctime)s] {%%(ファイル名)s:%%(行番号)d} %%(レベル名)s - %%(メッセージ)s
  63. simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
  64. タスクログプレフィックステンプレート =
  65. log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
  66. log_processor_filename_template = {{ ファイル名 }}.log
  67. dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
  68. task_log_reader = タスク
  69. 追加のロガー名 =
  70. ワーカーログサーバーポート = 8793
  71. [メトリクス]
  72. statsd_on = 
  73. statsd_host = ローカルホスト
  74. 統計d_ポート = 8125
  75. statsd_prefix = エアフロー
  76. 統計d_allow_list =
  77. 統計名ハンドラ =
  78. statsd_datadog_enabled = False  
  79. 統計データドッグタグ =
  80. [秘密]
  81. バックエンド =
  82. バックエンド_kwargs =
  83. [クリ]
  84. api_client = airflow.api.client.local_client
  85. エンドポイントURL = http://localhost:8080
  86. [デバッグ]
  87. fail_fast = False  
  88. [API]
  89. enable_experimental_api = False  
  90. auth_backend = airflow.api.auth.backend.deny_all
  91. 最大ページ制限 = 100
  92. フォールバックページ制限 = 100
  93. google_oauth2_オーディエンス =
  94. google_key_path =
  95. アクセス制御許可ヘッダー =
  96. アクセス制御許可メソッド =
  97. アクセス制御許可の起源 =
  98. [系統]
  99. バックエンド =
  100. [アトラス]
  101. sasl_enabled = False  
  102. ホスト =
  103. ポート = 21000
  104. ユーザー名 =
  105. パスワード=
  106. [オペレーター]
  107. default_owner = エアフロー
  108. デフォルトCPU = 1
  109. デフォルト_ram = 512
  110. デフォルトディスク = 512
  111. デフォルト_gpus = 0
  112. default_queue =デフォルト 
  113. allow_illegal_arguments = False  
  114. [ハイブ]
  115. デフォルト_hive_mapred_queue =
  116. [ウェブサーバー]
  117. base_url = https://devopsman.cn/airflow #Airflowドメイン名をカスタマイズ
  118. default_ui_timezone = Asia/Shanghai # デフォルトのタイムゾーンを設定する
  119. ウェブサーバーホスト = 0.0.0.0
  120. ウェブサーバーポート = 8080
  121. web_server_ssl_cert =
  122. ウェブサーバーのSSLキー =
  123. ウェブサーバーマスタータイムアウト = 120
  124. ウェブサーバーワーカータイムアウト = 120
  125. ワーカー_リフレッシュ_バッチ_サイズ = 1
  126. ワーカー更新間隔 = 6000
  127. reload_on_plugin_change = False  
  128. シークレットキー = emEfndkf3QWZ5zVLE1kVMg==
  129. 労働者 = 4
  130. ワーカークラス = 同期
  131. アクセスログファイル = -
  132. エラーログファイル = -
  133. アクセスログフォーマット =
  134. expose_config = False  
  135. 公開ホスト名 = True  
  136. expose_stacktrace = True  
  137. dag_default_view = ツリー
  138. dag_orientation = LR
  139. ログフェッチタイムアウト秒 = 5
  140. ログフェッチ遅延秒 = 2
  141. ログ自動テーリングオフセット = 30
  142. ログアニメーション速度 = 1000
  143. デフォルトでは非表示の一時停止ダグス = False  
  144. ページサイズ = 100
  145. navbar_color = #fff
  146. デフォルトの dag 実行表示数 = 25
  147. enable_proxy_fix = False  
  148. プロキシ_fix_x_for = 1
  149. プロキシ_fix_x_proto = 1
  150. プロキシ_fix_x_host = 1
  151. プロキシ_fix_x_port = 1
  152. プロキシ_fix_x_プレフィックス = 1
  153. クッキーセキュア = False  
  154. cookie_samesite = 緩い
  155. default_wrap = False  
  156. x_frame_enabled =有効 
  157. show_recent_stats_for_completed_runs = True  
  158. update_fab_perms = 
  159. セッションの存続時間 = 43200
  160. 自動更新間隔 = 3
  161. [メールアドレス]
  162. email_backend = airflow.utils.email.send_email_smtp
  163. email_conn_id = SMTP_デフォルト
  164. デフォルトのメール再試行 = True  
  165. デフォルトのメール失敗時 = True  
  166. [smtp] # メール設定
  167. smtp_host = ローカルホスト
  168. smtp_starttls = 
  169. smtp_ssl = 
  170. SMTP_ポート = 25
  171. smtp_mail_from = [email protected]
  172. SMTP_タイムアウト = 30
  173. SMTP_再試行制限 = 5
  174. [歩哨]
  175. 監視オン = 
  176. セントリー_dsn =
  177. [セロリ_kubernetes_executor]
  178. kubernetes_queue = kubernetes
  179. [セロリ]
  180. celery_app_name = airflow.executors.celery_executor
  181. ワーカー同時実行数 = 16
  182. ワーカー_umask = 0o077
  183. broker_url = redis://:xxxx@$${REDIS_HOST}:7480/0
  184. result_backend = db+mysql://airflow:aaaa@$${MYSQL_HOST}:3306/airflow
  185. フラワーホスト = 0.0.0.0
  186. 花のURLプレフィックス =
  187. フラワーポート = 5555
  188. フラワー基本認証 =
  189. 同期並列性 = 0
  190. celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
  191. ssl_active = 
  192. SSLキー =
  193. ssl_cert =
  194. ssl_cacert =
  195. プール = プリフォーク
  196. オペレーションタイムアウト = 1.0
  197. タスク追跡開始 = True  
  198. タスク採用タイムアウト = 600
  199. タスク公開の最大再試行回数 = 3
  200. ワーカー事前チェック = False  
  201. [セロリブローカートランスポートオプション]
  202. [ダスク]
  203. クラスターアドレス = 127.0.0.1:8786
  204. tls_ca =
  205. tls_cert =
  206. TLS_キー =
  207. [スケジューラ]
  208. ジョブハートビート秒 = 5
  209. スケジューラハートビート秒 = 5
  210. 実行回数 = -1
  211. スケジューラ_アイドル_スリープ時間 = 1
  212. 最小ファイル処理間隔 = 30
  213. dag_dir_list_interval = 300
  214. 印刷統計間隔 = 30
  215. プールメトリック間隔 = 5.0
  216. スケジューラヘルスチェックしきい値 = 30
  217. 孤立したタスクのチェック間隔 = 300.0
  218. child_process_log_directory = /opt/airflow/logs/scheduler
  219. スケジューラ_ゾンビ_タスク_しきい値 = 300
  220. catchup_by_default = True  
  221. クエリあたりの最大数 = 512
  222. 行レベルロックの使用 = True  
  223. ループごとに作成する最大ダグルン数 = 10
  224. スケジュールごとのループの最大ダグル数 = 20
  225. タスク実行後のスケジュール = True  
  226. 解析プロセス = 2
  227. ファイル解析ソートモード = 変更日時
  228. use_job_schedule = True  
  229. allow_trigger_in_future = False  
  230. 依存関係検出 = airflow.serialization.serialized_objects.DependencyDetector
  231. トリガータイムアウトチェック間隔 = 15
  232. [トリガー]
  233. デフォルト容量 = 1000
  234. [ケルベロス]
  235. ccache = /tmp/airflow_krb5_ccache
  236. 主 = 空気の流れ
  237. 再初期化頻度 = 3600
  238. kinit_path = kinit
  239. キータブ = airflow.keytab
  240. 転送可能 = True  
  241. include_ip = 
  242. [github_エンタープライズ]
  243. API_rev = v3 です
  244. [エラスティックサーチ]
  245. ホスト =
  246. log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number}
  247. end_of_log_mark = ログ終了
  248. フロントエンド =
  249. write_stdout = False  
  250. json_format = False  
  251. json_fields = asctime、ファイル名、行番号、レベル名、メッセージ
  252. host_field = ホスト
  253. offset_field = オフセット
  254. [elasticsearch_configs]
  255. use_ssl = False  
  256. verify_certs = True  
  257. [Kubernetes]
  258. ポッドテンプレートファイル =
  259. ワーカーコンテナリポジトリ =
  260. ワーカーコンテナタグ =
  261. 名前空間 =デフォルト 
  262. ワーカーポッドを削除する = True  
  263. 失敗時にワーカーポッドを削除する = False  
  264. ワーカーポッド作成バッチサイズ = 1
  265. マルチネームスペースモード = False  
  266. in_cluster = True  
  267. kube_client_request_args=
  268. 削除オプション_kwargs =
  269. enable_tcp_keepalive =有効 
  270. tcp_keep_idle = 120
  271. tcp_keep_intvl = 30
  272. tcp_keep_cnt = 6
  273. verify_ssl = 
  274. ワーカーポッドの保留タイムアウト = 300
  275. ワーカーポッドの保留タイムアウトチェック間隔 = 120
  276. ワーカーポッドのキューチェック間隔 = 60
  277. ワーカーポッドの保留タイムアウトバッチサイズ = 100
  278. [スマートセンサー]
  279. スマートセンサーの使用 = False  
  280. シャードコード上限 = 10000
  281. 破片 = 5
  282. sensors_enabled = 名前付きHiveパーティションセンサー

変更が完了したら、サービスを再起動します。

  1. docker-compose の再起動

4. データ同期

Airflow は 3 つのワーカーノードを使用するため、各ノードが構成を変更すると、他のノードが同期する必要があります。同時に、DAGS ディレクトリとプラグイン ディレクトリもリアルタイムで同期する必要があります。スケジューラが情報をノードにスケジュールした後、対応する DAGS ファイルが見つからない場合は、エラーが報告されます。したがって、リアルタイムのデータ同期には lsyncd を使用します。

  1. apt-get install lsyncd -y

公開鍵経由で接続するようにノードを構成する

  1. ssh-keygen -t rsa -C "airflow-sync" -b 4096 #airflow-syncという名前のキーのペアを生成します
  2. ip100 200場合、ssh-copy-id -i ~/.ssh/airflow-sync.pub ${USERNAME}@192.168.0.$ip -P12022 を実行します。完了

その後、秘密鍵を介して他のノードにアクセスできるようになります。

同期設定ファイルを編集し、lsyncd設定パラメータについて詳しく知るには、公式ドキュメント[2]に直接アクセスしてください。

  1. 設定 {
  2. logfile = "/var/log/lsyncd.log"、#ログファイル
  3. statusFile = "/var/log/lsyncd.status" 、# 同期ステータス情報
  4. pidfile = "/var/run/lsyncd.pid"
  5. ステータス間隔 = 1、
  6. nodaemon = false 、# デーモンプロセス
  7. inotifyMode = "CloseWrite"
  8. 最大プロセス数 = 1、
  9. 最大遅延 = 1、
  10. }
  11. 同期{
  12. デフォルトの.rsync、
  13. ソース = "/data/airflow"
  14. ターゲット = "192.168.0.100:/data/airflow"
  15.  
  16. rsync = {
  17. バイナリ= "/usr/bin/rsync"
  18. 圧縮 = false
  19. アーカイブ = true
  20. 所有者 = true
  21. 権限 = true
  22. --delete = 真、  
  23. whole_file = false
  24. rsh = "/usr/bin/ssh -p 12022 -l suoper -o StrictHostKeyChecking=no -i /home/username/.ssh/airflow-rsync"  
  25. },
  26. }
  27. 同期{
  28. デフォルトの.rsync、
  29. ソース = "/data/airflow"
  30. ターゲット = "192.168.0.200:/data/airflow"
  31.  
  32. rsync = {
  33. バイナリ= "/usr/bin/rsync"
  34. 圧縮 = false
  35. アーカイブ = true
  36. 所有者 = true
  37. 権限 = true
  38. --delete = 真、  
  39. whole_file = false
  40. rsh = "/usr/bin/ssh -p 12022 -l suoper -o StrictHostKeyChecking=no -i /home/username/.ssh/airflow-rsync"  
  41. },
  42. }

上記のパラメータの意味については、公式 Web サイトをご覧ください。ここでは、rsync の rsh を通じて ssh コマンドが定義されており、秘密鍵やカスタム ポートなどのセキュリティ対策が使用されるシナリオを解決できます。もちろん、パスワードなしのアクセスを構成して、構成に default.rsync または default.rsyncssh を使用することもできます。

lsyncd サービスホスティングを構成する

  1. cat << EOF > /etc/systemd/system/lsyncd.service
  2. [ユニット]
  3. 説明=lsyncd
  4. 条件ファイルは実行可能=/usr/bin/lsyncd
  5.  
  6. =network-online.targetの後
  7. 欲しいもの=ネットワークオンライン.ターゲット
  8.  
  9. [サービス]
  10. 開始リミットバースト=10
  11. ExecStart=/usr/bin/lsyncd /etc/lsyncd.conf
  12. 再起動=失敗
  13. 再起動秒数=120
  14. 環境ファイル=-/etc/sysconfig/aliyun
  15. キルモード=プロセス
  16. [インストール]
  17. WantedBy =マルチユーザー.ターゲット
  18. 終了
  19.  
  20. systemctlデーモンリロード
  21. systemctl enable --now lsyncd.service #サービスを開始し、自動的に開始するように設定します 

これにより、データ (dag、プラグイン、airflow.cfg) の同期が完了します。後で CICD シナリオを使用する場合は、DAG ファイルを Bigdata1 ノードに直接アップロードすると、他の 2 つのノードが自動的に同期されます。問題が発生した場合は、ログを表示してデバッグすることができます。

  1. lsyncd -logすべての/etc/lsyncd.conf
  2. 末尾 -f /var/log/lsyncd.log

5リバースプロキシ[3]

https://lab.mycompany.com/myorg/airflow/ などのリバース プロキシの背後に Airflow を配置する必要がある場合は、次の構成でこれを行うことができます。

airflow.cfgでbase_urlを設定する

  1. ベースURL = http://my_host/myorg/airflow
  2. enable_proxy_fix =有効 

nginx の設定

  1. サーバー{
  2. 聞く 80;
  3. サーバー名 lab.mycompany.com;
  4.  
  5. 場所 /myorg/airflow/ {
  6. proxy_pass http://localhost:8080;
  7. proxy_set_header ホスト $http_host;
  8. proxy_redirectオフ;
  9. プロキシ_http_バージョン 1.1;
  10. proxy_set_header アップグレード $http_upgrade;
  11. proxy_set_header接続  「アップグレード」 ;
  12. }
  13. }

この時点で、Airflow 分散スケジューリング クラスターのインストールは基本的に完了です。具体的な効果は以下をご覧ください。

これをご覧になっているということは、あなたも Airflow を使用しているか、または Airflow に興味を持っているということです。ちなみに、Airflow の学習教材をいくつかお渡しします。

https://livebook.manning.com/book/data-pipelines-with-apache-airflow/chapter-12/1

参考文献

[1]Airflow 2.2.3 + MySQL8.0.27: https://mp.weixin.qq.com/s/VncpyXcTtlvnDkFrsAZ5lQ

[2]lsyncd設定ファイル: https://lsyncd.github.io/lsyncd/manual/config/file/

[3] プロキシの背後にあるエアフロー: https://airflow.apache.org/docs/apache-airflow/stable/howto/run-behind-proxy.html

<<:  iResearchの「2021年クラウドネイティブインテリジェントマーケティング調査レポート」がリリース、Mobvista、Adobe、Alimamaが代表例として選出

>>:  HUAWEI Cloud GaussDB(openGauss用):業界の悩みを解決し、Huaweiの消費者向けクラウドデータベースを分散型クラウドへと変革する支援

推薦する

別のクラウドを構築して、ビジョンをつなげましょう。 Kedaがビデオハイブリッドクラウドを正式に開始

9月22日、「新しいクラウド、完全に接続されたビジョンの構築」をテーマにした2017 Keda Vi...

第4回ウェブマスターSEOコンテストを観戦した経験について語る

今日、ウェブマスターネットワークのホームページのインタビューを通じて、Bobayou-第4回ウェブマ...

ハイブリッドクラウドインフラストラクチャを効果的に管理するための5つのステップ

ハイブリッド クラウドは、大小、新旧を問わず、あらゆるテクノロジー企業の間で話題になっています。いく...

草の根は成長の節目であるSEO戦略計画策定を経なければならない

時間が経つにつれて、あなたのノートは美しい芸術作品になります。すべての戦略がくしゃくしゃに丸められて...

K3s をベースにしたクラウドネイティブ エッジ インフラストラクチャを構築するにはどうすればよいでしょうか?

Kubernetes は、データセンターを通じてクラウドからエッジまでのパスを見つけています。以前、...

国営クラウドの登場で、新創クラウドが最大の勝者になるかもしれない

2020年9月、国務院国有資産監督管理委員会は「国有企業のデジタル化転換の加速に関する通知」を発行し...

有名なウェブサイトもユーザーエクスペリエンスを向上させるために新機能を追加している。

今日、パソコンの電源を入れたら暇だったので、ずっとインターネットをブラウズしていました。突然、新しい...

テンセント広告とバイトダンスの競争の転換点

今日は、テンセント広告とバイトダンスの転換点についてお話ししましょう。過去5年間、バイトダンスは止め...

PostgreSQLのいくつかの分散アーキテクチャの比較

強力な機能と優れたスケーラビリティにより、Postgresql をベースにした分散アーキテクチャが数...

パーフェクトダイアリーのストップロスの瞬間

パーフェクトダイアリーの計画はそれほど順調には進みませんでした。閉幕したばかりの中国国際輸入博覧会で...

Hosteons VPSはどうですか? Dallas AMD Ryzen+NVMe シリーズのレビュー

ホストオンはどうですか?ホストは良いですか? Hosteons の AMD Ryzen+NVMe に...

versaweb-月額$59/L5630/72gメモリ/240g SSD/50Tトラフィック/Gポート/ラスベガス

Versawebが公式サイトで突如サーバーのキャンペーンを開始しました。まだ知らない方も多いと思いま...

ブログランキングを最適化する5つのステップ

現在、ますます多くの SEO 担当者が、ウェブサイトのメンテナンスにブログを使用するようになっていま...

cmivpsはどうですか?香港CN2+BGPシリーズVPSの広帯域化を簡単に評価

cmivps の香港 VPS のデフォルト設定は中国本土最適化ですが、私が使用した多くのネットワーク...