From 6b8ad2f10e9fe0bb0c571f0ef4d15a16c3347f67 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Wed, 17 Jul 2024 10:13:20 -0700 Subject: [PATCH 1/2] Wrap the error used in concurrentbatchprocessor --- .../batch_processor.go | 4 ++ .../batch_processor_test.go | 52 +++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index b00a0e45..4247b569 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -154,6 +154,10 @@ func (ce countedError) Error() string { return fmt.Sprintf("batch error: %s", ce.err.Error()) } +func (ce countedError) Unwrap() error { + return ce.err +} + var _ consumer.Traces = (*batchProcessor)(nil) var _ consumer.Metrics = (*batchProcessor)(nil) var _ consumer.Logs = (*batchProcessor)(nil) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index 018f2535..d1310fe5 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -37,6 +37,21 @@ import ( "go.opentelemetry.io/otel/sdk/trace/tracetest" ) +type testError struct{} + +func (testError) Error() string { + return "test" +} + +func TestErrorWrapping(t *testing.T) { + e := countedError{ + err: fmt.Errorf("oops: %w", testError{}), + } + require.Error(t, e) + require.Contains(t, e.Error(), "oops: test") + require.ErrorIs(t, e, testError{}) +} + func TestProcessorShutdown(t *testing.T) { factory := NewFactory() @@ -1697,3 +1712,40 @@ func TestBatchProcessorEmptyBatch(t *testing.T) { wg.Wait() require.NoError(t, batcher.Shutdown(context.Background())) } + +type errorSink struct { + err error +} + +var _ consumer.Logs = errorSink{} + +func (es errorSink) Capabilities() consumer.Capabilities { + return consumer.Capabilities{} +} + +func (es errorSink) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + return es.err +} + +func TestErrorPropagation(t *testing.T) { + for _, proto := range []error{ + testError{}, + fmt.Errorf("womp"), + } { + sink := errorSink{err: proto} + + creationSet := processortest.NewNopSettings() + creationSet.MetricsLevel = configtelemetry.LevelDetailed + cfg := createDefaultConfig().(*Config) + batcher, err := newBatchLogsProcessor(creationSet, sink, cfg) + + require.NoError(t, err) + require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + + ld := testdata.GenerateLogs(1) + err = batcher.ConsumeLogs(context.Background(), ld) + assert.Error(t, err) + assert.ErrorIs(t, err, proto) + assert.Contains(t, err.Error(), proto.Error()) + } +} From 90c7d953f2b58527a84d99b024189511aa8843fc Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Wed, 17 Jul 2024 10:14:45 -0700 Subject: [PATCH 2/2] pr --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c7b7c08..9b691ec5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## Unreleased +- Wrap concurrentbatchprocessor errors [#235](https://github.com/open-telemetry/otel-arrow/pull/235) - Update to OTel-Collector v0.105.0, which includes the OTel-Arrow components. [#233](https://github.com/open-telemetry/otel-arrow/pull/233) - Remove the primary exporter/receiver components, update references and documentation. [#230](https://github.com/open-telemetry/otel-arrow/pull/230) - Update to Arrow v17. [#231](https://github.com/open-telemetry/otel-arrow/pull/231)