DUICUO

オープンソースのデータ収集エンジンLogstashの説明と例

I. 概要

Logstashは、Elastic Stack(ELK Stack)の一部であるオープンソースのデータ収集およびログ処理ツールです。様々なデータソースからデータを収集、変換、転送し、大規模データの分析と可視化を支援します。Logstashは通常、ElasticsearchやKibanaと組み合わせてリアルタイムのログ分析と監視に使用されます。

Logstash の主な機能と特徴は次のとおりです。

  • データ収集: Logstash は、ログ ファイル、データ ファイル、メッセージ キュー、データベース、ネットワーク トラフィックなど、さまざまなデータ ソースからデータを収集できます。さまざまなデータ ソースのニーズに適応するために、複数の入力プラグインをサポートしています。
  • データ変換:Logstashは強力なデータ変換機能を備えており、収集したデータのフィルタリング、解析、変換、エンリッチメントを可能にします。フィルタリングプラグインを活用することで、正規表現解析、フィールド分割、データ匿名化、タイムスタンプ生成など、データに対する様々な操作を実行できます。
  • マルチチャネルデータ処理:Logstashでは、多様なニーズに合わせてデータを複数のチャネルにストリーミングできます。チャネルとしては、Elasticsearch、Kafka、RabbitMQなどを使用できます。また、カスタム出力プラグインを定義することもできます。
  • データフィルタリングとプラグイン:Logstashは、入力、フィルタリング、出力プラグインなど、豊富なプラグインエコシステムを誇ります。これらのプラグインは、特定のニーズに基づいて、様々なデータ処理タスクに合わせて設定・拡張できます。
  • リアルタイムデータ処理:Logstashはリアルタイムデータ処理機能を備えており、データをソースから宛先へリアルタイムまたはほぼリアルタイムで転送できます。これにより、ログ監視、セキュリティ分析、パフォーマンス監視などのリアルタイムアプリケーションに最適です。
  • スケーラビリティ:Logstashは複数のLogstashインスタンスをデプロイすることで、データの取得と処理の水平スケーリングを実現できます。これにより、大規模なデータ需要への対応が容易になります。
  • 設定が簡単:Logstashは、シンプルな設定ファイル(通常はYAML形式)を使用してデータストリームの処理を定義します。設定ファイルは非常に直感的で、理解しやすく、メンテナンスも容易です。
  • コミュニティとサポート: Logstash は、活発なコミュニティ サポートと豊富なドキュメント リソースを備えた、広く採用されているオープン ソース プロジェクトです。

LogstashはElastic Stackの主要コンポーネントであり、ElasticsearchおよびKibanaと連携して、強力なリアルタイムロギングおよびデータ分析ソリューションを構築します。大規模データの監視、分析、可視化のための堅牢なデータ収集・処理ツールを組織に提供します。

公式ドキュメント:

II. Logstashアーキテクチャ

写真

Logstash は、入力、フィルター、出力の 3 つの主要部分で構成されています。

Logstash のイベント処理パイプライン (Logstash ではデータ ストリーム内の各データをイベントと呼びます) は、入力 -> フィルター -> 出力という 3 つの主な役割によって処理されます。

  • `input-plugins`: 必須。イベント生成を担当します。一般的に使用される入力には、File、syslog、redis、kakfa、beats(例:Filebeats)などがあります。公式ドキュメント: https://www.elastic.co/guide/en/logstash/7.17/input-plugins.html
  • フィルター:オプション。データの処理と変換(フィルターはデータを変更します)を担います。よく使用されるフィルターには、grok、mutate、drop、clone、geoipなどがあります。公式ドキュメント:https://www.elastic.co/guide/en/logstash/7.17/filter-plugins.html
  • `outputs`: 必須。データ出力(出力を他の場所に送信)を担います。よく使用されるプラグインには、Elasticsearch、File、Graphite、Kakfa、Statsdなどがあります。公式ドキュメント: https://www.elastic.co/guide/en/logstash/7.17/output-plugins.html

II. Elasticsearchのデプロイメント

ここでは、次の展開方法から選択できます。

  • Docker Compose を使用したデプロイ: Docker Compose を使用して Elasticsearch と Kibana を迅速にデプロイするためのステップバイステップのチュートリアル。
  • Kubernetesへのデプロイ:Kubernetes上のElasticSearch + Kibana - 解説と実践操作(バージョン7.17.3)

ここでは、docker-compose デプロイメント方法を選択します。

1) Dockerをデプロイする

# 安装yum-config-manager配置工具yum -y install yum-utils # 建议使用阿里云yum源:(推荐) #yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo # 安装docker-ce版本yum install -y docker-ce # 启动并开机启动systemctl enable --now docker docker --version

2) docker-composeをデプロイする

curl -SL https://github.com/docker/compose/releases/download/v2.16.0/docker-compose-linux-x86_64 -o /usr/local/bin/docker-compose chmod +x /usr/local/bin/docker-compose docker-compose --version

3) ネットワークを作る

# 创建docker network create bigdata # 查看docker network ls

4) Linux ハンドル数と最大スレッド数を変更します。

 #查看当前最大句柄数sysctl -a | grep vm.max_map_count #修改句柄数vi /etc/sysctl.conf vm.max_map_count=262144 #临时生效,修改后需要重启才能生效,不想重启可以设置临时生效sysctl -w vm.max_map_count=262144 #修改后需要重新登录生效vi /etc/security/limits.conf # 添加以下内容* soft nofile 65535 * hard nofile 65535 * soft nproc 4096 * hard nproc 4096 # 重启服务,-h 立刻重启,默认间隔一段时间才会开始重启reboot -h now

5) 展開パッケージをダウンロードし、展開を開始します。

 # 这里选择docker-compose 部署方式git clone https://gitee.com/hadoop-bigdata/docker-compose-es-kibana.git cd docker-compose-es-kibana chmod -R 777 es kibana docker-compose -f docker-compose.yaml up -d docker-compose ps

Ⅲ. Logstash 導入および構成ガイド

1) Logstash インストール パッケージをダウンロードします。

公式ウェブサイト https://www.elastic.co/downloads/logstash にアクセスし、対応するバージョンの zip ファイルをダウンロードします。

 wget https://artifacts.elastic.co/downloads/logstash/logstash-8.11.1-linux-x86_64.tar.gz

2) インストールパッケージファイルを解凍する

tar -xf logstash-8.11.1-linux-x86_64.tar.gz

3) さまざまなシナリオでのテスト

1) テスト1: 標準入力と出力の使用

cd logstash-8.11.1 # 测试,采用标准的输入和输出,#codec=>rubydebug,解析转换类型:ruby # codec各类型讲解:https://www.elastic.co/guide/en/logstash/7.9/codec-plugins.html ./bin/logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}' # 输入: hello # 输出: { "event" => { "original" => "hello" }, "host" => { "hostname" => "local-168-182-110" }, "@version" => "1", "@timestamp" => 2023-11-19T02:31:02.485073839Z, "message" => "hello" }

写真

2) テスト2: 設定ファイル+標準入出力の使用

設定ファイル: config/logstash-1.conf

 input { stdin { } } output { stdout { codec => rubydebug } }

サービスを開始

./bin/logstash -f ./config/logstash-1.conf

3) テスト3: 設定ファイル + ファイル入力 + 標準画面出力

設定ファイル: ./config/logstash-2.conf

 input { file { path => "/var/log/messages" } } output { stdout { codec=>rubydebug } }

サービスを開始

./bin/logstash -f ./config/logstash-2.conf

写真

4) テスト4: 設定ファイル + ファイル入力 + Kafka 出力

Kafka のデプロイメントについては、次の記事を参照してください。

  • Kubernetes (クラウドネイティブ) への ZooKeeper + Kafka 環境のデプロイメント
  • [ミドルウェア] docker-compose を使って Kafka を素早くデプロイするためのステップバイステップのチュートリアル

設定ファイル: ./config/logstash-3.conf

 input { file { path => "/var/log/messages" } } output { kafka { bootstrap_servers => "192.168.182.110:9092" topic_id => "messages" } }

サービスを開始

./bin/logstash -f ./config/logstash-3.conf

Kafkaデータを利用する

docker exec -it kafka-node1 bash ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic messages --from-beginning

写真

5) テスト5: 設定ファイル + Filebeatポート入力 + 標準出力

Filebeat の展開については、次の記事を参照してください。

  • 軽量ログ収集コンポーネント「Filebeat」の解説と実践的な操作方法。
  • Kubernetes での Filebeat ログ収集の実践

サーバーはログを生成します(Filebeat) --> Logstashサーバー

設定ファイル: ./config/logstash-4.conf

 input { beats { port => 5044 } } output { stdout { codec => rubydebug } }

サービスを開始

./bin/logstash -f ./config/logstash-4.conf

起動後、ローカルマシンのポート5044が起動します。システム上で既に起動されているポートと競合しないよう注意してください。テストのため、Filebeatサーバーの設定ファイルを変更します。

Filebeat設定ファイルの内容: filebeat.yml

 filebeat.inputs: - type: log enabled: true paths: - /var/log/messages # ------------------------------ Logstash Output ------------------------------- output.logstash: hosts: ["192.168.182.110:5044"]

Filebeatを起動する

./filebeat -e -c filebeat.yml

6) テスト6: 設定ファイルとFilebeatポートのKafkaへの入出力

サーバーがログを生成 (Filebeat) ---> Logstash サーバー ---> Kafka サーバー

設定ファイル: ./config/logstash-5.conf

 input { beats { port => 5044 } } output { kafka { bootstrap_servers => "192.168.182.110:9092" topic_id => "messages" } }

サービスを開始

./bin/logstash -f ./config/logstash-5.conf

7) テスト7: Filebeatデータ収集 + Kafka入力読み取り + Logstash処理 + Elasticsearchへの出力

サーバーがログを生成 (Filebeat) ---> Kafka サーバー __ データ抽出 __-> Logstash サーバー __-> Elasticsearch

写真

Logstash 設定: ./config/logstash-6.conf

 input { kafka { bootstrap_servers => "10.82.192.110:9092" topics => ["messages"] } } output { elasticsearch { hosts => ["10.82.192.110:9200"] index => "messageslog-%{+YYYY-MM-dd}" } }

filebeat.yml 出力.kafka 構成:

 # ------------------------------ KAFKA Output ------------------------------- output.kafka: eanbled: true hosts: ["10.82.192.110:9092"] version: "2.0.1" topic: '%{[fields][log_topic]}' partition.round_robin: reachable_only: true worker: 2 required_acks: 1 compression: gzip max_message_bytes: 10000000

systemctlを使用してFilebeatを起動する

# vi /usr/lib/systemd/system/filebeat.service [Unit] Descriptinotallow=filebeat server daemon Documentatinotallow=/opt/filebeat-7.6.2-linux-x86_64/filebeat -help Wants=network-online.target After=network-online.target [Service] User=root Group=root Envirnotallow="BEAT_CONFIG_OPTS=-c /opt/filebeat-7.6.2-linux-x86_64/filebeat.yml" ExecStart=/opt/filebeat-7.6.2-linux-x86_64/filebeat $BEAT_CONFIG_OPTS Restart=always [Install] WantedBy=multi-user.target

systemctl を使用して Logstash を起動する

# vi /usr/lib/systemd/system/logstash.service [Unit] Descriptinotallow=logstash [Service] User=root ExecStart=/opt/logstash-8.11.1/bin/logstash -f /opt/logstash-8.11.1/config/logstash-6.conf Restart=always [Install] WantedBy=multi-user.target

サービスを開始

systemctl start logstash systemctl status logstash

IV. よく使われるLogstashフィルタープラグイン

データ処理と変換(フィルターがデータを変更する)を担当します。一般的に使用されるフィルターには、grok、mutate、drop、clone、geoip などがあります。公式ドキュメント: https://www.elastic.co/guide/en/logstash/7.17/filter-plugins.html

1) grokの組み込み正規表現の例を使用する

Grokプラグイン:Grokは、非構造化ログデータを構造化されクエリ可能なコンテンツに解析する優れた方法です。その基本原理は、あらゆるテキスト形式に一致する正規表現に基づいています。

このツールは、syslog ログ、Apache およびその他の Web サーバー ログ、MySQL ログ、および一般にコンピューターではなく人間が使用するために作成されるあらゆるログ形式に最適です。

grok には 120 個の組み込みマッチング パターンが含まれており、独自のパターンを定義することもできます: https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

ファイルビートの設定: filebeat.yml

 ## filebeat.inputs: - type: log enabled: true paths: - /var/log/messages output.logstash: #指定logstash监听的IP和端口hosts: ["192.168.182.110:5044"]

Logstash 設定: stdin-grok-stout.conf

 cat >> stdin-grok-stout.conf << EOF input { #监听的类型beats { #监听的本地端口port => 5044 } } filter{ grok{ #match => { "message" => "%{COMBINEDAPACHELOG}" } #上面的"COMBINEDAPACHELOG"变量官方github上已经废弃,建议使用下面的匹配模式#参考地址:https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/legacy/httpd match => { "message" => "%{HTTPD_COMBINEDLOG}" } } } output { stdout {} elasticsearch { #定义es集群的主机地址hosts => ["192.168.182.110:9200"] #定义索引名称index => "hqt-application-pro-%{+YYYY.MM.dd}" } } EOF

2) grokのカスタム正規表現の例を使用する

公式ウェブサイトを参照してください: https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-grok.html

構成は次のとおりです。

 cat >> stdin-grok_custom_patterns-stdout.conf << EOF input { stdin {} } filter { grok { #指定模式匹配的目录,可以使用绝对路径#在./patterns目录下随便创建一文件,并写入以下匹配模式# ORDER_ID [\u4e00-\u9fa5]{10,11}:[0-9A-F]{10,11} patterns_dir => ["./patterns"] #匹配模式#测试数据为:app_name:gotone-payment-api,client_ip:,context:,docker_name:,env:dev,exception:,extend1:,level:INFO,line:-1,log_message:com.gotone.paycenter.controller.task.PayCenterJobHandler.queryPayOrderTask-request:[\\],log_time:2022-11-23 00:00:00.045,log_type:applicationlog,log_version:1.0.0,本次成交的订单编号为:BEF25A72965,parent_span_id:,product_line:,server_ip:,server_name:gotone-payment-api-c86658cb7-tc8k5,snooper:,span:0,span_id:,stack_message:,threadId:104,trace_id:,user_log_type: match => { "message" => "%{ORDER_ID:test_order_id}" } } } output { stdout {} } EOF

3) フィルタープラグインの一般的なフィールドの例(フィールド、タグの追加/削除)

既存のフィールド(nginxのJSON解析ログ)

構成は次のとおりです。

 cat >> stdin-remove_add_field-stout.conf << EOF input { beats { port => 5044 } } filter { mutate { #移除指定的字段,使用逗号分隔remove_field => [ "tags","agent","input","log","ecs","version","@version","ident","referrer","auth" ] #添加指定的字段,使用逗号分隔#"%{clientip}"使用%可以将已有字段的值当作变量使用add_field => { "app_name" => "nginx" "test_clientip" => "clientip---->%{clientip}" } #添加tag add_tag => [ "linux","web","nginx","test" ] #移除tag remove_tag => [ "linux","test" ] } } output { stdout {} } EOF

4) 日付プラグインを使用してElasticsearchに書き込まれる時間を変更する例

テスト ログ: 以下は収集する JSON ログ エントリです。

 {"app_name":"gotone-payment-api","client_ip":"","context":"","docker_name":"","env":"dev","exception":"","extend1":"","level":"INFO","line":68,"log_message":"现代金控支付查询->调用入参[{}]","log_time":"2022-11-23 00:00:00.051","log_type":"applicationlog","log_version":"1.0.0","method_name":"com.gotone.paycenter.dao.third.impl.modernpay.ModernPayApiAbstract.getModernPayOrderInfo","parent_span_id":"","product_line":"","server_ip":"","server_name":"gotone-payment-api-c86658cb7-tc8k5","snooper":"","span":0,"span_id":"","stack_message":"","threadId":104,"trace_id":"gotone-payment-apib4a65777-ce6b-4bcc-8aef-71a7cfffaf2c","user_log_type":""}

構成は次のとおりです。

 cat >> stdin-date-es.conf << EOF input { file { #指定收集的路径path => "/var/log/messages" } } filter { json { #JSON解析器可以将json形式的数据转换为logstash实际的数据结构(根据key:value拆分成字段形式) source => "message" } date { #匹配时间字段并解析match => [ "log_time", "yyyy-MM-dd HH:mm:ss.SSS" ] #将匹配到的时间字段解析后存储到目标字段,默认字段为"@timestamp" target => "@timestamp" timezone => "Asia/Shanghai" } } output { stdout {} elasticsearch { #定义es集群的主机地址hosts => ["192.168.182.110:9200"] #定义索引名称index => "hqt-application-pro-%{+YYYY.MM.dd}" } } EOF

5) 元のIPアドレスの位置情報のGeoIP分析

テスト データは、JSON 形式の nginx ログで構成されます。

 {"@timestamp":"2022-12-18T03:27:10+08:00","host":"10.0.24.2","clientip":"114.251.122.178","SendBytes":4833,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"43.143.242.47","uri":"/index.html","domain":"43.143.242.47","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36","status":"200"}

構成は次のとおりです。

 cat >> beats-geoip-stdout.conf << EOF input { file { #指定收集的路径path => "/var/log/test.log" } } filter { json { #JSON解析器可以将json形式的数据转换为logstash实际的数据结构(根据key:value拆分成字段形式) source => "message" } geoip { #指定基于哪个字段分析IP地址source => "client_ip" #指定IP地址分析模块所使用的数据库,默认为GeoLite2-City.mmdb(这里必须再次指定以下,否则不会显示城市) database => "/hqtbj/hqtwww/logstash_workspace/data/plugins/filters/geoip/CC/GeoLite2-City.mmdb" #如果期望查看指定的字段,则可以在这里配置,若不配置,表示显示所有的查询字段#fields => ["city_name","country_name","ip"] #指定geoip的输出字段,当有多个IP地址需要分析时(例如源IP和目的IP),则该字段非常有效#target => "test-geoip-nginx" } } output { stdout {} } EOF

GeoLite2-City.mmdb ダウンロード: https://dev.maxmind.com/geoip/geolite2-free-geolocation-data

写真

7) mutateコンポーネントの一般的な使用例

テストデータを変更する Python スクリプト:

 cat >> generate_log.py << EOF #!/usr/bin/env python # -*- coding: UTF-8 -*- # @author : oldboyedu-linux80 import datetime import random import logging import time import sys LOG_FORMAT = "%(levelname)s %(asctime)s [com.oldboyedu.%(module)s] - %(message)s " DATE_FORMAT = "%Y-%m-%d %H:%M:%S" # 配置root的logging.Logger实例的基本配置logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt=DATE_FORMAT, filename=sys.argv[1], filemode='a',) actions = ["浏览⻚⾯", "评论商品", "加⼊收藏", "加⼊购物⻋", "提交订单", "使⽤优惠券", "领取优惠券", "搜索", "查看订单", "付款", "清空购物⻋"] while True: time.sleep(random.randint(1, 5)) user_id = random.randint(1, 10000) # 对⽣成的浮点数保留2位有效数字. price = round(random.uniform(15000, 30000),2) action = random.choice(actions) svip = random.choice([0,1]) logging.info("DAU|{0}|{1}|{2}|{3}".format(user_id, action,svip,price)) EOF # python generate_log.py /tmp/app.log

8) Logstashのマルチifブランチの例

構成は次のとおりです。

 cat >> homework-to-es.conf << EOF input { beats { type => "test-nginx-applogs" port => 5044 } file { type => "test-product-applogs" path => "/tmp/app.logs" } beats { type => "test-dw-applogs" port => 8888 } file { type => "test-payment-applogs" path => "/tmp/payment.log" } } filter { if [type] == "test-nginx-applogs"{ mutate { remove_field => [ "tags","agent","input","log","ecs","version","@version","ident","referrer","auth","xff","referer","upstreamtime","upstreamhost","tcp_xff"] } geoip { source => "clientip" database => "/hqtbj/hqtwww/logstash_workspace/data/plugins/filters/geoip/CC/GeoLite2-City.mmdb" } useragent { source => "http_user_agent" } } if [type] == "test-product-applogs" { mutate { split => { "message" => "|" } } mutate { add_field => { "user_id" => "%{[message][1]}" "action" => "%{[message][2]}" "svip" => "%{[message][3]}" "price" => "%{[message][4]}" } } mutate { convert => { "user_id" => "integer" "svip" => "boolean" "price" => "float" } } } if [type] in [ "test-dw-applogs","test-payment-applogs" ] { json { source => "message" } date { match => [ "log_time", "yyyy-MM-dd HH:mm:ss.SSS" ] target => "@timestamp" } } } output { stdout {} if [type] == "test-nginx-applogs" { elasticsearch { hosts => ["192.168.182.110:9200"] index => "test-nginx-logs-%{+YYYY.MM.dd}" } } if [type] == "test-product-applogs" { elasticsearch { hosts => ["192.168.182.110:9200"] index => "test-product-applogs-%{+YYYY.MM.dd}" } } if [type] in [ "test-dw-applogs","test-payment-applogs" ] { elasticsearch { hosts => ["192.168.182.110:9200"] index => "test-center-applogs-%{+YYYY.MM.dd}" } } } EOF