Skip to content

Commit

Permalink
KAFKA-1001 Update scala version to resolve CVE-2022-36944
Browse files Browse the repository at this point in the history
This change bumps scala version to 2.13.12 and cherry-picks squashed combination of the following commits (only scala-related changes):
dfaae31 MINOR: Upgrade Scala for Java 20/21 support (apache#13840)
6ae08c4 KAFKA-14256: Upgrade from Scala 2.13.8 to 2.13.10 (apache#12675)
7c2d672 MINOR: Update library dependencies (Q1 2022) (apache#11306)
a8bd649 MINOR: Update Scala to 2.13.6 (apache#10711)
dd34e40 MINOR: Update Scala to 2.13.5 (apache#10169)
cbf8ad2 MINOR: Upgrade to Scala 2.13.4 (apache#9643)
7f90a58 MINOR: Update Scala to 2.13.3 (apache#8931)
  • Loading branch information
ijuma authored and ychernysh committed Feb 16, 2024
1 parent 8927a49 commit 3c6ef14
Show file tree
Hide file tree
Showing 91 changed files with 437 additions and 441 deletions.
2 changes: 1 addition & 1 deletion bin/kafka-run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ done
base_dir=$(dirname $0)/..

if [ -z "$SCALA_VERSION" ]; then
SCALA_VERSION=2.13.2
SCALA_VERSION=2.13.12
if [[ -f "$base_dir/gradle.properties" ]]; then
SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`
fi
Expand Down
2 changes: 1 addition & 1 deletion bin/windows/kafka-run-class.bat
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ set BASE_DIR=%CD%
popd

IF ["%SCALA_VERSION%"] EQU [""] (
set SCALA_VERSION=2.13.2
set SCALA_VERSION=2.13.12
)

IF ["%SCALA_BINARY_VERSION%"] EQU [""] (
Expand Down
8 changes: 6 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,6 @@ subprojects {
"-Xlint:delayedinit-select",
"-Xlint:doc-detached",
"-Xlint:missing-interpolator",
"-Xlint:nullary-override",
"-Xlint:nullary-unit",
"-Xlint:option-implicit",
"-Xlint:package-object-classes",
Expand All @@ -527,17 +526,21 @@ subprojects {
else
inlineFrom = ["-opt-inline-from:org.apache.kafka.**"]

if (versions.baseScala == '2.13')
scalaCompileOptions.additionalParameters += ["-Wconf:msg=@nowarn annotation does not suppress any warnings:s"] // See https://github.com/scala/scala/pull/9960

// Somewhat confusingly, `-opt:l:inline` enables all optimizations. `inlineFrom` configures what can be inlined.
// See https://www.lightbend.com/blog/scala-inliner-optimizer for more information about the optimizer.
scalaCompileOptions.additionalParameters += ["-opt:l:inline"]
scalaCompileOptions.additionalParameters += inlineFrom

if (versions.baseScala != '2.12') {
scalaCompileOptions.additionalParameters += ["-opt-warnings"]
scalaCompileOptions.additionalParameters += ["-opt-warnings", "-Xlint:strict-unsealed-patmat"]
// Scala 2.13.2 introduces compiler warnings suppression, which is a pre-requisite for -Xfatal-warnings
scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"]
// Scala 2.13 has compiler warnings suppression
scalaCompileOptions.additionalParameters += ["-Wconf:cat=deprecation:s"]
scalaCompileOptions.additionalParameters += ["-Wconf:msg=legacy-binding:s"]
scalaCompileOptions.additionalParameters += ["-Wconf:cat=optimizer:ws"]
}

Expand All @@ -546,6 +549,7 @@ subprojects {
if (versions.baseScala == '2.12') {
scalaCompileOptions.additionalParameters += [
"-Xlint:by-name-right-associative",
"-Xlint:nullary-override",
"-Xlint:unsound-match"
]
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object Kafka extends Logging {
}

// attach shutdown handler to catch terminating signals as well as normal termination
Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown)
Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown())

kafkaServerStartable.startup()
kafkaServerStartable.awaitShutdown()
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ object ConfigCommand extends Config {
describeResourceConfig(adminClient, entityTypes.head, entityNames.headOption, describeAll)
case ConfigType.User | ConfigType.Client =>
describeClientQuotasConfig(adminClient, entityTypes, entityNames)
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
}
}

Expand All @@ -433,6 +434,7 @@ object ConfigCommand extends Config {
adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get().asScala.toSeq
case ConfigType.Broker | BrokerLoggerConfigType =>
adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ BrokerDefaultEntityName
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
})

entities.foreach { entity =>
Expand Down Expand Up @@ -472,6 +474,7 @@ object ConfigCommand extends Config {
if (!entityName.isEmpty)
validateBrokerId()
(ConfigResource.Type.BROKER_LOGGER, None)
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
}

val configSourceFilter = if (describeAll)
Expand Down Expand Up @@ -634,7 +637,7 @@ object ConfigCommand extends Config {
}
}

val entities = entityTypes.map(t => Entity(t, if (sortedNames.hasNext) Some(sanitizeName(t, sortedNames.next)) else None))
val entities = entityTypes.map(t => Entity(t, if (sortedNames.hasNext) Some(sanitizeName(t, sortedNames.next())) else None))
ConfigEntity(entities.head, if (entities.size > 1) Some(entities(1)) else None)
}

Expand Down Expand Up @@ -718,12 +721,12 @@ object ConfigCommand extends Config {
(userDefaults, ConfigType.User),
(brokerDefaults, ConfigType.Broker))

private[admin] def entityTypes(): List[String] = {
private[admin] def entityTypes: List[String] = {
options.valuesOf(entityType).asScala.toList ++
(entityFlags ++ entityDefaultsFlags).filter(entity => options.has(entity._1)).map(_._2)
}

private[admin] def entityNames(): List[String] = {
private[admin] def entityNames: List[String] = {
val namesIterator = options.valuesOf(entityName).iterator
options.specs.asScala
.filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default"))
Expand Down
16 changes: 9 additions & 7 deletions core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,22 +161,24 @@ object ConsumerGroupCommand extends Logging {
private[admin] case class CsvUtils() {
val mapper = new CsvMapper with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
def readerFor[T <: CsvRecord: ClassTag] = {
def readerFor[T <: CsvRecord : ClassTag] = {
val schema = getSchema[T]
val clazz = implicitly[ClassTag[T]].runtimeClass
mapper.readerFor(clazz).`with`(schema)
}
def writerFor[T <: CsvRecord: ClassTag] = {
def writerFor[T <: CsvRecord : ClassTag] = {
val schema = getSchema[T]
val clazz = implicitly[ClassTag[T]].runtimeClass
mapper.writerFor(clazz).`with`(schema)
}
private def getSchema[T <: CsvRecord: ClassTag] = {
private def getSchema[T <: CsvRecord : ClassTag] = {
val clazz = implicitly[ClassTag[T]].runtimeClass
val fields = clazz match {
case _ if classOf[CsvRecordWithGroup] == clazz => CsvRecordWithGroup.fields
case _ if classOf[CsvRecordNoGroup] == clazz => CsvRecordNoGroup.fields
}

val fields =
if (classOf[CsvRecordWithGroup] == clazz) CsvRecordWithGroup.fields
else if (classOf[CsvRecordNoGroup] == clazz) CsvRecordNoGroup.fields
else throw new IllegalStateException(s"Unhandled class $clazz")

val schema = mapper.schemaFor(clazz).sortedBy(fields: _*)
schema
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
case Some(partitions) =>
partitions.map(_.topic).toSet
case None =>
zkClient.getAllPartitions().map(_.topic)
zkClient.getAllPartitions.map(_.topic)
}

val partitionsFromZk = zkClient.getPartitionsForTopics(topics).flatMap{ case (topic, partitions) =>
Expand All @@ -190,7 +190,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
case Some(partitions) =>
partitions.partition(partitionsFromZk.contains)
case None =>
(zkClient.getAllPartitions(), Set.empty)
(zkClient.getAllPartitions, Set.empty)
}
PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,7 @@ object ReassignPartitionsCommand extends Logging {

private def topicDescriptionFutureToState(partition: Int,
future: KafkaFuture[TopicDescription],
targetReplicas: Seq[Int])
: PartitionReassignmentState = {
targetReplicas: Seq[Int]): PartitionReassignmentState = {
try {
val topicDescription = future.get()
if (topicDescription.partitions().size() < partition) {
Expand All @@ -492,7 +491,8 @@ object ReassignPartitionsCommand extends Logging {
case t: ExecutionException =>
t.getCause match {
case _: UnknownTopicOrPartitionException =>
new PartitionReassignmentState(Seq(), targetReplicas, true)
PartitionReassignmentState(Seq(), targetReplicas, true)
case e => throw e
}
}
}
Expand Down Expand Up @@ -1111,7 +1111,7 @@ object ReassignPartitionsCommand extends Logging {
// Check for the presence of the legacy partition reassignment ZNode. This actually
// won't detect all rebalances... only ones initiated by the legacy method.
// This is a limitation of the legacy ZK API.
val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress()
val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress
if (reassignPartitionsInProgress) {
// Note: older versions of this tool would modify the broker quotas here (but not
// topic quotas, for some reason). This behavior wasn't documented in the --execute
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,16 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
}

private def setAclIndividually(path: String): Unit = {
val setPromise = Promise[String]
val setPromise = Promise[String]()
futures.synchronized {
futures += setPromise.future
}
setAcl(path, setPromise)
}

private def setAclsRecursively(path: String): Unit = {
val setPromise = Promise[String]
val childrenPromise = Promise[String]
val setPromise = Promise[String]()
val childrenPromise = Promise[String]()
futures.synchronized {
futures += setPromise.future
futures += childrenPromise.future
Expand Down Expand Up @@ -279,15 +279,15 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
future match {
case Some(a) =>
Await.result(a, 6000 millis)
futures.synchronized { futures.dequeue }
recurse
futures.synchronized { futures.dequeue() }
recurse()
case None =>
}
}
recurse()

} finally {
zkClient.close
zkClient.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
}

class ChangeNotification {
def process(): Unit = processNotifications
def process(): Unit = processNotifications()
}

/**
Expand All @@ -143,17 +143,17 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
private def changeNumber(name: String): Long = name.substring(seqNodePrefix.length).toLong

class ChangeEventProcessThread(name: String) extends ShutdownableThread(name = name) {
override def doWork(): Unit = queue.take().process
override def doWork(): Unit = queue.take().process()
}

object ChangeNotificationHandler extends ZNodeChildChangeHandler {
override val path: String = seqNodeRoot
override def handleChildChange(): Unit = addChangeNotification
override def handleChildChange(): Unit = addChangeNotification()
}

object ZkStateChangeHandler extends StateChangeHandler {
override val name: String = StateChangeHandlers.zkNodeChangeListenerHandler(seqNodeRoot)
override def afterInitializingSession(): Unit = addChangeNotification
override def afterInitializingSession(): Unit = addChangeNotification()
}
}

8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ class KafkaController(val config: KafkaConfig,
info("starting the token expiry check scheduler")
tokenCleanScheduler.startup()
tokenCleanScheduler.schedule(name = "delete-expired-tokens",
fun = () => tokenManager.expireTokens,
fun = () => tokenManager.expireTokens(),
period = config.delegationTokenExpiryCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)
}
Expand Down Expand Up @@ -933,10 +933,10 @@ class KafkaController(val config: KafkaConfig,
* @param shouldRemoveReassignment Predicate indicating which partition reassignments should be removed
*/
private def maybeRemoveFromZkReassignment(shouldRemoveReassignment: (TopicPartition, Seq[Int]) => Boolean): Unit = {
if (!zkClient.reassignPartitionsInProgress())
if (!zkClient.reassignPartitionsInProgress)
return

val reassigningPartitions = zkClient.getPartitionReassignment()
val reassigningPartitions = zkClient.getPartitionReassignment
val (removingPartitions, updatedPartitionsBeingReassigned) = reassigningPartitions.partition { case (tp, replicas) =>
shouldRemoveReassignment(tp, replicas)
}
Expand Down Expand Up @@ -1542,7 +1542,7 @@ class KafkaController(val config: KafkaConfig,
val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]

zkClient.getPartitionReassignment().foreach { case (tp, targetReplicas) =>
zkClient.getPartitionReassignment.foreach { case (tp, targetReplicas) =>
maybeBuildReassignment(tp, Some(targetReplicas)) match {
case Some(context) => partitionsToReassign.put(tp, context)
case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class GroupMetadataManager(brokerId: Int,
scheduler.startup()
if (enableMetadataExpiration) {
scheduler.schedule(name = "delete-expired-group-metadata",
fun = () => cleanupGroupMetadata,
fun = () => cleanupGroupMetadata(),
period = config.offsetsRetentionCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)
}
Expand Down Expand Up @@ -590,7 +590,7 @@ class GroupMetadataManager(brokerId: Int,

readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0

val memRecords = fetchDataInfo.records match {
val memRecords = (fetchDataInfo.records: @unchecked) match {
case records: MemoryRecords => records
case fileRecords: FileRecords =>
val sizeInBytes = fileRecords.sizeInBytes
Expand Down Expand Up @@ -752,7 +752,7 @@ class GroupMetadataManager(brokerId: Int,
onGroupUnloaded: GroupMetadata => Unit): Unit = {
val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
info(s"Scheduling unloading of offsets and group metadata from $topicPartition")
scheduler.schedule(topicPartition.toString, () => removeGroupsAndOffsets)
scheduler.schedule(topicPartition.toString, () => removeGroupsAndOffsets())

def removeGroupsAndOffsets(): Unit = {
var numOffsetsRemoved = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ class TransactionStateManager(brokerId: Int,

readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0

val memRecords = fetchDataInfo.records match {
val memRecords = (fetchDataInfo.records: @unchecked) match {
case records: MemoryRecords => records
case fileRecords: FileRecords =>
val sizeInBytes = fileRecords.sizeInBytes
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/log/AbstractIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ abstract class AbstractIndex(@volatile private var _file: File, val baseOffset:
searchEntity match {
case IndexSearchType.KEY => java.lang.Long.compare(indexEntry.indexKey, target)
case IndexSearchType.VALUE => java.lang.Long.compare(indexEntry.indexValue, target)
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/LazyIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ class LazyIndex[T <: AbstractIndex] private (@volatile private var indexWrapper:

def get: T = {
indexWrapper match {
case indexValue: IndexValue[T] => indexValue.index
case indexValue: IndexValue[_] => indexValue.index.asInstanceOf[T]
case _: IndexFile =>
inLock(lock) {
indexWrapper match {
case indexValue: IndexValue[T] => indexValue.index
case indexValue: IndexValue[_] => indexValue.index.asInstanceOf[T]
case indexFile: IndexFile =>
val indexValue = new IndexValue(loadIndex(indexFile.file))
indexWrapper = indexValue
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ class Log(@volatile private var _dir: File,
var truncated = false

while (unflushed.hasNext && !truncated) {
val segment = unflushed.next
val segment = unflushed.next()
info(s"Recovering unflushed segment ${segment.baseOffset}")
val truncatedBytes =
try {
Expand Down Expand Up @@ -2247,7 +2247,7 @@ class Log(@volatile private var _dir: File,

if (asyncDelete) {
info(s"Scheduling segments for deletion ${segments.mkString(",")}")
scheduler.schedule("delete-file", () => deleteSegments, delay = config.fileDeleteDelayMs)
scheduler.schedule("delete-file", () => deleteSegments(), delay = config.fileDeleteDelayMs)
} else {
deleteSegments()
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ class LogManager(logDirs: Seq[File],

var recoveryPoints = Map[TopicPartition, Long]()
try {
recoveryPoints = this.recoveryPointCheckpoints(dir).read
recoveryPoints = this.recoveryPointCheckpoints(dir).read()
} catch {
case e: Exception =>
warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory " +
Expand All @@ -333,7 +333,7 @@ class LogManager(logDirs: Seq[File],

var logStartOffsets = Map[TopicPartition, Long]()
try {
logStartOffsets = this.logStartOffsetCheckpoints(dir).read
logStartOffsets = this.logStartOffsetCheckpoints(dir).read()
} catch {
case e: Exception =>
warn(s"Error occurred while reading log-start-offset-checkpoint file of directory " +
Expand Down Expand Up @@ -1056,7 +1056,7 @@ class LogManager(logDirs: Seq[File],
debug(s"Checking if flush is needed on ${topicPartition.topic} flush interval ${log.config.flushMs}" +
s" last flushed ${log.lastFlushTime} time since last flush: $timeSinceLastFlush")
if(timeSinceLastFlush >= log.config.flushMs)
log.flush
log.flush()
} catch {
case e: Throwable =>
error(s"Error flushing topic ${topicPartition.topic}", e)
Expand Down

0 comments on commit 3c6ef14

Please sign in to comment.