Skip to content

Commit

Permalink
promtail: added action_on_failure support to timestamp stage
Browse files Browse the repository at this point in the history
  • Loading branch information
pracucci committed Oct 10, 2019
1 parent bf4530a commit 7645b12
Show file tree
Hide file tree
Showing 7 changed files with 327 additions and 33 deletions.
14 changes: 14 additions & 0 deletions docs/clients/promtail/stages/timestamp.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ timestamp:

# IANA Timezone Database string.
[location: <string>]

# Which action should be taken in case the timestamp can't
# be extracted or parsed. Valid values are: [skip, fudge].
# Defaults to "fudge".
[action_on_failure: <string>]
```
The `format` field can be provided as an "example" of what timestamps look like
Expand Down Expand Up @@ -68,6 +73,15 @@ should be used in the custom format.
| Timezone offset | `-0700`, `-070000` (with seconds), `-07`, `07:00`, `-07:00:00` (with seconds) |
| Timezone ISO-8601 | `Z0700` (Z for UTC or time offset), `Z070000`, `Z07`, `Z07:00`, `Z07:00:00` |

The `action_on_failure` setting defines which action should be taken by the
stage in case the `source` field doesn't exist in the extracted data or the
timestamp parsing fails. The supported actions are:

- `fudge` (default): change the timestamp to the last known timestamp, summing
up 1 nanosecond (to guarantee log entries ordering)
- `skip`: do not change the timestamp and keep the time when the log entry has
been scraped by Promtail

## Examples

```yaml
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/gorilla/websocket v1.4.0
github.com/grpc-ecosystem/grpc-gateway v1.9.6 // indirect
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/hashicorp/golang-lru v0.5.3
github.com/hpcloud/tail v1.0.0
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af
github.com/json-iterator/go v1.1.7
Expand Down
10 changes: 4 additions & 6 deletions pkg/logentry/stages/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, erro
}},
PipelineStage{
StageTypeTimestamp: TimestampConfig{
"timestamp",
RFC3339Nano,
nil,
Source: "timestamp",
Format: RFC3339Nano,
}},
PipelineStage{
StageTypeOutput: OutputConfig{
Expand All @@ -51,9 +50,8 @@ func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error)
},
PipelineStage{
StageTypeTimestamp: TimestampConfig{
"time",
RFC3339Nano,
nil,
Source: "time",
Format: RFC3339Nano,
},
},
PipelineStage{
Expand Down
149 changes: 122 additions & 27 deletions pkg/logentry/stages/timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/util"
lru "github.com/hashicorp/golang-lru"
"github.com/mitchellh/mapstructure"
"github.com/prometheus/common/model"
)
Expand All @@ -17,17 +19,33 @@ const (
ErrTimestampSourceRequired = "timestamp source value is required if timestamp is specified"
ErrTimestampFormatRequired = "timestamp format is required"
ErrInvalidLocation = "invalid location specified: %v"
ErrInvalidActionOnFailure = "invalid action on failure (supported values are %v)"
ErrTimestampSourceMissing = "extracted data did not contain a timestamp"
ErrTimestampConversionFailed = "failed to convert extracted time to string"
ErrTimestampParsingFailed = "failed to parse time"

Unix = "Unix"
UnixMs = "UnixMs"
UnixNs = "UnixNs"

TimestampActionOnFailureSkip = "skip"
TimestampActionOnFailureFudge = "fudge"
TimestampActionOnFailureDefault = TimestampActionOnFailureFudge

// Maximum number of "streams" for which we keep the last known timestamp
maxLastKnownTimestampsCacheSize = 10000
)

var (
TimestampActionOnFailureOptions = []string{TimestampActionOnFailureSkip, TimestampActionOnFailureFudge}
)

// TimestampConfig configures timestamp extraction
type TimestampConfig struct {
Source string `mapstructure:"source"`
Format string `mapstructure:"format"`
Location *string `mapstructure:"location"`
Source string `mapstructure:"source"`
Format string `mapstructure:"format"`
Location *string `mapstructure:"location"`
ActionOnFailure *string `mapstructure:"action_on_failure"`
}

// parser can convert the time string into a time.Time value
Expand All @@ -52,8 +70,17 @@ func validateTimestampConfig(cfg *TimestampConfig) (parser, error) {
return nil, fmt.Errorf(ErrInvalidLocation, err)
}
}
return convertDateLayout(cfg.Format, loc), nil

// Validate the action on failure and enforce the default
if cfg.ActionOnFailure == nil {
cfg.ActionOnFailure = util.StringRef(TimestampActionOnFailureDefault)
} else {
if !util.StringSliceContains(TimestampActionOnFailureOptions, *cfg.ActionOnFailure) {
return nil, fmt.Errorf(ErrInvalidActionOnFailure, TimestampActionOnFailureOptions)
}
}

return convertDateLayout(cfg.Format, loc), nil
}

// newTimestampStage creates a new timestamp extraction pipeline stage.
Expand All @@ -67,49 +94,117 @@ func newTimestampStage(logger log.Logger, config interface{}) (*timestampStage,
if err != nil {
return nil, err
}

var lastKnownTimestamps *lru.Cache
if *cfg.ActionOnFailure == TimestampActionOnFailureFudge {
lastKnownTimestamps, err = lru.New(maxLastKnownTimestampsCacheSize)
if err != nil {
return nil, err
}
}

return &timestampStage{
cfgs: cfg,
logger: logger,
parser: parser,
cfg: cfg,
logger: logger,
parser: parser,
lastKnownTimestamps: lastKnownTimestamps,
}, nil
}

// timestampStage will set the timestamp using extracted data
type timestampStage struct {
cfgs *TimestampConfig
cfg *TimestampConfig
logger log.Logger
parser parser

// Stores the last known timestamp for a given "stream id" (guessed, since at this stage
// there's no reliable way to know it).
lastKnownTimestamps *lru.Cache
}

// Name implements Stage
func (ts *timestampStage) Name() string {
return StageTypeTimestamp
}

// Process implements Stage
func (ts *timestampStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
if ts.cfgs == nil {
if ts.cfg == nil {
return
}
if v, ok := extracted[ts.cfgs.Source]; ok {
s, err := getString(v)
if err != nil {
if Debug {
level.Debug(ts.logger).Log("msg", "failed to convert extracted time to string", "err", err, "type", reflect.TypeOf(v).String())
}

parsedTs, err := ts.parseTimestampFromSource(extracted)
if err != nil {
ts.processActionOnFailure(labels, t)
return
}

// Update the log entry timestamp with the parsed one
*t = *parsedTs

// The timestamp has been correctly parsed, so we should store it in the map
// containing the last known timestamp used by the "fudge" action on failure.
if *ts.cfg.ActionOnFailure == TimestampActionOnFailureFudge {
ts.lastKnownTimestamps.Add(labels.FastFingerprint(), *t)
}
}

func (ts *timestampStage) parseTimestampFromSource(extracted map[string]interface{}) (*time.Time, error) {
// Ensure the extracted data contains the timestamp source
v, ok := extracted[ts.cfg.Source]
if !ok {
if Debug {
level.Debug(ts.logger).Log("msg", ErrTimestampSourceMissing)
}

parsedTs, err := ts.parser(s)
if err != nil {
if Debug {
level.Debug(ts.logger).Log("msg", "failed to parse time", "err", err, "format", ts.cfgs.Format, "value", s)
}
} else {
*t = parsedTs
return nil, errors.New(ErrTimestampSourceMissing)
}

// Convert the timestamp source to string (if it's not a string yet)
s, err := getString(v)
if err != nil {
if Debug {
level.Debug(ts.logger).Log("msg", ErrTimestampConversionFailed, "err", err, "type", reflect.TypeOf(v).String())
}
} else {

return nil, errors.New(ErrTimestampConversionFailed)
}

// Parse the timestamp source according to the configured format
parsedTs, err := ts.parser(s)
if err != nil {
if Debug {
level.Debug(ts.logger).Log("msg", "extracted data did not contain a timestamp")
level.Debug(ts.logger).Log("msg", ErrTimestampParsingFailed, "err", err, "format", ts.cfg.Format, "value", s)
}

return nil, errors.New(ErrTimestampParsingFailed)
}

return &parsedTs, nil
}

// Name implements Stage
func (ts *timestampStage) Name() string {
return StageTypeTimestamp
func (ts *timestampStage) processActionOnFailure(labels model.LabelSet, t *time.Time) {
switch *ts.cfg.ActionOnFailure {
case TimestampActionOnFailureFudge:
ts.processActionOnFailureFudge(labels, t)
case TimestampActionOnFailureSkip:
// Nothing to do
}
}

func (ts *timestampStage) processActionOnFailureFudge(labels model.LabelSet, t *time.Time) {
labelsFingerprint := labels.FastFingerprint()
lastTimestamp, ok := ts.lastKnownTimestamps.Get(labelsFingerprint)

// If the last known timestamp is unknown (ie. has not been successfully parsed yet)
// there's nothing we can do, so we're going to keep the current timestamp
if !ok {
return
}

// Fudge the timestamp
*t = lastTimestamp.(time.Time).Add(1 * time.Nanosecond)

// Store the fudged timestamp, so that a subsequent fudged timestamp will be 1ns after it
ts.lastKnownTimestamps.Add(labelsFingerprint, *t)
}
Loading

0 comments on commit 7645b12

Please sign in to comment.