Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

promtail: added action_on_failure support to timestamp stage #1123

Merged
merged 1 commit into from
Oct 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/clients/promtail/stages/timestamp.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ timestamp:

# IANA Timezone Database string.
[location: <string>]

# Which action should be taken in case the timestamp can't
# be extracted or parsed. Valid values are: [skip, fudge].
# Defaults to "fudge".
[action_on_failure: <string>]
```

The `format` field can be provided as an "example" of what timestamps look like
Expand Down Expand Up @@ -68,6 +73,15 @@ should be used in the custom format.
| Timezone offset | `-0700`, `-070000` (with seconds), `-07`, `07:00`, `-07:00:00` (with seconds) |
| Timezone ISO-8601 | `Z0700` (Z for UTC or time offset), `Z070000`, `Z07`, `Z07:00`, `Z07:00:00` |

The `action_on_failure` setting defines which action should be taken by the
stage in case the `source` field doesn't exist in the extracted data or the
timestamp parsing fails. The supported actions are:

- `fudge` (default): change the timestamp to the last known timestamp, summing
up 1 nanosecond (to guarantee log entries ordering)
- `skip`: do not change the timestamp and keep the time when the log entry has
been scraped by Promtail

## Examples

```yaml
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/gorilla/websocket v1.4.0
github.com/grpc-ecosystem/grpc-gateway v1.9.6 // indirect
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/hashicorp/golang-lru v0.5.3
github.com/hpcloud/tail v1.0.0
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af
github.com/json-iterator/go v1.1.7
Expand Down
10 changes: 4 additions & 6 deletions pkg/logentry/stages/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, erro
}},
PipelineStage{
StageTypeTimestamp: TimestampConfig{
"timestamp",
RFC3339Nano,
nil,
Source: "timestamp",
Format: RFC3339Nano,
}},
PipelineStage{
StageTypeOutput: OutputConfig{
Expand All @@ -51,9 +50,8 @@ func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error)
},
PipelineStage{
StageTypeTimestamp: TimestampConfig{
"time",
RFC3339Nano,
nil,
Source: "time",
Format: RFC3339Nano,
},
},
PipelineStage{
Expand Down
149 changes: 122 additions & 27 deletions pkg/logentry/stages/timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/util"
lru "github.com/hashicorp/golang-lru"
"github.com/mitchellh/mapstructure"
"github.com/prometheus/common/model"
)
Expand All @@ -17,17 +19,33 @@ const (
ErrTimestampSourceRequired = "timestamp source value is required if timestamp is specified"
ErrTimestampFormatRequired = "timestamp format is required"
ErrInvalidLocation = "invalid location specified: %v"
ErrInvalidActionOnFailure = "invalid action on failure (supported values are %v)"
ErrTimestampSourceMissing = "extracted data did not contain a timestamp"
ErrTimestampConversionFailed = "failed to convert extracted time to string"
ErrTimestampParsingFailed = "failed to parse time"

Unix = "Unix"
UnixMs = "UnixMs"
UnixNs = "UnixNs"

TimestampActionOnFailureSkip = "skip"
TimestampActionOnFailureFudge = "fudge"
TimestampActionOnFailureDefault = TimestampActionOnFailureFudge

// Maximum number of "streams" for which we keep the last known timestamp
maxLastKnownTimestampsCacheSize = 10000
)

var (
TimestampActionOnFailureOptions = []string{TimestampActionOnFailureSkip, TimestampActionOnFailureFudge}
)

// TimestampConfig configures timestamp extraction
type TimestampConfig struct {
Source string `mapstructure:"source"`
Format string `mapstructure:"format"`
Location *string `mapstructure:"location"`
Source string `mapstructure:"source"`
Format string `mapstructure:"format"`
Location *string `mapstructure:"location"`
ActionOnFailure *string `mapstructure:"action_on_failure"`
}

// parser can convert the time string into a time.Time value
Expand All @@ -52,8 +70,17 @@ func validateTimestampConfig(cfg *TimestampConfig) (parser, error) {
return nil, fmt.Errorf(ErrInvalidLocation, err)
}
}
return convertDateLayout(cfg.Format, loc), nil

// Validate the action on failure and enforce the default
if cfg.ActionOnFailure == nil {
cfg.ActionOnFailure = util.StringRef(TimestampActionOnFailureDefault)
} else {
if !util.StringSliceContains(TimestampActionOnFailureOptions, *cfg.ActionOnFailure) {
return nil, fmt.Errorf(ErrInvalidActionOnFailure, TimestampActionOnFailureOptions)
}
}

return convertDateLayout(cfg.Format, loc), nil
}

// newTimestampStage creates a new timestamp extraction pipeline stage.
Expand All @@ -67,49 +94,117 @@ func newTimestampStage(logger log.Logger, config interface{}) (*timestampStage,
if err != nil {
return nil, err
}

var lastKnownTimestamps *lru.Cache
if *cfg.ActionOnFailure == TimestampActionOnFailureFudge {
lastKnownTimestamps, err = lru.New(maxLastKnownTimestampsCacheSize)
if err != nil {
return nil, err
}
}

return &timestampStage{
cfgs: cfg,
logger: logger,
parser: parser,
cfg: cfg,
logger: logger,
parser: parser,
lastKnownTimestamps: lastKnownTimestamps,
}, nil
}

// timestampStage will set the timestamp using extracted data
type timestampStage struct {
cfgs *TimestampConfig
cfg *TimestampConfig
logger log.Logger
parser parser

// Stores the last known timestamp for a given "stream id" (guessed, since at this stage
// there's no reliable way to know it).
lastKnownTimestamps *lru.Cache
}

// Name implements Stage
func (ts *timestampStage) Name() string {
return StageTypeTimestamp
}

// Process implements Stage
func (ts *timestampStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
if ts.cfgs == nil {
if ts.cfg == nil {
return
}
if v, ok := extracted[ts.cfgs.Source]; ok {
s, err := getString(v)
if err != nil {
if Debug {
level.Debug(ts.logger).Log("msg", "failed to convert extracted time to string", "err", err, "type", reflect.TypeOf(v).String())
}

parsedTs, err := ts.parseTimestampFromSource(extracted)
if err != nil {
ts.processActionOnFailure(labels, t)
return
}

// Update the log entry timestamp with the parsed one
*t = *parsedTs

// The timestamp has been correctly parsed, so we should store it in the map
// containing the last known timestamp used by the "fudge" action on failure.
if *ts.cfg.ActionOnFailure == TimestampActionOnFailureFudge {
ts.lastKnownTimestamps.Add(labels.FastFingerprint(), *t)
}
}

func (ts *timestampStage) parseTimestampFromSource(extracted map[string]interface{}) (*time.Time, error) {
// Ensure the extracted data contains the timestamp source
v, ok := extracted[ts.cfg.Source]
if !ok {
if Debug {
level.Debug(ts.logger).Log("msg", ErrTimestampSourceMissing)
}

parsedTs, err := ts.parser(s)
if err != nil {
if Debug {
level.Debug(ts.logger).Log("msg", "failed to parse time", "err", err, "format", ts.cfgs.Format, "value", s)
}
} else {
*t = parsedTs
return nil, errors.New(ErrTimestampSourceMissing)
}

// Convert the timestamp source to string (if it's not a string yet)
s, err := getString(v)
if err != nil {
if Debug {
level.Debug(ts.logger).Log("msg", ErrTimestampConversionFailed, "err", err, "type", reflect.TypeOf(v).String())
}
} else {

return nil, errors.New(ErrTimestampConversionFailed)
}

// Parse the timestamp source according to the configured format
parsedTs, err := ts.parser(s)
if err != nil {
if Debug {
level.Debug(ts.logger).Log("msg", "extracted data did not contain a timestamp")
level.Debug(ts.logger).Log("msg", ErrTimestampParsingFailed, "err", err, "format", ts.cfg.Format, "value", s)
}

return nil, errors.New(ErrTimestampParsingFailed)
}

return &parsedTs, nil
}

// Name implements Stage
func (ts *timestampStage) Name() string {
return StageTypeTimestamp
func (ts *timestampStage) processActionOnFailure(labels model.LabelSet, t *time.Time) {
switch *ts.cfg.ActionOnFailure {
case TimestampActionOnFailureFudge:
ts.processActionOnFailureFudge(labels, t)
case TimestampActionOnFailureSkip:
// Nothing to do
}
}

func (ts *timestampStage) processActionOnFailureFudge(labels model.LabelSet, t *time.Time) {
labelsFingerprint := labels.FastFingerprint()
lastTimestamp, ok := ts.lastKnownTimestamps.Get(labelsFingerprint)

// If the last known timestamp is unknown (ie. has not been successfully parsed yet)
// there's nothing we can do, so we're going to keep the current timestamp
if !ok {
return
}

// Fudge the timestamp
*t = lastTimestamp.(time.Time).Add(1 * time.Nanosecond)

// Store the fudged timestamp, so that a subsequent fudged timestamp will be 1ns after it
ts.lastKnownTimestamps.Add(labelsFingerprint, *t)
}
Loading