Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Producer] Refactor internalSend() and resouce managment #1071

Closed
wants to merge 11 commits into from
52 changes: 40 additions & 12 deletions pulsar/message_chunking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math/rand"
"net/http"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -178,9 +179,16 @@ func TestMaxPendingChunkMessages(t *testing.T) {
defer c.Close()
pc := c.(*consumer).consumers[0]

sendSingleChunk(producer, "0", 0, 2)
callbackOnce0 := &sync.Once{}
cr0 := newChunkRecorder()
msg0 := "chunk-0-0|chunk-0-1|"
callbackOnce1 := &sync.Once{}
cr1 := newChunkRecorder()
msg1 := "chunk-1-0|chunk-1-1|"

sendSingleChunk(producer, "0", 0, 2, msg0, callbackOnce0, cr0)
// MaxPendingChunkedMessage is 1, the chunked message with uuid 0 will be discarded
sendSingleChunk(producer, "1", 0, 2)
sendSingleChunk(producer, "1", 0, 2, msg1, callbackOnce1, cr1)

// chunkedMsgCtx with uuid 0 should be discarded
retryAssert(t, 3, 200, func() {}, func(t assert.TestingT) bool {
Expand All @@ -189,7 +197,7 @@ func TestMaxPendingChunkMessages(t *testing.T) {
return assert.Equal(t, 1, len(pc.chunkedMsgCtxMap.chunkedMsgCtxs))
})

sendSingleChunk(producer, "1", 1, 2)
sendSingleChunk(producer, "1", 1, 2, msg1, callbackOnce1, cr1)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
msg, err := c.Receive(ctx)
Expand All @@ -199,7 +207,7 @@ func TestMaxPendingChunkMessages(t *testing.T) {
assert.Equal(t, "chunk-1-0|chunk-1-1|", string(msg.Payload()))

// Ensure that the chunked message of uuid 0 is discarded.
sendSingleChunk(producer, "0", 1, 2)
sendSingleChunk(producer, "0", 1, 2, msg0, callbackOnce0, cr0)
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
msg, err = c.Receive(ctx)
cancel()
Expand Down Expand Up @@ -548,30 +556,50 @@ func createTestMessagePayload(size int) []byte {
}

//nolint:all
func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) {
func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int, wholePayload string, callbackOnce *sync.Once, cr *chunkRecorder) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add the parameterswholePayload callbackOnce and chunkRecorder ? I think it's more appropriate to make them inside thesendSingleChunk .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, in the current implemention, all the chunking sendRequest should share the whole message payload, callbackOnce, chunkRecorder, see internalSend() and addRequestToBatch(), I add these params just want to make the send procedure work correctly, I think it better to keep them.

msg := &ProducerMessage{
Payload: []byte(fmt.Sprintf("chunk-%s-%d|", uuid, chunkID)),
}
producerImpl := p.(*producer).producers[0].(*partitionProducer)
mm := producerImpl.genMetadata(msg, len(msg.Payload), time.Now())
mm := producerImpl.genMetadata(msg, len(wholePayload), time.Now())
mm.Uuid = proto.String(uuid)
mm.NumChunksFromMsg = proto.Int32(int32(totalChunks))
mm.TotalChunkMsgSize = proto.Int32(int32(len(msg.Payload)))
mm.TotalChunkMsgSize = proto.Int32(int32(len(wholePayload)))
mm.ChunkId = proto.Int32(int32(chunkID))
producerImpl.updateMetadataSeqID(mm, msg)

doneCh := make(chan struct{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this doneCh. The goal of this UT is to verify whether consumer can discard the oldest chunk message. It has no impact whether the callback of sendRequest is called or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I have deleted these line in a new commit.

producerImpl.internalSingleSend(
mm,
msg.Payload,
&sendRequest{
producer: producerImpl,
ctx: context.Background(),
msg: msg,
callback: func(id MessageID, producerMessage *ProducerMessage, err error) {
close(doneCh)
},
msg: msg,
callbackOnce: callbackOnce,
flushImmediately: true,
totalChunks: totalChunks,
chunkID: chunkID,
uuid: uuid,
chunkRecorder: cr,
transaction: nil,
memLimit: nil,
reservedMem: 0,
semaphore: nil,
reservedSemaphore: 0,
sendAsBatch: false,
schema: nil,
schemaVersion: nil,
uncompressedPayload: []byte(wholePayload),
uncompressedSize: int64(len(wholePayload)),
compressedPayload: []byte(wholePayload),
compressedSize: len(wholePayload),
payloadChunkSize: internal.MaxMessageSize - proto.Size(mm),
mm: mm,
deliverAt: time.Now(),
maxMessageSize: internal.MaxMessageSize,
},
uint32(internal.MaxMessageSize),
)

<-doneCh
}
Loading