-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17877: Only call once maybeSendResponseCallback for each marker #17619
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for finding this. LGTM
@jolshan Can you help take a look? I updated the fix for the second case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM as long as tests pass
@@ -2431,8 +2431,12 @@ class KafkaApis(val requestChannel: RequestChannel, | |||
} | |||
|
|||
val markerResults = new ConcurrentHashMap[TopicPartition, Errors]() | |||
def maybeComplete(): Unit = { | |||
if (partitionsWithCompatibleMessageFormat.size == markerResults.size) { | |||
val numPartitions = new AtomicInteger(partitionsWithCompatibleMessageFormat.size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To understand the first case:
we call replicaManager#appendRecords (with empty records) while simultaneously appending via the new coordinator. This incorrectly decrements the number of appends counter and thus sends the premature response.
The similarity in both cases is that we prematurely decrement this numAppends counter.
Is my understanding correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct. It is common if the new coordinator returns an error for the partition(NOT_LEADER_OR_FOLLOWER) fast enough to get ahead of the callback of the replicaManager#appendRecords
@jeffkbkim @dajac Any further comments? Looks like the build is green. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. thanks for the find and fix!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the fix.
…apache#17619) We should only call once `maybeSendResponseCallback` for each marker during the WriteTxnMarkersRequest handling. Consider the following 2 cases: First We have 2 markers to append, one for producer-0, one for producer-1 When we first process producer-0, it appends a marker to the __consumer_offset. The __consumer_offset append finishes very fast because the group coordinator is no longer the leader. So the coordinator directly returns NOT_LEADER_OR_FOLLOWER. In its callback, it calls the maybeComplete() for the first time, and because there is only one partition to append, it is able to go further to call maybeSendResponseCallback() and decrement numAppends. Then it calls the replica manager append for nothing, in the callback, it calls the maybeComplete() for the second time. This time, it also decrements numAppends. Second We have 2 markers to append, one for producer-0, one for producer-1 When we first process producer-0, it appends a marker to the __consumer_offset and a data topic foo. The 2 appends will be handled by group coordinator and replica manager asynchronously. It can be a race that, both appends finishes together, then they can fill the `markerResults` at the same time, then call the `maybeComplete`. Because the `partitionsWithCompatibleMessageFormat.size == markerResults.size` condition is satisfied, both `maybeComplete` calls can go through to decrement the `numAppends` and cause a premature response. Note: the problem only happens with KIP-848 coordinator enabled. Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>, David Jacot <djacot@confluent.io>
We should only call once
maybeSendResponseCallback
for each marker during the WriteTxnMarkersRequest handling.Consider the following 2 cases:
First
We have 2 markers to append, one for producer-0, one for producer-1
When we first process producer-0, it appends a marker to the __consumer_offset.
The __consumer_offset append finishes very fast because the group coordinator is no longer the leader. So the coordinator directly returns NOT_LEADER_OR_FOLLOWER. In its callback, it calls the maybeComplete() for the first time, and because there is only one partition to append, it is able to go further to call maybeSendResponseCallback() and decrement numAppends.
Then it calls the replica manager append for nothing, in the callback, it calls the maybeComplete() for the second time. This time, it also decrements numAppends.
Second
We have 2 markers to append, one for producer-0, one for producer-1
When we first process producer-0, it appends a marker to the __consumer_offset and a data topic foo.
The 2 appends will be handled by group coordinator and replica manager asynchronously.
It can be a race that, both appends finishes together, then they can fill the
markerResults
at the same time, then call themaybeComplete
. Because thepartitionsWithCompatibleMessageFormat.size == markerResults.size
condition is satisfied, bothmaybeComplete
calls can go through to decrement thenumAppends
and cause a premature response.Note: the problem only happens with KIP-848 coordinator enabled.
https://issues.apache.org/jira/browse/KAFKA-17877