From 7f3f042c7e4da26f45bc5aec980957f90dc2bf3c Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Mon, 1 Jul 2024 07:35:07 -0700 Subject: [PATCH] [otelarrowreceiver] Add a test for graceful shutdown (#33640) **Description:** Adds a test. **Link to tracking Issue:** #33581 **Testing:** The testbed expects a very fast graceful shutdown. This test shows it will work. **Documentation:** N/A --- receiver/otelarrowreceiver/otelarrow_test.go | 124 ++++++++++++++++++- 1 file changed, 123 insertions(+), 1 deletion(-) diff --git a/receiver/otelarrowreceiver/otelarrow_test.go b/receiver/otelarrowreceiver/otelarrow_test.go index 6d2130ca4123..a71f4c56714b 100644 --- a/receiver/otelarrowreceiver/otelarrow_test.go +++ b/receiver/otelarrowreceiver/otelarrow_test.go @@ -37,6 +37,9 @@ import ( "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/mock/gomock" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" "golang.org/x/net/http2/hpack" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -232,7 +235,7 @@ func newReceiver(t *testing.T, factory receiver.Factory, settings component.Tele type senderFunc func(td ptrace.Traces) -func TestShutdown(t *testing.T) { +func TestStandardShutdown(t *testing.T) { endpointGrpc := testutil.GetAvailableLocalAddress(t) nextSink := new(consumertest.TracesSink) @@ -295,6 +298,125 @@ func TestShutdown(t *testing.T) { assert.EqualValues(t, sinkSpanCountAfterShutdown, nextSink.SpanCount()) } +func TestOTelArrowShutdown(t *testing.T) { + // In the cooperative test, the client calls CloseSend() with no + // keepalive set. In the non-cooperative case, the keepalive ends + // the stream. + for _, cooperative := range []bool{true, false} { + t.Run(fmt.Sprint("cooperative=", cooperative), func(t *testing.T) { + ctx := context.Background() + + endpointGrpc := testutil.GetAvailableLocalAddress(t) + + nextSink := new(consumertest.TracesSink) + + // Create OTelArrow receiver + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.GRPC.Keepalive = &configgrpc.KeepaliveServerConfig{ + ServerParameters: &configgrpc.KeepaliveServerParameters{}, + } + // Note that keepalive parameters are set very high + if !cooperative { + cfg.GRPC.Keepalive.ServerParameters.MaxConnectionAge = time.Second + cfg.GRPC.Keepalive.ServerParameters.MaxConnectionAgeGrace = 5 * time.Second + } + cfg.GRPC.NetAddr.Endpoint = endpointGrpc + set := receivertest.NewNopSettings() + core, obslogs := observer.New(zapcore.DebugLevel) + set.TelemetrySettings.Logger = zap.New(core) + + set.ID = testReceiverID + r, err := NewFactory().CreateTracesReceiver( + ctx, + set, + cfg, + nextSink) + require.NoError(t, err) + require.NotNil(t, r) + require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) + + conn, err := grpc.NewClient(endpointGrpc, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer conn.Close() + + doneSignalGrpc := make(chan bool) + + client := arrowpb.NewArrowTracesServiceClient(conn) + stream, err := client.ArrowTraces(ctx, grpc.WaitForReady(true)) + require.NoError(t, err) + producer := arrowRecord.NewProducer() + defer func() { + require.NoError(t, conn.Close()) + }() + + start := time.Now() + var once sync.Once + + // Send traces to the receiver until we signal via done channel, and then + // send one more trace after that. + go generateTraces(func(td ptrace.Traces) { + if time.Since(start) > 5*time.Second { + once.Do(func() { + if cooperative { + require.NoError(t, stream.CloseSend()) + } + }) + return + } + batch, batchErr := producer.BatchArrowRecordsFromTraces(td) + require.NoError(t, batchErr) + require.NoError(t, stream.Send(batch)) + }, doneSignalGrpc) + + // Wait until the receiver outputs anything to the sink. + assert.Eventually(t, func() bool { + return nextSink.SpanCount() > 0 + }, time.Second, 10*time.Millisecond) + + // Now shutdown the receiver, while continuing sending traces to it. + // Note that gRPC GracefulShutdown() does not actually use the context + // for cancelation. + err = r.Shutdown(context.Background()) + assert.NoError(t, err) + + // Remember how many spans the sink received. This number should not change after this + // point because after Shutdown() returns the component is not allowed to produce + // any more data. + sinkSpanCountAfterShutdown := nextSink.SpanCount() + + // Now signal to generateTraces to exit the main generation loop, then send + // one more trace and stop. + doneSignalGrpc <- true + + // Wait until all follow up traces are sent. + <-doneSignalGrpc + + // The last, additional trace should not be received by sink, so the number of spans in + // the sink should not change. + assert.EqualValues(t, sinkSpanCountAfterShutdown, nextSink.SpanCount()) + + shutdownCause := "" + scanLogs: + for _, log := range obslogs.All() { + if log.Message == "arrow stream shutdown" { + for _, f := range log.Context { + if f.Key == "message" { + shutdownCause = f.String + break scanLogs + } + } + } + } + if cooperative { + assert.Equal(t, "EOF", shutdownCause) + } else { + assert.Equal(t, "context canceled", shutdownCause) + } + }) + } +} + func generateTraces(senderFn senderFunc, doneSignal chan bool) { // Continuously generate spans until signaled to stop. loop: