DUICUO

Netflix Conductor、オープンソースのマイクロサービスオーケストレーションフレームワーク

[[438055]]

この記事では、主に Netflix Conductor の基本的な概念と主な動作メカニズムを紹介します。

導入

Netflix Conductorは、マイクロサービスベースのアーキテクチャで使用される、Javaで記述されたオープンソースのワークフローエンジンです。以下の機能を備えています。

  • それぞれの独立したタスクがマイクロサービスによって実装される複雑なビジネス プロセスの作成が可能になります。
  • JSON DSL に基づいてワークフローを作成し、タスクの実行を調整します。
  • ワークフローは実行中に表示および追跡可能です。
  • 一時停止、再開、再起動などの複数の制御モデルを提供します。
  • マイクロサービスの再利用を最大限に高める簡単な方法を提供します。
  • 数百万の同時プロセスを実行できるように拡張できるサービス機能を備えています。
  • クライアントとサーバーの分離はキュー サービスを通じて実現されます。
  • HTTP またはその他の RPC プロトコル経由のデータ転送をサポートします。

2つの基本概念

1 タスク

タスクは実行の最小単位であり、HTTP リクエストの送信などの実行ロジックの一部を実行します。

  • システム タスク: これらのタスクはコンダクター サービスによって実行され、エンジンと同じ JVM 内で実行されます。
  • ワーカータスク:ワーカーサービスによって実行され、その実行はエンジンから分離されています。キューからタスクを取得した後、ワーカーはそれらを実行し、結果ステータスをエンジンに更新します。ワーカー実装はクロスランゲージであり、サーバーとの通信にはHTTPプロトコルを使用します。

conductor はいくつかの組み込み SystemTasks を提供します。

  • 機能タスク:
    • HTTP: HTTPリクエストの送信
    • JSON_JQ_TRANSFORM: 一般的な JSON 変換用の jq コマンドを実行します。詳細については、公式の jq ドキュメントを参照してください。
    • KAFKA_PUBLISH: Kafkaメッセージを公開する
  • プロセス制御タスク:
  • SWITCH (旧 Decision): コードの switch case に似た条件分岐。
  • FORK: 並列タスクをスケジュールするための並列ブランチを起動します。
  • JOIN: 並列タスクを要約するために使用される並列ブランチを要約します。
  • DO_WHILE: コードの do-while ループに似たループ。
  • WAIT: 外部イベントによってノード状態の更新がトリガーされるまで継続的に実行されます。外部操作を待機するために使用できます。
  • SUB_WORKFLOW: サブプロセス。他のプロセスを実行します。
  • TERMINATE: 出力を指定してプロセスを途中で終了します。SWITCHノードと組み合わせて使用​​することができ、コード内の早期リターンステートメントに似ています。

カスタムタスク:

  • システム タスクの場合、Conductor はカスタマイズおよび拡張可能な WorkflowSystemTask 抽象クラスを提供します。
  • ワーカー タスクの場合、コンダクターのクライアント ワーカー インターフェイスを実装することによって実行ロジックを実装できます。

2 ワークフロー

  • ワークフローは実行する必要のある一連のタスクで構成され、コンダクターは JSON を使用してタスクのフローを記述します。
  • 基本的なシーケンシャルフローに加えて、組み込みの SWITCH、FORK、JOIN、DO_WIHLE、および TERMINATE タスクを使用して、分岐、並列処理、ループ、早期終了などのフロー制御を実現することもできます。

3 入力と出力

タスクへの入力は、ワークフローのインスタンス化の一部、または他のタスクの出力として機能するマッピングです。これにより、ワークフローまたは他のタスクからの入出力を、後続のタスクの入力として使用できるようになります。

  • タスクには独自の入力と出力があり、どちらも JSONObject 型です。
  • タスクは、${taskxxx.output}という形式を使用して、他のタスクの入力と出力を参照できます。参照構文はjson-pathです。${taskxxx.output}の基本的な値解析に加えて、フィルタリングなどの複雑な操作もサポートしています。詳細はjson-path構文を参照してください。
  • ワークフローを開始するときに、プロセスの入力データを渡すことができ、タスクは ${workflow.input} を使用して参照できます。

タスクはアトミック操作とフロー制御を実装し、ワークフローはタスク間のフロー関係を定義および記述します。タスクはワークフロー、または他のタスクの入力と出力を参照します。これらのメカニズムを通じて、コンダクターはプロセスを記述するためのJSON DSLを実装します。

3in1アーキテクチャ

主にいくつかの部分に分かれています。

  • オーケストレーター: ワークフローのスケジュールと管理を担当します。
  • 管理/実行サービス: プロセスとタスクの管理および更新操作を提供します。
  • TaskQueues: Orchestrator によって解析されて実行されるタスクが配置されるタスク キュー。
  • ワーカー: タスク実行ワーカーは、TaskQueues からタスクを取得し、実行サービスを通じてタスクのステータスと結果データを更新します。
  • データベース: メタデータおよびランタイム データベース。ワークフローやタスクなどのランタイム ステータス情報、およびプロセスやタスクの定義などのメタデータを保存するために使用されます。
  • インデックス: 実行履歴を保存するために使用されるインデックス データベース。

4つの運用モデル

1. タスクの状態遷移

  • SCHEDULED: スケジュールが保留中。キューに配置されたが、まだ実行のためにポーリングされていないタスクの状態。
  • IN_PROGRESS: 実行中。実行のためにポーリングされたがまだ完了していないアプリケーションの状態。
  • 完了: 実行が完了しました
  • 失敗: 実行に失敗しました。
  • キャンセル済み: この状態はサービスが中断されたときに発生し、通常は次の 2 つの状況で発生します。
    • 1. プロセスが手動で停止されると、実行中のタスクはこの状態になります。
    • 2. 複数のフォーク ブランチでは、1 つのブランチのタスクが失敗すると、他のブランチで実行されているタスクも同じ状態になります。

2. タスクキュー

タスクの実行 (同期システム タスクを除く) は、まずタスク キューに追加されます。これは、典型的なプロデューサー - コンシューマー パターンです。

  • タスク キューは、遅延や優先度などの機能を備えたキューです。
  • 各タイプのタスクは個別のキューに格納されます。さらに、ドメインまたは分離グループが設定されている場合は、実行の分離を実現するために複数のキューに分割されます。
  • ディシダー サービスはプロデューサーであり、プロセス構成と現在の実行ステータスに基づいて実行可能なタスクを解析し、キューに追加します。
  • タスク エグゼキュータ (SystemTaskWorker、Worker) は、ロング ポーリングを使用してキューからタスクを取得して実行するコンシューマーです。

キュー インターフェイスはプラグ可能であり、コンダクターは Dynomite、MySQL、および PostgreSQL の実装を提供します。

3. コア機能実装メカニズム

コンダクター スケジューリングの中核は、プロセスの現在の状態に基づいて実行するタスクのリストを解析し、ワーカーによる実行のためにタスクをキューに入れるディサイダー サービスです。

主な決定プロセスは次のように簡略化されています。詳細なコードについては、WorkflowExecutor.java の decide メソッドを参照してください。

タスクをスケジュールするための簡略化されたワークフローは次のとおりです。詳細なコードについては、`WorkflowExecutor.java` の `scheduleTask` メソッドを参照してください。

いつトリガーするかを決める

最も重要なトリガー時間:

新しいプログラムが起動されると、決定操作がトリガーされます。

システム タスクが完了すると、決定操作がトリガーされます。

Worker タスクが ExecutionService を介してタスク ステータスを更新すると、決定操作がトリガーされます。

プロセス制御ノードの実装メカニズム

1) タスクとタスクマッパー

各タスクは、Task と TaskMapper の 2 つの部分で構成されます。

タスク: タスクの実行ロジック コード。その機能はタスクを実行することです。

TaskMapper: タスクマッピングロジックコード。タスク定義の設定、現在のインスタンスの実行ステータス、その他の情報に基づいて、実際に実行する必要があるタスクのリストを返します。

一般的なタスクの場合、TaskMapper はタスク自体と、実行インスタンスに関する追加のステータス情報を返します。ただし、制御ノードの場合はロジックが異なります。

2) 条件分岐の実装機構(SWITCH)

SWITCH は、条件に基づいて異なる分岐を実行するために使用されます。

実際には、このノードのタスクは何も処理しません。TaskMapperは分岐条件に基づいて分岐を決定し、対応する分岐の最初のタスクを返します。

SwitchTaskMapper.java の getMappedTasks メソッドのキー コード:

  1. // スケジュールされるタスクのリストと最終結果が返されます。
  2. リスト<タスク> タスクをスケジュール済み = 新しい LinkedList<>();
  3. // evalResult は分岐条件変数 ( case )の値です
  4. // decisionCases は、キーがブランチのケース値であり、値が対応するブランチのタスク定義のリストであるマップ構造です (ブランチ内に複数のタスク定義が存在する場合があります)。
  5. // ブランチ変数の実際の値に基づいて、対応するブランチのタスク定義リストを取得します。
  6. リスト<WorkflowTask> selectedTasks = taskToSchedule.getDecisionCases().get(evalResult);
  7. //デフォルトのロジック: 対応するブランチが見つからないか、ブランチが空の場合は、デフォルトのブランチが使用されます。
  8. 選択されたタスクがnullの場合、選択されたタスクは空です。
  9. 選択されたタスク = taskToSchedule.getDefaultCase();
  10. }
  11. 選択されたタスクがnullの場合、選択されたタスクは空です。
  12. // ブランチの最初のタスク (インデックス 0) を取得し、スケジュールのためにディシダー サービスに返します (ディシダーはタスクをキューに追加し、実行のためにワーカーに渡します)。
  13. ワークフロータスク selectedTask = selectedTasks.get(0);
  14. // `deciderService` の `getTasksToBeScheduled` メソッドが呼び出され、`TaskMapper` が取得されて `getMappedTasks` が呼び出されます。これは再帰呼び出しアプローチを使用してネストされたタスクを解析します。
  15. リスト<タスク> caseTasks = taskMapperContext.getDeciderService()
  16. .getTasksToBeScheduled(ワークフローインスタンス、選択されたタスク、再試行回数、taskMapperContext.getRetryTaskId());
  17. スケジュールするタスクにすべてのケースタスクを追加します。
  18. switchTask.getInputData().put( "hasChildren" , "true" );
  19. }
  20. tasksToBeScheduledを返します

3) 並列(FORK)実装メカニズム

FORK は複数の並列ブランチを開始するために使用されます。

実際、このノードの Task は何も行いません。TaskMapper は、すべての並列ブランチの最初の Task を返します。

`ForkJoinTaskMapper.java` の `getMappedTasks` の主要なコード スニペット:

  1. // スケジュールされるタスクのリストと最終結果が返されます。
  2. リスト<タスク> タスクをスケジュール済み = 新しい LinkedList<>();
  3. // 構成内のすべてのフォークブランチ
  4. リスト<リスト<ワークフロータスク>> forkTasks = taskToSchedule.getForkTasks();
  5. for (List<WorkflowTask> wfts : forkTasks) {
  6. // 各ブランチから最初のタスクを取得します
  7. ワークフロータスク wft = wfts.get(0);
  8. // `deciderService` の `getTasksToBeScheduled` メソッドが呼び出され、`TaskMapper` が取得されて `getMappedTasks` が呼び出されます。これは再帰呼び出しアプローチを使用してネストされたタスクを解析します。
  9. リスト<タスク> タスク2 = taskMapperContext.getDeciderService()
  10. .getTasksToBeScheduled(ワークフローインスタンス、wft、再試行回数);
  11. スケジュールするタスクにすべてのタスクを追加します(tasks2);
  12. }
  13. tasksToBeScheduledを返します

通常、分岐ノード(SWITCH)と並列ノード(FORK)自体には実行ロジックがありません。これらのノードはTaskMapperを介して実際に実行されるタスクに戻り、その後、Decider Serviceに処理を引き渡します。

再試行実装メカニズム

再試行とその遅延設定は両方とも、タスク キュー機能を使用して実装されます。

再試行: タスクをタスク キューに再度追加します。

再試行遅延: タスクをタスクキューに追加する際、遅延時間を設定します。遅延時間が経過した後にのみ、タスクはキューからポーリングされ、実行されます。

5つの完全性保証メカニズム

スケジュール処理中に、マシンの再起動、ネットワーク異常、JVM のクラッシュなどの状況が発生する場合があり、決定プロセスが予期せず終了したり、プロセスが完全に実行されなかったり、プロセスが無期限に実行される (実際にはスケジュールされていない) などの異常な現象や、その他のステータス エラーが発生したりすることがあります。

1 ワークフローリコンサイラー

この問題に対処するため、コンダクターにはWorkflowReconcilerが搭載されており、定期的に実行中のすべてのプロセスを決定し、プロセス実行の一貫性を回復しようとします。さらに、プロセスのタイムアウトも検証します。

2 決定キュー

では、WorkflowReconciler は現在実行中のワークフローをどのようにして取得するのでしょうか? その答えは decideQueue です。

タスクキューと同様に、decideQueueも遅延機能を持つキューであり、現在実行中のプロセスのインスタンスIDを格納します。タスクが実行を開始すると(新規実行の開始、再試行、再開、再実行など)、インスタンスIDがdecideQueueにプッシュされます。実行が終了すると(成功または失敗)、インスタンスIDはdecideQueueから削除されます。

3 実行ロックサービス

WorkflowReconcilerは、タイムアウトチェックとプロセスの一貫性維持のため、実行中のすべてのプロセスを定期的に決定しようとします。ただし、プロセスの通常の実行自体も決定をトリガーします。同じ実行が同時に2つの決定をトリガーすると、状態の混乱、実行の停止、その他の問題が発生する可能性があります。

Conductor はロックを使用してこの問題を解決し、LocalOnlyLock (セマフォに基づく)、Redis 分散ロック (RedisSession に基づく)、ZooKeeper 分散ロックの 3 つの実装を提供します。

`decide` メソッドは最初にロックの取得を試みます。失敗した場合は、直ちに戻ります。ロックにより、`decide` が同じプロセスインスタンスで同時に実行されないことが保証されます。

  1. if (!executionLockService.acquireLock(ワークフローID)) {
  2.  
  3. 戻る 間違い;
  4.  
  5. }

ロックは設定可能であるため、単一のマシンに対してロックを設定する必要はない、という誤解が生じる可能性があります。実際には、WorkflowReconcilerとワークフローの通常の実行の間に競合が発生し、ワークフローの状態が不整合になる可能性があるため、単一のマシンに対してロックを設定する必要があります。

参照:

Github: https://github.com/Netflix/conductor

公式ドキュメント: https://netflix.github.io/conductor/

ワークフローリコンサイラー: https://github.com/Netflix/conductor/blob/main/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowReconciler.java

ワークフローシステムタスク: https://github.com/Netflix/conductor/blob/main/core/src/main/java/com/netflix/conductor/core/execution/tasks/WorkflowSystemTask.java?spm=ata.21736010.0.0.2b501a3cYnrSfT&file=WorkflowSystemTask.java