|
嬉しい驚きですね!以前書いた、Canal Adapterを使ってMySQLデータをES8に同期する方法について、18枚の画像を使ったステップバイステップガイドを保存することを強くお勧めします。Canal Adapterを使ってMySQLデータをES8に同期する方法が詳しく説明されています。 この記事では、公式の ES8 アダプター同期クラスは ES8 TLS 認証をサポートしていないため、ES8 クラスターをデプロイするときにこのセキュリティ機能を無効にする必要があることを説明しました。 しかし、技術者として機能が削減されるのを見るのは耐えられなかったので、ソースコードを取得して TLS 認証をサポートするように修正しました。 写真 変更を加えた後、コードをローカルに再パッケージ化し、機能を実装しました。共有するのは自分だけのものより悪いと思い、何気なくプルリクエスト(PR)を送信しました。ところが、なんと、このPRがメインブランチにマージされたことを最近になって発見しました! 写真 そして、まるで偶然にも、何万ものスターを持つ有名なオープンソースプロジェクトの貢献者になったのです。大物からPRの下に「ありがとう」と返信までありました。自分が大物にこんなに近づけるんだと、ふと気づきました。 写真 問題の説明問題を修正せずにCanalアダプタを起動した後、MySQLデータをES8に同期する際に以下のエラーが発生しました。このエラーは、ES8ではデフォルトでセキュリティ認証が有効になっており、署名用の証明書が組み込まれているためです。CanalアダプタはES8への適応時にこの機能をサポートしていないため、このエラーが発生しています。 2024-04-13 20:55:39.368 [pool-3-thread-1] ERROR caotter.canal.adapter.launcher.loader.AdapterProcessor - ElasticsearchException[java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: ExecutionException[javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: ValidatorException[PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target]; nested: SunCertPathBuilderException[unable to find valid certification path to requested target]; java.lang.RuntimeException: ElasticsearchException[java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: ExecutionException[javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: ValidatorException[PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target]; nested: SunCertPathBuilderException[unable to find valid certification path to requested target]; at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:112) at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:60) at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.sync(ESAdapter.java:104) at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.sync(ESAdapter.java:83) at com.alibaba.otter.canal.client.adapter.ProxyOuterAdapter.sync(ProxyOuterAdapter.java:42) at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.batchSync(AdapterProcessor.java:139) at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$1(AdapterProcessor.java:97) at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:890) at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$2(AdapterProcessor.java:94) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.elasticsearch.ElasticsearchException: java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2695) at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2171) at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2154) at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2118) at org.elasticsearch.client.IndicesClient.getMapping(IndicesClient.java:538) at com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection.getMapping(ESConnection.java:132) at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getEsType(ES8xTemplate.java:392) at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getValFromData(ES8xTemplate.java:269) at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getESDataFromDmlData(ES8xTemplate.java:324) at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.singleTableSimpleFiledUpdate(ESSyncService.java:814) at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.update(ESSyncService.java:208) at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:97) ... 12 common frames omitted Caused by: java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.getValue(BaseFuture.java:257) at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.get(BaseFuture.java:244) at org.elasticsearch.common.util.concurrent.BaseFuture.get(BaseFuture.java:75) at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2692) ... 23 common frames omitted Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1431) at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535) at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214) at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186) at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469) at org.apache.http.nio.reactor.ssl.SSLIOSession.doWrap(SSLIOSession.java:270) at org.apache.http.nio.reactor.ssl.SSLIOSession.doHandshake(SSLIOSession.java:316) at org.apache.http.nio.reactor.ssl.SSLIOSession.isAppInputReady(SSLIOSession.java:537) at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:120) at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) ... 1 common frames omitted Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem at sun.security.ssl.Alerts.getSSLException(Alerts.java:192) at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728) at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304) at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296) at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1509) at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216) at sun.security.ssl.Handshaker.processLoop(Handshaker.java:979) at sun.security.ssl.Handshaker$1.run(Handshaker.java:919) at sun.security.ssl.Handshaker$1.run(Handshaker.java:916) at java.security.AccessController.doPrivileged(Native Method) at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1369) at org.apache.http.nio.reactor.ssl.SSLIOSession.doRunTask(SSLIOSession.java:288) at org.apache.http.nio.reactor.ssl.SSLIOSession.doHandshake(SSLIOSession.java:356) ... 9 common frames omitted Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387) at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292) at sun.security.validator.Validator.validate(Validator.java:260) at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:324) at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:281) at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136) at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1496) ... 17 common frames omitted Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141) at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126) at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280) at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:382) ... 23 common frames omitted 2024-04-13 20:55:39.370 [Thread-4] ERROR caotter.canal.adapter.launcher.loader.AdapterProcessor - Outer adapter sync failed! Error sync and rollback, execute times: 13 解決解決策は 2 つあります。 - Elasticsearch (ES) クラスターをデプロイおよび構築する際は、セキュリティ証明書機能を無効にしてください。ES 側の対応する設定は、`elasticsearch.yml` で `xpack.security.enabled` を `false` に設定することです。Docker でデプロイした ES の場合は、コンテナ内で変更するか、コンテナの起動時に設定する必要があります。
- 証明書の互換性を確保するために、canal アダプタのソース コードを変更します。
オプション 1 では ES8 セキュリティ機能を無効にする必要があるため、このセクションでは主にオプション 2 について説明します。これは推奨されません。 ES8 セキュリティ構成を確実にするためにソース コードを変更します。証明書のコピーDockerを使用してES8をインストールおよびデプロイすると、デフォルトで証明書が作成されます。この証明書をコンテナからコピーする必要があります。コマンドは以下のとおりです。 docker cp es01:/usr/share/elasticsearch/config/certs/http_ca.crt . ここで、es01 はコンテナ名です。ご自身のコンテナ名に置き換えても構いません。また、コピーしたパスをご自身で置き換えることもできます。後ほど必要になるので、パスを覚えておいてください。 コードを変更するCanal アダプターのソース コードで、次の型を見つけます: com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection#ESConnection 写真 コンストラクターを次のように変更します。 public ESConnection(String[] hosts, Map<String, String> properties) throws UnknownHostException { String caPath = properties.get("security.ca.path"); if (StringUtils.isNotEmpty(caPath)) { connectEsWithCa(hosts, properties, caPath); } else { connectEsWithoutCa(hosts, properties); } } private void connectEsWithCa(String[] hosts, Map<String, String> properties, String caPath) { Path caCertificatePath = Paths.get(caPath); try (InputStream is = Files.newInputStream(caCertificatePath)) { CertificateFactory factory = CertificateFactory.getInstance("X.509"); Certificate trustedCa = factory.generateCertificate(is); KeyStore trustStore = KeyStore.getInstance("pkcs12"); trustStore.load(null, null); trustStore.setCertificateEntry("ca", trustedCa); SSLContextBuilder sslContextBuilder = SSLContexts.custom() .loadTrustMaterial(trustStore, null); final SSLContext sslContext = sslContextBuilder.build(); HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new); RestClientBuilder restClientBuilder = RestClient.builder(httpHosts); String nameAndPwd = properties.get("security.auth"); if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) { String[] nameAndPwdArr = nameAndPwd.split(":"); final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1])); restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> { httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); return httpClientBuilder.setSSLContext(sslContext); }); } restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true).build(); } catch (Exception e) { throw new RuntimeException(e); } } private void connectEsWithoutCa(String[] hosts, Map<String, String> properties) { HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new); RestClientBuilder restClientBuilder = RestClient.builder(httpHosts); String nameAndPwd = properties.get("security.auth"); if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) { String[] nameAndPwdArr = nameAndPwd.split(":"); final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1])); restClientBuilder.setHttpClientConfigCallback( httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); } restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true) .build(); } 簡単な説明 - connectEsWithoutCa メソッドは、元のコンストラクターの実装です。
- connectEsWithCa メソッドは、セキュリティ認証と互換性のあるコンストラクター実装です。
- これら 2 つの方法の使用は、security.ca.path プロパティが構成されているかどうかによって異なります。
- security.ca.path 構成は、ランチャーの outerAdapters の ES8 プロパティ内の security.auth と同じレベルにあります。
コードの修正は完了しました。使い方を見てみましょう。 再パッケージコードを変更した後、Maven を使用して再パッケージ化し、対応する ES8 JAR ファイルを作成します。 写真 コンパイルされパッケージ化された JAR ファイルを Canal アダプターのプラグイン ディレクトリにコピーし、対応する名前をダウンロードしたバージョン (以前にダウンロードしたバージョン 1.1.7 など) と一致するように変更します。 写真 ファイル client-adapter.es8x-1.1.7-jar-with-dependencies.jar.7 はダウンロードに付属していた元の JAR ファイルですが、client-adapter.es8x-1.1.7-jar-with-dependencies.jar は再パッケージ化してコンパイルした JAR ファイルです。 ランチャーの設定を変更する先ほどコードの互換性について説明した際に、「security.ca.path」という設定を使用しました。そのため、このプロパティには、先ほどコピーしたCA証明書へのパス、つまり「security.ca.path: /opt/canal/http_ca.crt」を設定する必要があります。 完全な構成を以下に示します。 server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000 retries: -1 timeout: accessKey: secretKey: consumerProperties: # canal tcp consumer canal.tcp.server.host: 127.0.0.1:11111 canal.tcp.zookeeper.hosts: canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: # kafka consumer # kafka.bootstrap.servers: 127.0.0.1:9092 # kafka.enable.auto.commit: false # kafka.auto.commit.interval.ms: 1000 # kafka.auto.offset.reset: latest # kafka.request.timeout.ms: 40000 # kafka.session.timeout.ms: 30000 # kafka.isolation.level: read_committed # kafka.max.poll.records: 1000 # rocketMQ consumer # rocketmq.namespace: # rocketmq.namesrv.addr: 127.0.0.1:9876 # rocketmq.batch.size: 1000 # rocketmq.enable.message.trace: false # rocketmq.customized.trace.topic: # rocketmq.access.channel: # rocketmq.subscribe.filter: # rabbitMQ consumer # rabbitmq.host: # rabbitmq.virtual.host: # rabbitmq.username: # rabbitmq.password: # rabbitmq.resource.ownerId: srcDataSources: defaultDS: url: jdbc:mysql://127.0.0.1:3306/database?useUnicode=true username: root password: 123456 canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: es8 key: es-key hosts: https://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode properties: mode: rest # transport or rest security.auth: elastic:password security.ca.path: /opt/canal/http_ca.crt cluster.name: docker-cluster - name: logger # - name: rdb # key: mysql1 # properties: # jdbc.driverClassName: com.mysql.jdbc.Driver # jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true # jdbc.username: root # jdbc.password: 121212 # druid.stat.enable: false # druid.stat.slowSqlMillis: 1000 # - name: rdb # key: oracle1 # properties: # jdbc.driverClassName: oracle.jdbc.OracleDriver # jdbc.url: jdbc:oracle:thin:@localhost:49161:XE # jdbc.username: mytest # jdbc.password: m121212 # - name: rdb # key: postgres1 # properties: # jdbc.driverClassName: org.postgresql.Driver # jdbc.url: jdbc:postgresql://localhost:5432/postgres # jdbc.username: postgres # jdbc.password: 121212 # threads: 1 # commitSize: 3000 # - name: hbase # properties: # hbase.zookeeper.quorum: 127.0.0.1 # hbase.zookeeper.property.clientPort: 2181 # zookeeper.znode.parent: /hbase # - name: kudu # key: kudu # properties: # kudu.master.address: 127.0.0.1 # ',' split multi address # - name: phoenix # key: phoenix # properties: # jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver # jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db # jdbc.username: # jdbc.password: 証明書パスの設定後、正常に起動してデータを同期できます。詳細な手順については、対応するWeChat公式アカウントの記事「Canalアダプタを使用してMySQLデータをES8に同期する方法を18枚の写真で解説」(この記事を保存することをお勧めします!)を参照してください。ここではデモを繰り返すつもりはありません。 要約オープンソースプロジェクトに参加したいとずっと思っていましたが、今回小さな目標を達成できるとは思ってもいませんでした。実は全く予想外の驚きでした。Canalのデータ同期について学び、調査していたところ、この問題を発見しました。修正して、何気なくPRを送信したのですが、まさかマージされるとは思ってもいませんでした。今でもとても興奮しています。 これは、オープンソースプロジェクトに真に参加し、利用し、理解している限り、たとえほんの一部であっても、独自のコードを提供する機会が残っていることを示しています。それは、オープンソースプロジェクトへの小さな貢献に過ぎません。 最近、別のオープンソースプロジェクトでいくつか小さなバグを発見しました。後ほどプルリクエストを送信し、オープンソースへの道を歩み続けます。 |