Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connector/exceptions] Add support for exemplars in exceptionsconnector #31819

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/exceptionsconnector_add_exemplars.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exceptionsconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for exemplars in exceptionsconnector

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [24409]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
3 changes: 3 additions & 0 deletions connector/exceptionsconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ The following settings can be optionally configured:

The provided default config includes `exception.type` and `exception.message` as additional dimensions.

- `exemplars`: Use to configure how to attach exemplars to metrics.
- `enabled` (default: `false`): enabling will add spans as Exemplars.

## Examples

The following is a simple example usage of the `exceptions` connector.
Expand Down
6 changes: 6 additions & 0 deletions connector/exceptionsconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ type Dimension struct {
Default *string `mapstructure:"default"`
}

type ExemplarsConfig struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might not need the suffix Config here?

Enabled bool `mapstructure:"enabled"`
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
}

// Config defines the configuration options for exceptionsconnector
type Config struct {
// Dimensions defines the list of additional dimensions on top of the provided:
Expand All @@ -24,6 +28,8 @@ type Config struct {
// The dimensions will be fetched from the span's attributes. Examples of some conventionally used attributes:
// https://github.com/open-telemetry/opentelemetry-collector/blob/main/model/semconv/opentelemetry.go.
Dimensions []Dimension `mapstructure:"dimensions"`
// Exemplars defines the configuration for exemplars.
Exemplars ExemplarsConfig `mapstructure:"exemplars"`
}

var _ component.ConfigValidator = (*Config)(nil)
Expand Down
3 changes: 3 additions & 0 deletions connector/exceptionsconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ func TestLoadConfig(t *testing.T) {
{Name: exceptionTypeKey},
{Name: exceptionMessageKey},
},
Exemplars: ExemplarsConfig{
Enabled: false,
},
},
},
}
Expand Down
57 changes: 38 additions & 19 deletions connector/exceptionsconnector/connector_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,18 @@
component.StartFunc
component.ShutdownFunc

exceptions map[string]*excVal
exceptions map[string]*exception

logger *zap.Logger

// The starting time of the data points.
startTimestamp pcommon.Timestamp
}

type excVal struct {
count int
attrs pcommon.Map
type exception struct {
count int
attrs pcommon.Map
exemplars pmetric.ExemplarSlice
}

func newMetricsConnector(logger *zap.Logger, config component.Config) *metricsConnector {
Expand All @@ -59,7 +60,7 @@
dimensions: newDimensions(cfg.Dimensions),
keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
startTimestamp: pcommon.NewTimestampFromTime(time.Now()),
exceptions: make(map[string]*excVal),
exceptions: make(map[string]*exception),
}
}

Expand Down Expand Up @@ -95,7 +96,8 @@
key := c.keyBuf.String()

attrs := buildDimensionKVs(c.dimensions, serviceName, span, eventAttrs)
c.addException(key, attrs)
exc := c.addException(key, attrs)
c.addExemplar(exc, span.TraceID(), span.SpanID())
}
}
}
Expand Down Expand Up @@ -132,28 +134,45 @@
dps := mCalls.Sum().DataPoints()
dps.EnsureCapacity(len(c.exceptions))
timestamp := pcommon.NewTimestampFromTime(time.Now())
for _, val := range c.exceptions {
dpCalls := dps.AppendEmpty()
dpCalls.SetStartTimestamp(c.startTimestamp)
dpCalls.SetTimestamp(timestamp)

dpCalls.SetIntValue(int64(val.count))

val.attrs.CopyTo(dpCalls.Attributes())
for _, exc := range c.exceptions {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(c.startTimestamp)
dp.SetTimestamp(timestamp)
dp.SetIntValue(int64(exc.count))
for i := 0; i < exc.exemplars.Len(); i++ {
exc.exemplars.At(i).SetTimestamp(timestamp)
}
exc.exemplars.CopyTo(dp.Exemplars())
marctc marked this conversation as resolved.
Show resolved Hide resolved
exc.attrs.CopyTo(dp.Attributes())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marctc - Under heavy load conditions or in case of an application gone into some sort of crash loop, could exc.exemplars possibly have too many entries to copy?
The situation may be worse if the end users add custom dimensions that can possibly have large number of unique measurements (cardinality)
So, I wonder if it could probably overwhelm the system (or in worst case, panic) in the CopyTo operation.

I see that spans-to-metrics connector has some safeguard mechanisms like max_per_data_point (along with dimensions_cache_size and resource_metrics_cache_size)

https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/connector/spanmetricsconnector/config.go#L93

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think for the scope of this PR is necessary or should we address it in a follow up PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do a follow up PR but I got a bit curious and tried to run an overnight stress test after patching this change in my local setup. The collectors crashed multiple times. My recommendation would be to restrict just one exemplar per data point for now.

panic: runtime error: index out of range [340043] with length 340043

goroutine 70 [running]:
go.opentelemetry.io/collector/pdata/pmetric.ExemplarSlice.CopyTo({0xc02cf84f90?, 0xc02cf3e72c?}, {0xc14ea5f7d8?, 0xc05ab969b0?})
	go.opentelemetry.io/collector/pdata@v1.4.0/pmetric/generated_exemplarslice.go:134 +0x12d
github.com/open-telemetry/opentelemetry-collector-contrib/connector/exceptionsconnector.(*metricsConnector).collectExceptions(0xc000c62900, {0xc1363f6690?, 0xc05ab969b0?})
	github.com/open-telemetry/opentelemetry-collector-contrib/connector/exceptionsconnector@v0.96.0/connector_metrics.go:161 +0x3cb
github.com/open-telemetry/opentelemetry-collector-contrib/connector/exceptionsconnector.(*metricsConnector).exportMetrics(0xc000c62900, {0x3530320, 0xc001171da0})
	github.com/open-telemetry/opentelemetry-collector-contrib/connector/exceptionsconnector@v0.96.0/connector_metrics.go:131 +0x25f
github.com/open-telemetry/opentelemetry-collector-contrib/connector/exceptionsconnector.(*metricsConnector).ConsumeTraces(0xc000c62900, {0x3530320, 0xc001171da0}, {0xc026bbd308?, 0xc002c47498?})
	github.com/open-telemetry/opentelemetry-collector-contrib/connector/exceptionsconnector@v0.96.0/connector_metrics.go:122 +0x14b
go.opentelemetry.io/collector/internal/fanoutconsumer.(*tracesConsumer).ConsumeTraces(0xc001171cb0?, {0x3530320, 0xc001171da0}, {0xc026bbd308?, 0xc002c47498?})

}
return nil
}

func (c *metricsConnector) addException(excKey string, attrs pcommon.Map) {
func (c *metricsConnector) addException(excKey string, attrs pcommon.Map) *exception {
exc, ok := c.exceptions[excKey]
if !ok {
c.exceptions[excKey] = &excVal{
count: 1,
attrs: attrs,
c.exceptions[excKey] = &exception{
count: 1,
attrs: attrs,
exemplars: pmetric.NewExemplarSlice(),
}
return
return c.exceptions[excKey]
}
exc.count++
return exc
}

func (c *metricsConnector) addExemplar(exc *exception, traceID pcommon.TraceID, spanID pcommon.SpanID) {
if !c.config.Exemplars.Enabled {
return
}

Check warning on line 168 in connector/exceptionsconnector/connector_metrics.go

View check run for this annotation

Codecov / codecov/patch

connector/exceptionsconnector/connector_metrics.go#L167-L168

Added lines #L167 - L168 were not covered by tests
if traceID.IsEmpty() {
return
}

Check warning on line 171 in connector/exceptionsconnector/connector_metrics.go

View check run for this annotation

Codecov / codecov/patch

connector/exceptionsconnector/connector_metrics.go#L170-L171

Added lines #L170 - L171 were not covered by tests
e := exc.exemplars.AppendEmpty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this Append result in an unbounded growth and that in fact might be the root cause for the panic I noticed in my overnight test.
So, an alternate approach could be to reset exc.exemplars to a new slice after exc.exemplars.CopyTo(dp.Exemplars())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could just add that count mechanism that spansmetricsconnector is using, wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello Marc - That should be good IMO. However, what do you think of resetting exc.exemplars back to pmetric.NewExemplarSlice() after doing CopyTo
I guess I do not understand the need to hang on to older exemplars post that CopyTo operation. Can you please help me understand?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are totally right, what about now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me Marc. Thank you.

e.SetTraceID(traceID)
e.SetSpanID(spanID)
e.SetDoubleValue(float64(exc.count))
}

func buildDimensionKVs(dimensions []dimension, serviceName string, span ptrace.Span, eventAttrs pcommon.Map) pcommon.Map {
Expand Down
10 changes: 10 additions & 0 deletions connector/exceptionsconnector/connector_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ func newTestMetricsConnector(mcon consumer.Metrics, defaultNullValue *string, lo
{exceptionTypeKey, nil},
{exceptionMessageKey, nil},
},
Exemplars: ExemplarsConfig{
Enabled: true,
},
}
c := newMetricsConnector(logger, cfg)
c.metricsConsumer = mcon
Expand Down Expand Up @@ -174,6 +177,13 @@ func verifyConsumeMetricsInput(t testing.TB, input pmetric.Metrics, numCumulativ
assert.NotZero(t, dp.StartTimestamp(), "StartTimestamp should be set")
assert.NotZero(t, dp.Timestamp(), "Timestamp should be set")
verifyMetricLabels(dp, t, seenMetricIDs)

assert.Equal(t, numCumulativeConsumptions, dp.Exemplars().Len())
exemplar := dp.Exemplars().At(0)
assert.NotZero(t, exemplar.Timestamp())
assert.NotZero(t, exemplar.TraceID())
assert.NotZero(t, exemplar.SpanID())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be a good idea to add a benchmark test ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's already a benchmark (BenchmarkConnectorConsumeTraces). Do you have anything in mind exactly to benchmark?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking, digging Exception events in the real world workload, sort of requires deep-packet inspection and may be we add a test that essentially find 1 in 10 traces with baggage containing exception (say, as one of the 5 span event entries) and see how fast can we generate metrics and/or log events.
But I am goo with current change. I think its functionally correct and pretty awesome work. Thank You for doing this Marc.

}
return true
}
Expand Down
Loading