Skip to content

Commit

Permalink
refactor: factor out prepareTransaction
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed Oct 24, 2023
1 parent af56e60 commit c53c7e4
Showing 1 changed file with 37 additions and 31 deletions.
68 changes: 37 additions & 31 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,35 @@ func (p *partitionProducer) validateMsg(msg *ProducerMessage) error {
return nil
}

func (p *partitionProducer) prepareTransaction(sr *sendRequest) error {
if sr.msg.Transaction == nil {
return nil
}

txn := (sr.msg.Transaction).(*transaction)
if txn.state != TxnOpen {
p.log.WithField("state", txn.state).Error("Failed to send message" +
" by a non-open transaction.")
return newError(InvalidStatus, "Failed to send message by a non-open transaction.")
}

if err := txn.registerProducerTopic(p.topic); err != nil {
return err
}

if err := txn.registerSendOrAckOp(); err != nil {
return err
}

sr.transaction = txn
sr.callback = func(id MessageID, producerMessage *ProducerMessage, err error) {
runCallback(sr.callback, id, producerMessage, err)
txn.endSendOrAckOp(err)
}

return nil
}

func (p *partitionProducer) internalSendAsync(
ctx context.Context,
msg *ProducerMessage,
Expand All @@ -1145,37 +1174,9 @@ func (p *partitionProducer) internalSendAsync(
return
}

// Register transaction operation to transaction and the transaction coordinator.
var newCallback func(MessageID, *ProducerMessage, error)
var txn *transaction
if msg.Transaction != nil {
transactionImpl := (msg.Transaction).(*transaction)
txn = transactionImpl
if transactionImpl.state != TxnOpen {
p.log.WithField("state", transactionImpl.state).Error("Failed to send message" +
" by a non-open transaction.")
runCallback(callback, nil, msg, newError(InvalidStatus, "Failed to send message by a non-open transaction."))
return
}

if err := transactionImpl.registerProducerTopic(p.topic); err != nil {
runCallback(callback, nil, msg, err)
return
}
if err := transactionImpl.registerSendOrAckOp(); err != nil {
runCallback(callback, nil, msg, err)
return
}
newCallback = func(id MessageID, producerMessage *ProducerMessage, err error) {
runCallback(callback, id, producerMessage, err)
transactionImpl.endSendOrAckOp(err)
}
} else {
newCallback = callback
}
if p.getProducerState() != producerReady {
// Producer is closing
runCallback(newCallback, nil, msg, errProducerClosed)
runCallback(callback, nil, msg, errProducerClosed)
return
}

Expand All @@ -1187,14 +1188,19 @@ func (p *partitionProducer) internalSendAsync(
sr := &sendRequest{
ctx: ctx,
msg: msg,
callback: newCallback,
callback: callback,
callbackOnce: callbackOnce,
flushImmediately: flushImmediately,
publishTime: time.Now(),
blockCh: bc,
closeBlockChOnce: &sync.Once{},
transaction: txn,
}

if err := p.prepareTransaction(sr); err != nil {
runCallback(sr.callback, nil, msg, err)
return
}

p.options.Interceptors.BeforeSend(p, msg)

p.dataChan <- sr
Expand Down

0 comments on commit c53c7e4

Please sign in to comment.