Skip to content

Commit

Permalink
Support rate for unwrap expressions. (#3048)
Browse files Browse the repository at this point in the history
This allows to bytes throughput  if there were printed in a log line.

Examples :

```logql
sum by (device) (rate({app="network-dev"} |= "192.16.8.1" | json | unwrap recv_bytes [1m]))
```

Closes #3044

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Dec 8, 2020
1 parent 2406ea7 commit 4e7a123
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 6 deletions.
3 changes: 2 additions & 1 deletion docs/sources/logql/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ We currently support the functions:

Supported function for operating over unwrapped ranges are:

- `rate(log-range)`: calculates per second rate of all values in the specified interval.
- `sum_over_time(unwrapped-range)`: the sum of all values in the specified interval.
- `avg_over_time(unwrapped-range)`: the average value of all points in the specified interval.
- `max_over_time(unwrapped-range)`: the maximum value of all points in the specified interval.
Expand All @@ -431,7 +432,7 @@ Supported function for operating over unwrapped ranges are:
- `stddev_over_time(unwrapped-range)`: the population standard deviation of the values in the specified interval.
- `quantile_over_time(scalar,unwrapped-range)`: the φ-quantile (0 ≤ φ ≤ 1) of the values in the specified interval.

Except for `sum_over_time`, unwrapped range aggregations support grouping.
Except for `sum_over_time` and `rate` unwrapped range aggregations support grouping.

```logql
<aggr-op>([parameter,] <unwrapped-range>) [without|by (<label list>)]
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func (e rangeAggregationExpr) validate() error {
}
if e.left.unwrap != nil {
switch e.operation {
case OpRangeTypeAvg, OpRangeTypeSum, OpRangeTypeMax, OpRangeTypeMin, OpRangeTypeStddev, OpRangeTypeStdvar, OpRangeTypeQuantile:
case OpRangeTypeRate, OpRangeTypeAvg, OpRangeTypeSum, OpRangeTypeMax, OpRangeTypeMin, OpRangeTypeStddev, OpRangeTypeStdvar, OpRangeTypeQuantile:
return nil
default:
return fmt.Errorf("invalid aggregation %s with unwrap", e.operation)
Expand Down
49 changes: 48 additions & 1 deletion pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
},
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0.5}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
},
{
`rate({app="foo"} | unwrap foo [30s])`, time.Unix(60, 0), logproto.FORWARD, 10,
[][]logproto.Series{
// 30s range the lower bound of the range is not inclusive only 15 samples will make it 60 included
{newSeries(testSize, offset(46, constantValue(2)), `{app="foo"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(60, 0), Selector: `rate({app="foo"} | unwrap foo[30s])`}},
},
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 1.0}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
},
{
`count_over_time({app="foo"} |~".+bar" [1m])`, time.Unix(60, 0), logproto.BACKWARD, 10,
[][]logproto.Series{
Expand Down Expand Up @@ -868,6 +879,25 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
},
{
`rate(({app=~"foo|bar"} |~".+bar" | unwrap bar)[1m])`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100,
[][]logproto.Series{
{newSeries(testSize, factor(10, constantValue(2)), `{app="foo"}`), newSeries(testSize, factor(5, constantValue(2)), `{app="bar"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `rate({app=~"foo|bar"}|~".+bar"|unwrap bar[1m])`}},
},
promql.Matrix{
promql.Series{
Metric: labels.Labels{{Name: "app", Value: "bar"}},
Points: []promql.Point{{T: 60 * 1000, V: 0.4}, {T: 90 * 1000, V: 0.4}, {T: 120 * 1000, V: 0.4}, {T: 150 * 1000, V: 0.4}, {T: 180 * 1000, V: 0.4}},
},
promql.Series{
Metric: labels.Labels{{Name: "app", Value: "foo"}},
Points: []promql.Point{{T: 60 * 1000, V: 0.2}, {T: 90 * 1000, V: 0.2}, {T: 120 * 1000, V: 0.2}, {T: 150 * 1000, V: 0.2}, {T: 180 * 1000, V: 0.2}},
},
},
},
{
`topk(2,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100,
[][]logproto.Series{
Expand Down Expand Up @@ -1965,7 +1995,24 @@ func constant(t int64) generator {
Sample: logproto.Sample{
Timestamp: time.Unix(t, 0).UnixNano(),
Hash: uint64(i),
Value: 1.,
Value: 1.0,
},
}
}
}

// nolint
func constantValue(t int64) generator {
return func(i int64) logData {
return logData{
Entry: logproto.Entry{
Timestamp: time.Unix(i, 0),
Line: fmt.Sprintf("%d", i),
},
Sample: logproto.Sample{
Timestamp: time.Unix(i, 0).UnixNano(),
Hash: uint64(i),
Value: float64(t),
},
}
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/logql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (r rangeAggregationExpr) extractor(override *grouping) (log.SampleExtractor
func (r rangeAggregationExpr) aggregator() (RangeVectorAggregator, error) {
switch r.operation {
case OpRangeTypeRate:
return rateLogs(r.left.interval), nil
return rateLogs(r.left.interval, r.left.unwrap != nil), nil
case OpRangeTypeCount:
return countOverTime, nil
case OpRangeTypeBytesRate:
Expand All @@ -111,9 +111,16 @@ func (r rangeAggregationExpr) aggregator() (RangeVectorAggregator, error) {
}

// rateLogs calculates the per-second rate of log lines.
func rateLogs(selRange time.Duration) func(samples []promql.Point) float64 {
func rateLogs(selRange time.Duration, computeValues bool) func(samples []promql.Point) float64 {
return func(samples []promql.Point) float64 {
return float64(len(samples)) / selRange.Seconds()
if !computeValues {
return float64(len(samples)) / selRange.Seconds()
}
var total float64
for _, p := range samples {
total += p.V
}
return total / selRange.Seconds()
}
}

Expand Down

0 comments on commit 4e7a123

Please sign in to comment.