DUICUO

オープンソース プロジェクト Cadence でポーリングを実装するにはどうすればよいですか?

このガイドは、Cadenceにおけるポーリングの仕組みを理解したいすべての開発者とエンジニアを対象としています。Cadenceは比較的新しい(そして完全にオープンソースの)フォールトトレラントなステータスコードプラットフォームで、元々はUberによって開発され(現在はInstaclustrを含む多くの企業によってサポートされています)、現在も利用されています。

ケイデンスの利点 

多数のユースケースは、単一のリクエスト・レスポンス、複雑な状態追跡、非同期イベントレスポンスなど多岐にわたり、信頼性の低い外部依存関係との通信を伴います。このようなアプリケーションを構築する一般的なアプローチは、ステートレスサービス、データベース、スケジュールされたタスク、キューイングシステムを寄せ集めのように統合することです。

しかし、コードの大部分がパイプラインに使用され、膨大な低レベルの詳細の背後にある実際のビジネスロジックが不明瞭になるため、開発者に悪影響を与える可能性があります。Cadenceは完全にオープンソースのオーケストレーションフレームワークであり、開発者が高度なフォールトトレランスを備え、長時間実行されるアプリケーション(ワークフローと呼ばれることが多い)を開発するのに役立ちます。

本質的には、特定のプロセスに依存しない仮想メモリを提供し、関数スタックやローカル変数など、様々なホストおよびソフトウェア障害に対応可能なアプリケーション状態を完全に保持します。これにより、開発者はプログラミング言語の機能を最大限に活用してコードを記述でき、Cadenceはアプリケーションの永続性、可用性、およびスケーラビリティを担います。ビジーウェイトは通常、不要なCPUサイクルを大量に消費するため、ポーリングは可能な限り回避する必要があります。代わりに、以下の2つの場合を除き、イベントトリガー割り込みを使用する必要があります。

  • 必要なポーリング時間はわずかです。
  • 投票プロセス中に合理的な待ち時間を受け入れることができる

コンピュータにとって、これは長距離移動中に5分ごとに目的地までの距離を尋ねることに相当します。しかし、多くの場合、これが唯一の選択肢です。Cadenceは、永続タイマー、長時間実行アクティビティ、無制限の再試行を強力にサポートしており、このような機能の実装に最適です。

Cadenceを使用して外部サービスをポーリングする 

ポーリングメカニズムを実装する方法は数多くあります。この記事では、主に外部サービスのポーリング実装について説明し、Cadenceからそのメリットを分析します。まず、Cadenceの概念について簡単に説明しましょう。Cadenceの核となる考え方は、フォールトフリーでステートフルなワークフローです。これは、ローカル変数やそれが作成するスレッドを含むワークフローコードの状態が、プロセスまたはCadenceサービスの障害の影響を受けないことを意味します。これは、状態、スレッド処理、永続タイマー、イベントハンドラーをカプセル化するため、非常に強力な概念です。決定論的な実行の要件を満たすために、ワークフローは外部APIへの直接呼び出しを許可しません。代わりに、アクティビティの実行をスケジュールする役割を担います。アクティビティとは、サービスの呼び出しやメディアファイルのトランスコードなど、ビジネスレベルの機能を実装するために使用されるアプリケーションロジックです。アクティビティが失敗した場合、Cadenceはその実行状態を復元しません。したがって、アクティビティ関数には、制限なく任意のコードを含めることができます。

投票の実施 

コード自体は非常にシンプルです。その機能を行ごとに説明します。

状態 polledState = externalServiceActivities.getState(); while(!expectedState.equals(polledState)) {
ワークフロー.スリープ(持続時間.秒数(30));
ポーリングされた状態 = 外部サービスアクティビティ.getState();
}

左右にスワイプしてコード全体を表示します

まずアクティビティ(この場合はREST APIなどの外部サービス)を呼び出します。次に条件チェックを実行する必要があります。必要な状態が満たされていない場合は、10秒間待機します。

これは通常の意味での待機ではなく、永続的なタイマーです。この場合、ポーリングは定期的な待機を実行しますが、これは非常に長い時間がかかる可能性があり、失敗した場合、その期間全体を無駄にすることは絶対に望ましくありません。Cadenceは、タイマーをイベントとして永続化し、完了時に適切なジョブサービス(つまり、ワークフローとアクティビティの実装を管理するサービス)に通知することで、この問題を解決します。

これらのタイマーは、秒から分、時間、日、さらには月や年まで、幅広い時間間隔を管理できます。最後に、外部サービスを再度呼び出すことで状態が更新されます。先に進む前に、潜在的な問題を防ぐためにCadenceがバックグラウンドで行っている処理を簡単に理解しておきましょう。

重要なお知らせ:ケイデンス履歴記録とポーリングに関する考慮事項 

ケイデンスはどのようにして障害のないステートフルなワークフローを実現しているのでしょうか?その鍵は、ケイデンスがワークフロー実行にいかに忠実に従っているかにあります。ワークフローの状態回復はイベントソーシングを活用しており、これによりコードの記述方法に一定の制約が課せられます。イベントソーシングは、絶えず変化する一連のイベントを永続的な状態に変換します。

ワークフローの状態が変化するたびに、ワークフローのイベント履歴に新しいイベントが追加されます。Cadenceは、この履歴を使用して操作を再生し、ワークフローの現在の状態を再構築します。そのため、外部環境との通信はすべてアクティビティを介して行う必要があり、現在の時刻の取得、待機、新しいスレッドの作成にはCadence APIを使用する必要があります。

1. ポーリングは慎重に使用してください。

ポーリングでは、特定の条件に基づいて継続的にループする必要があります。各アクティビティ呼び出しとタイマーイベントは永続的であるため、ポーリング間隔が短くても、許容できないほどの時間の消費につながる可能性があります。では、ポーリングセグメントの履歴がどのように表示されるかを見てみましょう。

  • まず、外部サービスをポーリングするために必要なアクティビティを確立します。
  • このイベントは作業サービスによって開始されました。
  • アクティビティが完了したら結果が返されます。
  • 条件がまだ満たされていない場合は、タイマーを開始します。
  • タイムアウトが発生すると、ワークフローを起動するイベントがトリガーされます。
  • 条件が満たされるまで、上記の 5 つの手順を繰り返します。
  • 最終的なポーリング確認条件が満たされます (タイマーを設定する必要はありません)。
  • ワークフローは完了としてマークされます。


Cadenceのポーリングコードスニペットのイベント履歴

ワークフローが途中で失敗し、その履歴を再生する必要がある場合、多数のイベントが実行される可能性があります。このような操作が制御不能になるのを防ぐには、短いポーリングサイクルの使用を避け、ワークフローに適切なタイムアウトを設定し、ポーリング回数を制限するなどの方法があります。

すべての操作は永続的であり、人間による再実行が必要になる場合があることに注意してください。

2. アクティビティの再試行回数を設定します。

何らかの理由で外部サービスに障害が発生した場合はどうでしょうか?何度も何度も試行する必要があります。Cadence には、アクティビティの結果を記録し、ワークフローの状態を完全に復元するメカニズムがあり、再試行ロジックなどの追加機能もサポートしています。以下は、再試行オプションを有効にしたアクティビティ設定の例です。

プライベート最終ExternalServiceActivities externalServiceActivities = Workflow.newActivityStub(ExternalServiceActivities.class、新しいActivityOptions.Builder()
.setRetryOptions(新しいRetryOptions.Builder() .setInitialInterval(Duration.ofSeconds(10)) .setMaximumAttempts(3)
。建てる())
.setScheduleToCloseTimeout(Duration.ofMinutes(5)) .build());

完全なコードを表示するには、左または右にスワイプします。

これにより、ExternalServiceActivities 内の操作は最大 3 回まで再試行でき、各再試行間隔は 10 秒であることを Cadence に通知します。これにより、外部サービスアクティビティへのすべての呼び出しで再試行ロジックを記述することなく、簡単に再試行機能を実装できます。

使用例: Instafood と MegaBurgers  

このパターンの実際的な効果を示すために、仮想的なポーリング メカニズムをサンプル プロジェクトに統合します。

1. Instafoodの紹介

Instafoodはオンラインベースのフードデリバリーサービスです。お客様はInstafoodモバイルアプリを通じて、お気に入りの地元のレストランから料理を注文できます。注文は店頭受け取りまたは配達で受け付けています。

お客様がデリバリーを選択した場合、Instafoodは配達ドライバーに連絡し、レストランから料理をピックアップしてお客様に配達します。Instafoodは各レストランに、Instafoodとレストラン間のコミュニケーション用にディスプレイ画面またはタブレットを提供します。お客様が注文すると、Instafoodはレストランに通知し、レストランは注文を承諾したり、予想配達時間を提示したり、注文を完了としてマークしたりすることができます。デリバリー注文の場合、Instafoodは配達ドライバーと調整を行い、予想配達時間に基づいて料理をピックアップします。

2. アンケート「メガバーガー」

MegaBurgersは、大手多国籍ファストフード・バーガーチェーンです。独自のモバイルアプリとウェブサイトを運営し、注文処理のバックエンドとしてREST APIを使用しています。InstafoodとMegaBurgersは、Instafoodの顧客がMegaBurgersのInstafoodアプリから注文し、持ち帰りと配達のオプションを選択できるようにする契約を締結しました。一般的なソリューションとは異なり、MegaBurgersは全店舗にInstafoodのディスプレイ画面を設置するのではなく、Instafoodの注文システムを自社のRESTベースの注文システムに統合し、注文の受付と更新処理を行うことに合意しました。

MegaBurgerの注文ステートマシン

MegaBurger の REST API にはプッシュ メカニズム (WebSocket、WebHooks など) がないため、注文ステータスの更新を受信できません。

代わりに、注文ステータスを確認するために GET リクエストを定期的に送信する必要があり、これらのポーリングにより、Instafood 側で注文ワークフローが繰り返し実行される可能性があります (例: 配達ドライバーが食品をピックアップするスケジュールを設定する)。

Instafoodプロジェクトを作成する 

サンプルプロジェクトを実行するには、Cadence クラスターを構成する必要があります。この例では、Instaclustr プラットフォームを使用して操作を実行します。

ステップ1: Instaclustr管理対象クラスターを作成する

Cadence クラスターは、永続化レイヤーとして Apache Cassandra クラスターに接続する必要があります。Cadence クラスターと Cassandra クラスターを正しく設定するには、「Cadence クラスターの作成」ドキュメントの手順に従います。

  • 次の操作は、手動による介入なしに自動的に初期化されます。
  • Cassandra クラスター上の Cadence ノードに対してファイアウォール ルールが自動的に生成されます。
  • Cadence と Cassandra 間の認証 (クライアント側の暗号化を含む) は自動的に構成されます。
  • Cadence のデフォルト構成とキースペースの可視性は、Cassandra で自動的に作成されます。
  • Cadence クラスターが停止する前に Cassandra クラスターが誤って削除されないように、2 つのクラスター間に関係が確立されます。
  • ロードバランサーが作成されます。ロードバランサーのアドレスを介してクラスターに接続し、アクセスすることをお勧めします。

ステップ2: Cadenceドメインを構成する

Cadenceのバックエンドはマルチテナントサービスを採用しており、分離されたユニットはドメインと呼ばれます。Instafoodアプリケーションを実行するには、まずドメインに登録する必要があります。

1. Cadence クラスターと対話するには、コマンドライン インターフェイス クライアントをインストールする必要があります。

macOS

macOS を使用している場合は、以下に示すように、Homebrew 経由で Cadence CLI をインストールできます。

 brew を使用して Cadence Workflow をインストールします。
# コマンドラインクライアントを実行する
リズム <コマンド> <引数>

その他のオペレーティングシステム

CLI は、Docker Hub イメージ リポジトリ ubercadence/cli 経由で実行および使用できます。

 # コマンドラインクライアントを実行する
docker run --network=host --rm ubercadence/cli:master <コマンド> <引数>

左右にスワイプしてコード全体を表示します

次の手順では、Cadence を使用してクライアントを参照します。

2. 接続の安定性を確保するため、ロードバランサのアドレスを介してクラスタに接続し、アクセスすることをお勧めします。ロードバランサのアドレスは、「接続情報」タブの上部に表示されます(下図参照)。

 「ab-cd12ef23-45gh-4baf-ad99-df4xy-azba45bc0c8da111.elb.us-east 1.amazonaws.com」

左右にスワイプしてコード全体を表示します

これを <cadence_host> と呼びます。

3. 現在のドメインを一覧表示して接続をテストできます。

 cadence --ad <cadence_host>:7933 管理ドメインリスト

4. instafoodドメインを追加します。

 cadence --ad <cadence_host>:7933 --do instafood ドメイン登録 --global_domain=false

完全なコードを表示するには左右にスワイプします。

5. 登録されているか確認します。

 cadence --ad <cadence_host>:7933 --do instafood domain describe

ステップ3: Instafoodサンプルプロジェクトを実行する

1. Instafood プロジェクトの Git リポジトリから Gradle プロジェクトをクローンします。

2. instafood/src/main/resources/instafood.properties にある構成ファイルを開き、cadenceHost の値をロードバランサーのアドレスに置き換えます。

 cadenceHost=<cadence_host>

3. 次の方法でアプリケーションを実行します。

 cadence-cookbooks-instafood/instafood$ ./gradlew 実行

あるいは、IDE 内から InstafoodApplication のメイン クラスを実行します。

4. ターミナル出力をチェックして、正常に実行されていることを確認します。

MegaBurger の API を理解する 

Instafood が MegaBurger とどのように統合されるかを知る前に、その API を簡単に見てみましょう。

1. MegaBurger サービスを実行します。

まずはサービスを実行してみましょう。以下のコマンドでサービスを開始します。

 cadence-cookbooks-instafood/megaburger$ ./gradlew 実行

または、IDEでMegaburgerRestApplicationを実行してください。これは、メモリを永続化レイヤーとして使用するSpring Boot REST APIのデモ例です。アプリケーションを閉じると、すべてのデータが失われます。

2. MegaBurgerの注文API

MegaBurger は、各食品の注文のステータスを追跡および更新するための Orders API をリリースしました。

POST /注文

注文を作成し、その ID を返します。

リクエスト:

 curl -X POST localhost:8080/orders -H "Content-Type: application/json" --data '{"meal": "ビーガンバーガー", "quantity": 1}'

左右にスワイプしてコード全体を表示します

応答:

 {
「id」: 1,
「食事」:「ビーガンバーガー」
「数量」: 1,
「ステータス」:「保留中」
「eta_minutes」: null
}

GET /orders

すべての注文情報を含むリストを返します。

リクエスト:

 curl -X GET localhost:8080/orders

応答:

 [
{
「id」: 0,
「食事」:「ビーガンバーガー」
「数量」: 1,
「ステータス」:「保留中」
「eta_minutes」: null
},
{
「id」: 1,
「食事」:「オニオンリング」
「数量」: 2,
「ステータス」:「保留中」
「eta_minutes」: null
}
]


GET /orders / {orderId}

ID が orderId と一致する注文を返します。

リクエスト:

 curl -X GET ローカルホスト:8080/orders/1

応答:

 {
「id」: 1,
「食事」:「オニオンリング」
「数量」: 2,
「ステータス」:「保留中」
「eta_minutes」: null
}

/orders/{orderId} にパッチを適用

orderId と一致する ID を持つ注文を更新する

リクエスト:

 curl -X PATCH localhost:8080/orders/1 -H "Content-Type: application/ json" --data '{"status":"ACCEPTED"}'

完全なコードを表示するには左右にスワイプします。

応答:

 {
「id」: 1,
「食事」:「オニオンリング」
「数量」: 2,
「ステータス」: 「承認済み」
「eta_minutes」: null
}

  MegaBurger ポーリング統合プロジェクトのレビュー 

すべての構成の初期化が完了したので、Instafood と MegaBurger の統合が実際にどのように機能するかを見てみましょう。

1. 投票ワークフロー

まず、新しいワークフローを定義します: MegaBurgerOrderWorkflow

パブリックインターフェースMegaBurgerOrderWorkflow {
@ワークフローメソッド
void orderFood(FoodOrder 注文);
// ...
}

このワークフローには、MegaBurger との統合を通じて対応する FoodOrder を送信および追跡する orderFood メソッドがあります。

それでは、それがどのように実装されているかを見てみましょう。

パブリッククラスMegaBurgerOrderWorkflowImplはMegaBurgerOrderWorkフローを実装します{
// ...
@オーバーライド
パブリック void orderFood(FoodOrder 注文) {
注文ワークフローの親注文ワークフロー = getParentOrderWorkflow();
整数 orderId = megaBurgerOrderActivities.createOrder(mapMega BurgerFoodOrder(order));
注文ステータスを更新します(親注文ワークフロー、注文ステータス.PENDING);
// 注文が承認/拒否されるまでポーリングする
注文ステータスを更新します(親注文ワークフロー、注文ステータス遷移をポーリングします(注文 ID、注文ステータス。
保留中));
OrderStatus.REJECTED.equals(currentStatus) の場合 {
throw new RuntimeException(“ID “ + orderId + “ の注文は拒否されました”);
}
// 親ワークフローに ETA を送信する
parentOrderWorkflow.updateEta(getOrderEta(orderId)); // 注文が処理されるまでポーリングする
updateOrderStatus(parentOrderWorkflow, pollOrderStatusTransition(orderId, OrderStatus.ACCEPTED)); // 注文の準備ができるまでポーリングする
updateOrderStatus(parentOrderWorkflow, pollOrderStatusTransition(orderId, OrderStatus.COOKING)); // 注文が配達されるまでポーリングする
updateOrderStatus(親OrderWorkflow、
pollOrderStatusTransition(orderId, OrderStatus.READY)); }
// ...
}

左右にスワイプしてコード全体を表示します

このワークフローはまず親ワークフローを取得します。MegaBurgerOrderWorkflow はMegaBurgerとの連携のみを処理し、別のワークフローで管理されているクライアントに注文を配送します。つまり、サブワークフローを使用していることになります。次に、アクティビティを介して注文が作成され、注文IDが取得されます。

このアクティビティは、/orders への POST リクエストの送信を担当する API クライアントの単なるデコレータです。注文が作成されると、親ワークフローは注文が PENDING 状態になったことを示すシグナルを受け取ります(これは、ワークフロー外部から送信された非同期リクエストです)。

次に、注文がPENDINGからACCEPTEDまたはREJECTEDに変わるのを待つ必要があります。ここでポーリングが役立ちます。pollOrderStatusTransition関数が何をするのか見てみましょう。

プライベートOrderStatus pollOrderStatusTransition(Integer orderId, OrderStatus orderStatus) { OrderStatus polledStatus =
megaBurgerOrderActivities.getOrderById(orderId).getStatus(); while (orderStatus.equals(polledStatus)) {
ワークフロー.スリープ(持続時間.秒数(30));
polledStatus = megaBurgerOrderActivities。
getOrderById(注文ID).getStatus();
}
polledStatus を返します。
}

左右にスワイプしてコード全体を表示します

これは、この記事で説明した他のポーリングループと非常によく似ています。唯一の違いは、注文ステータスが変化するまで待機状態を特定のポーリング状態に置き換えることです。同様に、IDで注文を取得するために使用される実際のAPI呼び出しは、再試行を可能にするアクティビティの背後に隠されています。注文が拒否された場合は、ランタイム例外が発生し、ワークフローは失敗します。注文が承認された場合は、メガバーガーの予想完了時間が親ワークフローに返されます(親ワークフローは、この予想完了時間を使用して配達をスケジュールします)。最終的に、注文が配達済みとマークされるまで、図3に示す状態が遷移します。

2. 通常の動作シナリオ

最後に、完全な注文シナリオを完成させましょう。

このシナリオは、サンプルプロジェクトのテストスイートの一部です。必要なのは、InstafoodサーバーとMegaBurgerサーバーの両方を同時に実行し、上記の手順に従うことだけです。

このテストケースでは、クライアントが Instafood を通じて MegaBurger の新しいビーガンバーガーを注文し、店舗で受け取る場合について説明します。

 cadence-cookbooks-instafood/instafood$ ./gradlew テスト

または、IDE で InstafoodApplicationTest を実行します。

クラス InstafoodApplicationTest {
// ...
@テスト
公共の空白
givenAnOrderItShouldBeSentToMegaBurgerAndBeDeliveredAccordingly() { FoodOrder order = new FoodOrder(Restaurant.MEGABURGER, “Vegan Burger”, 2, “+54 11 2343-2324”, “Díaz velez 433, La lucila”, true);
// クライアントが食べ物を注文する
ワークフロー実行 workflowExecution= WorkflowClient start(orderWorkflow::orderFood, order);
// 注文がメガバーガーの承諾待ちになるまで待機します。await().until(() -> OrderStatus.PENDING.equals(orderWorkflow.getStatus()));
// メガバーガーは注文を受け付け、到着予定時刻を送信します
megaBurgerOrdersApiClient.updateStatusAndEta(getLastOrderId(), “承認済み”, 15);
OrderStatus.ACCEPTED が orderWorkflow.getStatus() に等しいまで待機します。
// メガバーガーが調理注文を開始
megaBurgerOrdersApiClient.updateStatus(getLastOrderId(), “調理中”);
OrderStatus.COOKING.equals(orderWorkflow.getStatus()) が返されるまで待機します。
// メガバーガーは注文が準備できたことを知らせます
megaBurgerOrdersApiClient.updateStatus(getLastOrderId(), “READY”);
OrderStatus.READY が orderWorkflow.getStatus() と等しいまで待機します。
// メガバーガーは注文が受け取られたことを知らせる
megaBurgerOrdersApiClient.updateStatus(getLastOrderId(), “RESTAURANT_DELIVERED”);
await().until(() -> OrderStatus.RESTAURANT_DELIVERED.
equals(orderWorkflow.getStatus()));
ワークフロー履歴がイベントを処理するまで待機します ()。(() -> workflowHistoryHasEvent(workflowClient、workflowExecution、EventType.WorkflowExecutionCompleted)): }
}

左右にスワイプしてコード全体を表示します

このシナリオには、Instafood、MegaBurger、クライアントの 3 つの参加者がいます。

1. クライアントが Instafood に注文を送信します。

2. 注文が MegaBurger に到着すると (注文ステータスは PENDING)、MegaBurger はそれを ACCEPTED としてマークし、完了予定時刻を返します。

3. 次に、状態更新シーケンス全体を見てみましょう。

  • MegaBurger は注文を COOKING としてマークします。
  • MegaBurger は注文を「READY」(配達または受け取りの準備ができていることを意味します)としてマークします。
  • MegaBurger は注文を RESTAURANT_DELIVERED としてマークします。

4. 注文はピックアップ形式で配送されるため、顧客が配送を完了するとワークフロー全体が終了します。

要約 

この記事では、Cadenceを使用してポーリングを実装する方法を学びました。Instaclustrプラットフォーム上でCadenceクラスターを実行する方法と、アプリケーションから簡単に接続できることを実証しました。参考リンク:https://dzone.com/articles/how-to-use-open-source-cadence-for-polling

翻訳者紹介

Qiu Kaiは51CTOのコミュニティエディターであり、現在は北京ZJS Express Co., Ltd.で情報セキュリティエンジニアとして勤務しています。主な業務は、同社の情報セキュリティ計画と構築(レベル保護スキーム、ISO27001)で、日々の業務は主にセキュリティソリューションの開発と実装、社内セキュリティ監査とリスク評価、そして管理です。