DUICUO

時系列データとオープンソース ツールを使用してエッジ プロジェクトを強化します。

時間の経過とともに収集されるデータは時系列データと呼ばれます。今日では、時系列データはあらゆる産業やエコシステムの一部となっています。成長を続けるモノのインターネット(IoT)業界の主要な構成要素であり、人々の日常生活の重要な一部となるでしょう。しかし、時系列データとその需要は処理が困難です。これは、時系列データを処理するために特別に設計されたツールが存在しないためです。この記事では、これらの問題を詳しく説明し、InfluxDataが過去10年間でどのように解決してきたかを説明します。

流入データ

InfluxDataはオープンソースの時系列データベースプラットフォームです。InfluxDBを通じてInfluxDataをご存知かもしれませんが、時系列データベース開発に特化していることはご存知ないかもしれません。時系列データを管理する際には、ストレージライフサイクルとクエリという2つの問題に対処する必要があるため、この点は重要です。

ストレージライフサイクルにおいて、開発者は通常、非常に詳細なデータの収集と分析から始めます。しかし、その後、過剰なストレージ容量を消費することなく傾向を説明するために、より小規模でサンプリングレートの低いデータセットを保存したいと考える場合があります。

データベースをクエリする際、IDではなく時間範囲に基づいてデータを検索する必要があります。時系列データの最も一般的な用途の一つは、一定期間にわたるデータの要約です。このようなクエリは、行と列を用いて異なるデータポイント間の関係性を記述する一般的なリレーショナルデータベースにデータを保存する場合、処理速度が低下します。時系列データ専用に設計されたデータベースは、このようなクエリをはるかに高速に処理できます。InfluxDBには、時系列データセットのクエリ用に特別に構築された独自のクエリ言語であるFluxが組み込まれています。

Telegraf の動作を示す画像。

データ収集

データの取得と処理の両方に優れたツールがいくつかあります。InfluxDataには12以上のクライアントライブラリがあり、独自のプログラミング言語でデータの書き込みやクエリを実行できます。使い方をカスタマイズするのに最適なツールです。オープンソースの取り込みエージェントTelegrafには、300以上の入出力プラグインが含まれています。開発者であれば、独自のプラグインを提供することもできます。

InfluxDB では、小規模な履歴データセットを CSV ファイルとしてアップロードしたり、大規模なデータセットを一括インポートしたりすることもできます。

インポート数学
bicycles3 = from (バケット: "smartcity" )
| >範囲(開始: 2021-03-01 T00 : 00 : 00 z 終了: 2021-04-01 T00 : 00 : 00z )
|>フィルター( fn : ( r ) => r . _measurement == "city_IoT" )
|>フィルター( fn : ( r ) => r . _field == "カウンター" )
|>フィルター( fn : ( r ) => r .ソース== "自転車" )
|>フィルター( fn : ( r ) => r . neighbor_id == "3" )
|> aggregateWindow ( every : 1 hfn : meancreateEmpty : false )
bicycles4 = from (バケット: "smartcity" )
| >範囲(開始: 2021-03-01 T00 : 00 : 00 z 終了: 2021-04-01 T00 : 00 : 00z )
|>フィルター( fn : ( r ) => r . _measurement == "city_IoT" )
|>フィルター( fn : ( r ) => r . _field == "カウンター" )
|>フィルター( fn : ( r ) => r .ソース== "自転車" )
|>フィルター( fn : ( r ) => r . neighbor_id == "4" )
|> aggregateWindow ( every : 1 h , fn : mean , createEmpty : false ) join ( tables : { neighbor_3 : bicycles3 , neighbor_4 : bicycles4 }, on [ "_time" ], method : "inner" )
|> keep ( columns : [ "_time" , "_value_neighborhood_3" , "_value_neighborhood_4" ])
|>マップ( fn :( r ) => ({
r
差異値: math.abs (x : (r._value_neighborhood_3 - r._value_neighborhood_4 ) )
}))

フラックス

Flux は、時系列データの処理用にゼロから構築された社内クエリ言語です。タスク、アラート、通知など、一部のツールの基盤となるソースでもあります。上記の Flux クエリを分析するには、いくつか定義する必要があります。まず、「バケット」とは、いわゆるデータベースのことです。バケットを設定し、そこにデータストリームを追加することができます。このクエリは、特定の日(正確には24時間)の範囲を持つ​smartcity​バケットを呼び出します。バケットからすべてのデータを取得することもできますが、ほとんどのユーザーは特定の範囲のデータを含めます。これは、実行できる最も基本的な Flux クエリです。

次に、データをより正確で扱いやすいレベルに導くためにフィルターを追加しました。例えば、ID 3のコミュニティに割り当てられた自転車の数をフィルターしました。そこから、 ​aggregateWindow​を使って1時間ごとの平均値を取得しました。つまり、1時間ごとに1列、合計24列のテーブルが返されるはずです。ID 4のコミュニティに対しても同じクエリを実行しました。最後に、これら2つのテーブルを重ね合わせて、2つのコミュニティ間の自転車利用状況の違いを取得しました。

トラフィックのピークを知りたい場合は、これが良い選択肢です。もちろん、これはFluxのクエリ機能のほんの一例に過ぎませんが、Fluxに付属するツールのいくつかを使った良い例です。他にも多くのデータ分析機能と統計機能があります。詳しくは、Fluxのドキュメントを確認することをお勧めします。

 「influxdata/influxdb/tasks」をインポートする
オプションタスク= {名前: PB_downsample間隔: 1時間オフセット: 10}
(バケット: "plantbuddy" )から
|>範囲(開始:タスク.最終成功(または時間: -タスク.))
|>フィルター( fn : ( r ) => r [ "_measurement" ] == "sensor_data" )
|> aggregateWindow ( every : 10 mfn : lastcreateEmpty : false )
|>生成(名前: "last" )
|> to (バケット: "ダウンサンプリング" )

タスク

InfluxDBタスクは、入力データストリームを受け取り、何らかの方法で変更または分析する、スケジュール設定されたFluxスクリプトです。その後、変更されたデータを新しいバケットに保存するか、その他の操作を実行します。小さなデータセットを新しいバケットに保存する処理は「ダウンサンプリング」と呼ばれ、データベースの中核機能であり、時系列データのライフサイクルにおいて重要な部分を占めています。

現在のタスク例でわかるように、データをダウンサンプリングしています。10分ごとに最後の値を取得し、その値をダウンサンプリングバケットに保存します。元のデータセットでは10分間に数千のデータポイントが含まれていたかもしれませんが、ダウンサンプリングバケットには60個の新しい値しか含まれていません。注目すべき点は、範囲内で​lastSuccess​関数も使用していることです。これは、InfluxDB に、過去2時間以内にタスクが失敗した場合に備えて、最後に成功した実行時刻からこのタスクの実行を開始するように指示します。失敗した場合は、過去3時間以内に最後に成功した実行を遡ることができます。これは、組み込みのエラー処理に非常に役立ちます。

検査・警報通知システムの画像

検査と警報

InfluxDB には、アラートまたはチェックと通知が含まれています。

多くの人が通知を設定することを選択します。そのためには、通知エンドポイントを定義する必要があります。例えば、チャットアプリケーションは通知を受信するためにHTTP呼び出しを行うとします。次に、通知を受信するタイミングを定義します。例えば、1時間ごとにチェックを実行したり、24時間ごとに通知を実行したりできます。通知応答値を変更することもできます。例えば、「WARN」を「CRITICAL」に変更したり、「CRITICAL」であっても「OK」を「WARN」に変更したりできます。これは高度にカスタマイズ可能なシステムです。このシステムから作成されたFluxコードも編集可能です。

新しいEdge機能の画像

最後に、最近リリースされた特別な新機能を含む、すべてのコア機能をまとめてご紹介します。「Edge to cloud」は、オープンソースのInfluxDBを実行し、接続に問題が発生した場合にデータをローカルに保存できる非常に強力なツールです。接続が回復すると、データはInfluxDataクラウドプラットフォームにストリーミングされます。

これはエッジデバイスや重要なデータにとって極めて重要です。データ損失は甚大な被害をもたらすからです。クラウドに複製するバケットを定義し、そのバケットにはローカルデータストレージ用のディスクバックアップキューを用意します。次に、クラウドバケットに複製するデータを定義します。データはクラウドに接続するまでローカルに保存されます。

InfluxDB と IoT Edge

例えば、自宅の植物にIoTセンサーを接続して、その健康状態を監視するプロジェクトがあるとします。このプロジェクトでは、ノートパソコンをエッジデバイスとして設定します。ノートパソコンが閉じられているか電源がオフになっている場合、データはローカルに保存され、再接続時にクラウドストレージバケットにストリーミングされます。

画像はPlant Buddyの動作を示しています。

注意すべき点は、ローカルデバイス上のデータがレプリケーションバケットに保存される前にダウンサンプリングされることです。植物センサーは1秒あたり1つのデータポイントを提供しますが、データは1分間の平均値に圧縮されるため、保存されるデータ量は少なくなります。クラウドアカウントでは、植物の水分レベルが一定値を下回り水やりが必要になった際に通知やアラートを追加できます。また、ウェブサイトで視覚的な情報を表示して、植物の健康状態をユーザーに伝えることもできます。

データベースは多くのアプリケーションの基盤です。InfluxDBのような時系列データベースプラットフォームでタイムスタンプ付きデータを使用することで、開発者の時間を節約し、幅広いツールやサービスへのアクセスが可能になります。InfluxDBのメンテナーは、オープンソースコミュニティで人々が作り上げているものを見るのが大好きです。ぜひ私たちとつながり、プロジェクトやコードを他の人と共有してください!