背景ClickHouseは、世界中の開発者に広く利用されているオープンソースのOLAPエンジンであり、ByteDanceの様々なアプリケーションシナリオにも採用されています。高いパフォーマンスと分散型の性質を持つClickHouseは、大規模なデータ分析とクエリのニーズを満たすことができます。そこで、ByteDanceの研究開発チームは、オープンソースのClickHouseエンジンを基盤として構築されたクラウドネイティブなデータウェアハウス、ByteHouseを開発しました。 開発者は日々の業務において、ビジネスチェーンが長すぎるとプロセスの安定性とデータの一貫性を保証しにくくなるという問題にしばしば遭遇します。これは、分散型のクロスサービスシナリオではさらに顕著になります。この記事では、この問題の解決策として、Volcano Engine ByteHouse内に軽量なプロセスエンジンを構築し、データ一貫性の問題に対処する方法を提案します。 軽量ワークフローエンジンを使用することで、複雑なビジネスリンクのオーケストレーション問題を統一された標準を用いて解決できます。ビジネスコードの可読性と再利用性が向上するだけでなく、コアビジネスロジックの開発に集中できるようになり、プロセス全体の標準化と規制の強化につながります。 要約すると、プロセス エンジンを使用すると次の利点があります。 - 軽量、接続が簡単、メモリベースの操作、パフォーマンスが保証されています。
- 保守が容易で、プロセス構成はビジネス ロジックから分離されており、ホット アップデートをサポートします。
- 実行戦略と演算子の豊富なサポートを備え、拡張性が非常に高いです。
一般的な考え方写真 上の画像は、ByteHouse Enterprise Edition管理プラットフォームの機能アーキテクチャを示しています。このアーキテクチャからわかるように、ByteHouseのコア機能はClickHouseクラスターに依存しています。多数のクラスターノードと大規模なデータ計算量を伴うビジネスシナリオでは、ノードの状態の不整合が容易に発生する可能性があります。そのため、ClickHouseクラスター全体で状態の一貫性を確保することが、私たちの中核的な要件です。 写真 データの一貫性を確保するために、ByteHouse は次の機能を提供します。 - イベントエンジン: イベント処理センター
- ワークフローエンジン: 軽量ワークフローエンジン
- 和解システム
データの一貫性を確保する最も簡単な方法は、ステート マシンを使用してプロセスの実行を監視することです。 - まず、すべてのタスクリクエストがイベントエンジンに送信され、イベントエンジンはタスクを対応するハンドラーに分配して実行します。イベントエンジンは、送信されたすべてのタスクのライフサイクルを一元管理し、非同期リトライやロールバック補償などの機能を提供します。イベントエンジンへのトラフィックを統合することで、サービスのその後のビジネス拡張が容易になります。
- 次に、より複雑なタスクリクエストについては、ワークフローエンジンに送信して実行できます。ワークフローエンジンは、インスタンスの生成、タスクキューのオーケストレーション、プロセス実行インスタンスのライフサイクル管理、そして失敗時のロールバックと再試行を統合します。
- 最後に、調整サービスは、サービスが利用できないなどの特別なシナリオで生成されたダーティ データをカバーします。
写真 建築デザインプロセス監視のアーキテクチャ設計には、主に次のものが含まれます。 - プロセス管理層: 主にプロセス構成の解析と初期化、およびオーケストレーション戦略の完了を担当します。
- 戦略/動作レイヤー: 実行ノードを調整し、実行タスクをエグゼキュータに配布します。
- 実行者: 実行ノードの実行を管理する
- 実行ノード: ビジネス ロジックの特定の実装を担当します。
写真 実施計画実行ノード写真 ワークフローエンジンの中核は「責任の連鎖」であり、これはチェーン内のノードの順序に従ってすべてのタスクを順番に実行します。したがって、必要な3つの基本ユニットは次のとおりです。 - リクエスト: 入力パラメータ
- processlist: プロセス実行ノードのリスト
- レスポンス: 出力パラメータ
私たちの研究開発業務では、次のような問題に頻繁に遭遇します。 - 同時に問題が発生した場合、node1、node2、node3 間のデータのやり取りはどのように実現できますか?
- node1 の入力パラメータと出力パラメータが node2 および node3 と異なる場合、どのように処理すればよいでしょうか?
- 異なるパラメータタイプを持つノードを均一にスケジュールするにはどうすればよいですか?
最もシンプルなアプローチは、すべてのノードで同じコンテキスト情報を使用するようにし、実行ノード全体をテンプレート化することです。すべての実行ノードに同じ Delegation インターフェースを実装し、実行メソッドの入力パラメータとして同じ executionContext を統一的に使用します。 プロセス内のリクエストとレスポンスをexecutionContextに格納することで、各実行ノードがコンテキストを通じてレスポンスを操作できるようになります。 // Delegation - type Delegation interface { Execute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError TryExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError ConfirmExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError CancelExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError Code() string Type() value.DelegationType } 実行戦略最小の実行ノードを決定したら、ビジネスシナリオが必ずしもノードを順番に実行して結果を返すとは限らないことを考慮する必要があります。ジャンプ、ループ、並行実行は、プロセス実行中によく行われる操作です。異なるビジネスシナリオでの再利用性を考慮し、実行ノードの上に実行戦略レイヤーを追加し、動作戦略を用いて実行ノードをトリガーするタスクを再配置しました。 - 下の図では、プロセスを behavior1 と behavior2 に分割しており、それぞれが異なる戦略に対応しています。
- 戦略の簡単な例としては、順次実行、同時実行、ループ実行、条件付きジャンプ実行などがあります。
- 実際のビジネスニーズに合わせてカスタマイズできます。後ほど例を紹介します。
写真 // ActivityBehavior - type ActivityBehavior interface { Enter(ctx context.Context, executionContext ExecutionContextInterface, pvmActivity PvmActivity) apperror.AppError Execute(ctx context.Context, executionContext ExecutionContextInterface, pvmActivity PvmActivity) apperror.AppError Leave(ctx context.Context, executionContext ExecutionContextInterface, pvmActivity PvmActivity) apperror.AppError Code() value.ActivityBehaviorCode }
ポリシー動作には、Enter、Execute、Leave の 3 つのインターフェースが用意されています。Enter は実行ノードのタスクインスタンスの生成、Execute は実行タスクインスタンス操作のオーケストレーションとトリガー、Leave は次の動作へのジャンプを担当します。 動作戦略のジャンプメソッドはリンクリストに似ており、次のメソッドを継続的に実行していることがわかります。そのため、コーディングプロセスではスタックオーバーフローを防ぐために無限ループを避けるように注意する必要があります。 執行者Executorの主な機能は、実行戦略と実行ノードを接続することです。戦略/動作は実行すべきコマンドをExecutorに送信し、Executorは実行ノードで操作をトリガーします。ここでは、実行ノードの種類に基づいて、TCC(Track-Compulsory Control)を含む3つの実行方法のいずれかにマッピングされます。TCCは、一度実行して複数回再試行します。 // DelegationExecutor - type DelegationExecutor interface { execute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError postExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError } func (de *DefaultDelegationExecutor) execute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError { delegationCode := executionContext.GetExecutionInstance().GetDelegationCode() if len(delegationCode) == 0 || de.DelegationMap[delegationCode] == nil { logger.Info(ctx, "DefaultDelegationExecutor delegation code not found,use default delegation", zap.String("delegationCode", delegationCode)) delegationCode = string(value.DefaultDelegation) executionContext.GetExecutionInstance().SetDelegationCode(delegationCode) } return de.dumpExecute(ctx, executionContext, delegationCode) } func (de *DefaultDelegationExecutor) dumpExecute(ctx context.Context, executionContext ExecutionContextInterface, delegationCode string) apperror.AppError { FireEvent(ctx, executionContext, value.ExecutionStart) var err apperror.AppError delegation := de.DelegationMap[delegationCode] switch delegation.Type() { case value.TccDelegation: err = tccExecute(ctx, executionContext, delegation) case value.SingleDelegation: err = singleExecute(ctx, executionContext, delegation) case value.RetryDelegation: err = retryExecute(ctx, executionContext, delegation) } if err != nil { logger.Error(ctx, "delegation.Execute_err", zap.Error(err)) return apperror.Trace(err) } FireEvent(ctx, executionContext, value.ExecutionEnd) return nil } 実行コンテキストExecutionContext は、次のようなプロセス実行の詳細をすべて記録するために使用されます。 - ProcessEngineConfigurationInterface: プロセス定義情報
- ExecutionInstanceInterface: 実行ノードインスタンス
- ActivityInstanceInterface: 実行戦略インスタンス
- ProcessInstanceInterface: プロセスインスタンス
- リクエスト: 入力パラメータ
- レスポンス: 戻り値
プロセス全体の安定性を確保するため、レスポンス以外のインスタンスパラメータのインターフェースを公開することは推奨されません。レスポンスは、プロセスインスタンスの実行中に生成される変数情報を保存するために使用できます。 `ProcessEngineConfiguration` を使用してプロセス全体を定義するには、最も簡単な方法、つまり設定情報をデータベース内の JSON 文字列にマッピングする方法を選択できます。あるいは、読みやすくデータが失われない限り、設定ファイルから読み取る方法も選択できます。 // ExecutionContextInterface - type ExecutionContextInterface interface { GetProcessEngineConfiguration() ProcessEngineConfigurationInterface SetProcessEngineConfiguration(processEngineConfiguration ProcessEngineConfigurationInterface) GetExecutionInstance() instance.ExecutionInstanceInterface SetExecutionInstance(executionInstance instance.ExecutionInstanceInterface) GetActivityInstance() instance.ActivityInstanceInterface SetActivityInstance(activityInstance instance.ActivityInstanceInterface) GetProcessInstance() instance.ProcessInstanceInterface SetProcessInstance(processInstance instance.ProcessInstanceInterface) SetNeedPause(needPause bool) IsNeedPause() bool SetActivityIndex(activityIndex int) GetActivityIndex() int SetActivityBehaviorCode(activityBehaviorCode value.ActivityBehaviorCode) GetActivityBehaviorCode() value.ActivityBehaviorCode SetBizUniqueKey(bizUniqueKey string) GetBizUniqueKey() string GetRequest() map[string]interface{} SetRequest(request map[string]interface{}) GetResponse() map[string]string SetResponse(response map[string]string) AtomicAddResponse(key string, value string) } リスナーリスナーの主な機能は、プロセス実行中に重要なパラメータを監視することです。上記のエグゼキュータインターフェースに示されているように、`fireEvent` はイベントメッセージを送信し、リスナーが対応するイベントタイプを検出してカスタマイズされたアクションを実行できるようにします。 アスペクト指向プログラミングと同様に、ログ記録、ノード実行時間の監視、プロセスで生成された応答情報の永続化、トレースの追加など、実行ノードの前後にカスタマイズされたロジックを追加できます。 API写真 最後に、上記のコンテンツを組み合わせて、3 つの主要なインターフェースを提供します。 - 開始: 起動プロセス
- シグナル: 一時停止または異常終了の後にプロセスの実行を再開します。
- 中止: プロセスを強制的に中断する
process start(){ //1.get and create ProcessEngineConfigurationInterface 解析流程定义//2.create processInstance 创建流程实例//3.create ExecutionContext 创建执行上下文//4. lockstrategy trylock //5. invoke process start processinstance.start() //6. persist processInstance and return //7. lockstrategy unlock } processinstance start(){ // get behavior // behavior enter behavior.Enter(ctx, executionContext) //behavior execute behavior.Execute(ctx, executionContext) //behavior leave behavior.Leave(ctx, executionContext) }
start と比較すると、signal は実行の詳細を読み取り、以前に失敗した実行ノードの場所を見つけ、それをコンテキストにロードしてから、実行を続行する必要があります。 障害が発生したノードに関する情報を保持するには、2つの方法があります。1つ目は、プロセス実行の終了時に保持する方法です。2つ目は、リスナーを使用して各実行ノードで保持する方法です。どちらの方法を選択するかは、具体的なビジネスシナリオにおけるパフォーマンスとデータの一貫性の要件に基づいて決定する必要があります。 同時実行シナリオ- 動作戦略には、カスタマイズ、並行性、そして異なるシナリオにおける複数の実行ノードの処理が不可欠です。同時に変更を加えると、必然的にデータ破損が発生します。シンプルなアプローチとしては、変更可能な情報(レスポンス)をロックされたコンテナに保存する方法があります。ここでは、github.com/bytedance/gopkg パッケージにカプセル化された skipmap パッケージを使用しています。
- LockStrategyを使用すると、ビジネスシナリオに最適な戦略を定義できます。最もシンプルなソリューションはRedisのロックですが、システム異常終了時のリカバリも考慮する必要があります。特殊なケースにおけるロック例外の解決方法については、Redisの公式ドキュメント(https://redis.io/commands/setnx/)をご参照ください。
フォローアップ作業軽量ワークフローエンジンの基本機能が実装されました。今後の拡張と最適化では、以下の方向性に焦点を当てます。 - グラフィカル インターフェイスでプロセスの実行ステータスを表示できます。
- さまざまなビジネスシナリオに適応するために戦略行動の次元を拡張する
- サブプロセス ディメンションを追加すると、元の実行ロジックを再利用できます。
デモ例以下はプロセス構成に関する簡単な設定情報です。ここでは、DefaultBehavior、つまり同期シーケンシャル実行戦略を使用しています。 { "ProcessContentList":[ { "Behavior":"DefaultBehavior", "DelegationList":[ { "Code":"sample1" }, { "Code":"sample2" }, { "Code":"sample3" } ] }, { "Behavior":"DefaultBehavior", "DelegationList":[ { "Code":"sample4" }, { "Code":"sample5" } ] } ] } 写真 リスナーにログを追加すると、実行プロセス全体を追跡し、実行ステータスをより適切に監視できるようになります。 実用ClickHouse クラスターのスケールダウンを例に挙げます。 写真 { "ProcessContentList":[ // 查询所有需要重分布的table { "Behavior":"DefaultBehavior",// 顺序执行"DelegationList":[ { "Code":"hor_reshard_table_loop" } ] }, // 遍历所有table进行数据的重分布{ "LoopKey":"reshard_table_loop_key", "Behavior":"NonBlockLoopBehavior",// 非阻塞循环处理"DelegationList":[ { "Code":"hor_reshard_table" } ] }, // 进行删除节点操作{ "Behavior":"DefaultBehavior", "DelegationList":[ { "Code":"hor_start_remove_node" }, { "Code":"hor_prepare_node_vcloud", "PostCode":"hor_rollback_remove_node_vcloud"// 统一失败回滚处理}, { "Code":"hor_update_config_vcloud", "PostCode":"hor_rollback_remove_node_vcloud" }, { "Code":"hor_set_cluster_running", "PostCode":"hor_rollback_remove_node_vcloud" }, { "Code":"hor_release_node" }, { "Code":"hor_callback_bill" } ] } ] }
要約複雑なソリューション設計を受け入れない限り、単一のワークフローエンジンであらゆるビジネスシナリオに対応することはほぼ不可能です。また、サードパーティ製のワークフローエンジンは、日常的なビジネス開発には扱いにくいものです。一方、軽量のワークフローエンジンは、統合プロセスを簡素化し、過剰なHTTPリクエストによるパフォーマンスのオーバーヘッドを軽減し、柔軟性と適応性が高く、トラブルシューティングも容易になります。 ByteHouseにワークフローエンジンを追加することで、企業はロールバックを繰り返すことなく、より低コストでより多くの再試行が可能になります。これは特に時間のかかるタスクに有効であり、ユーザーエクスペリエンスの向上につながります。さらに、ワークフローエンジンはビジネスプロセスをテンプレート化できるため、APIサービスの再利用性が向上し、ビジネスコードの可読性と拡張性が向上し、将来のメンテナンスが容易になります。 Volcano Engineを基盤とするクラウドネイティブ・データウェアハウスであるByteHouseは、リアルタイムデータ分析と大規模オフラインデータ分析をサポートする高速分析エクスペリエンスを提供します。また、利便性の高い柔軟なスケーリング機能、卓越した分析性能、そして豊富なエンタープライズレベルの機能を備え、お客様のデジタルトランスフォーメーションを支援します。 |