[go: up one dir, main page]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17877: Only call once maybeSendResponseCallback for each marker #17619

Merged
merged 3 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
KAFKA-17877: Skip calling replica manager append if nothing to append
  • Loading branch information
CalvinConfluent committed Oct 28, 2024
commit 6b4dfcc79e6e79781b7d71d26c041d1fba3f01dc
31 changes: 18 additions & 13 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2476,20 +2476,25 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

replicaManager.appendRecords(
timeout = config.requestTimeoutMs.toLong,
requiredAcks = -1,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition = controlRecords,
requestLocal = requestLocal,
responseCallback = errors => {
errors.foreachEntry { (tp, partitionResponse) =>
markerResults.put(tp, partitionResponse.error)
// If all the markers for consumer offsets partitions for the producer ID are written with the new group
// coordinator, we should avoid calling the replica manager appendRecords. Otherwise, maybeComplete() will be
// called more times than the markers and cause sending the response prematurely.
if (!controlRecords.isEmpty) {
CalvinConfluent marked this conversation as resolved.
Show resolved Hide resolved
replicaManager.appendRecords(
timeout = config.requestTimeoutMs.toLong,
requiredAcks = -1,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition = controlRecords,
requestLocal = requestLocal,
responseCallback = errors => {
errors.foreachEntry { (tp, partitionResponse) =>
markerResults.put(tp, partitionResponse.error)
}
maybeComplete()
}
maybeComplete()
}
)
)
}
}
}

Expand Down
38 changes: 38 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3068,6 +3068,44 @@ class KafkaApisTest extends Logging {
assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
}

@Test
def WriteTxnMarkersShouldAllBeIncludedInTheResponse(): Unit = {
CalvinConfluent marked this conversation as resolved.
Show resolved Hide resolved
// This test verifies the response will not be sent prematurely because of calling replicaManager append
// with no records.
val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(ApiKeys.WRITE_TXN_MARKERS.latestVersion(),
asList(
new TxnMarkerEntry(1, 1.toShort, 0, TransactionResult.COMMIT, asList(topicPartition)),
new TxnMarkerEntry(2, 1.toShort, 0, TransactionResult.COMMIT, asList(topicPartition)),
)).build()
val request = buildRequest(writeTxnMarkersRequest)
val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])

when(replicaManager.getMagic(any()))
.thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
when(groupCoordinator.isNewGroupCoordinator())
CalvinConfluent marked this conversation as resolved.
Show resolved Hide resolved
.thenReturn(true)
when(groupCoordinator.completeTransaction(
ArgumentMatchers.eq(topicPartition),
any(),
ArgumentMatchers.eq(1.toShort),
ArgumentMatchers.eq(0),
ArgumentMatchers.eq(TransactionResult.COMMIT),
any()
)).thenReturn(CompletableFuture.completedFuture[Void](null))

kafkaApis = createKafkaApis()
kafkaApis.handleWriteTxnMarkersRequest(request, RequestLocal.withThreadConfinedCaching)

verify(requestChannel).sendResponse(
ArgumentMatchers.eq(request),
capturedResponse.capture(),
ArgumentMatchers.eq(None)
)
val markersResponse = capturedResponse.getValue
assertEquals(2, markersResponse.errorsByProducerId.size())
}

@Test
def shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition(): Unit = {
val tp1 = new TopicPartition("t", 0)
Expand Down