InfluxDB 2.x: Task: Aggregation: Unterschied zwischen den Versionen
Aus Wiki-WebPerfect
Admin (Diskussion | Beiträge) |
Admin (Diskussion | Beiträge) K (Admin verschob die Seite InfluxDB: Task: Aggregation nach InfluxDB 2.x: Task: Aggregation, ohne dabei eine Weiterleitung anzulegen) |
||
(13 dazwischenliegende Versionen des gleichen Benutzers werden nicht angezeigt) | |||
Zeile 1: | Zeile 1: | ||
− | Sometimes you want to aggregate a heavy amount of data, for example for a Grafana graph panel, but the query is very slow because of the amount of metrics (Over millions). <br> | + | Sometimes you want to aggregate a heavy amount of data, for example for a Grafana graph panel, but the query is very slow because of the amount of metrics (Over millions, in scenario I have over 6 millions metrics to aggregate). <br> |
Then it is more efficient to create an InfluxDB Task that aggregates the data and save the results back to an bucket instead of calculate the result each time the query runs. | Then it is more efficient to create an InfluxDB Task that aggregates the data and save the results back to an bucket instead of calculate the result each time the query runs. | ||
Zeile 6: | Zeile 6: | ||
*Create a graph panel that shows the sum of all used storage we have over time. | *Create a graph panel that shows the sum of all used storage we have over time. | ||
*Because the storage is shared, each node in the same cluster collects the same storage information's -> That's why I have to dedup the data (unique()). | *Because the storage is shared, each node in the same cluster collects the same storage information's -> That's why I have to dedup the data (unique()). | ||
− | *Because I write the aggregated data back to the same bucket, I renamed the _measurement of the aggregated data ( | + | *Because I write the aggregated data back to the same bucket, I renamed the _measurement of the aggregated data (adding suffix "_agg-sum") to be able to distinguish the data. |
− | === How can I achieve this === | + | === How can I achieve this? === |
− | Because I want also the historical data, I must first aggregate the data over a bigger time window | + | *Because I want also the historical data, I must first aggregate the data over a bigger time window (run the query manually). |
+ | *The sampling frequency in my bucket is one metric point per each hour. So that's why I also want to aggregate the data with an sampling frequency of one hour. | ||
+ | |||
+ | |||
+ | '''Some explanation about my data''' <br> | ||
+ | *''telegraf_90d'' = is the source and destination bucket with an retention of 90 days (bucket). <br> | ||
+ | *''cluster_csv'' = is the original measurement of the source data (_measurement). <br> | ||
+ | *''cluster_csv_agg-sum = '' is the new measurement for the aggregated data (_measurement). <br> | ||
+ | *''host'' = is the node that collects the storage information, in our case the Hyper-V node (tag). <br> | ||
+ | *''FileSystemLabel'' = is the name of the storage (tag). <br> | ||
+ | |||
+ | |||
+ | ==== Manual query to aggregate the historical data ==== | ||
+ | <pre> | ||
+ | //define variables | ||
+ | bucket = "telegraf_90d" | ||
+ | window = 1h | ||
+ | |||
+ | customsum = (tables=<-, column="_value") => | ||
+ | tables | ||
+ | |> drop(columns: ["host"]) | ||
+ | |> unique(column: "FileSystemLabel") | ||
+ | |> drop(columns: ["FileSystemLabel"]) | ||
+ | |> sum(column) | ||
+ | |||
+ | |||
+ | from(bucket: bucket) | ||
+ | |> range(start: -90d, stop: v.timeRangeStop) | ||
+ | |> filter(fn: (r) => | ||
+ | r._measurement == "cluster_csv" and | ||
+ | r._field == "SizeUsed" and | ||
+ | exists r.FileSystemLabel | ||
+ | ) | ||
+ | |> aggregateWindow(every: window, fn: customsum, createEmpty: true) | ||
+ | |> fill(column: "_value", usePrevious: true) | ||
+ | |> toInt() | ||
+ | |> set(key: "_measurement", value: "cluster_csv_agg-sum") | ||
+ | |> to( | ||
+ | bucket: "telegraf_90d" | ||
+ | ) | ||
+ | </pre> | ||
+ | |||
+ | |||
+ | |||
+ | ==== InfluxDB Task (runs scheduled) ==== | ||
+ | <pre> | ||
+ | option task = {name: "task_telegraf_90d-cluster_csv_agg-sum", every: 1h} | ||
+ | |||
+ | customsum = (tables=<-, column="_value") => | ||
+ | (tables | ||
+ | |> drop(columns: ["host"]) | ||
+ | |> unique(column: "FileSystemLabel") | ||
+ | |> drop(columns: ["FileSystemLabel"]) | ||
+ | |> sum(column)) | ||
+ | data = from(bucket: "telegraf_90d") | ||
+ | |> range(start: -duration(v: int(v: task.every) * 2)) | ||
+ | |> filter(fn: (r) => | ||
+ | (r._measurement == "cluster_csv" and r._field == "SizeUsed" and exists r.FileSystemLabel)) | ||
+ | |||
+ | data | ||
+ | |> aggregateWindow(every: 1h, fn: customsum, createEmpty: true) | ||
+ | |> fill(column: "_value", usePrevious: true) | ||
+ | |> toInt() | ||
+ | |> set(key: "_measurement", value: "cluster_csv_agg-sum") | ||
+ | |> to(bucket: "telegraf_90d") | ||
+ | </pre> | ||
+ | |||
Aktuelle Version vom 3. Februar 2022, 08:52 Uhr
Sometimes you want to aggregate a heavy amount of data, for example for a Grafana graph panel, but the query is very slow because of the amount of metrics (Over millions, in scenario I have over 6 millions metrics to aggregate).
Then it is more efficient to create an InfluxDB Task that aggregates the data and save the results back to an bucket instead of calculate the result each time the query runs.
Inhaltsverzeichnis
Example: Sum of all storage
What I want
- Create a graph panel that shows the sum of all used storage we have over time.
- Because the storage is shared, each node in the same cluster collects the same storage information's -> That's why I have to dedup the data (unique()).
- Because I write the aggregated data back to the same bucket, I renamed the _measurement of the aggregated data (adding suffix "_agg-sum") to be able to distinguish the data.
How can I achieve this?
- Because I want also the historical data, I must first aggregate the data over a bigger time window (run the query manually).
- The sampling frequency in my bucket is one metric point per each hour. So that's why I also want to aggregate the data with an sampling frequency of one hour.
Some explanation about my data
- telegraf_90d = is the source and destination bucket with an retention of 90 days (bucket).
- cluster_csv = is the original measurement of the source data (_measurement).
- cluster_csv_agg-sum = is the new measurement for the aggregated data (_measurement).
- host = is the node that collects the storage information, in our case the Hyper-V node (tag).
- FileSystemLabel = is the name of the storage (tag).
Manual query to aggregate the historical data
//define variables bucket = "telegraf_90d" window = 1h customsum = (tables=<-, column="_value") => tables |> drop(columns: ["host"]) |> unique(column: "FileSystemLabel") |> drop(columns: ["FileSystemLabel"]) |> sum(column) from(bucket: bucket) |> range(start: -90d, stop: v.timeRangeStop) |> filter(fn: (r) => r._measurement == "cluster_csv" and r._field == "SizeUsed" and exists r.FileSystemLabel ) |> aggregateWindow(every: window, fn: customsum, createEmpty: true) |> fill(column: "_value", usePrevious: true) |> toInt() |> set(key: "_measurement", value: "cluster_csv_agg-sum") |> to( bucket: "telegraf_90d" )
InfluxDB Task (runs scheduled)
option task = {name: "task_telegraf_90d-cluster_csv_agg-sum", every: 1h} customsum = (tables=<-, column="_value") => (tables |> drop(columns: ["host"]) |> unique(column: "FileSystemLabel") |> drop(columns: ["FileSystemLabel"]) |> sum(column)) data = from(bucket: "telegraf_90d") |> range(start: -duration(v: int(v: task.every) * 2)) |> filter(fn: (r) => (r._measurement == "cluster_csv" and r._field == "SizeUsed" and exists r.FileSystemLabel)) data |> aggregateWindow(every: 1h, fn: customsum, createEmpty: true) |> fill(column: "_value", usePrevious: true) |> toInt() |> set(key: "_measurement", value: "cluster_csv_agg-sum") |> to(bucket: "telegraf_90d")