DUICUO

ZKクライアントキュレーターの使い方の詳細な説明

ZooKeeperは高可用性を目的として設計されたものではありませんが、ZABプロトコルを用いて極めて高い一貫性を実現するCP(Consistency and Availability:一貫性と可用性)システムです。そのため、レジストリセンター、構成センター、分散ロックなどのシナリオでよく採用されています。

[[323374]]

パフォーマンスは非常に制限されており、API もあまりユーザーフレンドリーではありません。xjjdog は、Raft プロトコルに基づいており、より軽量な Etcd または Consul の使用を好みます。

CuratorはNetflixが提供するオープンソースのZooKeeperクライアントであり、現在はApacheのトップレベルプロジェクトです。ネイティブZooKeeperクライアントと比較して、Curatorは抽象度が高く、ZooKeeperクライアント開発を簡素化します。Curatorは、接続の再接続、繰り返しのウェイター登録、NodeExistsException例外処理など、ZooKeeperクライアントの低レベル開発における多くの問題を解決します。

Curatorは一連のモジュールで構成されています。一般的な開発者にとって最もよく使用されるのはcurator-frameworkとcurator-recipesです。これらについては以下で順に紹介します。

1. Mavenの依存関係

Curator の最新バージョン 4.3.0 は、ZooKeeper 3.4.x および 3.5 をサポートしています。ただし、Curator に渡す依存関係は、実際のサーバーで使用されているバージョンと一致している必要があることに注意してください。例えば、現在 ZooKeeper 3.4.6 を使用しています。

  1. <依存関係>
  2. <グループID>org.apache.curator</グループID>
  3. <artifactId>キュレーターフレームワーク</artifactId>
  4. <バージョン>4.3.0</バージョン>
  5. <除外>
  6. <除外>
  7. <グループID>org.apache.zookeeper</グループID>
  8. <artifactId>動物園の飼育員</artifactId>
  9. </除外>
  10. </除外>
  11. </依存関係>
  12. <依存関係>
  13. <グループID>org.apache.curator</グループID>
  14. <artifactId>キュレーターレシピ</artifactId>
  15. <バージョン>4.3.0</バージョン>
  16. <除外>
  17. <除外>
  18. <グループID>org.apache.zookeeper</グループID>
  19. <artifactId>動物園の飼育員</artifactId>
  20. </除外>
  21. </除外>
  22. </依存関係>
  23. <依存関係>
  24. <グループID>org.apache.zookeeper</グループID>
  25. <artifactId>動物園の飼育員</artifactId>
  26. <バージョン>3.4.6</バージョン>
  27. </依存関係>

2.キュレーターフレームワーク

以下は、zk 関連の操作のうち一般的なものの一部です。

  1. 公共 静的キュレーターフレームワーク getClient() {
  2. CuratorFrameworkFactory.builder()を返す
  3. .connectString( "127.0.0.1:2181" )
  4. .retryPolicy(新しいExponentialBackoffRetry(1000, 3))
  5. .connectionTimeoutMs(15 * 1000) // 接続タイムアウト、デフォルトは15秒
  6. .sessionTimeoutMs(60 * 1000) // セッションタイムアウト、デフォルトは60秒
  7. .namespace( "arch" ) // 名前空間を設定する
  8. 。建てる();
  9. }
  10.   
  11. 公共  static void create (final CuratorFramework client, final String path, final byte[] payload) throws Exception {
  12. client.create ().creatingParentsIfNeeded().forPath(パス、ペイロード);
  13. }
  14.   
  15. 公共  static void createEphemeral(final CuratorFramework client, final String path, final byte[] payload) throws Exception {
  16. client.create ().withMode(CreateMode.EPHEMERAL).forPath(パス、ペイロード);
  17. }
  18.   
  19. 公共  static String createEphemeralSequential(final CuratorFramework client, final String path, final byte[] payload) throws Exception {
  20. client.create (). withProtection ().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, payload);を返します
  21. }
  22.   
  23. 公共  static void setData(final CuratorFramework client, final String path, final byte[] payload) throws Exception {
  24. client.setData().forPath(パス、ペイロード);
  25. }
  26.   
  27. 公共  static void delete (final CuratorFramework client, final String path) 例外をスローします {
  28. client.delete() .deletingChildrenIfNeeded ().forPath(path);
  29. }
  30.   
  31. 公共  static void guaranteedDelete(final CuratorFramework client, final String path) 例外をスローします {
  32. client.delete ().guaranteed().forPath(path);
  33. }
  34.   
  35. 公共  static String getData(final CuratorFramework client, final String path) は例外をスローします {
  36. 新しい文字列を返します(client.getData().forPath(path));
  37. }
  38.   
  39. 公共  static List<String> getChildren(final CuratorFramework client, final String path) throws Exception {
  40. client.getChildren().forPath(path)を返します
  41. }

3.キュレーターレシピキュレーターレシピ

このセクションでは、ZooKeeper の典型的なユースケースをいくつか紹介します。次のセクションでは、主に開発でよく使用されるコンポーネントを紹介します。

イベントリスナー

ZooKeeper はウォッチャーを登録することでイベント リスニングをネイティブにサポートしていますが、特に使い勝手が良いわけではなく、開発者がウォッチャーを繰り返し登録する必要があり、非常に面倒です。

Curatorは、ZooKeeperサーバー上のトランザクションを監視するためのCacheを導入します。CacheはCuratorのイベントリスナーのラッパーであり、イベント監視はローカルのキャッシュビューとリモートのZooKeeperビューの比較と近似できます。さらに、Curatorは開発者のためにリスナーの繰り返し登録を自動的に処理するため、ネイティブAPI開発の煩雑なプロセスを大幅に簡素化します。

1) ノードキャッシュ

  1. 公共 静的void nodeCache()は例外をスローします{
  2. 最終的な文字列パス = "/nodeCache" ;
  3. 最終的な CuratorFramework クライアント = getClient();
  4. クライアントを起動します。
  5.   
  6. 削除(クライアント、パス);
  7. 作成(クライアント、パス、 「キャッシュ」 .getBytes());
  8.   
  9. 最終的な NodeCache nodeCache = 新しい NodeCache(client, path);
  10. nodeCache.start( true );
  11. nodeCache.getListenable()
  12. .addListener(() -> System.out .println ( "ノードデータが変更されました。新しいデータは " + new String(nodeCache.getCurrentData().getData())));
  13.   
  14. setData(クライアント、パス、 「cache1」 .getBytes());
  15. setData(クライアント、パス、 「cache2」 .getBytes());
  16.   
  17. スレッド.スリープ(1000);
  18.   
  19. クライアントを閉じます();
  20. }

NodeCache は指定されたノードをリッスンできます。リスナーを登録すると、ノードに変更があった場合、対応するリスナーに通知されます。

2) パスキャッシュ

パスキャッシュは、ZNode の子ノードのイベント(追加、更新、削除など)をリッスンするために使用されます。パスキャッシュは子ノードの状態を同期し、生成されたイベントは登録された PathChildrenCacheListener に渡されます。

  1. 公共 静的void pathChildrenCache()は例外をスローします{
  2. 最終的な文字列パス = "/pathChildrenCache" ;
  3. 最終的な CuratorFramework クライアント = getClient();
  4. クライアントを起動します。
  5.   
  6. 最終的な PathChildrenCache キャッシュ = 新しい PathChildrenCache(client, path, true );
  7. キャッシュを開始します(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
  8. cache.getListenable().addListener((client1, イベント) -> {
  9. スイッチ (event.getType()) {
  10. CHILD_ADDEDの場合:
  11. システム.out.println ( "CHILD_ADDED:" + event.getData().getPath());
  12. 壊す
  13. CHILD_REMOVEDの場合:
  14. システム.out.println ( "CHILD_REMOVED:" + event.getData().getPath());
  15. 壊す
  16. CHILD_UPDATEDの場合:
  17. システム.out.println ( "CHILD_UPDATED:" + event.getData().getPath());
  18. 壊す
  19. CONNECTION_LOSTの場合:
  20. システム.out.println ( "CONNECTION_LOST:" + event.getData().getPath());
  21. 壊す
  22. CONNECTION_RECONNECTEDの場合:
  23. システム.out.println ( "CONNECTION_RECONNECTED:" + event.getData().getPath());
  24. 壊す
  25. CONNECTION_SUSPENDEDの場合:
  26. システム.out.println ( "CONNECTION_SUSPENDED:" + event.getData().getPath());
  27. 壊す
  28. ケース初期化済み:
  29. システム.out.println ( "初期化されました:" + event.getData().getPath());
  30. 壊す
  31. デフォルト
  32. 壊す
  33. }
  34. });
  35.   
  36. // client.create ().withMode(CreateMode.PERSISTENT).forPath(path);
  37. スレッド.スリープ(1000);
  38.   
  39. client.create ().withMode(CreateMode.PERSISTENT).forPath(パス + "/c1" );
  40. スレッド.スリープ(1000);
  41.   
  42. client.delete ().forPath(パス + "/c1" );
  43. スレッド.スリープ(1000);
  44.   
  45. client.delete ().forPath(path); // ノード自体の変更を監視しても通知はトリガーされません
  46. スレッド.スリープ(1000);
  47.   
  48. クライアントを閉じます();
  49. }

3) ツリーキャッシュ

パス キャッシュとノード キャッシュを組み合わせたもので、パスの下の作成、更新、削除イベントを監視し、パスの下のすべての子ノードのデータをキャッシュします。

  1. 公共  static void treeCache() は例外をスローします {
  2. 最終的な文字列パス = "/treeChildrenCache" ;
  3. 最終的な CuratorFramework クライアント = getClient();
  4. クライアントを起動します。
  5.   
  6. 最終的な TreeCache キャッシュ = 新しい TreeCache(クライアント、パス);
  7. キャッシュを開始します。
  8.   
  9. cache.getListenable().addListener((client1, イベント) -> {
  10. スイッチ (event.getType()){
  11. NODE_ADDEDの場合:
  12. システム.out.println ( "NODE_ADDED:" + event.getData().getPath());
  13. 壊す
  14. NODE_REMOVEDの場合:
  15. システム.out.println ( "NODE_REMOVED:" + event.getData().getPath());
  16. 壊す
  17. NODE_UPDATEDの場合:
  18. システム.out.println ( "NODE_UPDATED:" + event.getData().getPath());
  19. 壊す
  20. CONNECTION_LOSTの場合:
  21. システム.out.println ( "CONNECTION_LOST:" + event.getData().getPath());
  22. 壊す
  23. CONNECTION_RECONNECTEDの場合:
  24. システム.out.println ( "CONNECTION_RECONNECTED:" + event.getData().getPath());
  25. 壊す
  26. CONNECTION_SUSPENDEDの場合:
  27. システム.out.println ( "CONNECTION_SUSPENDED:" + event.getData().getPath());
  28. 壊す
  29. ケース初期化済み:
  30. システム.out.println ( "初期化されました:" + event.getData().getPath());
  31. 壊す
  32. デフォルト
  33. 壊す
  34. }
  35. });
  36.   
  37. client.create ().withMode(CreateMode.PERSISTENT).forPath(path);
  38. スレッド.スリープ(1000);
  39.   
  40. client.create ().withMode(CreateMode.PERSISTENT).forPath(パス + "/c1" );
  41. スレッド.スリープ(1000);
  42.   
  43. setData(クライアント、パス、 「テスト」 .getBytes());
  44. スレッド.スリープ(1000);
  45.   
  46. client.delete ().forPath(パス + "/c1" );
  47. スレッド.スリープ(1000);
  48.   
  49. client.delete ().forPath(パス);
  50. スレッド.スリープ(1000);
  51.   
  52. クライアントを閉じます();
  53. }

選挙

キュレーターは、リーダー ラッチとリーダー選出の 2 つの方法を提供します。

1) リーダーラッチ

リーダーは候補者の中からランダムに選出されます。一度選出されると、`close()` を呼び出してリーダーを解放しない限り、他の候補者がリーダーになることはできません。

  1. パブリッククラス LeaderLatchTest {
  2.   
  3. プライベート静的最終文字列 PATH = "/demo/leader" ;
  4.   
  5. 公共 静的void main(String[] args) {
  6. リスト<LeaderLatch> latchList = 新しいArrayList<>();
  7. リスト<CuratorFramework> クライアント = 新しい ArrayList<>();
  8. 試す {
  9. ( int i = 0; i < 10; i++)の場合{
  10. CuratorFramework クライアント = getClient();
  11. クライアントを起動します。
  12. clients.add (クライアント);
  13.   
  14. 最終的なリーダーラッチ leaderLatch = 新しいリーダーラッチ(クライアント、PATH、 "クライアント#" + i);
  15. リーダーラッチ.addListener(新しいリーダーラッチリスナー() {
  16. @オーバーライド
  17. パブリックvoid isLeader() {
  18. System.out.println (leaderLatch.getId() + ":私はリーダーです。仕事をしています!" );
  19. }
  20.   
  21. @オーバーライド
  22. パブリックvoid notLeader() {
  23. System.out.println (leaderLatch.getId() + ":私はリーダーではありません。何もしません!" );
  24. }
  25. });
  26. ラッチリストにリーダーラッチを追加します。
  27. リーダーラッチの開始();
  28. }
  29. スレッド.スリープ(1000 * 60);
  30. } キャッチ (例外 e) {
  31. e.printStackTrace();
  32. ついに {
  33. CuratorFrameworkクライアントの場合:クライアント{
  34. CloseableUtils.closeQuietly(クライアント);
  35. }
  36.   
  37. (リーダーラッチ リーダーラッチ: ラッチリスト)の場合{
  38. CloseableUtils.closeQuietly(リーダーラッチ);
  39. }
  40. }
  41. }
  42.   
  43. 公共 静的キュレーターフレームワーク getClient() {
  44. CuratorFrameworkFactory.builder()を返す
  45. .connectString( "127.0.0.1:2181" )
  46. .retryPolicy(新しいExponentialBackoffRetry(1000, 3))
  47. .connectionTimeoutMs(15 * 1000) // 接続タイムアウト、デフォルトは15秒
  48. .sessionTimeoutMs(60 * 1000) // セッションタイムアウト、デフォルトは60秒
  49. .namespace( "arch" ) // 名前空間を設定する
  50. 。建てる();
  51. }
  52.   
  53. }

2) リーダー選挙

LeaderSelectorListener はリーダーシップの制御を可能にし、適切なタイミングでリーダーシップを解放することで、各ノードにリーダーシップを獲得する機会を与えます。一方、LeaderLatch はリーダーシップを無期限に保持し、close メソッドが呼び出されない限り、リーダーシップを放棄しません。

  1. パブリッククラス LeaderSelectorTest {
  2. プライベート静的最終文字列 PATH = "/demo/leader" ;
  3.   
  4. 公共 静的void main(String[] args) {
  5. List<LeaderSelector> セレクター = 新しい ArrayList<>();
  6. リスト<CuratorFramework> クライアント = 新しい ArrayList<>();
  7. 試す {
  8. ( int i = 0; i < 10; i++)の場合{
  9. CuratorFramework クライアント = getClient();
  10. クライアントを起動します。
  11. clients.add (クライアント);
  12.   
  13. 最終的な文字列= "client#" + i;
  14. LeaderSelector leaderSelector = 新しいLeaderSelector(client, PATH, 新しいLeaderSelectorListenerAdapter() {
  15. @オーバーライド
  16. public void takeLeadership(CuratorFramework client) は例外をスローします {
  17. システム.out.println ( name + ":私はリーダーです。" );
  18. スレッド.sleep(2000);
  19. }
  20. });
  21.   
  22. リーダーセレクターの自動再キュー();
  23. リーダーセレクターの開始();
  24. セレクターを追加します(リーダーセレクター)。
  25. }
  26. Thread.sleep(整数.MAX_VALUE);
  27. } キャッチ (例外 e) {
  28. e.printStackTrace();
  29. ついに {
  30. CuratorFrameworkクライアントの場合:クライアント{
  31. CloseableUtils.closeQuietly(クライアント);
  32. }
  33.   
  34. for (LeaderSelector セレクター: セレクター) {
  35. CloseableUtils.closeQuietly(セレクター);
  36. }
  37.   
  38. }
  39. }
  40.   
  41. 公共 静的キュレーターフレームワーク getClient() {
  42. CuratorFrameworkFactory.builder()を返す
  43. .connectString( "127.0.0.1:2181" )
  44. .retryPolicy(新しいExponentialBackoffRetry(1000, 3))
  45. .connectionTimeoutMs(15 * 1000) // 接続タイムアウト、デフォルトは15秒
  46. .sessionTimeoutMs(60 * 1000) // セッションタイムアウト、デフォルトは60秒
  47. .namespace( "arch" ) // 名前空間を設定する
  48. 。建てる();
  49. }
  50.   
  51. }

分散ロック

1) 共有再入ロック

Shared は、ロックがグローバルに参照可能であり、どのクライアントでもロックを要求できることを意味します。Reentrant は JDK の ReentrantLock に似ており、単一のクライアントがロックを保持したまま、ブロックされることなく複数回ロックを取得できることを意味します。これは InterProcessMutex クラスによって実装されています。そのコンストラクタは次のとおりです。

  1. パブリックInterProcessMutex(CuratorFramework クライアント、文字列パス)

acquire を使用してロックを取得し、タイムアウト メカニズムを提供します。

  1. /**
  2. * ミューテックスを取得する - 利用可能になるまでブロックする。注意: 同じスレッドが acquire を呼び出すこともできる。
  3. * 再入可能。各acquire () 呼び出しはrelease ()呼び出しバランスをとる必要があります。
  4. /
  5. パブリックvoid 取得();
  6.   
  7. /**
  8. * ミューテックスを取得します - ミューテックスが利用可能になるか、指定された時間が経過するまでブロックします。注: 同じスレッドが
  9. * acquireを再呼び出します。   trueはrelease()呼び出しによってバランスをとる必要がある
  10. * パラメータ:
  11. *時間-時間 待つ
  12. * 単位 -時間単位
  13. *返品:
  14. *ミューテックスが取得された場合はtrueそうでない場合はfalse  
  15. /
  16. public boolean acquire(long time , TimeUnit unit);

`release()` メソッドを使用してロックを解除します。`InterProcessMutex` インスタンスは再利用できます。ZooKeeper レシピ wiki では、ネゴシエート可能な取り消しメカニズムが定義されています。ミューテックスを取り消すには、次のメソッドを呼び出します。

  1. /**
  2. * ロックを取り消せるようにします。別のプロセスまたはスレッドがロックの解除を要求したときに、リスナーが呼び出されます。
  3. * パラメータ:
  4. * リスナー - リスナー
  5. /
  6. パブリックvoid makeRevocable(RevocationListener<T> リスナー)

2) 非再入可能共有ロック

InterProcessSemaphoreMutex を使用する場合、呼び出し方法は同様ですが、このロックは再入不可能である点が異なります。つまり、同じスレッド内で再入することはできません。

3) 共有再入可能読み取り書き込みロック

JDKのReentrantReadWriteLockと同様に、読み取り/書き込みロックは関連する2つのロックを管理します。1つは読み取り操作を処理し、もう1つは書き込み操作を処理します。書き込みロックが使用されていない間は、複数のプロセスが同時に読み取り操作を使用できますが、書き込みロックが使用されている間は読み取りは許可されません(ブロッキングされます)。このロックは再入可能です。書き込みロックを保持しているスレッドは読み取りロックに再入できますが、読み取りロックは書き込みロックに再入できません。これはまた、書き込みロックを読み取りロックにダウングレードできることも意味します。例えば、書き込みロックを要求 -> ロックを読み取り -> 書き込みロックを解放する、といった具合です。読み取りロックから書き込みロックへのアップグレードは不可能です。これは主に2つのクラスによって実装されます。

  1. プロセス間読み取り書き込みロック
  2. インタープロセスロック

4) 共有セマフォ

カウンティングセマフォはJDKセマフォに似ています。JDKセマフォは許可セットを保持しますが、Cubatorではこれをリースと呼びます。すべてのインスタンスで同じ`numberOfLeases`値を使用する必要があることに注意してください。`acquire`を呼び出すとリースオブジェクトが返されます。クライアントは`finally`ブロックでこれらのリースオブジェクトを閉じる必要があります。閉じないとリースは失われます。ただし、クラッシュなどの何らかの理由でクライアントのセッションが失われた場合、そのクライアントが保持していたリースは自動的に閉じられ、他のクライアントが引き続き使用できるようになります。リースは、以下の方法でも返されます。

  1. パブリックvoid returnAll(コレクション<リース> リース)
  2. public void returnLease(リース リース)

複数のリースを一度に要求できることに注意してください。セマフォに十分なリースがない場合、要求スレッドはブロックされます。タイムアウトオーバーロードメソッドも提供されています。

  1. パブリックリース取得()
  2. パブリックコレクション<リース> 取得( int数量)
  3. 公的リース取得(長期 TimeUnit単位)
  4. パブリックコレクション<リース> 取得( int数量, long時間, TimeUnit 単位)

主なカテゴリは次のとおりです。

  1. インタープロセスセマフォV2
  2. リース
  3. 共有カウントリーダー

5) マルチ共有ロック

マルチ共有ロックはロックのコンテナです。`acquire` が呼び出されると、すべてのロックが取得されます。取得に失敗した場合は、すべてのロックが解放されます。同様に、`release` が呼び出されると、すべてのロックが解放されます(失敗は無視されます)。基本的に、これはロックのグループを表します。このロックに対するリクエストまたは解放操作は、その中のすべてのロックに渡されます。主に2つのクラスが関係します。

  1. インタープロセスマルチロック
  2. インタープロセスロック

そのコンストラクターには、ロックのセットまたは ZooKeeper パスのセットのいずれかが必要です。

  1. パブリックInterProcessMultiLock(List<InterProcessLock> ロック)
  2. パブリックInterProcessMultiLock(CuratorFramework クライアント、List<String> パス)

フェンス

DistributedBarrierコンストラクタでは、barrierPathパラメータを使用してフェンスを識別します。同じbarrierPathパラメータ(同じパス)を持つフェンスは、同一のフェンスとみなされます。通常、フェンスは以下のように使用されます。

1. 主導クライアントがフェンスを設置します。

2. 他のクライアントは、バリアが削除されるまで待機するために waitOnBarrier() を呼び出し、プログラムの処理スレッドをブロックします。

3. 先頭のクライアントがバリアを取り除くと、他のクライアントのハンドラーは同時に実行を続けます。

DistributedBarrier クラスの主なメソッドは次のとおりです。

setBarrier() - バリアを設定する

waitOnBarrier() - バリアが削除されるまで待機します

removeBarrier() - バリアを削除します

2) 二重の障壁

二重バリアは、クライアントが計算の開始時と終了時に同期することを可能にします。プロセスは、十分な数のプロセスが二重バリアに加わると計算を開始し、計算が完了するとバリアを離れます。二重バリアクラスは「DistributedDoubleBarrier」です。このクラスは二重バリア機能を実装します。そのコンストラクタは次のとおりです。

  1. // クライアント - クライアント
  2. // barrierPath -使用するパス
  3. // memberQty -バリア内のメンバー
  4. パブリック分散ダブルバリア(CuratorFrameworkクライアント、文字列バリアパス、 intメンバー数量)

`memberQty` はメンバー数です。`enter` メソッドが呼び出されると、すべてのメンバーが `enter` を呼び出すまでメンバーはブロックされます。`leave` メソッドが呼び出されると、すべてのメンバーが `leave` を呼び出すまで呼び出しスレッドもブロックされます。

注: パラメータ「memberQty」の値は閾値であり、制限ではありません。待機中のフェンスの数がこの値以上になると、フェンスが開きます。

DistributedBarrierと同様に、二重フェンスのbarrierPathパラメータも、それらが同じフェンスであるかどうかを判断するために使用されます。二重フェンスの使用方法は次のとおりです。

1. 複数のクライアントから同じパスに 2 つのバリア (DistributedDoubleBarrier) を作成し、enter() メソッドを呼び出して、バリアに入る前にバリアの数が memberQty に達するまで待機します。

2. バリアの数が memberQty に達すると、複数のクライアントが同時にブロックを停止し、leave() メソッドが実行されるまで実行を継続し、leave() メソッドで memberQty バリアがブロックされるのを待機します。

3. 複数の membersQty フェンスが leave() メソッドで同時にブロックされた場合、複数のクライアントの leave() メソッドはブロックを停止し、実行を継続します。

DistributedDoubleBarrier クラスの主なメソッドは次のとおりです: enter()、enter(long maxWait、TimeUnit unit) - バリアへの同時エントリを待機します。

`leave()` と `leave(long maxWait, TimeUnit unit)` - フェンスが同時に開くのを待ちます。

例外処理: DistributedDoubleBarrier は接続状態を監視し、接続が切断されると enter() メソッドと leave メソッドが例外をスローします。

カウンタ

ZooKeeper を用いてカウンターを実装し、クラスター共有カウンターを作成できます。同じパスを使用している限り、最新のカウンター値を取得できます。これは ZooKeeper の一貫性によって保証されます。カウンターには 2 つのカウンターがあり、1 つは整数型、もう 1 つは長整数型です。

1) 共有カウント

このクラスはカウントに int 型を使用します。主に3つのクラスが関係します。

  1. * 共有数
  2. * 共有カウントリーダー
  3. * 共有カウントリスナー

SharedCountはカウンタを表します。これにSharedCountListenerを追加できます。カウンタの値が変更されると、このListenerは変更イベントをリッスンし、SharedCountReaderはリテラル値とバージョン情報を含むversionedValue値を含む最新の値を読み込みます。

2) 分散アトミックロング

SharedCountよりも広いカウント範囲を持つことに加え、まず楽観的ロックを用いてカウンタの設定を試みます。これが失敗した場合(例えば、カウンタが既に別のクライアントによって更新されている場合)、InterProcessMutexを用いてカウンタ値を更新します。このカウンタは、以下の一連の操作から構成されます。

  • get(): 現在の値を取得する
  • increment(): 1ずつ増加します
  • decrement(): 1ずつ減算する
  • add(): 特定の値を追加します。
  • 減算(): 特定の値を減算する
  • trySet(): カウンター値の設定を試みます
  • forceSet(): カウンター値を強制的に設定。

結果によって返される `succeeded()` 関数を確認する必要があります。この関数は、操作が成功したかどうかを示します。操作が成功した場合、`preValue()` は操作前の値を表し、`postValue()` は操作後の値を表します。

終わり

Curator は多くの複雑な ZooKeeper 操作を抽象化・簡素化するため、ZooKeeper ユーザーにとって大きなメリットとなります。しかし、真の喜びは Curator を使う必要がなくなることです。

他の人がZooKeeperをどのような位置付けにしているかは分かりませんが、Paxosプロトコルに出会ってからというもの、ZooKeeperに強い関心を抱くことが難しくなりました。私の技術選択リストでは、たいてい最下位に位置していて、ソースコード内の難解なロジックを理解することさえできません。

しかし、エンジニアリングプロジェクトは私たちの好みによって評価されることは一度もありません。常にそうでした。

著者について:「Little Sister's Flavor」(xjjdog)は、プログラマーが陥りやすい落とし穴を回避するためのWeChat公式アカウントです。インフラとLinuxに特化しています。10年間のアーキテクチャ経験を持ち、毎日数十億件ものリクエストを処理しながら、高度な並列処理の世界を探求し、独自の視点を提供しています。