Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DON'T MERGE] Try to replace all json4s with Jackson #37604

Closed
wants to merge 39 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
d189874
try replace json4s to jackson
LuciferYang Aug 22, 2022
9d28cc3
revet irrelevant
LuciferYang Aug 22, 2022
2ea9dcf
add a simple bench
LuciferYang Aug 24, 2022
e132a3a
fix compile
LuciferYang Aug 24, 2022
fff531d
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Aug 29, 2022
ae3c149
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Aug 31, 2022
3d785ba
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Sep 2, 2022
89fdeb1
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Sep 6, 2022
43a5b7e
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Sep 13, 2022
f106e86
fix compile
LuciferYang Sep 13, 2022
98cc9ee
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Sep 21, 2022
59ee54c
fix compile try uts
LuciferYang Sep 26, 2022
4717acf
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Sep 26, 2022
34d6230
fix compile
LuciferYang Sep 26, 2022
446be1e
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Nov 1, 2022
3cccb9e
fix test
LuciferYang Nov 1, 2022
c8e51bd
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Nov 16, 2022
54a677f
Merge branch 'apache:master' into json4s-2-jackson
LuciferYang Nov 18, 2022
c583bde
Merge branch 'apache:master' into json4s-2-jackson
LuciferYang Nov 21, 2022
a4a1ba9
Merge branch 'apache:master' into json4s-2-jackson
LuciferYang Nov 25, 2022
825468b
Merge branch 'apache:master' into json4s-2-jackson
LuciferYang Dec 1, 2022
fef78aa
Merge branch 'apache:master' into json4s-2-jackson
LuciferYang Dec 5, 2022
337d7ce
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Dec 9, 2022
78ddd44
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Dec 27, 2022
4480177
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Jan 19, 2023
0b706a0
fix confilct
LuciferYang Jan 19, 2023
2c56119
Merge branch 'apache:master' into json4s-2-jackson
LuciferYang Jan 21, 2023
3bc7aee
Merge branch 'apache:master' into json4s-2-jackson
LuciferYang Jan 23, 2023
7ae97f7
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Mar 2, 2023
ff49deb
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Mar 15, 2023
91eff10
update
LuciferYang Mar 16, 2023
b2a590e
Merge branch 'apache:master' into json4s-2-jackson
LuciferYang Mar 21, 2023
3e3ebeb
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Apr 17, 2023
e59ee90
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Apr 19, 2023
189c4d2
Merge branch 'apache:master' into json4s-2-jackson
LuciferYang Apr 20, 2023
5093488
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Apr 25, 2023
c9d95a6
re format
LuciferYang Apr 25, 2023
e386fbf
fix compile
LuciferYang Apr 25, 2023
9fce83c
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Jun 7, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,27 @@ import scala.collection.mutable.HashMap
import scala.util.control.NonFatal

import org.apache.kafka.common.TopicPartition
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

import org.apache.spark.util.JacksonUtils

/**
* Utilities for converting Kafka related objects to and from json.
*/
private object JsonUtils {
private implicit val formats = Serialization.formats(NoTypeHints)

/**
* Read TopicPartitions from json string
*/
def partitions(str: String): Array[TopicPartition] = {
try {
Serialization.read[Map[String, Seq[Int]]](str).flatMap { case (topic, parts) =>
JacksonUtils.readValue[Map[String, Seq[Int]]](str)
.flatMap { case (topic, parts) =>
parts.map { part =>
new TopicPartition(topic, part)
}
}.toArray
}.toArray
} catch {
case NonFatal(x) =>
case NonFatal(_) =>
throw new IllegalArgumentException(
s"""Expected e.g. {"topicA":[0,1],"topicB":[0,1]}, got $str""")
}
Expand All @@ -56,35 +56,37 @@ private object JsonUtils {
val parts: List[Int] = result.getOrElse(tp.topic, Nil)
result += tp.topic -> (tp.partition::parts)
}
Serialization.write(result)
JacksonUtils.writeValueAsString(result)
}

/**
* Read per-TopicPartition offsets from json string
*/
def partitionOffsets(str: String): Map[TopicPartition, Long] = {
try {
Serialization.read[Map[String, Map[Int, Long]]](str).flatMap { case (topic, partOffsets) =>
JacksonUtils.readValue[Map[String, Map[Int, Long]]](str)
.flatMap { case (topic, partOffsets) =>
partOffsets.map { case (part, offset) =>
new TopicPartition(topic, part) -> offset
new TopicPartition(topic, part) -> offset
}
}
}
} catch {
case NonFatal(x) =>
case NonFatal(_) =>
throw new IllegalArgumentException(
s"""Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got $str""")
}
}

def partitionTimestamps(str: String): Map[TopicPartition, Long] = {
try {
Serialization.read[Map[String, Map[Int, Long]]](str).flatMap { case (topic, partTimestamps) =>
partTimestamps.map { case (part, timestamp) =>
new TopicPartition(topic, part) -> timestamp
JacksonUtils.readValue[Map[String, Map[Int, Long]]](str)
.flatMap { case (topic, partTimestamps) =>
partTimestamps.map { case (part, timestamp) =>
new TopicPartition(topic, part) -> timestamp
}
}
}
} catch {
case NonFatal(x) =>
case NonFatal(_) =>
throw new IllegalArgumentException(
s"""Expected e.g. {"topicA": {"0": 123456789, "1": 123456789},
|"topicB": {"0": 123456789, "1": 123456789}}, got $str""".stripMargin)
Expand All @@ -103,12 +105,12 @@ private object JsonUtils {
}
val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism
partitions.foreach { tp =>
val off = partitionOffsets(tp)
val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
parts += tp.partition -> off
result += tp.topic -> parts
val off = partitionOffsets(tp)
val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
parts += tp.partition -> off
result += tp.topic -> parts
}
Serialization.write(result)
JacksonUtils.writeValueAsString(result)
}

def partitionTimestamps(topicTimestamps: Map[TopicPartition, Long]): String = {
Expand Down
1 change: 1 addition & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
Expand Down
9 changes: 0 additions & 9 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.handler.DefaultHandler
import org.eclipse.jetty.server.handler.HandlerList
import org.eclipse.jetty.server.handler.ResourceHandler
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods.{compact, render}

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -462,13 +460,6 @@ private[spark] object TestUtils {
result.toArray
}

/** Creates a temp JSON file that contains the input JSON record. */
def createTempJsonFile(dir: File, prefix: String, jsonValue: JValue): String = {
val file = File.createTempFile(prefix, ".json", dir)
JavaFiles.write(file.toPath, compact(render(jsonValue)).getBytes())
file.getPath
}

/** Creates a temp bash script that prints the given output. */
def createTempScriptWithExpectedOutput(dir: File, prefix: String, output: String): String = {
val file = File.createTempFile(prefix, ".sh", dir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.sys.process._

import org.json4s.jackson.JsonMethods

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.master.RecoveryState
import org.apache.spark.internal.{config, Logging}
Expand Down Expand Up @@ -340,7 +338,6 @@ private object FaultToleranceTest extends App with Logging {
private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File)
extends Logging {

implicit val formats = org.json4s.DefaultFormats
var state: RecoveryState.Value = _
var liveWorkerIPs: List[String] = _
var numLiveApps = 0
Expand All @@ -349,22 +346,25 @@ private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile

def readState(): Unit = {
try {
import org.apache.spark.util.JacksonUtils
import scala.collection.JavaConverters._
val masterStream = new InputStreamReader(
new URL("http://%s:8080/json".format(ip)).openStream, StandardCharsets.UTF_8)
val json = JsonMethods.parse(masterStream)
val jsonNode = JacksonUtils.readTree(masterStream)

val workers = json \ "workers"
val liveWorkers = workers.children.filter(w => (w \ "state").extract[String] == "ALIVE")
val workersNode = jsonNode.get("workers")
val liveWorkerNodes = workersNode.elements().asScala
.filter(n => n.get("state").asText() == "ALIVE")
// Extract the worker IP from "webuiaddress" (rather than "host") because the host name
// on containers is a weird hash instead of the actual IP address.
liveWorkerIPs = liveWorkers.map {
w => (w \ "webuiaddress").extract[String].stripPrefix("http://").stripSuffix(":8081")
}
liveWorkerIPs = liveWorkerNodes.map {
w => w.get("webuiaddress").asText().stripPrefix("http://").stripSuffix(":8081")
}.toList

numLiveApps = (json \ "activeapps").children.size
numLiveApps = jsonNode.get("activeapps").size()

val status = json \\ "status"
val stateString = status.extract[String]
val status = jsonNode.get("status")
val stateString = status.asText()
state = RecoveryState.values.filter(state => state.toString == stateString).head
} catch {
case e: Exception =>
Expand All @@ -383,8 +383,6 @@ private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile
private class TestWorkerInfo(val ip: String, val dockerId: DockerId, val logFile: File)
extends Logging {

implicit val formats = org.json4s.DefaultFormats

logDebug("Created worker: " + this)

def kill(): Unit = { Docker.kill(dockerId) }
Expand Down