Skip to content

Commit

Permalink
[fix][txn] Fix the transaction acknowledgement and send logic for chu…
Browse files Browse the repository at this point in the history
…nk message (#1069)

Master #1060
### Motivation
1. For the chunk message, we only register the send operation once but end the send operation multiple times when receiving the send response. It will make the transaction can be committed before all the operations are completed.
2. When we use transaction ack for chunk messages, the provided transaction is ignored, resulting in the chunk message actually being acknowledged using the non-transactional ack method.
### Modifications
1. Only end the send operation when receive the last chunk message.
2. Add the check for the transaction when the massage is a chunk message.
  • Loading branch information
liangyepianzhou authored Jul 26, 2023
1 parent 4bfd4aa commit 59ef32b
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 4 deletions.
17 changes: 15 additions & 2 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,10 @@ func (pc *partitionConsumer) ackIDCommon(msgID MessageID, withResponse bool, txn
}

if cmid, ok := msgID.(*chunkMessageID); ok {
return pc.unAckChunksTracker.ack(cmid)
if txn == nil {
return pc.unAckChunksTracker.ack(cmid)
}
return pc.unAckChunksTracker.ackWithTxn(cmid, txn)
}

trackingID := toTrackingMessageID(msgID)
Expand Down Expand Up @@ -2212,9 +2215,19 @@ func (u *unAckChunksTracker) remove(cmid *chunkMessageID) {
}

func (u *unAckChunksTracker) ack(cmid *chunkMessageID) error {
return u.ackWithTxn(cmid, nil)
}

func (u *unAckChunksTracker) ackWithTxn(cmid *chunkMessageID, txn Transaction) error {
ids := u.get(cmid)
for _, id := range ids {
if err := u.pc.AckID(id); err != nil {
var err error
if txn == nil {
err = u.pc.AckID(id)
} else {
err = u.pc.AckIDWithTxn(id, txn)
}
if err != nil {
return err
}
}
Expand Down
101 changes: 99 additions & 2 deletions pulsar/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,14 @@ func TestTransactionAbort(t *testing.T) {
// Abort the transaction.
_ = txn.Abort(context.Background())

consumerShouldNotReceiveMessage(t, consumer)

// Clean up: Close the consumer and producer instances.
consumer.Close()
producer.Close()
}

func consumerShouldNotReceiveMessage(t *testing.T, consumer Consumer) {
// Expectation: The consumer should not receive any messages.
done := make(chan struct{})
go func() {
Expand All @@ -438,8 +446,97 @@ func TestTransactionAbort(t *testing.T) {
require.Fail(t, "The consumer should not receive any messages")
case <-time.After(time.Second):
}
}

// Clean up: Close the consumer and producer instances.
func TestAckChunkMessage(t *testing.T) {
topic := newTopicName()
sub := "my-sub"

// Prepare: Create PulsarClient and initialize the transaction coordinator client.
_, client := createTcClient(t)

// Create transaction and register the send operation.
txn, err := client.NewTransaction(time.Hour)
require.Nil(t, err)
txn.(*transaction).registerSendOrAckOp()

// Create a producer with chunking enabled to send a large message that will be split into chunks.
producer, err := client.CreateProducer(ProducerOptions{
Name: "test",
Topic: topic,
EnableChunking: true,
DisableBatching: true,
})
require.NoError(t, err)
require.NotNil(t, producer)
defer producer.Close()

// Subscribe to the consumer.
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
Type: Exclusive,
SubscriptionName: sub,
})
require.NoError(t, err)
defer consumer.Close()

// Send a large message that will be split into chunks.
msgID, err := producer.Send(context.Background(), &ProducerMessage{
Transaction: txn,
Payload: createTestMessagePayload(_brokerMaxMessageSize),
})
require.NoError(t, err)
_, ok := msgID.(*chunkMessageID)
require.True(t, ok)

err = txn.Commit(context.Background())
require.Nil(t, err)

// Receive the message using a new transaction and ack it.
txn2, err := client.NewTransaction(time.Hour)
require.Nil(t, err)
message, err := consumer.Receive(context.Background())
require.Nil(t, err)

err = consumer.AckWithTxn(message, txn2)
require.Nil(t, err)

txn2.Abort(context.Background())

// Close the consumer to simulate reconnection and receive the same message again.
consumer.Close()
producer.Close()

// Subscribe to the consumer again.
consumer, err = client.Subscribe(ConsumerOptions{
Topic: topic,
Type: Exclusive,
SubscriptionName: sub,
})
require.Nil(t, err)
message, err = consumer.Receive(context.Background())
require.Nil(t, err)
require.NotNil(t, message)

// Create a new transaction and ack the message again.
txn3, err := client.NewTransaction(time.Hour)
require.Nil(t, err)

err = consumer.AckWithTxn(message, txn3)
require.Nil(t, err)

// Commit the third transaction.
err = txn3.Commit(context.Background())
require.Nil(t, err)

// Close the consumer again.
consumer.Close()

// Subscribe to the consumer again and verify that no message is received.
consumer, err = client.Subscribe(ConsumerOptions{
Topic: topic,
Type: Exclusive,
SubscriptionName: sub,
})
require.Nil(t, err)
consumerShouldNotReceiveMessage(t, consumer)
}

0 comments on commit 59ef32b

Please sign in to comment.