|
分散トレースを導入する際に前述したように、非同期呼び出しではトレース情報が失われる可能性があります。最終的な解決策は、以下のように、対応するラッパークラスを使用してトレース情報を再ラップすることです。 - 実行可能ラッパー
- 呼び出し可能ラッパー
- サプライヤーラッパー
前述の通り、OpenFeignでは非同期リクエストが失われるという問題もあります。結局のところ、これらすべての問題はThreadLocalによって引き起こされます。 ThreadLocal は現在のスレッドに関する情報のみを保存できるため、親スレッドと子スレッド間の継承を実装することはできません。 これは InheritableThreadLocal を思い起こさせます。これは確かに親スレッドと子スレッド間でのローカル変数の受け渡しを可能にしますが... ただし、プログラムでスレッドプールを使用している場合、スレッドは再利用されます。この場合、スレッドプール内のスレッドは使用されるたびに作成されるわけではないため、親スレッドと子スレッド間でスレッドを受け渡すことができない可能性があります。InheritableThreadLocal は、スレッドの初期化時に intertableThreadLocals=true に設定されている場合にのみコピーされ、受け渡されます。 そのため、今回使用する子スレッドが既にプールされているスレッドである場合は、初期化処理を経ずにスレッドプールから取り出され、オフラインで使用されるため、親スレッドと子スレッドのローカル変数はコピーされません。 日常的なアプリケーションでは、スレッド プールは効果的なリソース管理のための最も一般的な方法です。 今日は、Alibaba の ThasmittableThreadLocal がスレッド プール内の親スレッドと子スレッド間でローカル変数を渡す問題をどのように解決するかについて説明します。 InheritableThreadLocalの問題ThasmittableThreadLocal を紹介する前に、次のコードに示すように、スレッド プールの InheritableThreadLocal に関する問題を見てみましょう。 @テスト パブリックvoidテスト( )は例外をスローします{ //シングルスレッドプール ExecutorService executorService = Executors .newSingleThreadExecutor ( ) ; //継承可能なスレッドローカルストレージ InheritableThreadLocal <文字列> username = new InheritableThreadLocal <> ( ) ; ( int i = 0 ; i < 10 ; i ++ )の場合{ username.set ( "WeChat公式アカウント:コードモンキーテクノロジーコラム—" + i ) ; スレッド.スリープ( 3000 ) ; CompletableFuture .runAsync ( ( ) -> System .out .println ( username .get ( ) ) , executorService ) ; } } 上記のコードは、単一のスレッドプールを作成し、ループ内で非同期呼び出しを行い、ユーザー名を出力します。コアスレッド数は1なので、スレッドの再利用は避けられません。 印刷された情報は次のとおりです。 WeChat公式アカウント:コードモンキーテクノロジーコラム—0 WeChat公式アカウント:コードモンキーテクノロジーコラム—0 WeChat公式アカウント:コードモンキーテクノロジーコラム—0 WeChat公式アカウント:コードモンキーテクノロジーコラム—0 WeChat公式アカウント:コードモンキーテクノロジーコラム—0 WeChat公式アカウント:コードモンキーテクノロジーコラム—0 WeChat公式アカウント:コードモンキーテクノロジーコラム—0 WeChat公式アカウント:コードモンキーテクノロジーコラム—0 WeChat公式アカウント:コードモンキーテクノロジーコラム—0 WeChat公式アカウント:コードモンキーテクノロジーコラム—0 わかりますか? 親スレッドと子スレッド間の変数の受け渡しはここでは実装されていません。これは InheritableThreadLocal の制限です。 TransmittableThreadLocalはTransmittableThreadLocal(TTL): スレッドプールやその他のプールされて再利用されるスレッド実行コンポーネントを使用する場合、非同期実行時のコンテキスト受け渡しの問題を解決するために ThreadLocal 値を渡す機能を提供します。 TransmittableThreadLocal ライブラリ全体のコア機能には、ユーザー API、フレームワーク/ミドルウェアとの統合 API、スレッド プール ExecutorService/ForkJoinPool/TimerTask とそのスレッド ファクトリ ラッパーが含まれます。 要件シナリオ:- 分散トレースシステムまたはエンドツーエンドの負荷テスト(リンクタグ付けなど)
- ログ収集記録システムコンテキスト
公式サイト: https://github.com/alibaba/transmittable-thread-local 上記の例を TransmittableThreadLocal で変更して、その効果を確認してみましょう。 まず、次のように対応する依存関係を含める必要があります。 <依存関係> <グループID> com.alibaba </グループID> <artifactId> 送信可能-スレッド- ローカル</artifactId> </依存関係> 変更されたコードは次のとおりです。 @テスト パブリックvoidテスト( )は例外をスローします{ //シングルスレッドプール ExecutorService executorService = Executors .newSingleThreadExecutor ( ) ; //スレッド プールは TtlExecutors を使用してラップする必要があります。 executorService = TtlExecutors .getTtlExecutorService ( executorService ) ; // TransmittableThreadLocal が作成されました TransmittableThreadLocal <文字列> username = new TransmittableThreadLocal <> ( ) ; ( int i = 0 ; i < 10 ; i ++ )の場合{ username.set ( "WeChat公式アカウント:コードモンキーテクノロジーコラム—" + i ) ; スレッド.スリープ( 3000 ) ; CompletableFuture .runAsync ( ( ) -> System .out .println ( username .get ( ) ) , executorService ) ; } } 以下のコードに示すように、スレッド プールは TtlExecutors を使用してラップする必要があることに注意することが重要です。 executorService = TtlExecutors .getTtlExecutorService ( executorService ) ; 実行結果は次のとおりです。 WeChat公式アカウント:コードモンキーテクノロジーコラム—0 WeChat公式アカウント:コードモンキーテクノロジーコラム - 1 WeChat公式アカウント:コードモンキーテクノロジーコラム - 2 WeChat公式アカウント:コードモンキーテクノロジーコラム - 3 WeChat公式アカウント:コードモンキーテクノロジーコラム—4 WeChat公式アカウント:コードモンキーテクノロジーコラム—5 WeChat公式アカウント:コードモンキーテクノロジーコラム—6 WeChat公式アカウント:コードモンキーテクノロジーコラム—7 WeChat公式アカウント:コードモンキーテクノロジーコラム—8 WeChat公式アカウント:コードモンキーテクノロジーコラム—9 ご覧のとおり、スレッド プール内の親スレッドと子スレッド間のデータ転送が正常に実装されています。 タスクが呼び出されるたびに、メインスレッドの現在のTTLデータが子スレッドにコピーされ、実行後にクリアされます。一方、子スレッドで行われた変更は、メインスレッドに返された時点では実際には有効になりません。これにより、各タスクの実行は独立して行われます。 シンプルなアプリケーションSpring Security では、ビジネス メソッドでいつでもユーザー情報を取得できるように、詳細なユーザー ログイン情報を保存する必要があることがよくあります。 実際、Request 内の情報の保存も ThreadLocal を介して行われます。非同期実行の場合は、再度転送する必要があり、コードが複雑になります。 TransmittableThreadLocal を理解したら、以下に示すように、これを使用してユーザーのログイン情報を保存できます。
パブリッククラス SecurityContextHolder {
// TTLを使用してID情報を保存します プライベート静的最終 TransmittableThreadLocal < LoginVal > THREAD_LOCAL =新しい TransmittableThreadLocal <> ( ) ;
パブリック静的void セット(ログイン値ログイン値) { THREAD_LOCAL .set ( loginVal ) ; }
パブリック静的LoginVal get ( ) { THREAD_LOCAL.get ( )を返します。 }
パブリック静的void削除( ) { THREAD_LOCAL.remove ( ) ; }
} MVC の 1 つのリクエストは 1 つのスレッドに対応するため、次のコードに示すように、インターセプターの TransmittableThreadLocal の情報を設定および削除するだけで済みます。
@成分 パブリッククラスAuthInterceptorはAsyncHandlerInterceptorを実装します{ @オーバーライド public boolean preHandle ( HttpServletRequest リクエスト、 HttpServletResponse レスポンス、オブジェクトハンドラー) { if ( ! (ハンドラーインスタンスHandlerMethod ) ) trueを返します。 //リクエストヘッダーから暗号化されたユーザー情報を取得する 文字列トークン= request .getHeader ( OAuthConstant .TOKEN_NAME ) ; if ( StrUtil .isBlank (トークン) ) trueを返します。 //復号化 文字列 json = Base64 .decodeStr (トークン) ; // JSONをLoginValに解析する ログイン値 loginVal = TokenUtils .parseJsonToLoginVal ( json ) ; //データをThreadLocalにカプセル化する SecurityContextHolder .set ( loginVal ) ; trueを返します。 }
@オーバーライド public void afterCompletion ( HttpServletRequest リクエスト、 HttpServletResponse レスポンス、オブジェクトハンドラー、例外 ex ) { SecurityContextHolder .remove ( ) ; } } 原理定義上、TransimittableThreadLocal は InheritableThreadLocal を継承し、コピーメソッドを1つだけ含む TtlCopier インターフェースを実装します。したがって、TransimittableThreadLocal は主に InheritableThreadLocal の拡張です。 パブリッククラス TransmittableThreadLocal < T >は InheritableThreadLocal < T >を拡張し、TtlCopier < T >を実装します。 TransimittableThreadLocal にホルダー属性を追加します。この属性は、スレッド渡し対象としてマークされたオブジェクトをこのオブジェクトに追加します。 クラスをマークする最も簡単な方法は、クラスに「Type」フィールドを追加することです。別の方法としては、その型のすべてのオブジェクトを静的グローバルコレクションに追加する方法があります。こうすることで、後で使用する際に、このコレクション内のすべての値にこのマーカーが付与されます。 // 1.ホルダー自体は InheritableThreadLocal オブジェクトです。 // 2.このホルダーオブジェクトの値は WeakHashMap < TransmittableThreadLocal <Object> , ? >です // 2.1 WeekHashMap の値は常に null であり、使用できません。 // 2.2 WeekHasshMap は value = nullをサポートします プライベート静的 InheritableThreadLocal < WeakHashMap < TransmittableThreadLocal < Object > 、 ? >>ホルダー=新しい InheritableThreadLocal < WeakHashMap < TransmittableThreadLocal < Object > 、 ? >> ( ) { @オーバーライド 保護された WeakHashMap < TransmittableThreadLocal < Object > 、 ? > initialValue ( ) { 新しい WeakHashMap < TransmittableThreadLocal < Object > 、 Object > ( )を返します。 }
@オーバーライド 保護された WeakHashMap < TransmittableThreadLocal < Object > 、 ? >子値( WeakHashMap < TransmittableThreadLocal < Object > 、 ? >親値) { 新しい WeakHashMap < TransmittableThreadLocal < Object > 、 Object > ( parentValue )を返します。 } } ; アプリケーションコードは、`TtlExecutors`ユーティリティクラスを使用してスレッドプールオブジェクトをラップします。このユーティリティクラスは、入力スレッドプールが既にラップされているかどうかを確認し、非nullチェックなどを実行し、ラッパークラス`ExecutorServiceTtlWrapper`を返します。ラッパークラスは、スレッドプールの種類によって異なります。 @Null可能 パブリック静的ExecutorService getTtlExecutorService ( @Nullable ExecutorService executorService ) { if ( TtlAgent .isTtlAgentLoaded ( ) || executorService == null || executorService instanceof TtlEnhanced ) { executorService を返します。 } 新しい ExecutorServiceTtlWrapper ( executorService )を返します。 } ラッパー クラス ExecutorServiceTtlWrapper に入ると、ExecutorServiceTtlWrapper#submit メソッドと ExecutorTtlWrapper#execute メソッドの両方がスレッド オブジェクトを TtlCallable または TtlRunnable にラップし、実際の実行メソッドが実行される前にビジネス ロジックを実行できることがわかります。
@非ヌル @オーバーライド パブリック< T > Future < T >送信( @NonNull 呼び出し可能< T >タスク) { executorService .submit ( TtlCallable .get ( task ) )を返します。 }
@オーバーライド パブリック void 実行( @NonNull 実行可能なコマンド) { executor .execute ( TtlRunnable .get (コマンド) ) ; } したがって、コアロジックは `TtlCallable#call()` または `TtlRunnable#run()` に配置する必要があります。以下の例では `TtlCallable` を使用していますが、同じロジックが `TtlRunnable` にも適用されます。`call()` メソッドを分析する前に、`Transmitter` というクラスを見てみましょう。 t クリア( ) { 最終的なHashMap < TransmittableThreadLocal < Object > 、 Object > ttl2Value = 新しい HashMap < TransmittableThreadLocal < Object > 、 Object > ( ) ;
最終的なHashMap < ThreadLocal < Object > 、 Object > threadLocal2Value = 新しい HashMap < ThreadLocal < Object > 、 Object > ( ) ; for ( Map .Entry < ThreadLocal < Object > 、 TtlCopier < Object >> entry : threadLocalHolder .entrySet ( ) ) { 最終的なThreadLocal <Object> threadLocal = entry .getKey ( ) ; threadLocal2Value を .put ( threadLocal 、 threadLocalClearMark ) します。 } replay (新しいスナップショット( ttl2Value 、 threadLocal2Value ) )を返します。 }
パブリック静的void復元( @NonNullオブジェクトのバックアップ) { 最終スナップショットのバックアップSnapshot = (スナップショット) backup ; restoreTtlValues (バックアップスナップショット.ttl2Value ) ; スレッドローカル値を復元します(バックアップスナップショット.threadLocal2Value ) 。 }
プライベート静的void restoreTtlValues ( @NonNull HashMap < TransmittableThreadLocal < Object > 、 Object >バックアップ) { //拡張ポイント、TTLのafterExecuteを呼び出す doExecuteCallback ( false ) ;
for (最終的な Iterator < TransmittableThreadLocal < Object >> iterator = holder.get ( ). keySet ( ) . iterator ( ) ; iterator.hasNext ( ) ; ) { TransmittableThreadLocal <オブジェクト> threadLocal = iterator .next ( ) ;
if ( ! backup .containsKey ( threadLocal ) ) { イテレータ.削除( ) ; スレッドローカル.superRemove ( ) ; } }
//ローカル変数をバックアップバージョンに復元する setTtlValuesTo (バックアップ) ; }
プライベート静的void setTtlValuesTo ( @NonNullハッシュマップ< TransmittableThreadLocal <オブジェクト> 、オブジェクト> ttlValues ) { for ( Map .Entry < TransmittableThreadLocal < Object > 、 Object > entry : ttlValues .entrySet ( ) ) { TransmittableThreadLocal <オブジェクト> threadLocal = entry.getKey ( ) ; threadLocal .set (エントリ.getValue ( ) ) ; } }
プライベート静的void restoreThreadLocalValues ( @NonNull HashMap < ThreadLocal < Object > 、 Object >バックアップ) { for ( Map .Entry < ThreadLocal < Object > 、 Object > entry : backup .entrySet ( ) ) { 最終的なThreadLocal <Object> threadLocal = entry .getKey ( ) ; threadLocal .set (エントリ.getValue ( ) ) ; } }
プライベート静的クラススナップショット{ 最終的な HashMap < TransmittableThreadLocal < Object > 、 Object > ttl2Value ; 最終的な HashMap < ThreadLocal < Object > 、 Object > threadLocal2Value ;
プライベートスナップショット( HashMap < TransmittableThreadLocal < Object > 、 Object > ttl2Value 、 ハッシュマップ< ThreadLocal <オブジェクト> 、オブジェクト> threadLocal2Value ) { this .ttl2Value = ttl2Value ; this .threadLocal2Value = threadLocal2Value ; } } TtlCallable#call() メソッドを入力します。 ここまでで、スレッドプールアプローチを用いてローカル変数を渡すためのコアコードの基本的な部分を説明しました。要約すると、TtlCallableオブジェクトが作成されると、capture()メソッドが呼び出され、呼び出し元のローカルスレッド変数が取得されます。call()の実行中、取得されたスレッド変数は、スレッドプールから取得した対応するスレッドのローカル変数と置き換えられます。実行後、ローカル変数は呼び出し前の状態に復元されます。 要約この記事では、AlibabaのオープンソースであるTransmittableThreadLocalを用いて、親スレッドと子スレッド間のデータ転送をエレガントに実装する方法を紹介します。TransmittableThreadLocalは様々な応用シナリオがあり、企業で広く利用されています。 |