[go: up one dir, main page]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17877: Only call once maybeSendResponseCallback for each marker #17619

Merged
merged 3 commits into from
Nov 5, 2024

Conversation

CalvinConfluent
Copy link
Contributor
@CalvinConfluent CalvinConfluent commented Oct 28, 2024

We should only call once maybeSendResponseCallback for each marker during the WriteTxnMarkersRequest handling.
Consider the following 2 cases:

First
We have 2 markers to append, one for producer-0, one for producer-1
When we first process producer-0, it appends a marker to the __consumer_offset.
The __consumer_offset append finishes very fast because the group coordinator is no longer the leader. So the coordinator directly returns NOT_LEADER_OR_FOLLOWER. In its callback, it calls the maybeComplete() for the first time, and because there is only one partition to append, it is able to go further to call maybeSendResponseCallback() and decrement numAppends.
Then it calls the replica manager append for nothing, in the callback, it calls the maybeComplete() for the second time. This time, it also decrements numAppends.

Second
We have 2 markers to append, one for producer-0, one for producer-1
When we first process producer-0, it appends a marker to the __consumer_offset and a data topic foo.
The 2 appends will be handled by group coordinator and replica manager asynchronously.
It can be a race that, both appends finishes together, then they can fill the markerResults at the same time, then call the maybeComplete. Because the partitionsWithCompatibleMessageFormat.size == markerResults.size condition is satisfied, both maybeComplete calls can go through to decrement the numAppends and cause a premature response.

Note: the problem only happens with KIP-848 coordinator enabled.

https://issues.apache.org/jira/browse/KAFKA-17877

@github-actions github-actions bot added core Kafka Broker small Small PRs labels Oct 28, 2024
@jolshan jolshan added transactions Transactions and EOS KIP-848 The Next Generation of the Consumer Rebalance Protocol labels Oct 28, 2024
Copy link
Contributor
@jolshan jolshan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for finding this. LGTM

@CalvinConfluent CalvinConfluent changed the title KAFKA-17877: Skip calling replica manager append if nothing to append KAFKA-17877: Only call once maybeSendResponseCallback for each marker Oct 31, 2024
@CalvinConfluent
Copy link
Contributor Author

@jolshan Can you help take a look? I updated the fix for the second case.

Copy link
Contributor
@jolshan jolshan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM as long as tests pass

core/src/main/scala/kafka/server/KafkaApis.scala Outdated Show resolved Hide resolved
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala Outdated Show resolved Hide resolved
@@ -2431,8 +2431,12 @@ class KafkaApis(val requestChannel: RequestChannel,
}

val markerResults = new ConcurrentHashMap[TopicPartition, Errors]()
def maybeComplete(): Unit = {
if (partitionsWithCompatibleMessageFormat.size == markerResults.size) {
val numPartitions = new AtomicInteger(partitionsWithCompatibleMessageFormat.size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To understand the first case:

we call replicaManager#appendRecords (with empty records) while simultaneously appending via the new coordinator. This incorrectly decrements the number of appends counter and thus sends the premature response.

The similarity in both cases is that we prematurely decrement this numAppends counter.

Is my understanding correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. It is common if the new coordinator returns an error for the partition(NOT_LEADER_OR_FOLLOWER) fast enough to get ahead of the callback of the replicaManager#appendRecords

@jolshan
Copy link
Contributor
jolshan commented Nov 4, 2024

@jeffkbkim @dajac Any further comments? Looks like the build is green.

Copy link
Contributor
@jeffkbkim jeffkbkim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. thanks for the find and fix!

Copy link
Contributor
@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the fix.

@dajac dajac merged commit c91243a into apache:trunk Nov 5, 2024
8 checks passed
abhishekgiri23 pushed a commit to abhishekgiri23/kafka that referenced this pull request Nov 5, 2024
…apache#17619)

We should only call once `maybeSendResponseCallback` for each marker during the WriteTxnMarkersRequest handling.

Consider the following 2 cases:

First
We have 2 markers to append, one for producer-0, one for producer-1
When we first process producer-0, it appends a marker to the __consumer_offset.
The __consumer_offset append finishes very fast because the group coordinator is no longer the leader. So the coordinator directly returns NOT_LEADER_OR_FOLLOWER. In its callback, it calls the maybeComplete() for the first time, and because there is only one partition to append, it is able to go further to call maybeSendResponseCallback() and decrement numAppends.
Then it calls the replica manager append for nothing, in the callback, it calls the maybeComplete() for the second time. This time, it also decrements numAppends.

Second
We have 2 markers to append, one for producer-0, one for producer-1
When we first process producer-0, it appends a marker to the __consumer_offset and a data topic foo.
The 2 appends will be handled by group coordinator and replica manager asynchronously.
It can be a race that, both appends finishes together, then they can fill the `markerResults` at the same time, then call the  `maybeComplete`. Because the `partitionsWithCompatibleMessageFormat.size == markerResults.size` condition is satisfied, both `maybeComplete` calls can go through to decrement the `numAppends` and cause a premature response.

Note: the problem only happens with KIP-848 coordinator enabled.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>, David Jacot <djacot@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Kafka Broker KIP-848 The Next Generation of the Consumer Rebalance Protocol small Small PRs transactions Transactions and EOS
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants