InfluxDB 2.x: Task: Aggregation
Sometimes you want to aggregate a heavy amount of data, for example for a Grafana graph panel, but the query is very slow because of the amount of metrics (Over millions).
Then it is more efficient to create an InfluxDB Task that aggregates the data and save the results back to an bucket instead of calculate the result each time the query runs.
Inhaltsverzeichnis
Example: Sum of all storage
What I want
- Create a graph panel that shows the sum of all used storage we have over time.
- Because the storage is shared, each node in the same cluster collects the same storage information's -> That's why I have to dedup the data (unique()).
- Because I write the aggregated data back to the same bucket, I renamed the _measurement of the aggregated data (adding suffix "_agg-sum") to be able to distinguish the data.
How can I achieve this
- Because I want also the historical data, I must first aggregate the data over a bigger time window (run the query manually).
- The sampling frequency in my bucket is one metric point per each hour. So that's why I also want to aggregate the data with an sampling frequency of one hour.
Some explanation about my data
telegraf_90d = is the source and destination bucket (bucket).
cluster_csv = is the original measurement of the data (_measurement).
cluster_csv_agg-sum = is the new measurement for the aggregated data (_measurement).
host = is the node that collects the storage information, in our case the Hyper-V node (tag).
FileSystemLabel = is the name of the storage (tag).
Manual query to aggregate the historical data
//define variables bucket = "telegraf_90d" window = 1h customsum = (tables=<-, column="_value") => tables |> drop(columns: ["host"]) |> unique(column: "FileSystemLabel") |> drop(columns: ["FileSystemLabel"]) |> sum(column) from(bucket: bucket) |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r._measurement == "cluster_csv" and r._field == "SizeUsed" and exists r.FileSystemLabel ) |> aggregateWindow(every: window, fn: customsum, createEmpty: true) |> fill(column: "_value", usePrevious: true) |> toInt() |> set(key: "_measurement", value: "cluster_csv_agg-sum") |> to( bucket: "telegraf_90d" )