Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Producer] Refactor internalSend() and resouce managment #1071

Closed
wants to merge 11 commits into from

Conversation

gunli
Copy link
Contributor

@gunli gunli commented Jul 25, 2023

(If this PR fixes a github issue, please add Fixes #<xyz>.)

Fixes #1043

(or if this PR is one task of a github issue, please add Master Issue: #<xyz> to link to the master issue.)

Master Issue: #1043, #1055 #1059 #1060 #1067, #1068

Motivation

  1. As discussed in [fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema #1055, we need to calculate how many pending items and how many memory are required before appending the sendRequest to the dataChan, but currently, we do schema-encoding/compressing in internalSend(), this may lead to inaccurate memory limit cotrolling, and as described in [Improve][Producer]Simplify the MaxPendingMessages controlling #1043, make the code complicated and difficult to maintain, we need to simplify the send logic;
  2. In JAVA client, schema-encoding/compressing are done in application thread, it better to make it work like JAVA client;
  3. As discussed in [fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema #1055 and described in [Improve][Producer]Simplify the MaxPendingMessages controlling #1043, resource(memory and semaphore) acquiring/releasing logic are written across the whole file, we need to simplify the resource management logic, encapsulate them into functions, call them when necessary, make it 'Low Coupling, High Cohesion';
  4. As discussed in [Bug][Producer]Inaccurate transaction endSendOrAckOp for chunked message #1060, transaction is not correctly ended for chunked message, it better to encapsulate the transaction ending logic into one func which will be called when sendRequest is done.

Modifications

  1. Move shema encoding from internalSend() to internalSendAsync();
  2. Move compressing from internalSend() to internalSendAsync();
  3. Calculate total chunks before entering the dataChan;
  4. Reserve required semaphore and memory before entering the dataChan;
  5. sendRequest store the semaphore and memory it holds;
  6. Encapsualte relative code blocks into individual funcs to make the skeleton of internalSendAsync() clearer;
  7. Add sendRequest.done() to release the resources it holds;
  8. When a sendRequest is done, call sendRequest.done() ;
  9. In sendRequest.done() run callback, update metrics, end transaction, run interceptors callback...

Verifying this change

  • Make sure that the change passes the CI checks.

This change is already covered by existing tests, such as (please describe tests).

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)

Documentation

  • Does this pull request introduce a new feature? no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@gunli
Copy link
Contributor Author

gunli commented Jul 25, 2023

@Gleiphir2769 @RobertIndie @graysonzeng @shibd Would you please review this PR ?

Copy link
Contributor

@Gleiphir2769 Gleiphir2769 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @gunli, I leaved some comments.


func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
err := p.validateMsg(msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make it inline?

Copy link
Contributor Author

@gunli gunli Jul 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline will make internalSendAsync a BIG func, about 200 lines, it hard to read, spilt into small funcs will be more clear and readable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, maybe my description is not clear. Could we make L1191-L1192 as a One Liner if...else Statements.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think that is OK. inlining can reduce the code line number, non-inlining are better for debugging.


p.dataChan <- sr
err = p.updateSchema(sr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make it inline?

Copy link
Contributor Author

@gunli gunli Jul 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline will make internalSendAsync a BIG func, about 200 lines, it hard to read, spilt into small funcs will be more clear and readable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above


func (p *partitionProducer) updateUncompressPayload(sr *sendRequest) error {
// read payload from message
sr.uncompressedPayload = sr.msg.Payload
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems msg.Payload is cloned to sr.uncompressedPayload and it will take up unnecessary memory. I think the type of uncompressedPayload is *[]byte may be better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in go, []byte assignment just copy the address

return nil
}

func (p *partitionProducer) updateUncompressPayload(sr *sendRequest) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo updateUncompressPayload -> updateUncompressedPayload

Copy link
Contributor Author

@gunli gunli Jul 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I have renamed it.

checkSize = int64(sr.compressedSize)
}

sr.maxMessageSize = int32(int64(p._getConn().GetMaxMessageSize()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

p._getConn().GetMaxMessageSize() makes an rpc call to broker. This breaks the semantics of async.
For example, when user invokes the method producer.SendAsync, he must wait for an rpc to return.

What do you think?

Copy link
Contributor Author

@gunli gunli Jul 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MaxMessageSize is cached in the conn when a conn is ready, you can check the code of connection and connectionPool, this is OK.

Actually, p.getOrCreateSchema() will trigger a block RPC call. What make SendAsync not a real asyn func is the fixed length PendingQueue, if pengding queue can be expanded at runtime, shema-encoding/compress/getOrCreateSchema can be done in internalSend(), that can make it a real asyn func.

I don't think memLimit and fixed length queue is neccessary in a language with GC like JAVA and Go, 'cause we have semaphore and dataChan to control how many messages can be pending. As I mentioned in #1043 .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MaxMessageSize is cached in the conn when a conn is ready, you can check the code of connection and connectionPool, this is OK.

You're right. This point is good to me.

Actually, p.getOrCreateSchema() will trigger a block RPC call. What make SendAsync not a real asyn func is the fixed length PendingQueue

But the size of PendingQueue is decided by user. And if the size of dataChan is set as MaxPendingMessages, it will not be the async limit.

I don't think memLimit and fixed length queue is neccessary in a language with GC like JAVA and Go

This is not only related to memory, but also related to broker. Here is the interface description of Java client.

Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker

I think we should find a way to solve the p.getOrCreateSchema() problem. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the size of PendingQueue is decided by user. And if the size of dataChan is set as MaxPendingMessages, it will not be the async limit.

dataChan and semaphore can work together to make it async, when there is enough semaphore, add to the queue, or wait until the response back from the broker and release a semaphore.

I think we should find a way to solve the p.getOrCreateSchema() problem.

Since the schema can be set by the user at runtime, we can only solve it by moving the blocking logic to `internalSend()', but at the same time, we have to reserve memory and peding queue spaces before adding a message to dataChan, so if we want to solve it, we must drop memLimit and the fixed length pending queue.

Copy link
Contributor

@Gleiphir2769 Gleiphir2769 Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when there is enough semaphore, add to the queue, or wait until the response back from the broker and release a semaphore.

I support the idea which reserve resource before internalSend. And if we reserve the semaphore firstly, the block of dataChan will not happen if it's made with capacity MaxPendingMessages.

we must drop memLimit and the fixed length pending queue

Sorry, I am not get the point why we should drop the memLimit? It's a useful feature for the users who is lack of resources.
And I don't think the fixed length pending queue will become a problem to SendAsync. Why should we make it flexible?

Copy link
Contributor Author

@gunli gunli Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I support the idea which reserve resource before internalSend. And if we reserve the semaphore firstly, the block of dataChan will not happen if it's made with capacity MaxPendingMessages.

We do not block on dataChan, just block on semaphore, we just treat dataChan as a channel between the main goroutine(user's goroutine) and the partitionProducer's goroutine(IO goroutine), semaphore represents the available resource(pendingItem), When aquire semaphore succeed, we can add it to dataChan, otherwise, block until one semaphore has been released(one message has been done) .

Sorry, I am not get the point why we should drop the memLimit? It's a useful feature for the users who is lack of resources.
And I don't think the fixed length pending queue will become a problem to SendAsync. Why should we make it flexible?

Because we limit the memory and pending queue, we have to reserve memory and pending queue before adding a message to dataChan, which make we have to do schema encoding and compressing first, or we have no idea about how much memory and how many pending items we need, these are bloking logic. When these blocking jobs are done in the user's goroutine, they block the user's logic, which make it a non-async method.

@gunli
Copy link
Contributor Author

gunli commented Aug 7, 2023

@RobertIndie Would you please take a look at this PR, it pending for a long time.

@gunli
Copy link
Contributor Author

gunli commented Aug 31, 2023

@tisonkun @RobertIndie I think there is a bug in reserveSemaphore(), I will update it now.

@gunli
Copy link
Contributor Author

gunli commented Aug 31, 2023

@tisonkun @RobertIndie I think there is a bug in reserveSemaphore(), I will update it now.

done.

@RobertIndie
Copy link
Member

I noticed that you did a lot of refactoring work in this PR, including changes to the critical path for publishing messages. I’m concerned that these changes may impact publishing performance.

While there are many modifications, it’s difficult for the reviewer to see the relationship between these modifications and the bug fixes. Could you provide more detail in the PR description, particularly regarding why you’re making these changes to fix bugs?

I recommend separating the refactoring work from the bug fixes. Otherwise, it will be challenging for us to cherry-pick the bug fixes to other release branches.

@gunli
Copy link
Contributor Author

gunli commented Aug 31, 2023

I noticed that you did a lot of refactoring work in this PR, including changes to the critical path for publishing messages. I’m concerned that these changes may impact publishing performance.

While there are many modifications, it’s difficult for the reviewer to see the relationship between these modifications and the bug fixes. Could you provide more detail in the PR description, particularly regarding why you’re making these changes to fix bugs?

I recommend separating the refactoring work from the bug fixes. Otherwise, it will be challenging for us to cherry-pick the bug fixes to other release branches.

@RobertIndie The details we have disccussed in #1059 #1043 #1060, the bug fix just fixed the bug in the PR itself, not the exist code.
As we disscussed in #1055 #1059, we have to move some logic from internalSend() to internalSendAsync(), these logic work together, so I can't seperate them into multiple PRs. It is better to review this PR with some compare tools like beyond compare.

@RobertIndie
Copy link
Member

@gunli Could you summarize them and add the detailed explanation to the PR description? They are very important context for this PR. A well-written PR description not only helps with review but also enables other developers to learn about the context of this PR.

@gunli
Copy link
Contributor Author

gunli commented Aug 31, 2023

@gunli Could you summarize them and add the detailed explanation to the PR description? They are very important context for this PR. A well-written PR description not only helps with review but also enables other developers to learn about the context of this PR.

@RobertIndie I have updated the motivation.

@Gleiphir2769
Copy link
Contributor

Gleiphir2769 commented Sep 4, 2023

Hi @gunli . Looks like it does not pass the CI test now. You can check the CI log an fix it. Seems like it is related to Chunking.

image

@gunli
Copy link
Contributor Author

gunli commented Sep 6, 2023

@Gleiphir2769 Thank you, I am busy these days, I will fix it ASAP.

@gunli
Copy link
Contributor Author

gunli commented Sep 6, 2023

@Gleiphir2769 Thank you, I am busy these days, I will fix it ASAP.

@Gleiphir2769 I have fix the test case, and it can run PASS now, but since only the last chunk will trigger the callback, I commented the wait logic in sendSingleChunk(), I am not sure whether it is right to do that, please help to review it.

Copy link
Contributor

@Gleiphir2769 Gleiphir2769 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @gunli. I leaved some comments about sendSingleChunk .

mm.ChunkId = proto.Int32(int32(chunkID))
producerImpl.updateMetadataSeqID(mm, msg)

doneCh := make(chan struct{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this doneCh. The goal of this UT is to verify whether consumer can discard the oldest chunk message. It has no impact whether the callback of sendRequest is called or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I have deleted these line in a new commit.

@@ -548,30 +556,57 @@ func createTestMessagePayload(size int) []byte {
}

//nolint:all
func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) {
func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int, wholePayload string, callbackOnce *sync.Once, cr *chunkRecorder) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add the parameterswholePayload callbackOnce and chunkRecorder ? I think it's more appropriate to make them inside thesendSingleChunk .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, in the current implemention, all the chunking sendRequest should share the whole message payload, callbackOnce, chunkRecorder, see internalSend() and addRequestToBatch(), I add these params just want to make the send procedure work correctly, I think it better to keep them.

@gunli
Copy link
Contributor Author

gunli commented Sep 7, 2023

@RobertIndie @tisonkun Would you please review this PR again?

@gunli
Copy link
Contributor Author

gunli commented Oct 18, 2023

message_chunking_test-ac9c1a6399336461d2d3ce1cdd31cac6debd5ed5.txt
message_chunking_test-PR.txt
producer_partition-ac9c1a6399336461d2d3ce1cdd31cac6debd5ed5.txt
producer_partition-PR.txt
@RobertIndie @tisonkun I have uploaded the files that have been changed, would you please review this PR with some compareing tool line BeyondCompare?

@nodece
Copy link
Member

nodece commented Oct 23, 2023

This PR looks complex, could you split this PR?

@gunli
Copy link
Contributor Author

gunli commented Oct 24, 2023

This PR looks complex, could you split this PR?

@nodece OK, I will do it today.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Improve][Producer]Simplify the MaxPendingMessages controlling
4 participants