Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove concurrent batch processor in-flight bytes #247

Merged
merged 6 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
## Unreleased

- Concurrent batch processor: tracing improvements. [#238](https://github.com/open-telemetry/otel-arrow/pull/238), [#241](https://github.com/open-telemetry/otel-arrow/pull/241)
- Concurrent batch processor: support disabling in-flight limits. [#243](https://github.com/open-telemetry/otel-arrow/pull/243)
- Update to latest OTel-Collector & OTel-Go dependencies. Remove collector packages now included in collector-contrib/internal/otelarrow. [#245](https://github.com/open-telemetry/otel-arrow/pull/245)
- Concurrent batch processor: remove support for in-flight limits. [#247](https://github.com/open-telemetry/otel-arrow/pull/248)

## [0.25.0](https://github.com/open-telemetry/otel-arrow/releases/tag/v0.24.0) - 2024-07-17

Expand Down
7 changes: 1 addition & 6 deletions collector/processor/concurrentbatchprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ The differences in this component, relative to that component are:

1. Synchronous pipeline support: this component blocks each producer
until the request returns with success or an error status code.
2. Maximim in-flight-bytes setting. This component measures the
in-memory size of each request it admits to the pipeline and
otherwise stalls requests until they timeout. This function is
disabled by `max_in_flight_size_mib: 0`.
3. Unlimited concurrency: this component will start as many goroutines
2. Unlimited concurrency: this component will start as many goroutines
as needed to send batches through the pipeline.

Here is an example configuration:
Expand All @@ -22,7 +18,6 @@ Here is an example configuration:
send_batch_max_size: 1500
send_batch_size: 1000
timeout: 1s
max_in_flight_size_mib: 128
```

In this configuration, the component will admit up to 128MiB of
Expand Down
22 changes: 0 additions & 22 deletions collector/processor/concurrentbatchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -70,10 +69,6 @@ type batchProcessor struct {
// batcher will be either *singletonBatcher or *multiBatcher
batcher batcher

// in-flight bytes limit mechanism
limitBytes int64
sem *semaphore.Weighted

tracer trace.TracerProvider
}

Expand Down Expand Up @@ -171,8 +166,6 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat
}
sort.Strings(mks)

limitBytes := int64(cfg.MaxInFlightSizeMiB) << 20

tp := set.TelemetrySettings.TracerProvider
if tp == nil {
tp = otel.GetTracerProvider()
Expand All @@ -188,14 +181,9 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat
shutdownC: make(chan struct{}, 1),
metadataKeys: mks,
metadataLimit: int(cfg.MetadataCardinalityLimit),
limitBytes: limitBytes,
tracer: tp,
}

if limitBytes != 0 {
bp.sem = semaphore.NewWeighted(limitBytes)
}

if len(bp.metadataKeys) == 0 {
bp.batcher = &singleShardBatcher{batcher: bp.newShard(nil)}
} else {
Expand Down Expand Up @@ -459,9 +447,6 @@ func allSame(x []context.Context) bool {

func (bp *batchProcessor) countAcquire(ctx context.Context, bytes int64) error {
var err error
if bp.sem != nil {
err = bp.sem.Acquire(ctx, bytes)
}
if err == nil && bp.telemetry.batchInFlightBytes != nil {
bp.telemetry.batchInFlightBytes.Add(ctx, bytes, bp.telemetry.processorAttrOption)
}
Expand All @@ -472,9 +457,6 @@ func (bp *batchProcessor) countRelease(bytes int64) {
if bp.telemetry.batchInFlightBytes != nil {
bp.telemetry.batchInFlightBytes.Add(context.Background(), -bytes, bp.telemetry.processorAttrOption)
}
if bp.sem != nil {
bp.sem.Release(bytes)
}
}

func (b *shard) consumeAndWait(ctx context.Context, data any) error {
Expand Down Expand Up @@ -502,10 +484,6 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error {
}
bytes := int64(b.batch.sizeBytes(data))

if bytes > b.processor.limitBytes {
return fmt.Errorf("request size exceeds max-in-flight bytes: %d", bytes)
}

err := b.processor.countAcquire(ctx, bytes)
if err != nil {
return err
Expand Down
Loading
Loading