Skip to content

Commit

Permalink
[otelarrowreceiver] Add a test for graceful shutdown (#33640)
Browse files Browse the repository at this point in the history
**Description:** Adds a test.

**Link to tracking Issue:** #33581 

**Testing:** The testbed expects a very fast graceful shutdown. This
test shows it will work.

**Documentation:** N/A
  • Loading branch information
jmacd committed Jul 1, 2024
1 parent daacb6f commit 7f3f042
Showing 1 changed file with 123 additions and 1 deletion.
124 changes: 123 additions & 1 deletion receiver/otelarrowreceiver/otelarrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ import (
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.uber.org/mock/gomock"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -232,7 +235,7 @@ func newReceiver(t *testing.T, factory receiver.Factory, settings component.Tele

type senderFunc func(td ptrace.Traces)

func TestShutdown(t *testing.T) {
func TestStandardShutdown(t *testing.T) {
endpointGrpc := testutil.GetAvailableLocalAddress(t)

nextSink := new(consumertest.TracesSink)
Expand Down Expand Up @@ -295,6 +298,125 @@ func TestShutdown(t *testing.T) {
assert.EqualValues(t, sinkSpanCountAfterShutdown, nextSink.SpanCount())
}

func TestOTelArrowShutdown(t *testing.T) {
// In the cooperative test, the client calls CloseSend() with no
// keepalive set. In the non-cooperative case, the keepalive ends
// the stream.
for _, cooperative := range []bool{true, false} {
t.Run(fmt.Sprint("cooperative=", cooperative), func(t *testing.T) {
ctx := context.Background()

endpointGrpc := testutil.GetAvailableLocalAddress(t)

nextSink := new(consumertest.TracesSink)

// Create OTelArrow receiver
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.GRPC.Keepalive = &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{},
}
// Note that keepalive parameters are set very high
if !cooperative {
cfg.GRPC.Keepalive.ServerParameters.MaxConnectionAge = time.Second
cfg.GRPC.Keepalive.ServerParameters.MaxConnectionAgeGrace = 5 * time.Second
}
cfg.GRPC.NetAddr.Endpoint = endpointGrpc
set := receivertest.NewNopSettings()
core, obslogs := observer.New(zapcore.DebugLevel)
set.TelemetrySettings.Logger = zap.New(core)

set.ID = testReceiverID
r, err := NewFactory().CreateTracesReceiver(
ctx,
set,
cfg,
nextSink)
require.NoError(t, err)
require.NotNil(t, r)
require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()))

conn, err := grpc.NewClient(endpointGrpc, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer conn.Close()

doneSignalGrpc := make(chan bool)

client := arrowpb.NewArrowTracesServiceClient(conn)
stream, err := client.ArrowTraces(ctx, grpc.WaitForReady(true))
require.NoError(t, err)
producer := arrowRecord.NewProducer()
defer func() {
require.NoError(t, conn.Close())
}()

start := time.Now()
var once sync.Once

// Send traces to the receiver until we signal via done channel, and then
// send one more trace after that.
go generateTraces(func(td ptrace.Traces) {
if time.Since(start) > 5*time.Second {
once.Do(func() {
if cooperative {
require.NoError(t, stream.CloseSend())
}
})
return
}
batch, batchErr := producer.BatchArrowRecordsFromTraces(td)
require.NoError(t, batchErr)
require.NoError(t, stream.Send(batch))
}, doneSignalGrpc)

// Wait until the receiver outputs anything to the sink.
assert.Eventually(t, func() bool {
return nextSink.SpanCount() > 0
}, time.Second, 10*time.Millisecond)

// Now shutdown the receiver, while continuing sending traces to it.
// Note that gRPC GracefulShutdown() does not actually use the context
// for cancelation.
err = r.Shutdown(context.Background())
assert.NoError(t, err)

// Remember how many spans the sink received. This number should not change after this
// point because after Shutdown() returns the component is not allowed to produce
// any more data.
sinkSpanCountAfterShutdown := nextSink.SpanCount()

// Now signal to generateTraces to exit the main generation loop, then send
// one more trace and stop.
doneSignalGrpc <- true

// Wait until all follow up traces are sent.
<-doneSignalGrpc

// The last, additional trace should not be received by sink, so the number of spans in
// the sink should not change.
assert.EqualValues(t, sinkSpanCountAfterShutdown, nextSink.SpanCount())

shutdownCause := ""
scanLogs:
for _, log := range obslogs.All() {
if log.Message == "arrow stream shutdown" {
for _, f := range log.Context {
if f.Key == "message" {
shutdownCause = f.String
break scanLogs
}
}
}
}
if cooperative {
assert.Equal(t, "EOF", shutdownCause)
} else {
assert.Equal(t, "context canceled", shutdownCause)
}
})
}
}

func generateTraces(senderFn senderFunc, doneSignal chan bool) {
// Continuously generate spans until signaled to stop.
loop:
Expand Down

0 comments on commit 7f3f042

Please sign in to comment.