Skip to content

Commit

Permalink
Patches for MR3
Browse files Browse the repository at this point in the history
  • Loading branch information
Sungwoo Park committed Mar 16, 2021
1 parent 043ba81 commit 979a377
Show file tree
Hide file tree
Showing 362 changed files with 12,177 additions and 3,261 deletions.
Expand Up @@ -33,12 +33,10 @@ private JsonParserFactory() {
* @return the appropriate JsonParser to print a JSONObject into outputStream.
*/
public static JsonParser getParser(HiveConf conf) {
if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
if (engine.equals("mr3") || engine.equals("tez")) {
return new TezJsonParser();
}
if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
return new SparkJsonParser();
}
return null;
}
}
Expand Up @@ -190,14 +190,10 @@ public static boolean canRenderInPlace(HiveConf conf) {
String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
boolean inPlaceUpdates = false;

if (engine.equals("tez")) {
if (engine.equals("mr3") || engine.equals("tez")) {
inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS);
}

if (engine.equals("spark")) {
inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.SPARK_EXEC_INPLACE_PROGRESS);
}

return inPlaceUpdates && isUnixTerminal();
}

Expand Down
142 changes: 136 additions & 6 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Expand Up @@ -95,6 +95,7 @@ public class HiveConf extends Configuration {

private Pattern modWhiteListPattern = null;
private volatile boolean isSparkConfigUpdated = false;
private volatile boolean isMr3ConfigUpdated = false;
private static final int LOG_PREFIX_LENGTH = 64;

public boolean getSparkConfigUpdated() {
Expand All @@ -105,6 +106,14 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) {
this.isSparkConfigUpdated = isSparkConfigUpdated;
}

public boolean getMr3ConfigUpdated() {
return isMr3ConfigUpdated;
}

public void setMr3ConfigUpdated(boolean isMr3ConfigUpdated) {
this.isMr3ConfigUpdated = isMr3ConfigUpdated;
}

public interface EncoderDecoder<K, V> {
V encode(K key);
K decode(V value);
Expand Down Expand Up @@ -3002,7 +3011,7 @@ public static enum ConfVars {
HIVE_SSL_PROTOCOL_BLACKLIST("hive.ssl.protocol.blacklist", "SSLv2,SSLv3",
"SSL Versions to disable for all Hive Servers"),

HIVE_PRIVILEGE_SYNCHRONIZER("hive.privilege.synchronizer", true,
HIVE_PRIVILEGE_SYNCHRONIZER("hive.privilege.synchronizer", false,
"Whether to synchronize privileges from external authorizer periodically in HS2"),
HIVE_PRIVILEGE_SYNCHRONIZER_INTERVAL("hive.privilege.synchronizer.interval",
"1800s", new TimeValidator(TimeUnit.SECONDS),
Expand Down Expand Up @@ -3505,10 +3514,11 @@ public static enum ConfVars {
HIVE_DECODE_PARTITION_NAME("hive.decode.partition.name", false,
"Whether to show the unquoted partition names in query results."),

HIVE_EXECUTION_ENGINE("hive.execution.engine", "mr", new StringSet(true, "mr", "tez", "spark"),
"Chooses execution engine. Options are: mr (Map reduce, default), tez, spark. While MR\n" +
// do not remove 'tez' which might be necessary, e.g., when connecting from Hue
HIVE_EXECUTION_ENGINE("hive.execution.engine", "mr3", new StringSet(true, "mr3", "tez"),
"Chooses execution engine. Options are: mr3 or tez. While MR\n" +
"remains the default engine for historical reasons, it is itself a historical engine\n" +
"and is deprecated in Hive 2 line. It may be removed without further warning."),
"and is deprecated in Hive 2 line. It may be removed without further warning. tez and spark are not supported."),

HIVE_EXECUTION_MODE("hive.execution.mode", "container", new StringSet("container", "llap"),
"Chooses whether query fragments will run in container or in llap"),
Expand Down Expand Up @@ -3717,7 +3727,7 @@ public static enum ConfVars {
"Turn on Tez' auto reducer parallelism feature. When enabled, Hive will still estimate data sizes\n" +
"and set parallelism estimates. Tez will sample source vertices' output sizes and adjust the estimates at runtime as\n" +
"necessary."),
TEZ_LLAP_MIN_REDUCER_PER_EXECUTOR("hive.tez.llap.min.reducer.per.executor", 0.95f,
TEZ_LLAP_MIN_REDUCER_PER_EXECUTOR("hive.tez.llap.min.reducer.per.executor", 0.2f,
"If above 0, the min number of reducers for auto-parallelism for LLAP scheduling will\n" +
"be set to this fraction of the number of executors."),
TEZ_MAX_PARTITION_FACTOR("hive.tez.max.partition.factor", 2f,
Expand Down Expand Up @@ -4458,7 +4468,127 @@ public static enum ConfVars {
"This parameter enables a number of optimizations when running on blobstores:\n" +
"(1) If hive.blobstore.use.blobstore.as.scratchdir is false, force the last Hive job to write to the blobstore.\n" +
"This is a performance optimization that forces the final FileSinkOperator to write to the blobstore.\n" +
"See HIVE-15121 for details.");
"See HIVE-15121 for details."),

MR3_CLIENT_CONNECT_TIMEOUT("hive.mr3.client.connect.timeout",
"60000ms", new TimeValidator(TimeUnit.MILLISECONDS),
"Timeout for Hive to establish connection to MR3 Application Master."),
// ContainerWorker
// MR3_CONTAINER_MAX_JAVA_HEAP_FRACTION is not passed to ContainerWorker. Rather it is written to
// MR3Conf which is passed to DAGAppMaster and ContainerWorkers. That is, it is a part of mr3-conf.pb
// which is shared by both DAGAppMaster and ContainerWorkers as a LocalResource.
// It is fixed per MR3Session, i.e., at the time of creating a new MR3Session.
MR3_CONTAINER_MAX_JAVA_HEAP_FRACTION("hive.mr3.container.max.java.heap.fraction", 0.8f,
"Fraction of task memory to be used as Java heap. Fixed at the time of creating each MR3Session."),
// for ContainerGroup (in DAG)
// These configurations are used only when creating ContainerGroup.
// Hence, they do not affect MR3Conf (mr3-conf.pb) passed to DAGAppMaster and ContainerWorkers.
MR3_CONTAINERGROUP_SCHEME("hive.mr3.containergroup.scheme", "all-in-one",
new StringSet("all-in-one", "per-map-reduce", "per-vertex"),
"Scheme for assigning Vertexes to ContainerGroups"),
MR3_CONTAINER_ENV("hive.mr3.container.env", null,
"Environment string for ContainerGroups"),
MR3_CONTAINER_JAVA_OPTS("hive.mr3.container.java.opts", null,
"Java options for ContainerGroups"),
MR3_CONTAINER_COMBINE_TASKATTEMPTS("hive.mr3.container.combine.taskattempts", true,
"Allow multiple concurrent tasks in the same container"),
MR3_CONTAINER_REUSE("hive.mr3.container.reuse", true,
"Allow container reuse for running different tasks"),
MR3_CONTAINER_MIX_TASKATTEMPTS("hive.mr3.container.mix.taskattempts", true,
"Allow concurrent tasks from different DAGs in the same container"),
MR3_CONTAINER_USE_PER_QUERY_CACHE("hive.mr3.container.use.per.query.cache", true,
"Use per-query cache shared by all tasks in the same container"),
// for DAG
// This configuration is used only when creating DAG.
// Hence, it does not affect MR3Conf (mr3-conf.pb) passed to DAGAppMaster and ContainerWorkers.
MR3_CONTAINER_STOP_CROSS_DAG_REUSE("hive.mr3.container.stop.cross.dag.reuse", false,
"Stop cross-DAG container reuse for ContainerGroups"),
// common to Vertex, ContainerGroup, LLAP Daemon
MR3_RESOURCE_VCORES_DIVISOR("hive.mr3.resource.vcores.divisor", 1,
"Divisor for CPU cores, between 1 and 1000"),
// Vertex
MR3_MAP_TASK_MEMORY_MB("hive.mr3.map.task.memory.mb", 1024,
"Memory allocated to each mapper, in MB"),
MR3_REDUCE_TASK_MEMORY_MB("hive.mr3.reduce.task.memory.mb", 1024,
"Memory allocated to each reducer, in MB"),
MR3_MAP_TASK_VCORES("hive.mr3.map.task.vcores", 1,
"CPU cores allocated to each mapper"),
MR3_REDUCE_TASK_VCORES("hive.mr3.reduce.task.vcores", 1,
"CPU cores allocated to each reducer"),
// ContainerGroup -- All-in-One
MR3_ALLINONE_CONTAINERGROUP_MEMORY_MB("hive.mr3.all-in-one.containergroup.memory.mb", 1024,
"Memory allocated to each ContainerGroup for All-in-One, in MB"),
MR3_ALLINONE_CONTAINERGROUP_VCORES("hive.mr3.all-in-one.containergroup.vcores", 1,
"CPU cores allocated to each ContainerGroup for All-in-One"),
// ContainerGroup -- Per-Map-Reduce and Per-Vertex
// Map/Reduce ContainerGroup size can be different from Vertex.taskResource, e.g.,
// 'combine TaskAttempts' is enabled
MR3_MAP_CONTAINERGROUP_MEMORY_MB("hive.mr3.map.containergroup.memory.mb", 1024,
"Memory allocated to each ContainerGroup for mappers, in MB"),
MR3_REDUCE_CONTAINERGROUP_MEMORY_MB("hive.mr3.reduce.containergroup.memory.mb", 1024,
"Memory allocated to each ContainerGroup for reducers, in MB"),
MR3_MAP_CONTAINERGROUP_VCORES("hive.mr3.map.containergroup.vcores", 1,
"CPU cores allocated to each ContainerGroup for mappers"),
MR3_REDUCE_CONTAINERGROUP_VCORES("hive.mr3.reduce.containergroup.vcores", 1,
"CPU cores allocated to each ContainerGroup for reducers"),
// use LLAP IO for All-in-One and Per-Map-Reduce schemes when LLAP_IO_ENABLED = true
MR3_LLAP_HEADROOM_MB("hive.mr3.llap.headroom.mb", 1024,
"Memory allocated to JVM headroom when LLAP/IO is enabled"),
MR3_LLAP_DAEMON_TASK_MEMORY_MB("hive.mr3.llap.daemon.task.memory.mb", 0,
"Memory allocated to a DaemonTaskAttempt for LLAP/IO, in MB"),
MR3_LLAP_DAEMON_TASK_VCORES("hive.mr3.llap.daemon.task.vcores", 0,
"CPU cores allocated to a DaemonTaskAttempt for LLAP I/O"),
MR3_LLAP_ORC_MEMORY_PER_THREAD_MB("hive.mr3.llap.orc.memory.per.thread.mb", 1024,
"Memory allocated to each ORC manager in low-level LLAP I/O threads, in MB"),
// EXEC
MR3_EXEC_SUMMARY("hive.mr3.exec.print.summary", false,
"Display breakdown of execution steps, for every query executed by the shell"),
MR3_EXEC_INPLACE_PROGRESS("hive.mr3.exec.inplace.progress", true,
"Update job execution progress in-place in the terminal"),
// daemon ShuffleHandler
MR3_USE_DAEMON_SHUFFLEHANDLER("hive.mr3.use.daemon.shufflehandler", 0,
"Number of daemon ShuffleHandlers in every non-local ContainerWorker"),
// HiveServer2
HIVE_SERVER2_MR3_SHARE_SESSION("hive.server2.mr3.share.session", false,
"Use a common MR3Session to be shared by all HiveSessions"),
// for internal use only
// -1: not stored in HiveConf yet
HIVE_QUERY_ESTIMATE_REDUCE_NUM_TASKS("hive.query.estimate.reducer.num.tasks.internal", -1,
"Estimate number of reducer tasks based on MR3SessionManagerImpl.getEstimateNumTasks() for each query"),
MR3_BUCKET_MAPJOIN_ESTIMATE_NUM_NODES("hive.mr3.bucket.mapjoin.estimate.num.nodes", -1,
"Estimate number of nodes for converting to bucket mapjoin"),

// runtime
MR3_MAPJOIN_INTERRUPT_CHECK_INTERVAL("hive.mr3.mapjoin.interrupt.check.interval", 100000L,
"Interval at which HashTableLoader checks the interrupt state"),
MR3_DAG_ADDITIONAL_CREDENTIALS_SOURCE("hive.mr3.dag.additional.credentials.source", "",
"Comma separated list of additional paths for obtaining DAG Credentials"),

// fault tolerance
MR3_AM_TASK_MAX_FAILED_ATTEMPTS("hive.mr3.am.task.max.failed.attempts", 3,
"Max number of attempts for each Task"),

// speculative execution
MR3_AM_TASK_CONCURRENT_RUN_THRESHOLD_PERCENT("hive.mr3.am.task.concurrent.run.threshold.percent", 100,
"Percentage of TaskAttempts that complete before starting speculative execution. " +
"Can be set to an integer between 1 and 100. " +
"If set to 100, speculative execution of TaskAttempts is disabled."),

// deleting Vertex-local directory
MR3_DAG_DELETE_VERTEX_LOCAL_DIRECTORY("hive.mr3.delete.vertex.local.directory", false,
"Delete Vertex-local directories in ContainerWork when all destination Vertexes complete"),

// high availability
MR3_ZOOKEEPER_APPID_NAMESPACE("hive.mr3.zookeeper.appid.namespace", "mr3AppId",
"ZooKeeper namespace for sharing Application ID"),

// Kubernetes
HIVE_MR3_LOCALIZE_SESSION_JARS("hive.mr3.localize.session.jars", true,
"Localize session jars"),

// Compaction using MR3
HIVE_MR3_COMPACTION_USING_MR3("hive.mr3.compaction.using.mr3", false,
"Enable compaction using mr3. High Availability needs to be enabled.");

public final String varname;
public final String altName;
Expand Down
7 changes: 7 additions & 0 deletions common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
Expand Up @@ -73,6 +73,13 @@ public class PerfLogger {
public static final String TEZ_GET_SESSION = "TezGetSession";
public static final String SAVE_TO_RESULTS_CACHE = "saveToResultsCache";

public static final String MR3_SUBMIT_TO_RUNNING = "MR3SubmitToRunningDag";
public static final String MR3_BUILD_DAG = "MR3BuildDag";
public static final String MR3_SUBMIT_DAG = "MR3SubmitDag";
public static final String MR3_RUN_DAG = "MR3RunDag";
public static final String MR3_CREATE_VERTEX = "MR3CreateVertex";
public static final String MR3_RUN_VERTEX = "MR3RunVertex";

public static final String SPARK_SUBMIT_TO_RUNNING = "SparkSubmitToRunning";
public static final String SPARK_BUILD_PLAN = "SparkBuildPlan";
public static final String SPARK_BUILD_RDD_GRAPH = "SparkBuildRDDGraph";
Expand Down
10 changes: 10 additions & 0 deletions data/conf/hive-site.xml
Expand Up @@ -339,4 +339,14 @@
<value>false</value>
</property>

<property>
<name>mr3.container.runtime.auto.start.input</name>
<value>true</value>
</property>

<property>
<name>mr3.container.localize.python.working.dir.unsafe</name>
<value>true</value>
</property>

</configuration>
124 changes: 123 additions & 1 deletion data/conf/llap/hive-site.xml
Expand Up @@ -225,7 +225,7 @@

<property>
<name>hive.execution.engine</name>
<value>tez</value>
<value>mr3</value>
<description>Whether to use MR or Tez</description>
</property>

Expand Down Expand Up @@ -358,4 +358,126 @@
<value>1024</value>
</property>

<!-- MR3 -->

<property>
<name>hive.llap.execution.mode</name>
<value>all</value>
</property>

<property>
<name>hive.llap.io.enabled</name>
<value>true</value>
</property>

<property>
<name>hive.llap.io.memory.size</name>
<value>4Gb</value>
</property>

<property>
<name>hive.mr3.llap.headroom.mb</name>
<value>0</value>
</property>

<property>
<name>hive.llap.io.threadpool.size</name>
<value>2</value>
</property>

<property>
<name>hive.mr3.container.combine.taskattempts</name>
<value>true</value>
</property>

<property>
<name>hive.mr3.container.reuse</name>
<value>true</value>
</property>

<property>
<name>hive.mr3.containergroup.scheme</name>
<value>all-in-one</value>
</property>

<property>
<name>hive.mr3.container.max.java.heap.fraction</name>
<value>0.8f</value>
</property>

<property>
<name>hive.mr3.map.task.memory.mb</name>
<value>2048</value>
</property>

<property>
<name>hive.mr3.map.task.vcores</name>
<value>1</value>
</property>

<property>
<name>hive.mr3.reduce.task.memory.mb</name>
<value>2048</value>
</property>

<property>
<name>hive.mr3.reduce.task.vcores</name>
<value>1</value>
</property>

<property>
<name>hive.mr3.all-in-one.containergroup.memory.mb</name>
<value>12288</value>
</property>

<property>
<name>hive.mr3.all-in-one.containergroup.vcores</name>
<value>6</value>
</property>

<property>
<name>mr3.runtime</name>
<value>tez</value>
</property>

<property>
<name>mr3.master.mode</name>
<value>local-thread</value>
</property>

<property>
<name>mr3.am.worker.mode</name>
<value>local</value>
</property>

<property>
<name>mr3.am.resource.memory.mb</name>
<value>18432</value>
</property>

<property>
<name>mr3.am.local.resourcescheduler.max.memory.mb</name>
<value>16384</value>
</property>

<property>
<name>mr3.am.local.resourcescheduler.max.cpu.cores</name>
<value>128</value>
</property>

<property>
<name>mr3.container.localize.python.working.dir.unsafe</name>
<value>true</value>
</property>

<property>
<name>mr3.container.runtime.auto.start.input</name>
<value>true</value>
</property>

<property>
<name>mr3.container.localize.python.working.dir.unsafe</name>
<value>true</value>
</property>

</configuration>

0 comments on commit 979a377

Please sign in to comment.