diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 0dad93f14a46..735ae462192c 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -1434,9 +1434,6 @@ func TestIntegration_OrderedKeys_SubscriptionOrdering(t *testing.T) { msg.Ack() atomic.AddInt32(&numAcked, 1) }) - if sub.enableOrdering != enableMessageOrdering { - t.Fatalf("enableOrdering mismatch: got: %v, want: %v", sub.enableOrdering, enableMessageOrdering) - } // If the messages were received on a subscription with the EnableMessageOrdering=true, // total processing would exceed the timeout and only one message would be processed. if numAcked < 2 { diff --git a/pubsub/iterator.go b/pubsub/iterator.go index ca3069a0cd89..8e3155dca883 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -96,6 +96,12 @@ type messageIterator struct { eoMu sync.RWMutex enableExactlyOnceDelivery bool sendNewAckDeadline bool + + orderingMu sync.RWMutex + // enableOrdering determines if messages should be processed in order. This is populated + // by the response in StreamingPull and can change mid Receive. Must be accessed + // with the lock held. + enableOrdering bool } // newMessageIterator starts and returns a new messageIterator. @@ -352,12 +358,30 @@ func (it *messageIterator) recvMessages() ([]*pb.ReceivedMessage, error) { if err != nil { return nil, err } - it.eoMu.Lock() - if got := res.GetSubscriptionProperties().GetExactlyOnceDeliveryEnabled(); got != it.enableExactlyOnceDelivery { + + // If the new exactly once settings are different than the current settings, update it. + it.eoMu.RLock() + enableEOD := it.enableExactlyOnceDelivery + it.eoMu.RUnlock() + + subProp := res.GetSubscriptionProperties() + if got := subProp.GetExactlyOnceDeliveryEnabled(); got != enableEOD { + it.eoMu.Lock() it.sendNewAckDeadline = true it.enableExactlyOnceDelivery = got + it.eoMu.Unlock() + } + + // Also update the subscriber's ordering setting if stale. + it.orderingMu.RLock() + enableOrdering := it.enableOrdering + it.orderingMu.RUnlock() + + if got := subProp.GetMessageOrderingEnabled(); got != enableOrdering { + it.orderingMu.Lock() + it.enableOrdering = got + it.orderingMu.Unlock() } - it.eoMu.Unlock() return res.ReceivedMessages, nil } diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 64097febe5d4..15e72e55a44b 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -50,8 +50,6 @@ type Subscription struct { mu sync.Mutex receiveActive bool - - enableOrdering bool } // Subscription creates a reference to a subscription. @@ -1238,8 +1236,6 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes s.mu.Unlock() defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }() - s.checkOrdering(ctx) - // TODO(hongalex): move settings check to a helper function to make it more testable maxCount := s.ReceiveSettings.MaxOutstandingMessages if maxCount == 0 { @@ -1392,11 +1388,14 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes iter.eoMu.RUnlock() wg.Add(1) - // Make sure the subscription has ordering enabled before adding to scheduler. + // Only schedule messages in order if an ordering key is present and the subscriber client + // received the ordering flag from a Streaming Pull response. var key string - if s.enableOrdering { + iter.orderingMu.RLock() + if iter.enableOrdering { key = msg.OrderingKey } + iter.orderingMu.RUnlock() msgLen := len(msg.Data) if err := sched.Add(key, msg, func(msg interface{}) { defer wg.Done() @@ -1436,20 +1435,6 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes return group.Wait() } -// checkOrdering calls Config to check theEnableMessageOrdering field. -// If this call fails (e.g. because the service account doesn't have -// the roles/viewer or roles/pubsub.viewer role) we will assume -// EnableMessageOrdering to be true. -// See: https://github.com/googleapis/google-cloud-go/issues/3884 -func (s *Subscription) checkOrdering(ctx context.Context) { - cfg, err := s.Config(ctx) - if err != nil { - s.enableOrdering = true - } else { - s.enableOrdering = cfg.EnableMessageOrdering - } -} - type pullOptions struct { maxExtension time.Duration // the maximum time to extend a message's ack deadline in total maxExtensionPeriod time.Duration // the maximum time to extend a message's ack deadline per modack rpc