InfluxDB: Flux - Multiple aggregations (min, max, avg)

Aus Wiki-WebPerfect
Wechseln zu: Navigation, Suche

01-grafana multiple aggregations.png

If you want multiple aggregations for example following:

  • Last
  • Average
  • Maximum

you have to use the union and pivot function.

The following example aggregates the storage latency and shows the top n average of the Cluster Shared Volumes:

//define variables
bucket = "<YOUR_BUCKET>"
top = <TOP_n_VALUES>

data = from(bucket: bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => 
    r._measurement == "cluster_csv_filesystem" and
    r._field == "Write_Latency"
  )
  |> group(columns: ["instance"])


mean_latency = data
  |> mean()
  |> set(key: "_field", value: "mean_latency") 

last_latency = data
  |> last()
  |> set(key: "_field", value: "last_latency") 

max_latency = data
  |> max()
  |> set(key: "_field", value: "max_latency") 

union(tables: [mean_latency, last_latency, max_latency]) 
|> pivot(rowKey:["_start"], columnKey: ["_field"], valueColumn: "_value")
|> highestCurrent(n:top, column: "mean_latency", groupColumns: ["instance"])
|> keep(columns: ["instance", "mean_latency", "last_latency", "max_latency"])


Old query -> not recommended

//define variables
bucket = "<YOUR_BUCKET>"
top = <TOP_n_VALUES>
drop_columns = ["_field", "objectname", "_measurement", "_start", "_stop", "_time", "host"] //remove unnecessary fields

mean_latency = from(bucket: bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => 
    r._measurement == "cluster_csv_filesystem" and 
    r._field == "Write_Latency"
  )
  |> group(columns: ["instance"])
  |> mean()
  |> drop(columns: drop_columns)

last_latency = from(bucket: bucket)
  |> range(start: -1h) //for the datastream "last_latency" it is not mandatory to search the whole timespan
  |> filter(fn: (r) => 
    r._measurement == "cluster_csv_filesystem" and 
    r._field == "Write_Latency"
  )
  |> group(columns: ["instance"])
  |> last()
  |> drop(columns: drop_columns)

max_latency = from(bucket: bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => 
    r._measurement == "cluster_csv_filesystem" and 
    r._field == "Write_Latency"
  )
  |> group(columns: ["instance"])
  |> max()
  |> drop(columns: drop_columns)


//joins both datastream, calculate the sum of sent and received traffic
join_datasteams = join(
    tables: {mean:mean_latency, last:last_latency},
    on: ["instance"]
  )
  
//joins datastream and the other join -> Workaround: Currently only Joins of max two datastreams is possible
join(
    tables: {max:max_latency, table4:join_datasteams},
    on: ["instance"]
  )
  |> highestCurrent(n:top, column: "_value_mean", groupColumns: ["instance"])