Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DON'T MERGE] Try to replace all json4s with Jackson #37604

Closed
wants to merge 39 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
d189874
try replace json4s to jackson
LuciferYang Aug 22, 2022
9d28cc3
revet irrelevant
LuciferYang Aug 22, 2022
2ea9dcf
add a simple bench
LuciferYang Aug 24, 2022
e132a3a
fix compile
LuciferYang Aug 24, 2022
fff531d
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Aug 29, 2022
ae3c149
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Aug 31, 2022
3d785ba
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Sep 2, 2022
89fdeb1
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Sep 6, 2022
43a5b7e
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Sep 13, 2022
f106e86
fix compile
LuciferYang Sep 13, 2022
98cc9ee
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Sep 21, 2022
59ee54c
fix compile try uts
LuciferYang Sep 26, 2022
4717acf
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Sep 26, 2022
34d6230
fix compile
LuciferYang Sep 26, 2022
446be1e
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Nov 1, 2022
3cccb9e
fix test
LuciferYang Nov 1, 2022
c8e51bd
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Nov 16, 2022
54a677f
Merge branch 'apache:master' into json4s-2-jackson
LuciferYang Nov 18, 2022
c583bde
Merge branch 'apache:master' into json4s-2-jackson
LuciferYang Nov 21, 2022
a4a1ba9
Merge branch 'apache:master' into json4s-2-jackson
LuciferYang Nov 25, 2022
825468b
Merge branch 'apache:master' into json4s-2-jackson
LuciferYang Dec 1, 2022
fef78aa
Merge branch 'apache:master' into json4s-2-jackson
LuciferYang Dec 5, 2022
337d7ce
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Dec 9, 2022
78ddd44
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Dec 27, 2022
4480177
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Jan 19, 2023
0b706a0
fix confilct
LuciferYang Jan 19, 2023
2c56119
Merge branch 'apache:master' into json4s-2-jackson
LuciferYang Jan 21, 2023
3bc7aee
Merge branch 'apache:master' into json4s-2-jackson
LuciferYang Jan 23, 2023
7ae97f7
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Mar 2, 2023
ff49deb
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Mar 15, 2023
91eff10
update
LuciferYang Mar 16, 2023
b2a590e
Merge branch 'apache:master' into json4s-2-jackson
LuciferYang Mar 21, 2023
3e3ebeb
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Apr 17, 2023
e59ee90
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Apr 19, 2023
189c4d2
Merge branch 'apache:master' into json4s-2-jackson
LuciferYang Apr 20, 2023
5093488
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Apr 25, 2023
c9d95a6
re format
LuciferYang Apr 25, 2023
e386fbf
fix compile
LuciferYang Apr 25, 2023
9fce83c
Merge branch 'upmaster' into json4s-2-jackson
LuciferYang Jun 7, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@

package org.apache.spark.sql.streaming

import org.json4s._
import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import com.fasterxml.jackson.databind.JsonNode

import org.apache.spark.annotation.Evolving
import org.apache.spark.util.JacksonUtils

/**
* Reports information about the instantaneous status of a streaming query.
Expand All @@ -47,10 +45,10 @@ class StreamingQueryStatus protected[sql] (
// This is a copy of the same class in sql/core/.../streaming/StreamingQueryStatus.scala

/** The compact JSON representation of this status. */
def json: String = compact(render(jsonValue))
def json: String = JacksonUtils.writeValueAsString(jsonNode)

/** The pretty (i.e. indented) JSON representation of this status. */
def prettyJson: String = pretty(render(jsonValue))
def prettyJson: String = JacksonUtils.writeValuePrettyAsString(jsonNode)

override def toString: String = prettyJson

Expand All @@ -64,9 +62,11 @@ class StreamingQueryStatus protected[sql] (
isTriggerActive = isTriggerActive)
}

private[sql] def jsonValue: JValue = {
("message" -> JString(message)) ~
("isDataAvailable" -> JBool(isDataAvailable)) ~
("isTriggerActive" -> JBool(isTriggerActive))
private[sql] def jsonNode: JsonNode = {
val node = JacksonUtils.createObjectNode
node.put("message", message)
node.put("isDataAvailable", isDataAvailable)
node.put("isTriggerActive", isTriggerActive)
node
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,17 @@ import java.util.UUID
import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode, ObjectMapper}
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule}
import org.json4s._
import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.annotation.Evolving
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue}
import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJsonNode, safeMapToJsonNode}
import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
import org.apache.spark.sql.util.ToJsonUtil
import org.apache.spark.util.JacksonUtils

/**
* Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
Expand All @@ -58,11 +55,12 @@ class StateOperatorProgress private[spark] (
val customMetrics: ju.Map[String, JLong] = new ju.HashMap())
extends Serializable {

private val factory = JacksonUtils.defaultNodeFactory
/** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))
def json: String = JacksonUtils.writeValueAsString(jsonNode)

/** The pretty (i.e. indented) JSON representation of this progress. */
def prettyJson: String = pretty(render(jsonValue))
def prettyJson: String = JacksonUtils.writeValuePrettyAsString(jsonNode)

private[sql] def copy(
newNumRowsUpdated: Long,
Expand All @@ -81,26 +79,26 @@ class StateOperatorProgress private[spark] (
numStateStoreInstances = numStateStoreInstances,
customMetrics = customMetrics)

private[sql] def jsonValue: JValue = {
("operatorName" -> JString(operatorName)) ~
("numRowsTotal" -> JInt(numRowsTotal)) ~
("numRowsUpdated" -> JInt(numRowsUpdated)) ~
("allUpdatesTimeMs" -> JInt(allUpdatesTimeMs)) ~
("numRowsRemoved" -> JInt(numRowsRemoved)) ~
("allRemovalsTimeMs" -> JInt(allRemovalsTimeMs)) ~
("commitTimeMs" -> JInt(commitTimeMs)) ~
("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~
("numShufflePartitions" -> JInt(numShufflePartitions)) ~
("numStateStoreInstances" -> JInt(numStateStoreInstances)) ~
("customMetrics" -> {
if (!customMetrics.isEmpty) {
val keys = customMetrics.keySet.asScala.toSeq.sorted
keys.map { k => k -> JInt(customMetrics.get(k).toLong): JObject }.reduce(_ ~ _)
} else {
JNothing
}
})
private[sql] def jsonNode: JsonNode = {
val obj = factory.objectNode()
obj.put("operatorName", operatorName)
obj.put("numRowsTotal", numRowsTotal)
obj.put("numRowsUpdated", numRowsUpdated)
obj.put("allUpdatesTimeMs", allUpdatesTimeMs)
obj.put("numRowsRemoved", numRowsRemoved)
obj.put("allRemovalsTimeMs", allRemovalsTimeMs)
obj.put("commitTimeMs", commitTimeMs)
obj.put("memoryUsedBytes", memoryUsedBytes)
obj.put("numRowsDroppedByWatermark", numRowsDroppedByWatermark)
obj.put("numShufflePartitions", numShufflePartitions)
obj.put("numStateStoreInstances", numStateStoreInstances)
if (!customMetrics.isEmpty) {
val metrics = factory.objectNode()
val keys = customMetrics.keySet.asScala.toSeq.sorted
keys.foreach(k => metrics.put(k, customMetrics.get(k).toLong))
obj.set[JsonNode]("customMetrics", metrics)
}
obj
}

override def toString: String = prettyJson
Expand Down Expand Up @@ -169,30 +167,59 @@ class StreamingQueryProgress private[spark] (
def processedRowsPerSecond: Double = sources.map(_.processedRowsPerSecond).sum

/** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))
def json: String = JacksonUtils.writeValueAsString(jsonNode)

/** The pretty (i.e. indented) JSON representation of this progress. */
def prettyJson: String = pretty(render(jsonValue))
def prettyJson: String = JacksonUtils.writeValuePrettyAsString(jsonNode)

override def toString: String = prettyJson

private[sql] def jsonValue: JValue = {
("id" -> JString(id.toString)) ~
("runId" -> JString(runId.toString)) ~
("name" -> JString(name)) ~
("timestamp" -> JString(timestamp)) ~
("batchId" -> JInt(batchId)) ~
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~
("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
("sink" -> sink.jsonValue) ~
("observedMetrics" -> safeMapToJValue[Row](
observedMetrics,
row => ToJsonUtil.jsonValue(row)))
private[sql] def jsonNode: JsonNode = {
val factory = JacksonUtils.defaultNodeFactory
val obj = factory.objectNode()
obj.put("id", id.toString)
obj.put("runId", runId.toString)
obj.put("name", name)
obj.put("timestamp", timestamp)
obj.put("batchId", batchId)
obj.put("numInputRows", numInputRows)
val inputRowsPerSecondNode = safeDoubleToJsonNode(inputRowsPerSecond)
if (!inputRowsPerSecondNode.isMissingNode) {
obj.set[JsonNode]("inputRowsPerSecond", inputRowsPerSecondNode)
}
val processedRowsPerSecondNode = safeDoubleToJsonNode(processedRowsPerSecond)
if (!processedRowsPerSecondNode.isMissingNode) {
obj.set[JsonNode]("processedRowsPerSecond", processedRowsPerSecondNode)
}

val durationMsNode = safeMapToJsonNode[JLong](durationMs, v => factory.numberNode(v.toLong))
if (!durationMsNode.isMissingNode) {
obj.set[JsonNode]("durationMs", durationMsNode)
}

val eventTimeNode = safeMapToJsonNode[String](eventTime, v => factory.textNode(v))
if (!eventTimeNode.isMissingNode) {
obj.set[JsonNode]("eventTime", eventTimeNode)
}


val stateOperatorsList = stateOperators.map(_.jsonNode).toList.asJava
val stateOperatorsArrayNode = factory.arrayNode(stateOperatorsList.size())
stateOperatorsArrayNode.addAll(stateOperatorsList)
obj.set[JsonNode]("stateOperators", stateOperatorsArrayNode)

val sourcesList = sources.map(_.jsonNode).toList.asJava
val sourcesArrayNode = factory.arrayNode(sourcesList.size)
sourcesArrayNode.addAll(sourcesList)
obj.set[JsonNode]("sources", sourcesArrayNode)

obj.set[JsonNode]("sink", sink.jsonNode)
val observedMetricsNode =
safeMapToJsonNode[Row](observedMetrics, row => ToJsonUtil.jsonNode(row))
if (!observedMetricsNode.isMissingNode) {
obj.set[JsonNode]("observedMetrics", observedMetricsNode)
}
obj
}
}

Expand Down Expand Up @@ -244,28 +271,40 @@ class SourceProgress protected[spark] (
extends Serializable {

/** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))
def json: String = JacksonUtils.writeValueAsString(jsonNode)

/** The pretty (i.e. indented) JSON representation of this progress. */
def prettyJson: String = pretty(render(jsonValue))
def prettyJson: String = JacksonUtils.writeValuePrettyAsString(jsonNode)

override def toString: String = prettyJson

private[sql] def jsonValue: JValue = {
("description" -> JString(description)) ~
("startOffset" -> tryParse(startOffset)) ~
("endOffset" -> tryParse(endOffset)) ~
("latestOffset" -> tryParse(latestOffset)) ~
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
("metrics" -> safeMapToJValue[String](metrics, s => JString(s)))
private[sql] def jsonNode: JsonNode = {
val factory = JacksonUtils.defaultNodeFactory
val node = JacksonUtils.createObjectNode
node.put("description", description)
node.set[JsonNode]("startOffset", tryParseNode(startOffset))
node.set[JsonNode]("endOffset", tryParseNode(endOffset))
node.set[JsonNode]("latestOffset", tryParseNode(latestOffset))
node.put("numInputRows", numInputRows)
val inputRowsPerSecondNode = safeDoubleToJsonNode(inputRowsPerSecond)
if (!inputRowsPerSecondNode.isMissingNode) {
node.set[JsonNode]("inputRowsPerSecond", inputRowsPerSecondNode)
}
val processedRowsPerSecondNode = safeDoubleToJsonNode(processedRowsPerSecond)
if (!processedRowsPerSecondNode.isMissingNode) {
node.set[JsonNode]("processedRowsPerSecond", processedRowsPerSecondNode)
}
val metricsNode = safeMapToJsonNode[String](metrics, s => factory.textNode(s))
if (!metricsNode.isMissingNode) {
node.set[JsonNode]("metrics", metricsNode)
}
node
}

private def tryParse(json: String) = try {
parse(json)
private def tryParseNode(json: String): JsonNode = try {
JacksonUtils.readTree(json)
} catch {
case NonFatal(e) => JString(json)
case NonFatal(_) => JacksonUtils.defaultNodeFactory.textNode(json)
}
}

Expand Down Expand Up @@ -293,17 +332,23 @@ class SinkProgress protected[spark] (
}

/** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))
def json: String = JacksonUtils.writeValueAsString(jsonNode)

/** The pretty (i.e. indented) JSON representation of this progress. */
def prettyJson: String = pretty(render(jsonValue))
def prettyJson: String = JacksonUtils.writeValuePrettyAsString(jsonNode)

override def toString: String = prettyJson

private[sql] def jsonValue: JValue = {
("description" -> JString(description)) ~
("numOutputRows" -> JInt(numOutputRows)) ~
("metrics" -> safeMapToJValue[String](metrics, s => JString(s)))
private[sql] def jsonNode: JsonNode = {
val factory = JacksonUtils.defaultNodeFactory
val obj = factory.objectNode()
obj.put("description", description)
obj.put("numOutputRows", numOutputRows)
val metricsNode = safeMapToJsonNode[String](metrics, s => factory.textNode(s))
if (!metricsNode.isMissingNode) {
obj.set[JsonNode]("metrics", metricsNode)
}
obj
}
}

Expand All @@ -318,14 +363,22 @@ private[sql] object SinkProgress {
}

private object SafeJsonSerializer {
def safeDoubleToJValue(value: Double): JValue = {
if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
def safeDoubleToJsonNode(value: Double): JsonNode = {
val factory = JacksonUtils.defaultNodeFactory
if (value.isNaN || value.isInfinity) {
factory.missingNode()
} else {
factory.numberNode(value)
}
}

/** Convert map to JValue while handling empty maps. Also, this sorts the keys. */
def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = {
if (map.isEmpty) return JNothing
def safeMapToJsonNode[T](map: ju.Map[String, T], valueToJsonNode: T => JsonNode): JsonNode = {
val factory = JacksonUtils.defaultNodeFactory
if (map.isEmpty) return factory.missingNode()
val keys = map.asScala.keySet.toSeq.sorted
keys.map { k => k -> valueToJValue(map.get(k)): JObject }.reduce(_ ~ _)
val node = factory.objectNode()
keys.foreach { k => node.set[JsonNode](k, valueToJsonNode(map.get(k))) }
node
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ import io.grpc.protobuf.services.ProtoReflectionService
import io.grpc.stub.StreamObserver
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods.{compact, render}

import org.apache.spark.{SparkEnv, SparkException, SparkThrowable}
import org.apache.spark.api.python.PythonException
Expand All @@ -45,6 +43,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_BINDING_PORT, CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE, CONNECT_JVM_STACK_TRACE_MAX_SIZE}
import org.apache.spark.sql.internal.SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED
import org.apache.spark.util.JacksonUtils

/**
* The SparkConnectService implementation.
Expand Down Expand Up @@ -80,7 +79,9 @@ class SparkConnectService(debug: Boolean)
.newBuilder()
.setReason(st.getClass.getName)
.setDomain("org.apache.spark")
.putMetadata("classes", compact(render(allClasses(st.getClass).map(_.getName))))
.putMetadata(
"classes",
JacksonUtils.writeValueAsString(allClasses(st.getClass).map(_.getName)))

lazy val stackTrace = Option(ExceptionUtils.getStackTrace(st))
val withStackTrace = if (stackTraceEnabled && stackTrace.nonEmpty) {
Expand Down