Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebalance listener correctness #122

Merged
merged 72 commits into from
Apr 14, 2021
Merged

Conversation

nikitapecasa
Copy link
Contributor

@nikitapecasa nikitapecasa commented Apr 8, 2021

This PR:

  • adds a new way of describing rebalance listener (RebalanceListener1) via new subscribe method in Consumer
  • which gives a correct behaviour of consumer methods executed from within RebalanceListener
def subscribe(topics: Nes[Topic], listener: RebalanceListener1[F]): F[Unit]

RebalanceListener1 callbacks are executed on current thread where consumer.poll/close/unsubscribe is running in a blocking fashion

example of new API usage:

class SaveOffsetsOnRebalance[F[_]: Applicative] extends RebalanceListener1[F] {
  import RebalanceCallback._
  def onPartitionsAssigned(partitions: Nes[TopicPartition]) =
    for {
      // read the offsets from an external store using some custom code not described here
      offsets <- lift(readOffsetsFromExternalStore[F](partitions))
      a       <- offsets.toList.foldMapM { case (partition, offset) => seek(partition, offset) }
    } yield a
  def onPartitionsRevoked(partitions: Nes[TopicPartition]) =
    for {
      positions <- partitions.foldM(Map.empty[TopicPartition, Offset]) {
        case (offsets, partition) =>
          for {
            position <- position(partition)
          } yield offsets + (partition -> position)
      }
      // save the offsets in an external store using some custom code not described here
      a <- lift(saveOffsetsInExternalStore[F](positions))
    } yield a
  // do not need to save the offsets since these partitions are probably owned by other consumers already
  def onPartitionsLost(partitions: Nes[TopicPartition]) = empty
}

List of allowed KafkaConsumer methods available via RebalanceCallback (as agreed with @t3hnar):

- assign
+ assignment
+ beginningOffsets
- close
- commitAsync
+ commitSync
+ committed
+ endOffsets
+ groupMetadata
+ listTopics
- metrics
+ offsetsForTimes
+ partitionsFor
- pause
+ paused
- poll
+ position
- resume
+ seek
+ seekToBeginning
+ seekToEnd
- subscribe
+ subscription
- unsubscribe
- wakeup

nikitapecasa and others added 30 commits March 30, 2021 19:40
…nsumer.position when used from within rebalance listener
…ee toTry.get execution on current thread in RebalanceListener
… guarantee toTry.get execution on current thread in RebalanceListener"

This reverts commit 206a3b7
…asses, no deadlock on consumer.position"

This reverts commit d9278bb
…(topics: Nes[Topic], listener: ConsumerRebalanceListener[F])
…d in ConsumerLogging, add TODO to implement it later
…outside of consumer.poll has no effect on test result
…ction with kafka java consumer from within poll method
@nikitapecasa nikitapecasa changed the title WIP: rebalance listener correctness Rebalance listener correctness Apr 14, 2021
@nikitapecasa nikitapecasa merged commit e75cb9c into master Apr 14, 2021
@dfakhritdinov dfakhritdinov deleted the rebalance-listener-correctness branch February 1, 2024 16:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants