diff --git a/.github/ISSUE_TEMPLATE/---document-issue-.md b/.github/ISSUE_TEMPLATE/---document-issue-.md
deleted file mode 100644
index ffc2fcd7817b6..0000000000000
--- a/.github/ISSUE_TEMPLATE/---document-issue-.md
+++ /dev/null
@@ -1,59 +0,0 @@
----
-name: 文档(Document Issue)
-about: 您可以提问文档相关的问题。 You could use this template for reporting an document issue.
-
----
-
-非常感谢您提交关于飞桨文档的Issue,我们会认真听取您的意见,并进行改进。
-
-建立issue时,为快速解决问题,请您根据情况给出如下信息:
-- 标题:请包含关键词“XXX文档问题”,例如“add 文档问题” 或 ”paddle.add 文档问题“
-- 文档版本信息:请提供有问题的文档的版本号,例如 develop,1.8,2.0RC;
-
-### 文档问题描述:
-
-#### API文档描述是否清晰?
-如:文档描述看不懂,不知道这个API该怎么用;文档公式错误;
-
-#### 参数说明是否清晰
-如:参数未解释清楚,包括用法、使用场景、默认值等
-
-#### 返回/形状说明是否清晰
-如:API返回值、数据的形状描述错误、不清楚
-
-#### 示例代码是否有效?
-如:没有示例代码;示例代码没有可指导性;示例代码跑不通;示例代码格式有问题;示例代码没有注释;
-
-#### 中英文内容是否一致?
-如:中英文API描述不一致;中英文API参数不一致;
-
-#### 其他
-如:文档页面打不开;文档缺失;文档中有死链;
-
-
-Thanks for opening a document issue. We will listen to your opinions carefully and make improvements.
-
-In order to quickly solve your problem, when creating an issue, please provide the following information:
-**Document Information**
-- Title:Please include the keyword "XXX document issue", such as "add document issue" or "paddle.add document issue"
-- Doc Version:Please provide the version of the document, such as develop, 1.8, 2.0RC;
-
-### Describe the problem:
-
-#### Document description is clear?
-For example: I don’t understand this document, I don’t know how to use this API; The formula in this doc is unclear;
-
-#### Parameter description is clear?
-For example: The parameters are confusing, including usage, scenarios, default values, etc.
-
-#### Return/Shape description is clear
-For example: Data returned this doc is error, shape returned is not clear.
-
-#### The sample code is clear?
-For example: no sample code; The sample code is not helpful; The sample code not run well; Format of the sample is not reasonable; The sample code has no comments.
-
-#### Chinese content and English content is consistent?
-For example:Chinese API in this doc is inconsistent with English API, including params, description, sample code, formula, etc.
-
-#### Other
-For example: The doc link is broken; The doc page is missing; Dead link in docs.
diff --git a/.github/ISSUE_TEMPLATE/---feature-request-.md b/.github/ISSUE_TEMPLATE/---feature-request-.md
deleted file mode 100644
index 7af1f7daeefbb..0000000000000
--- a/.github/ISSUE_TEMPLATE/---feature-request-.md
+++ /dev/null
@@ -1,29 +0,0 @@
----
-name: 建议(Feature request)
-about: 您可以提出您的建议。 You could use this template for reporting a suggestion issue.
-
----
-
-欢迎您对PaddlePaddle提出建议,非常感谢您对PaddlePaddle的贡献!
-在留下您的建议时,辛苦您同步提供如下信息:
-- 版本、环境信息
-1)PaddlePaddle版本:请提供您的PaddlePaddle版本号,例如1.1
-2)CPU/GPU:您是否使用GPU进行训练,如是,请提供您的CUDA和cuDNN版本号
-3)系统环境:请您描述系统类型、版本,例如Mac OS 10.14
-注:您可以通过执行[summary_env.py](https://github.com/PaddlePaddle/Paddle/blob/develop/tools/summary_env.py)获取以上信息。
-- 复现信息:如为报错,请给出复现环境、复现步骤
-- 建议描述:请您详细描述,您认为需优化的功能
-
-Thank you for contributing to PaddlePaddle.
-Before submitting the issue, you could search issue in the github in case that there was a similar issue submitted or resolved before.
-Please make sure that this is a feature request.
-**System information**
--PaddlePaddle version (eg.1.1)or CommitID
--CPU: including CPUMKL/OpenBlas/MKLDNN version
--GPU: including CUDA/CUDNN version
--OS Platform (eg.Mac OS 10.14)
-Note: You can get most of the information by running [summary_env.py](https://github.com/PaddlePaddle/Paddle/blob/develop/tools/summary_env.py).
-**To Reproduce**
-Steps to reproduce the behavior
-**Describe the feature and the current behavior/state.**
-**Any Other info.**
diff --git a/.github/ISSUE_TEMPLATE/---inference-issue-.md b/.github/ISSUE_TEMPLATE/---inference-issue-.md
deleted file mode 100644
index ceb8b12d80572..0000000000000
--- a/.github/ISSUE_TEMPLATE/---inference-issue-.md
+++ /dev/null
@@ -1,42 +0,0 @@
----
-name: 预测(Inference Issue)
-about: 您可以提问预测中报错、应用等问题。 You could use this template for reporting an inference issue.
-
----
-
-为使您的问题得到快速解决,在建立Issue前,请您先通过如下方式搜索是否有相似问题:【搜索issue关键字】【使用labels筛选】【官方文档】
-
-如果您没有查询到相似问题,为快速解决您的提问,建立issue时请提供如下细节信息:
-- 标题:简洁、精准描述您的问题,例如“最新预测库的API文档在哪儿 ”
-- 版本、环境信息:
- 1)PaddlePaddle版本:请提供您的PaddlePaddle版本号(如1.1)或CommitID
- 2)CPU:预测若用CPU,请提供CPU型号,MKL/OpenBlas/MKLDNN/等数学库使用情况
- 3)GPU:预测若用GPU,请提供GPU型号、CUDA和CUDNN版本号
- 4)系统环境:请您描述系统类型、版本(如Mac OS 10.14),Python版本
-注:您可以通过执行[summary_env.py](https://github.com/PaddlePaddle/Paddle/blob/develop/tools/summary_env.py)获取以上信息。
--预测信息
- 1)C++预测:请您提供预测库安装包的版本信息,及其中的version.txt文件
- 2)CMake包含路径的完整命令
- 3)API信息(如调用请提供)
- 4)预测库来源:官网下载/特殊环境(如BCLOUD编译)
-- 复现信息:如为报错,请给出复现环境、复现步骤
-- 问题描述:请详细描述您的问题,同步贴出报错信息、日志/代码关键片段
-
-Thank you for contributing to PaddlePaddle.
-Before submitting the issue, you could search issue in the github in case that th
-If there is no solution,please make sure that this is an inference issue including the following details :
-**System information**
--PaddlePaddle version (eg.1.1)or CommitID
--CPU: including CPUMKL/OpenBlas/MKLDNN version
--GPU: including CUDA/CUDNN version
--OS Platform (eg.Mac OS 10.14)
--Python version
--Cmake orders
--C++version.txt
--API information
-Note: You can get most of the information by running [summary_env.py](https://github.com/PaddlePaddle/Paddle/blob/develop/tools/summary_env.py).
-**To Reproduce**
-Steps to reproduce the behavior
-**Describe your current behavior**
-**Code to reproduce the issue**
-**Other info / logs**
diff --git a/.github/ISSUE_TEMPLATE/---installation-issue-.md b/.github/ISSUE_TEMPLATE/---installation-issue-.md
deleted file mode 100644
index 5e761a6605a76..0000000000000
--- a/.github/ISSUE_TEMPLATE/---installation-issue-.md
+++ /dev/null
@@ -1,43 +0,0 @@
----
-name: 安装(Installation Issue)
-about: 您可以提问安装、编译出现报错等问题。 You could use this template for reporting an installation
- issue.
-
----
-
-为使您的问题得到快速解决,在建立Issue前,请您先通过如下方式搜索是否有相似问题:【搜索issue关键字】【使用labels筛选】【官方文档】
-
-建立issue时,为快速解决问题,请您根据使用情况给出如下信息:
-- 标题:请包含关键词“安装错误”/“编译错误”,例如“Mac编译错误”
-- 版本、环境信息:
- 1)PaddlePaddle版本:请提供您的PaddlePaddle版本号(如1.1)或CommitID
- 2)CPU:请提供CPU型号,MKL/OpenBlas/MKLDNN/等数学库的使用情况
- 3)GPU:请提供GPU型号,CUDA和CUDNN版本号
- 4)系统环境:请说明系统类型、版本(如Mac OS 10.14)、Python版本
-注:您可以通过执行[summary_env.py](https://github.com/PaddlePaddle/Paddle/blob/develop/tools/summary_env.py)获取以上信息。
-- 安装方式信息:
-1)pip安装/docker安装
-2)本地编译:请提供cmake命令,编译命令
-3)docker编译:请提供docker镜像,编译命令
- 特殊环境请注明:如离线安装等
-- 复现信息:如为报错,请给出复现环境、复现步骤
-- 问题描述:请详细描述您的问题,同步贴出报错信息、日志/代码关键片段
-
-Thank you for contributing to PaddlePaddle.
-Before submitting the issue, you could search issue in Github in case that there was a similar issue submitted or resolved before.
-If there is no solution,please make sure that this is an installation issue including the following details:
-**System information**
--PaddlePaddle version (eg.1.1)or CommitID
--CPU: including CPUMKL/OpenBlas/MKLDNN version
--GPU: including CUDA/CUDNN version
--OS Platform (eg. Mac OS 10.14)
--Python version
-- Install method: pip install/install with docker/build from source(without docker)/build within docker
-- Other special cases that you think may be related to this problem, eg. offline install, special internet condition
-Note: You can get most of the information by running [summary_env.py](https://github.com/PaddlePaddle/Paddle/blob/develop/tools/summary_env.py).
-
-**To Reproduce**
-Steps to reproduce the behavior
-**Describe your current behavior**
-**Code to reproduce the issue**
-**Other info / logs**
diff --git a/.github/ISSUE_TEMPLATE/---model-issue-.md b/.github/ISSUE_TEMPLATE/---model-issue-.md
deleted file mode 100644
index 1e7c2e9c3e9ce..0000000000000
--- a/.github/ISSUE_TEMPLATE/---model-issue-.md
+++ /dev/null
@@ -1,38 +0,0 @@
----
-name: 模型(Model Issue)
-about: 您可以提问模型、算法、数据集方向的使用报错等问题。You could use this template for reporting a model/
- algorithm/dataset issue.
-
----
-
-为使您的问题得到快速解决,在建立Issue前,请您先通过如下方式搜索是否有相似问题:【搜索issue关键字】【使用labels筛选】【官方文档】
-
-建立issue时,为快速解决问题,请您根据使用情况给出如下信息:
-- 标题:简洁、精准描述您的问题,例如“ssd 模型前置lstm报错 ”
-- 版本、环境信息:
- 1)PaddlePaddle版本:请提供PaddlePaddle版本号,例如1.1或CommitID
- 2)CPU:请提供CPU型号,MKL/OpenBlas/MKLDNN/等数学库的使用情况
- 3)GPU:请提供GPU型号,CUDA和CUDNN版本号
- 4)系统环境:请说明系统类型、版本(例如Mac OS 10.14),Python版本
- 注:您可以通过执行[summary_env.py](https://github.com/PaddlePaddle/Paddle/blob/develop/tools/summary_env.py)获取以上信息。
-- 模型信息
- 1)模型名称 2)使用数据集名称 3)使用算法名称 4)模型链接
-- 复现信息:如为报错,请给出复现环境、复现步骤
-- 问题描述:请详细描述您的问题,同步贴出报错信息、日志/代码关键片段
-
-Thank you for contributing to PaddlePaddle.
-Before submitting the issue, you could search issue in the github.Probably there was a similar issue submitted or resolved before.
-If there is no solution,please make sure that this is a issue of models including the following details:
-**System information**
--PaddlePaddle version (eg.1.1)or CommitID
--CPU: including CPUMKL/OpenBlas/MKLDNN version
--GPU: including CUDA/CUDNN version
--OS Platform (eg.Mac OS 10.14)
--Python version
--Name of Models&Dataset/details of operator
-Note: You can get most of the information by running [summary_env.py](https://github.com/PaddlePaddle/Paddle/blob/develop/tools/summary_env.py).
-**To Reproduce**
-Steps to reproduce the behavior
-**Describe your current behavior**
-**Code to reproduce the issue**
-**Other info / logs**
diff --git a/.github/ISSUE_TEMPLATE/---others-.md b/.github/ISSUE_TEMPLATE/---others-.md
deleted file mode 100644
index ebab9023a6353..0000000000000
--- a/.github/ISSUE_TEMPLATE/---others-.md
+++ /dev/null
@@ -1,35 +0,0 @@
----
-name: 其他(Others)
-about: 如上述分类未包含您的问题,可在此提出。 You could use this template for reporting other issues
-
----
-
-为使您的问题得到快速解决,在建立Issues前,请您先通过如下方式搜索是否有相似问题:【搜索issue关键字】【使用labels筛选】【官方文档】
-
-如果您没有查询到相似问题,为快速解决您的提问,建立issue时请提供如下细节信息:
-- 标题:简洁、精准概括您的问题
-- 版本、环境信息:
- 1)PaddlePaddle版本:请提供您的PaddlePaddle版本号,例如1.1或CommitID
- 2)CPU/GPU:如果您使用GPU训练,请提供GPU驱动版本、CUDA和cuDNN版本号
- 3)系统环境:请您描述系统类型、版本,例如Mac OS 10.14
- 4)Python版本号
- 5)显存信息
- 注:您可以通过执行[summary_env.py](https://github.com/PaddlePaddle/Paddle/blob/develop/tools/summary_env.py)获取以上信息。
-- 复现信息:如为报错,请给出复现环境、复现步骤
-- 问题描述:请详细描述您的问题,同步贴出报错信息、日志/代码关键片段
-
-Thank you for contributing to PaddlePaddle.
-Before submitting the issue, you could search issue in the github in case that there was a similar issue submitted or resolved before.
-If there is no solution,please provide us with the following details :
-**System information**
--PaddlePaddle version (eg.1.1)or CommitID
--CPU: including CPUMKL/OpenBlas/MKLDNN version
--GPU: including CUDA/cuDNN version
--OS Platform and Distribution(eg.Mac OS 10.14)
--Python version
-Note: You can get most of the information by running [summary_env.py](https://github.com/PaddlePaddle/Paddle/blob/develop/tools/summary_env.py).
-**To Reproduce**
-Steps to reproduce the behavior
-**Describe your current behavior**
-**Code to reproduce the issue**
-**Other info / logs**
diff --git a/.github/ISSUE_TEMPLATE/---training-issue-.md b/.github/ISSUE_TEMPLATE/---training-issue-.md
deleted file mode 100644
index 15aa077619dc1..0000000000000
--- a/.github/ISSUE_TEMPLATE/---training-issue-.md
+++ /dev/null
@@ -1,40 +0,0 @@
----
-name: 训练(Training issue)
-about: 您可以提问训练中报错、应用、出core等问题。 You could use this template for reporting an training
- issue.
-
----
-
-为使您的问题得到快速解决,在建立Issues前,请您先通过如下方式搜索是否有相似问题:【搜索issue关键字】【使用labels筛选】【官方文档】
-
-如果您没有查询到相似问题,为快速解决您的提问,建立issue时请提供如下细节信息:
-- 标题:简洁、精准概括您的问题,例如“Insufficient Memory xxx" ”
-- 版本、环境信息:
- 1)PaddlePaddle版本:请提供您的PaddlePaddle版本号,例如1.1或CommitID
- 2)CPU:预测若用CPU,请提供CPU型号,MKL/OpenBlas/MKLDNN/等数学库使用情况
- 3)GPU:预测若用GPU,请提供GPU型号、CUDA和CUDNN版本号
- 4)系统环境:请您描述系统类型、版本,例如Mac OS 10.14,Python版本
- 注:您可以通过执行[summary_env.py](https://github.com/PaddlePaddle/Paddle/blob/develop/tools/summary_env.py)获取以上信息。
-- 训练信息
- 1)单机/多机,单卡/多卡
- 2)显存信息
- 3)Operator信息
-- 复现信息:如为报错,请给出复现环境、复现步骤
-- 问题描述:请详细描述您的问题,同步贴出报错信息、日志、可复现的代码片段
-
-Thank you for contributing to PaddlePaddle.
-Before submitting the issue, you could search issue in the github in case that there was a similar issue submitted or resolved before.
-If there is no solution,please make sure that this is a training issue including the following details:
-**System information**
--PaddlePaddle version (eg.1.1)or CommitID
--CPU: including CPUMKL/OpenBlas/MKLDNN version
--GPU: including CUDA/CUDNN version
--OS Platform (eg.Mac OS 10.14)
--Other imformation: Distriuted training/informantion of operator/
-Graphics card storage
-Note: You can get most of the information by running [summary_env.py](https://github.com/PaddlePaddle/Paddle/blob/develop/tools/summary_env.py).
-**To Reproduce**
-Steps to reproduce the behavior
-**Describe your current behavior**
-**Code to reproduce the issue**
-**Other info / logs**
diff --git a/.github/ISSUE_TEMPLATE/1_bug-report.yml b/.github/ISSUE_TEMPLATE/1_bug-report.yml
new file mode 100644
index 0000000000000..058589232fe1e
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/1_bug-report.yml
@@ -0,0 +1,66 @@
+
+name: 🐛 报BUG Bug Report
+description: 报告一个可复现的BUG帮助我们修复框架。 Report a bug to help us reproduce and fix it.
+labels: [type/bug-report, status/new-issue]
+
+body:
+- type: markdown
+ attributes:
+ value: >
+ #### 在向Paddle报bug之前,请先查询[历史issue](https://github.com/PaddlePaddle/Paddle/issues)是否报过同样的bug。
+
+ #### Before submitting a bug, please make sure the issue hasn't been already addressed by searching through [the existing and past issues](https://github.com/PaddlePaddle/Paddle/issues).
+
+- type: textarea
+ id: code
+ attributes:
+ label: bug描述 Describe the Bug
+ description: |
+ 请清晰简洁的描述这个bug,最好附上bug复现环境、bug复现步骤及最小代码集,以便我们可以通过运行代码来重现错误。代码片段需要尽可能简洁,请花些时间去掉不相关的代码以帮助我们有效地调试。我们希望通过复制代码并运行得到与你相同的结果,请避免任何外部数据或包含相关的导入等。例如:
+ ```python
+ # 导入所有必要的库。 All necessary imports at the beginning.
+ # paddlepaddle <= 2.1.2
+ import paddle
+
+ # 一个简洁的片段,能够定位到bug。 A succinct reproducing example trimmed down to the essential parts.
+ a = paddle.rand(shape=[1,4])
+ b = paddle.rand(shape=[1,4])
+ a.stop_gradient = False
+ b.stop_gradient = False
+
+ c = paddle.zeros((4, 4))
+ c[0, :] = a/b
+
+ print('Is c requires grad: ', not c.stop_gradient) # 注意:这里出现了bug,期望requires_grad=True
+ ```
+ 如果代码太长,请将可执行代码放到[AIStudio](https://aistudio.baidu.com/aistudio/index)中并将项目设置为公开(或者放到github gist上),请在项目中描述清楚bug复现步骤,在issue中描述期望结果与实际结果。
+ 如果你报告的是一个报错信息,请将完整回溯的报错贴在这里,并使用 ` ```三引号块``` `展示错误信息。
+
+
+ placeholder: |
+ 请清晰简洁的描述这个bug。A clear and concise description of what the bug is.
+
+ ```python
+ # 最小可复现代码。 Sample code to reproduce the problem.
+ ```
+
+ ```shell
+ 带有完整回溯的报错信息。 The error message you got, with the full traceback.
+ ```
+ validations:
+ required: true
+
+- type: textarea
+ id: others
+ attributes:
+ label: 其他补充信息 Additional Supplementary Information
+ description: |
+ 如果你还有其他需要补充的内容,请写在这里。
+ If you have anything else to add, please write it here.
+ validations:
+ required: false
+
+- type: markdown
+ attributes:
+ value: >
+ 感谢你的贡献 🎉!Thanks for your contribution 🎉!
diff --git a/.github/ISSUE_TEMPLATE/2_feature-request.yml b/.github/ISSUE_TEMPLATE/2_feature-request.yml
new file mode 100644
index 0000000000000..e9dd3465d1758
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/2_feature-request.yml
@@ -0,0 +1,37 @@
+name: 🚀 新需求 Feature Request
+description: 提交一个你对Paddle的新需求。 Submit a request for a new Paddle feature.
+labels: [type/feature-request, status/new-issue]
+
+body:
+- type: markdown
+ attributes:
+ value: >
+ #### 你可以在这里提出你对Paddle框架的新需求,包括但不限于:功能或模型缺失、功能不全或无法使用、精度/性能不符合预期等。
+
+ #### You could submit a request for a new Paddle feature here, including but not limited to: new features or models, incomplete or unusable features, accuracy/performance not as expected, etc.
+
+- type: textarea
+ id: description
+ attributes:
+ label: 需求描述 Feature Description
+ description: |
+ 请尽可能包含任务目标、需求场景、功能描述等信息,全面的信息有利于我们准确评估你的需求。
+ Please include as much information as possible, such as mission objectives, requirement scenarios, functional descriptions, etc. Comprehensive information will help us accurately assess your feature request.
+ value: "任务目标(请描述你正在做的项目是什么,如模型、论文、项目是什么?);
需求场景(请描述你的项目中为什么需要用此功能);
功能描述(请简单描述或设计这个功能)"
+ validations:
+ required: true
+
+- type: textarea
+ id: alternatives
+ attributes:
+ label: 替代实现 Alternatives
+ description: |
+ 如果你考虑过的任何替代解决方案或功能,请简要描述下,我们会综合评估。
+ A description of any alternative solutions or features you've considered, if any.
+ validations:
+ required: false
+
+- type: markdown
+ attributes:
+ value: >
+ 感谢你的贡献 🎉!Thanks for your contribution 🎉!
diff --git a/.github/ISSUE_TEMPLATE/3_build-installation-issue.yml b/.github/ISSUE_TEMPLATE/3_build-installation-issue.yml
new file mode 100644
index 0000000000000..2786175af6dc8
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/3_build-installation-issue.yml
@@ -0,0 +1,65 @@
+name: 🗂 安装 Build/Installation Issue
+description: 报告一个安装问题。 Report an issue related to build or install Paddle.
+labels: [type/build, status/new-issue]
+
+body:
+- type: markdown
+ attributes:
+ value: >
+ #### 安装请参考[官网文档](https://www.paddlepaddle.org.cn/install/quick?docurl=/documentation/docs/zh/develop/install/pip/linux-pip.html),若未能解决你的问题,你可以在这里提issue。
+
+ #### Before submitting a Build/Installation Issue, please make sure you have visited the [official website](https://www.paddlepaddle.org.cn/documentation/docs/en/install/index_en.html).
+
+- type: textarea
+ id: error
+ attributes:
+ label: 问题描述 Issue Description
+ description: |
+ 请详细描述你的问题,同步贴出报错信息、日志/代码关键片段、复现步骤,以便我们快速排查问题。
+ Please describe your problem in detail, and synchronously post the error message, key log/code snippet, and reproduction steps, so that we can quickly troubleshoot the problem.
+ validations:
+ required: true
+
+- type: textarea
+ id: environment
+ attributes:
+ label: 版本&环境信息 Version & Environment Information
+ description: |
+ 请参考以下命令运行脚本[summary_env.py](https://github.com/PaddlePaddle/Paddle/blob/develop/tools/summary_env.py)获取版本&环境信息,并将输出拷贝在这里。
+ Please run the following and paste the output below.
+ ```shell
+ wget https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/tools/summary_env.py
+ python3 -m pip install distro
+ python3 summary_env.py
+ ```
+ 若运行脚本出现问题,请在issue中说明,并提供以下信息:
+ 1. PaddlePaddle版本:请提供你的PaddlePaddle版本号(如2.0.0)或CommitID。
+ 2. CPU(可选):请提供CPU型号,MKL/OpenBlas/MKLDNN/等数学库的使用情况,是否支持AVX指令集。
+ 3. GPU:请提供GPU型号,CUDA(如cuda10.2)和CUDNN版本号(如cudnn7.6.5)。
+ 4. 系统环境:请说明系统类型、版本(如Mac OS 10.14)。
+ 5. Python版本(如python 3.7)。
+ 6. (可选)若安装过程遇到问题,请提供安装方式(pip/conda/docker/源码编译)和相应的安装命令。
+ 7. (可选)若使用paddle过程中,遇到了无法使用gpu相关问题,请在命令行中键入`nvidia-smi`和`nvcc -V`,提供这两个命令输出的截图。
+ 8. (可选)若使用特殊硬件,请单独注明。
+
+
+ placeholder: |
+ ****************************************
+ Paddle version:
+ Paddle With CUDA:
+
+ OS:
+ Python version:
+
+ CUDA version:
+ cuDNN version:
+ Nvidia driver version:
+ ****************************************
+ validations:
+ required: true
+
+
+- type: markdown
+ attributes:
+ value: >
+ 感谢你的贡献 🎉!Thanks for your contribution 🎉!
diff --git a/.github/ISSUE_TEMPLATE/4_documentation-issue.yml b/.github/ISSUE_TEMPLATE/4_documentation-issue.yml
new file mode 100644
index 0000000000000..936ed3d92e8c4
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/4_documentation-issue.yml
@@ -0,0 +1,39 @@
+name: 📚 文档 Documentation Issue
+description: 反馈一个官网文档错误。 Report an issue related to https://www.paddlepaddle.org.cn/.
+labels: [type/docs, status/new-issue]
+
+body:
+- type: markdown
+ attributes:
+ value: >
+ #### 请确认反馈的问题来自PaddlePaddle官网文档:https://www.paddlepaddle.org.cn/ 。
+
+ #### Before submitting a Documentation Issue, Please make sure that issue is related to https://www.paddlepaddle.org.cn/.
+
+- type: textarea
+ id: link
+ attributes:
+ label: 文档链接&描述 Document Links & Description
+ description: |
+ 请说明有问题的文档链接以及该文档存在的问题。
+ Please fill in the link to the document and describe the question.
+ validations:
+ required: true
+
+
+- type: textarea
+ id: error
+ attributes:
+ label: 请提出你的建议 Please give your suggestion
+ description: |
+ 请告诉我们,你希望如何改进这个文档。或者你可以提个PR修复这个问题。[教程参考](https://github.com/PaddlePaddle/docs/wiki#%E8%B4%A1%E7%8C%AE%E6%96%87%E6%A1%A3)
+ Please tell us how you would like to improve this document. Or you can submit a PR to fix this problem.
+
+ validations:
+ required: false
+
+- type: markdown
+ attributes:
+ value: >
+ 感谢你的贡献 🎉!Thanks for your contribution 🎉!
+
diff --git a/.github/ISSUE_TEMPLATE/5_ask-a-question.yml b/.github/ISSUE_TEMPLATE/5_ask-a-question.yml
new file mode 100644
index 0000000000000..158918946f3fd
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/5_ask-a-question.yml
@@ -0,0 +1,33 @@
+name: 🙋🏼♀️🙋🏻♂️提问 Ask a Question
+description: 提出一个使用/咨询问题。 Ask a usage or consultation question.
+labels: [type/question, status/new-issue]
+
+body:
+- type: markdown
+ attributes:
+ value: >
+ #### 你可以在这里提出一个使用/咨询问题,提问之前请确保:
+
+ - 1)已经百度/谷歌搜索过你的问题,但是没有找到解答;
+
+ - 2)已经在官网查询过[API文档](https://www.paddlepaddle.org.cn/documentation/docs/zh/api/index_cn.html)与[FAQ](https://www.paddlepaddle.org.cn/documentation/docs/zh/faq/index_cn.html),但是没有找到解答;
+
+ - 3)已经在[历史issue](https://github.com/PaddlePaddle/Paddle/issues)中搜索过,没有找到同类issue或issue未被解答。
+
+
+ #### You could ask a usage or consultation question here, before your start, please make sure:
+
+ - 1) You have searched your question on Baidu/Google, but found no answer;
+
+ - 2) You have checked the [API documentation](https://www.paddlepaddle.org.cn/documentation/docs/en/api/index_en.html), but found no answer;
+
+ - 3) You have searched [the existing and past issues](https://github.com/PaddlePaddle/Paddle/issues), but found no similar issue or the issue has not been answered.
+
+
+
+- type: textarea
+ id: question
+ attributes:
+ label: 请提出你的问题 Please ask your question
+ validations:
+ required: true
diff --git a/.github/ISSUE_TEMPLATE/6_others.yml b/.github/ISSUE_TEMPLATE/6_others.yml
new file mode 100644
index 0000000000000..e8f4a9c232918
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/6_others.yml
@@ -0,0 +1,26 @@
+name: 🧩 其他 Others
+description: 提出其他问题。 Report any other non-support related issues.
+labels: [type/others, status/new-issue]
+
+body:
+- type: markdown
+ attributes:
+ value: >
+ #### 你可以在这里提出任何前面几类模板不适用的问题,包括但不限于:优化性建议、框架使用体验反馈、版本兼容性问题、报错信息不清楚等。
+
+ #### You can report any issues that are not applicable to the previous types of templates, including but not limited to: enhancement suggestions, feedback on the use of the framework, version compatibility issues, unclear error information, etc.
+
+- type: textarea
+ id: others
+ attributes:
+ label: 问题描述 Please describe your issue
+ validations:
+ required: true
+
+- type: markdown
+ attributes:
+ value: >
+ 感谢你的贡献 🎉! Thanks for your contribution 🎉!
+
+
+
diff --git a/cmake/external/libmct.cmake b/cmake/external/libmct.cmake
index 92c3165fbaa90..a166e43c7b95e 100644
--- a/cmake/external/libmct.cmake
+++ b/cmake/external/libmct.cmake
@@ -45,7 +45,7 @@ ExternalProject_Add(
PREFIX ${LIBMCT_PREFIX_DIR}
DOWNLOAD_DIR ${LIBMCT_DOWNLOAD_DIR}
DOWNLOAD_COMMAND wget --no-check-certificate ${LIBMCT_URL} -c -q -O ${LIBMCT_NAME}.tar.gz
- && tar zxvf ${LIBMCT_NAME}.tar.gz
+ && tar --no-same-owner -zxvf ${LIBMCT_NAME}.tar.gz
DOWNLOAD_NO_PROGRESS 1
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${LIBMCT_INSTALL_ROOT}
diff --git a/cmake/external/xpu.cmake b/cmake/external/xpu.cmake
index 0d340ab638b1a..e83bdef327891 100644
--- a/cmake/external/xpu.cmake
+++ b/cmake/external/xpu.cmake
@@ -36,7 +36,7 @@ ENDIF()
if(NOT DEFINED XPU_BASE_URL)
SET(XPU_BASE_URL_WITHOUT_DATE "https://baidu-kunlun-product.cdn.bcebos.com/KL-SDK/klsdk-dev")
- SET(XPU_BASE_URL "${XPU_BASE_URL_WITHOUT_DATE}/20220327")
+ SET(XPU_BASE_URL "${XPU_BASE_URL_WITHOUT_DATE}/20220402")
else()
SET(XPU_BASE_URL "${XPU_BASE_URL}")
endif()
diff --git a/cmake/flags.cmake b/cmake/flags.cmake
index f90b71f9e60a8..5742a6b602ff3 100644
--- a/cmake/flags.cmake
+++ b/cmake/flags.cmake
@@ -244,3 +244,7 @@ if(WITH_ROCM)
string (REPLACE "-Werror" "-Wno-error" CMAKE_C_FLAGS ${CMAKE_C_FLAGS})
endif()
+if(WITH_PSCORE OR WITH_PSLIB)
+ string (REPLACE "-Wnon-virtual-dtor" "-Wno-non-virtual-dtor" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
+ string (REPLACE "-Wnon-virtual-dtor" "-Wno-non-virtual-dtor" CMAKE_C_FLAGS ${CMAKE_C_FLAGS})
+endif()
diff --git a/cmake/inference_lib.cmake b/cmake/inference_lib.cmake
index cafd1406b256f..e3e6e1cced2aa 100644
--- a/cmake/inference_lib.cmake
+++ b/cmake/inference_lib.cmake
@@ -199,13 +199,6 @@ IF(WITH_XPU)
DSTS ${dst_dir} ${dst_dir})
ENDIF()
-IF(WITH_IPU)
- set(dst_dir "${PADDLE_INFERENCE_INSTALL_DIR}/third_party/install/ipu")
- copy(inference_lib_dist
- SRCS ${CMAKE_BINARY_DIR}/paddle/fluid/platform/device/ipu/libpaddle_ipu.so
- DSTS ${dst_dir})
-ENDIF()
-
# CMakeCache Info
copy(inference_lib_dist
SRCS ${CMAKE_CURRENT_BINARY_DIR}/CMakeCache.txt
diff --git a/paddle/fluid/distributed/CMakeLists.txt b/paddle/fluid/distributed/CMakeLists.txt
index 17432a0c043f2..06b0583eddf24 100644
--- a/paddle/fluid/distributed/CMakeLists.txt
+++ b/paddle/fluid/distributed/CMakeLists.txt
@@ -1,9 +1,6 @@
add_subdirectory(collective)
add_subdirectory(store)
if(NOT WITH_PSCORE)
- if(WITH_HETERPS)
- add_subdirectory(ps)
- endif()
add_subdirectory(fleet_executor)
return()
endif()
diff --git a/paddle/fluid/distributed/collective/CMakeLists.txt b/paddle/fluid/distributed/collective/CMakeLists.txt
index 6fb805a72e4de..6d736d5543ce4 100644
--- a/paddle/fluid/distributed/collective/CMakeLists.txt
+++ b/paddle/fluid/distributed/collective/CMakeLists.txt
@@ -7,14 +7,14 @@ endif()
if(WITH_NCCL)
cc_library(processgroup_nccl SRCS ProcessGroupNCCL.cc NCCLTools.cc Common.cc DEPS place cuda_stream enforce collective_helper device_context phi phi_api eager_api)
- if (WITH_DISTRIBUTE)
+ if (WITH_DISTRIBUTE AND WITH_PSCORE)
cc_library(processgroup_heter SRCS ProcessGroupHeter.cc NCCLTools.cc Common.cc DEPS place cuda_stream enforce collective_helper device_context phi phi_api eager_api)
endif()
endif()
if(WITH_ASCEND_CL)
cc_library(processgroup_hccl SRCS ProcessGroupHCCL.cc HCCLTools.cc Common.cc DEPS place npu_stream enforce collective_helper device_context phi phi_api eager_api)
- if (WITH_DISTRIBUTE)
+ if (WITH_DISTRIBUTE AND WITH_PSCORE)
cc_library(processgroup_heter SRCS ProcessGroupHeter.cc HCCLTools.cc Common.cc DEPS place npu_stream enforce collective_helper device_context phi phi_api eager_api)
endif()
endif()
diff --git a/paddle/fluid/distributed/collective/ProcessGroup.cc b/paddle/fluid/distributed/collective/ProcessGroup.cc
index ab118dadd5d88..6da83a888683b 100644
--- a/paddle/fluid/distributed/collective/ProcessGroup.cc
+++ b/paddle/fluid/distributed/collective/ProcessGroup.cc
@@ -35,10 +35,10 @@ bool ProcessGroup::Task::Wait(std::chrono::milliseconds timeout) {
void ProcessGroup::Task::Synchronize() {}
ProcessGroup::ProcessGroup(int rank, int size, int gid)
- : rank_(rank), size_(size) {
+ : rank_(rank), size_(size), gid_(gid) {
if (gid != IGNORE_ID) {
auto map = ProcessGroupMapFromGid::getInstance();
- map->insert(gid, this);
+ map->insert(gid_, this);
}
}
diff --git a/paddle/fluid/distributed/collective/ProcessGroup.h b/paddle/fluid/distributed/collective/ProcessGroup.h
index 36a00a7d31758..17d021852671e 100644
--- a/paddle/fluid/distributed/collective/ProcessGroup.h
+++ b/paddle/fluid/distributed/collective/ProcessGroup.h
@@ -93,8 +93,8 @@ class ProcessGroup {
}
virtual void Broadcast(const phi::DenseTensor* in, phi::DenseTensor* out) {
- PADDLE_THROW(platform::errors::InvalidArgument(
- "ProcessGroup%s does not support broadcast for static",
+ PADDLE_THROW(platform::errors::Fatal(
+ "ProcessGroup%s does not support broadcast for static mode runtime",
GetBackendName()));
}
@@ -148,6 +148,7 @@ class ProcessGroup {
protected:
const int rank_;
const int size_;
+ const int gid_;
};
class ProcessGroupMapFromGid {
@@ -158,16 +159,20 @@ class ProcessGroupMapFromGid {
}
void insert(int gid, ProcessGroup* pg) {
- PADDLE_ENFORCE_EQ(has(gid), false,
- platform::errors::PreconditionNotMet(
- "The process group with id %d doesnot exist.", gid));
+ // TODO(sandyhouse): address ut and uncomment the following codes
+ // PADDLE_ENFORCE_EQ(has(gid), false,
+ // platform::errors::PreconditionNotMet(
+ // "The process group with id %d doesnot exist.",
+ // gid));
map_[gid] = pg;
}
ProcessGroup* get(int gid) {
- PADDLE_ENFORCE_EQ(has(gid), false,
- platform::errors::PreconditionNotMet(
- "The process group with id %d doesnot exist.", gid));
+ // TODO(sandyhouse): address ut and uncomment the following codes
+ // PADDLE_ENFORCE_EQ(has(gid), true,
+ // platform::errors::PreconditionNotMet(
+ // "The process group with id %d doesnot exist.",
+ // gid));
return map_.find(gid)->second;
}
diff --git a/paddle/fluid/distributed/collective/ProcessGroupHCCL.cc b/paddle/fluid/distributed/collective/ProcessGroupHCCL.cc
index b21155e09d06e..55945b5e0e396 100644
--- a/paddle/fluid/distributed/collective/ProcessGroupHCCL.cc
+++ b/paddle/fluid/distributed/collective/ProcessGroupHCCL.cc
@@ -30,12 +30,6 @@ constexpr int64_t kWaitBlockTImeout = 10;
namespace paddle {
namespace distributed {
-// bool CheckTensorsInNPUPlace(const std::vector& tensors) {
-// return std::all_of(tensors.cbegin(), tensors.cend(), [&](const Tensor& t) {
-// return t.place() == platform::DeviceType::NPU;
-// });
-// }
-
void SyncDefaultStream(
const std::vector& places,
std::vector& hcclEvents, // NOLINT
diff --git a/paddle/fluid/distributed/collective/ProcessGroupHeter.cc b/paddle/fluid/distributed/collective/ProcessGroupHeter.cc
index ffd653042494d..b3c9ddde50116 100644
--- a/paddle/fluid/distributed/collective/ProcessGroupHeter.cc
+++ b/paddle/fluid/distributed/collective/ProcessGroupHeter.cc
@@ -56,7 +56,8 @@ ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr& store,
local_size_(local_size),
gloo_rank_(gloo_rank),
gloo_size_(gloo_size),
- with_switch_(with_switch) {
+ with_switch_(with_switch),
+ switch_endpoint_(switch_endpoint) {
#if defined(PADDLE_WITH_NCCL)
inner_pg_ = std::make_shared(store, local_rank, local_size,
IGNORE_ID);
@@ -64,14 +65,10 @@ ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr& store,
inner_pg_ = std::make_shared(store, local_rank, local_size,
IGNORE_ID);
#else
- PADDLE_THROW(platform::errors::InvalidArgument(
+ PADDLE_THROW(platform::errors::Fatal(
"ProcessGroupHeter only supports NCCL and HCCL now.");
#endif
- if (with_switch_) {
- // TODO(sandyhouse) starts a client to connect the cloud switch module
- // std::shared_ptr client_ =
- // HeterClient::GetInstance({switch_endpoint}, {}, 0);
- } else if (local_rank_ == 0) {
+ if (local_rank_ == 0 && !with_switch_) {
auto opts = ProcessGroupGloo::GlooOptions::create();
opts->device = ProcessGroupGloo::createDefaultDevice();
inter_pg_ = std::make_shared(store, gloo_rank_,
@@ -79,6 +76,15 @@ ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr& store,
}
}
+template
+static void _do_add(T* dst, T* src, size_t size) {
+ for (size_t i = 0; i < size; i++) {
+ *dst += *src;
+ dst++;
+ src++;
+ }
+}
+
std::shared_ptr ProcessGroupHeter::AllReduce(
std::vector& tensors, const AllreduceOptions& opts) {
#if defined(PADDLE_WITH_NCCL)
@@ -93,33 +99,92 @@ std::shared_ptr ProcessGroupHeter::AllReduce(
// Step2: copy tensors to CPU
if (local_rank_ == 0) {
- std::vector cpu_tensors(tensors.size());
+ std::vector cpu_tensors;
+ cpu_tensors.reserve(tensors.size());
for (size_t i = 0; i < tensors.size(); i++) {
auto dense_gpu_tensor =
std::dynamic_pointer_cast(tensors[i].impl());
- auto dense_cpu_tensor =
- std::dynamic_pointer_cast(cpu_tensors[i].impl());
- dense_cpu_tensor->Resize(tensors[i].dims());
+ phi::DenseTensorMeta meta = phi::DenseTensorMeta(
+ dense_gpu_tensor->dtype(), dense_gpu_tensor->dims());
+ std::shared_ptr dense_cpu_tensor =
+ std::make_shared(
+ std::make_unique(
+ paddle::platform::CPUPlace())
+ .get(),
+ meta);
+ dense_cpu_tensor->ResizeAndAllocate(dense_gpu_tensor->dims());
+ cpu_tensors[i] = paddle::experimental::Tensor(dense_cpu_tensor);
framework::TensorCopySync(*dense_gpu_tensor, platform::CPUPlace(),
dense_cpu_tensor.get());
}
// Step3: do inter cluster allreduce
if (with_switch_) {
- // TODO(sandyhouse) send to and recv from switch, and do add
+ if (local_rank_ == 0) {
+ HeterClient* client_ =
+ HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
+ auto dense_cpu_tensor =
+ std::dynamic_pointer_cast(cpu_tensors[0].impl());
+ std::vector send_size;
+ send_size.push_back(dense_cpu_tensor->numel());
+ int ret = client_->Send(
+ gid_, {dense_cpu_tensor->name()}, send_size,
+ dense_cpu_tensor->data(),
+ dense_cpu_tensor->numel() *
+ framework::DataTypeSize(dense_cpu_tensor->dtype()));
+ PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
+ "Send to the switch module error."));
+ phi::DenseTensorMeta meta = phi::DenseTensorMeta(
+ dense_cpu_tensor->dtype(), dense_cpu_tensor->dims());
+ std::shared_ptr dense_cpu_tensor2 =
+ std::make_shared(
+ std::make_unique(
+ paddle::platform::CPUPlace())
+ .get(),
+ meta);
+ dense_cpu_tensor2->ResizeAndAllocate(dense_cpu_tensor->dims());
+ Tensor cpu_tensor_temp =
+ paddle::experimental::Tensor(dense_cpu_tensor2);
+ ret = client_->Recv(
+ gid_, {dense_cpu_tensor->name()}, dense_cpu_tensor2->data(),
+ dense_cpu_tensor2->numel() *
+ framework::DataTypeSize(dense_cpu_tensor2->dtype()));
+ PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
+ "Recv from the switch module error."));
+
+ switch (dense_cpu_tensor->dtype()) {
+ case DataType::FLOAT32:
+ _do_add(reinterpret_cast(dense_cpu_tensor->data()),
+ reinterpret_cast(dense_cpu_tensor2->data()),
+ dense_cpu_tensor->numel());
+ break;
+ case DataType::FLOAT64:
+ _do_add(
+ reinterpret_cast(dense_cpu_tensor->data()),
+ reinterpret_cast(dense_cpu_tensor2->data()),
+ dense_cpu_tensor->numel());
+ break;
+ case DataType::INT32:
+ _do_add(reinterpret_cast(dense_cpu_tensor->data()),
+ reinterpret_cast(dense_cpu_tensor2->data()),
+ dense_cpu_tensor->numel());
+ break;
+ default:
+ PADDLE_THROW(platform::errors::PreconditionNotMet(
+ "Unsupported data type (%s) to do add.",
+ framework::DataType2String(dense_cpu_tensor->dtype())));
+ }
+ }
} else {
auto gloo_task = inter_pg_->AllReduce(cpu_tensors, opts);
gloo_task->Wait();
}
// Step4: copy cpu tensors to gpu
- // TODO(sandyhouse)
// copy cpu tensors to gpu
for (size_t i = 0; i < tensors.size(); i++) {
auto dense_gpu_tensor =
std::dynamic_pointer_cast(tensors[i].impl());
auto dense_cpu_tensor =
std::dynamic_pointer_cast(cpu_tensors[i].impl());
- // framework::TensorCopySync(*dense_cpu_tensor, tensors[i].place(),
- // dense_gpu_tensor.get());
framework::TensorCopySync(*dense_cpu_tensor, dense_cpu_tensor->place(),
dense_gpu_tensor.get());
}
@@ -147,18 +212,57 @@ std::shared_ptr ProcessGroupHeter::Broadcast(
inner_pg_->Broadcast(tensors, b_opts);
if (local_rank_ == 0) {
- std::vector cpu_tensors(tensors.size());
+ std::vector cpu_tensors;
+ cpu_tensors.reserve(tensors.size());
for (size_t i = 0; i < tensors.size(); i++) {
auto dense_gpu_tensor =
std::dynamic_pointer_cast(tensors[i].impl());
- auto dense_cpu_tensor =
- std::dynamic_pointer_cast(cpu_tensors[i].impl());
- dense_cpu_tensor->Resize(tensors[i].dims());
+ phi::DenseTensorMeta meta = phi::DenseTensorMeta(
+ dense_gpu_tensor->dtype(), dense_gpu_tensor->dims());
+ std::shared_ptr dense_cpu_tensor =
+ std::make_shared(
+ std::make_unique(
+ paddle::platform::CPUPlace())
+ .get(),
+ meta);
+ dense_cpu_tensor->ResizeAndAllocate(dense_gpu_tensor->dims());
+ cpu_tensors[i] = paddle::experimental::Tensor(dense_cpu_tensor);
framework::TensorCopySync(*dense_gpu_tensor, platform::CPUPlace(),
dense_cpu_tensor.get());
}
if (with_switch_) {
- // TODO(sandyhouse) send to and recv
+ if (local_rank_ == 0) {
+ HeterClient* client_ =
+ HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
+ auto dense_cpu_tensor =
+ std::dynamic_pointer_cast(cpu_tensors[0].impl());
+ if (gloo_rank_ == 0) {
+ std::vector send_size;
+ send_size.push_back(dense_cpu_tensor->numel());
+ int ret = client_->Send(
+ gid_, {dense_cpu_tensor->name()}, send_size,
+ dense_cpu_tensor->data(),
+ dense_cpu_tensor->numel() *
+ framework::DataTypeSize(dense_cpu_tensor->dtype()));
+ PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
+ "Send to the switch module error."));
+ } else {
+ int ret = client_->Recv(
+ gid_, {dense_cpu_tensor->name()}, dense_cpu_tensor->data(),
+ dense_cpu_tensor->numel() *
+ framework::DataTypeSize(dense_cpu_tensor->dtype()));
+ PADDLE_ENFORCE_EQ(ret, 0,
+ platform::errors::PreconditionNotMet(
+ "Receive from the switch module error."));
+ ret = client_->Recv(
+ gid_, {dense_cpu_tensor->name()}, dense_cpu_tensor->data(),
+ dense_cpu_tensor->numel() *
+ framework::DataTypeSize(dense_cpu_tensor->dtype()));
+ PADDLE_ENFORCE_EQ(ret, 0,
+ platform::errors::PreconditionNotMet(
+ "Receive from the switch module error."));
+ }
+ }
} else {
auto gloo_task = inter_pg_->Broadcast(cpu_tensors, opts);
gloo_task->Wait();
@@ -168,8 +272,6 @@ std::shared_ptr ProcessGroupHeter::Broadcast(
std::dynamic_pointer_cast(tensors[i].impl());
auto dense_cpu_tensor =
std::dynamic_pointer_cast(cpu_tensors[i].impl());
- // framework::TensorCopySync(*dense_cpu_tensor, tensors[i].place(),
- // dense_gpu_tensor.get());
framework::TensorCopySync(*dense_cpu_tensor, dense_cpu_tensor->place(),
dense_gpu_tensor.get());
}
@@ -185,22 +287,44 @@ void ProcessGroupHeter::Broadcast(const phi::DenseTensor* in,
inner_pg_->Broadcast(in, out);
if (local_rank_ == 0) {
- Tensor cpu_tensor;
- auto dense_cpu_tensor =
- std::dynamic_pointer_cast(cpu_tensor.impl());
- dense_cpu_tensor->Resize(in->dims());
+ phi::DenseTensorMeta meta = phi::DenseTensorMeta(in->dtype(), in->dims());
+ std::shared_ptr dense_cpu_tensor =
+ std::make_shared(
+ std::make_unique(
+ paddle::platform::CPUPlace())
+ .get(),
+ meta);
+ dense_cpu_tensor->ResizeAndAllocate(in->dims());
+ Tensor cpu_tensor = paddle::experimental::Tensor(dense_cpu_tensor);
framework::TensorCopySync(*in, platform::CPUPlace(),
dense_cpu_tensor.get());
if (with_switch_) {
- // TODO(sandyhouse) send to and recv
+ if (local_rank_ == 0) {
+ HeterClient* client_ =
+ HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
+ if (gloo_rank_ == 0) {
+ std::vector send_size;
+ send_size.push_back(in->numel());
+ int ret = client_->Send(
+ gid_, {in->name()}, send_size, dense_cpu_tensor->data(),
+ in->numel() * framework::DataTypeSize(in->dtype()));
+ PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
+ "Send to the switch module error."));
+ } else {
+ int ret =
+ client_->Recv(gid_, {in->name()}, dense_cpu_tensor->data(),
+ in->numel() * framework::DataTypeSize(in->dtype()));
+ PADDLE_ENFORCE_EQ(ret, 0,
+ platform::errors::PreconditionNotMet(
+ "Receive from the switch module error."));
+ }
+ }
} else {
std::vector cpu_tensors = {cpu_tensor};
- // auto gloo_task = inter_pg_->Broadcast(cpu_tensors);
- // gloo_task->Wait();
- inter_pg_->Broadcast(cpu_tensors);
+ auto gloo_task = inter_pg_->Broadcast(cpu_tensors);
+ gloo_task->Wait();
}
- framework::TensorCopySync(*dense_cpu_tensor, dense_cpu_tensor->place(),
- out);
+ framework::TensorCopySync(*dense_cpu_tensor, out->place(), out);
}
inner_pg_->Broadcast(out, out);
}
diff --git a/paddle/fluid/distributed/collective/ProcessGroupHeter.h b/paddle/fluid/distributed/collective/ProcessGroupHeter.h
index 8a26adbea4d78..892dbb9369e8d 100644
--- a/paddle/fluid/distributed/collective/ProcessGroupHeter.h
+++ b/paddle/fluid/distributed/collective/ProcessGroupHeter.h
@@ -23,7 +23,6 @@
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/distributed/collective/ProcessGroupGloo.h"
-// #include "paddle/fluid/distributed/ps/service/heter_client.h"
#include "paddle/fluid/platform/device_context.h"
#ifdef PADDLE_WITH_GLOO
@@ -48,6 +47,11 @@
#include "paddle/fluid/distributed/collective/ProcessGroupHCCL.h"
#endif
+#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
+ (defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_ASCEND_CL))
+#include "paddle/fluid/distributed/ps/service/heter_client.h"
+#endif
+
#include "paddle/fluid/distributed/collective/Common.h"
constexpr const char* HETER_BACKEND_NAME = "HETER_BACKEND";
@@ -108,6 +112,7 @@ class ProcessGroupHeter : public ProcessGroup {
int gloo_rank_;
int gloo_size_;
bool with_switch_;
+ std::string switch_endpoint_;
};
} // namespace distributed
diff --git a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc
index 7c0752b5f367c..b1d892e2521a3 100644
--- a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc
+++ b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc
@@ -110,7 +110,8 @@ void ProcessGroupNCCL::BroadcastUniqueNCCLID(
std::vector& nccl_ids) { // NOLINT
if (rank_ == 0) {
for (size_t i = 0; i < nccl_ids.size(); i++) {
- auto key = "ProcessGroupNCCL/nccl_ids/" + std::to_string(i);
+ auto key = "ProcessGroupNCCL/nccl_ids/" + std::to_string(gid_) + "/" +
+ std::to_string(i);
auto nccl_id = std::vector(
reinterpret_cast(&nccl_ids[i]),
reinterpret_cast(&nccl_ids[i]) + NCCL_UNIQUE_ID_BYTES);
@@ -118,7 +119,8 @@ void ProcessGroupNCCL::BroadcastUniqueNCCLID(
}
} else {
for (size_t i = 0; i < nccl_ids.size(); i++) {
- auto key = "ProcessGroupNCCL/nccl_ids/" + std::to_string(i);
+ auto key = "ProcessGroupNCCL/nccl_ids/" + std::to_string(gid_) + "/" +
+ std::to_string(i);
auto ret = store_->get(key);
std::memcpy(&nccl_ids[i], ret.data(), ret.size());
}
@@ -226,6 +228,43 @@ std::shared_ptr ProcessGroupNCCL::Collective(
return task;
}
+template
+void ProcessGroupNCCL::Collective(const phi::DenseTensor* in,
+ phi::DenseTensor* out, Fn fn,
+ CommType op_type) {
+ std::vector places;
+ places.push_back(in->place());
+ const auto key = GetKeyFromPlaces(places);
+
+ {
+ std::lock_guard lock(mutex_);
+ if (places_to_ncclcomm_.find(key) == places_to_ncclcomm_.end()) {
+ CreateNCCLManagerCache(key, places);
+ }
+ }
+
+ auto& nccl_comms = places_to_ncclcomm_[key];
+
+ SyncDefaultStream(places, places_to_events_[key], places_to_ctx_[key]);
+
+ // construct uninitialize guard for device
+ platform::CUDADeviceGuard cuda_guard;
+
+ if (FLAGS_use_stream_safe_cuda_allocator) {
+ cuda_guard.SetDevice(places[0]);
+ memory::RecordStream(in->Holder(), places_to_ctx_[key][0]->stream());
+ }
+
+ {
+ platform::NCCLGroupGuard nccl_guard;
+ cuda_guard.SetDevice(places[0]);
+ const auto& nccl_stream = places_to_ctx_[key][0]->stream();
+ fn(in, out, nccl_comms[0]->GetNcclComm(), nccl_stream);
+ }
+
+ cuda_guard.SetDevice(places[0]);
+}
+
template
std::shared_ptr ProcessGroupNCCL::PointToPoint(
std::vector& tensors, Fn fn, int dst_rank, CommType op_type) {
diff --git a/paddle/fluid/distributed/collective/ProcessGroupNCCL.h b/paddle/fluid/distributed/collective/ProcessGroupNCCL.h
index 4ab5374dacaf4..fa73ed195b0c1 100644
--- a/paddle/fluid/distributed/collective/ProcessGroupNCCL.h
+++ b/paddle/fluid/distributed/collective/ProcessGroupNCCL.h
@@ -146,6 +146,10 @@ class ProcessGroupNCCL : public ProcessGroup {
std::vector& outputs, // NOLINT
Fn fn, CommType op_type);
+ template
+ void Collective(const phi::DenseTensor*, phi::DenseTensor*, Fn fn,
+ CommType op_type);
+
template
std::shared_ptr PointToPoint(
std::vector& tensors, // NOLINT
diff --git a/paddle/fluid/distributed/collective/reducer.cc b/paddle/fluid/distributed/collective/reducer.cc
index ec02406efc818..71741515c90d5 100644
--- a/paddle/fluid/distributed/collective/reducer.cc
+++ b/paddle/fluid/distributed/collective/reducer.cc
@@ -360,6 +360,7 @@ void EagerReducer::InitializeGroups(
is_sparse_gradient_[tensor_indices_.front()]) {
// process the sparse gradient. one sparse, one group
group.dtype_ = first_var.dtype();
+ group.is_sparse_ = true;
} else {
// process the dense gradient.
InitializeDenseGroups(tensor_indices_, &group);
@@ -391,6 +392,12 @@ void EagerReducer::InitializeDenseGroups(
auto &tensor = tensors_[tensor_index];
auto &tensor_name = tensor.name();
+ PADDLE_ENFORCE_EQ(is_sparse_gradient_[tensor_index], false,
+ platform::errors::PreconditionNotMet(
+ "Tensor %s's GRAD must be Tensor, but received "
+ "GRAD is SelectedRows",
+ tensor_name));
+
PADDLE_ENFORCE_EQ(tensor.is_initialized(), true,
platform::errors::PreconditionNotMet(
"Tensor %s is not initialized.", tensor_name));
@@ -480,6 +487,7 @@ void EagerReducer::PrepareForBackward(const std::vector &outputs) {
next_group_ = 0;
std::for_each(groups_.begin(), groups_.end(), [](EagerGroup &group) {
group.pending_ = group.tensor_indices_.size();
+ group.sparse_contents_ = Tensor();
});
// reinitialize vars_marked_ready_ for next iteration
@@ -544,9 +552,6 @@ void EagerReducer::AddDistHook(size_t var_index) {
return;
}
- auto &tensor = tensors_[var_index];
- const auto &grad_node = GetGradNodeFromTensor(&tensor);
-
VLOG(3) << "Tensor[" << var_index << "] [" << tensors_[var_index].name()
<< "@Grad] arrived and triggered disthook";
@@ -608,33 +613,69 @@ void EagerReducer::MarkVarReady(const size_t var_index,
auto &group_tensor = group.dense_tensors_[inside_group_index];
const auto length = group.length_[inside_group_index];
- if (is_used_var) {
- auto *autograd_meta = tensors_[var_index].get_autograd_meta();
- auto &grad_tensor = static_cast(autograd_meta)->Grad();
- group_tensor
- .ShareDataWith(
- *(std::dynamic_pointer_cast(grad_tensor.impl())))
- .Resize({grad_tensor.numel()});
- } else {
- // TODO(shenliang03): maybe save the memory by avoiding tensor construction
- if (!group_tensor.initialized()) {
- group_tensor.Resize({static_cast(length)});
- group_tensor.mutable_data(inner_place_, group.dtype_);
- }
- if (HasGrad(var_index)) {
- VLOG(3) << "Tensor[" << tensors_[var_index].name() << "] has grad";
- auto grad_tensor = egr::EagerUtils::mutable_grad(tensors_[var_index]);
+ if (!group.is_sparse_) {
+ if (is_used_var) {
+ auto *autograd_meta = tensors_[var_index].get_autograd_meta();
+ auto &grad_tensor =
+ static_cast(autograd_meta)->Grad();
group_tensor
.ShareDataWith(*(
- std::dynamic_pointer_cast(grad_tensor->impl())))
- .Resize({length});
+ std::dynamic_pointer_cast(grad_tensor.impl())))
+ .Resize({grad_tensor.numel()});
} else {
- VLOG(3) << "Tensor[" << tensors_[var_index].name()
- << "] doesn't have grad";
- auto *dev_ctx = platform::DeviceContextPool::Instance().Get(inner_place_);
- group_tensor.Resize({static_cast(length)});
- phi::funcs::set_constant(*dev_ctx, &group_tensor, 0.0);
+ // TODO(shenliang03): maybe save the memory by avoiding tensor
+ // construction
+ if (!group_tensor.initialized()) {
+ group_tensor.Resize({static_cast(length)});
+ group_tensor.mutable_data(inner_place_, group.dtype_);
+ }
+ if (HasGrad(var_index)) {
+ VLOG(3) << "Tensor[" << tensors_[var_index].name() << "] has grad";
+ auto grad_tensor = egr::EagerUtils::mutable_grad(tensors_[var_index]);
+ group_tensor
+ .ShareDataWith(*(std::dynamic_pointer_cast(
+ grad_tensor->impl())))
+ .Resize({length});
+ } else {
+ VLOG(3) << "Tensor[" << tensors_[var_index].name()
+ << "] doesn't have grad";
+ auto *dev_ctx =
+ platform::DeviceContextPool::Instance().Get(inner_place_);
+ group_tensor.Resize({static_cast(length)});
+ phi::funcs::set_constant(*dev_ctx, &group_tensor, 0.0);
+ }
}
+ } else {
+ auto *autograd_meta = tensors_[var_index].get_autograd_meta();
+ auto &grad_tensor = static_cast(autograd_meta)->Grad();
+
+ // process sparse group
+ PADDLE_ENFORCE_EQ(
+ HasGrad(var_index), true,
+ platform::errors::PreconditionNotMet(
+ "The sparse parameter[%d][%s] should have gradient. "
+ "Currently, DataParallel does not support sparse "
+ "parameters without generating gradients during training. "
+ "For example, if is_sparese=True is used in Embedding, "
+ "the current step of this parameter cannot generate gradient "
+ "because of stop_gradient/detatch, where error will occur.",
+ var_index, tensors_[var_index].name()));
+
+ // need to check tensor type
+ PADDLE_ENFORCE_EQ(
+ grad_tensor.is_selected_rows(), true,
+ platform::errors::PreconditionNotMet(
+ "The sparse parameter[%d][%s] must have a selectedrows gradient. "
+ "Before forward pass, the parameter type is inferred to be "
+ "SelectedRows, but after backward pass, its actual type becomes "
+ "LodTensor. It is currently not supported by DataParallel. "
+ "For example, if sparse embedding is used, and the weight of "
+ "embedding is shared with subsequent dense parameters, then "
+ "the parameter gradient of the embedding will be converted "
+ "to dense parameters.",
+ var_index, tensors_[var_index].name()));
+
+ group.sparse_contents_.set_impl(grad_tensor.impl());
}
if (--group.pending_ == 0) {
@@ -666,7 +707,11 @@ void EagerReducer::MarkGroupReady(size_t group_index) {
for (; next_group_ < groups_.size() && groups_[next_group_].pending_ == 0;
++next_group_) {
UNUSED auto &group = groups_[next_group_];
- FusedAllReduceSchedule(&group, next_group_);
+ if (group.is_sparse_) {
+ AllReduceSparse(&group, next_group_);
+ } else {
+ FusedAllReduceSchedule(&group, next_group_);
+ }
}
}
@@ -725,6 +770,11 @@ void EagerReducer::ProcessUnusedDenseVars() {
const auto inside_group_index = var_locator.inside_group_index;
auto &src_tensor = group.dense_tensors_[inside_group_index];
+ // sparse no need to check and no support find_unused_parameters
+ if (group.is_sparse_) {
+ continue;
+ }
+
Tensor grad_value(std::make_shared(src_tensor));
auto dest_var_base = tensors_[var_index];
@@ -739,11 +789,15 @@ void EagerReducer::FinalizeBackward() {
groups_need_finalize_ = false;
grad_need_hooks_ = false;
for (auto &group : groups_) {
- group.task->Synchronize();
+ if (!group.is_sparse_) {
+ group.task->Synchronize();
+ }
}
for (auto &group : groups_) {
- group.SplitTensors(inner_place_);
+ if (!group.is_sparse_) {
+ group.SplitTensors(inner_place_);
+ }
}
if (find_unused_vars_each_step_) {
@@ -778,6 +832,127 @@ void EagerReducer::FusedAllReduceSchedule(EagerGroup *group,
// split in FinalizeBackward()
}
+void EagerReducer::AllReduceSparse(EagerGroup *group,
+ const int curr_group_index) {
+ // div nranks
+ Tensor sparse_tensor(group->sparse_contents_);
+ paddle::experimental::scale_(sparse_tensor, 1.0 / nranks_, 0.0, false);
+
+ VLOG(3) << "sparse_group [" << curr_group_index << "] start allreduce.";
+
+ auto *dev_ctx = platform::DeviceContextPool::Instance().Get(inner_place_);
+ if (platform::is_gpu_place(inner_place_)) {
+#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
+ dev_ctx = static_cast(
+ platform::DeviceContextPool::Instance().Get(inner_place_));
+#else
+ PADDLE_THROW(platform::errors::PermissionDenied(
+ "Paddle can't concat grad tensors since it's not compiled with NCCL,"
+ "Please recompile or reinstall Paddle with NCCL support."));
+#endif
+ } else if (platform::is_cpu_place(inner_place_)) {
+ dev_ctx = static_cast(
+ platform::DeviceContextPool::Instance().Get(inner_place_));
+ } else {
+ PADDLE_THROW(platform::errors::Unimplemented(
+ "Split grad tensor not supported on place (%s)", inner_place_));
+ }
+
+ auto src = std::dynamic_pointer_cast(
+ group->sparse_contents_.impl());
+ const auto &src_rows = src->rows();
+
+ const auto &rank_ = process_group_->GetRank();
+ const auto &size_ = process_group_->GetSize();
+
+ framework::Vector rows_num_vector(size_);
+ rows_num_vector[rank_] = static_cast(src_rows.size());
+
+ Tensor rows_num_tensor = paddle::experimental::empty(
+ IntArray({static_cast(size_)}), DataType::INT64, inner_place_);
+ auto *rows_num_dense_tensor =
+ std::dynamic_pointer_cast(rows_num_tensor.impl()).get();
+ framework::TensorFromVector(rows_num_vector, *dev_ctx,
+ rows_num_dense_tensor);
+
+ distributed::AllreduceOptions opts;
+ opts.reduce_op = ReduceOp::SUM;
+ std::vector reduce_tensors = {rows_num_tensor};
+ process_group_->AllReduce(reduce_tensors, opts)->Synchronize();
+
+ framework::TensorToVector(*rows_num_dense_tensor, *dev_ctx,
+ &rows_num_vector);
+ dev_ctx->Wait();
+
+ const auto *cpu_rows_num_ptr = rows_num_vector.data();
+ auto rows_num = std::accumulate(cpu_rows_num_ptr, cpu_rows_num_ptr + size_,
+ static_cast(0));
+
+ VLOG(3) << "Gather rows: " << string::join_strings(rows_num_vector, ',')
+ << ", total rows number: " << rows_num
+ << ", height: " << src->height();
+
+ dev_ctx->Wait();
+
+ if (std::all_of(cpu_rows_num_ptr, cpu_rows_num_ptr + size_,
+ [&](int64_t row) { return row == cpu_rows_num_ptr[0]; })) {
+ // During sparse communication, the number of each card is same.
+ // allgather is used to speed up the allreduce by replacing broadcast.
+
+ VLOG(3) << "allgather replaces broadcast to speed up in sparse allreduce";
+
+ Tensor dst_rows_tensor =
+ paddle::experimental::empty(IntArray({static_cast(rows_num)}),
+ DataType::INT64, inner_place_);
+ Tensor src_rows_tensor = paddle::experimental::empty(
+ IntArray({static_cast((*src).rows().size())}), DataType::INT64,
+ inner_place_);
+ auto *src_rows_dense_tensor =
+ std::dynamic_pointer_cast(src_rows_tensor.impl())
+ .get();
+ framework::TensorFromVector((*src).rows(), *dev_ctx,
+ src_rows_dense_tensor);
+
+ std::vector src_rows_tensors = {src_rows_tensor};
+ std::vector dst_rows_tensors = {dst_rows_tensor};
+ process_group_->AllGather(src_rows_tensors, dst_rows_tensors)
+ ->Synchronize();
+
+ framework::Vector dst_rows_vector(rows_num, 0);
+ auto *dst_rows_dense_tensor =
+ std::dynamic_pointer_cast(dst_rows_tensor.impl())
+ .get();
+ framework::TensorToVector(*dst_rows_dense_tensor, *dev_ctx,
+ &dst_rows_vector);
+ dev_ctx->Wait();
+
+ Tensor src_value_tensor(std::make_shared(src->value()));
+ std::vector dst_shape = src_value_tensor.shape();
+ dst_shape[dst_shape.size() - 2] = rows_num;
+ auto dst_dense_tensor = std::dynamic_pointer_cast(
+ paddle::experimental::full(IntArray(dst_shape), 0,
+ src_value_tensor.dtype(), inner_place_)
+ .impl());
+
+ auto dst =
+ std::make_shared(dst_rows_vector, (*src).height());
+ *(dst->mutable_value()) = *dst_dense_tensor;
+ Tensor dst_value_tensor(std::make_shared(dst->value()));
+
+ std::vector src_value_tensors = {src_value_tensor};
+ std::vector dst_value_tensors = {dst_value_tensor};
+ process_group_->AllGather(src_value_tensors, dst_value_tensors)
+ ->Synchronize();
+
+ src->set_rows(dst_rows_vector);
+ *(src->mutable_value()) =
+ *(std::dynamic_pointer_cast(dst_value_tensor.impl()));
+ } else {
+ PADDLE_THROW(
+ platform::errors::Unimplemented("This case is not supported."));
+ }
+}
+
std::ostream &operator<<(std::ostream &out, const EagerGroup &group) {
const auto &tensors_ = group.tensor_indices_;
out << "numel: " << group.all_length_ << " ;var number: " << tensors_.size()
diff --git a/paddle/fluid/distributed/collective/reducer.h b/paddle/fluid/distributed/collective/reducer.h
index 848277f5fad4e..12c02509884e9 100644
--- a/paddle/fluid/distributed/collective/reducer.h
+++ b/paddle/fluid/distributed/collective/reducer.h
@@ -47,6 +47,8 @@ std::vector> Eager_AssignGroupBySize(
class EagerGroup {
public:
Tensor dense_contents_;
+ Tensor sparse_contents_;
+ bool is_sparse_ = false;
// for concat kernel
std::vector dense_tensors_;
@@ -104,6 +106,7 @@ class EagerReducer {
void MarkVarReady(const size_t var_index, const bool is_used_var);
void MarkGroupReady(const size_t group_index);
void FusedAllReduceSchedule(EagerGroup *group, const int curr_group_index);
+ void AllReduceSparse(EagerGroup *group, const int curr_group_index);
void FinalizeBackward();
void TraverseBackwardGraph(const std::vector &outputs);
void ProcessUnusedDenseVars();
diff --git a/paddle/fluid/distributed/ps/service/CMakeLists.txt b/paddle/fluid/distributed/ps/service/CMakeLists.txt
old mode 100644
new mode 100755
index ab6c2e2600274..b8de291072a1f
--- a/paddle/fluid/distributed/ps/service/CMakeLists.txt
+++ b/paddle/fluid/distributed/ps/service/CMakeLists.txt
@@ -39,8 +39,8 @@ cc_library(server SRCS server.cc DEPS downpour_server boost ${RPC_DEPS})
cc_library(communicator SRCS communicator/communicator.cc DEPS scope client boost table math_function selected_rows_functor ${RPC_DEPS})
cc_library(ps_service SRCS ps_service/service.cc DEPS communicator client server boost ${RPC_DEPS})
-cc_library(heter_server SRCS heter_server.cc DEPS brpc_utils ${COMMON_DEPS} ${RPC_DEPS})
cc_library(heter_client SRCS heter_client.cc DEPS brpc_utils ${COMMON_DEPS} ${RPC_DEPS})
+cc_library(heter_server SRCS heter_server.cc DEPS heter_client brpc_utils ${COMMON_DEPS} ${RPC_DEPS})
set_source_files_properties(ps_service/graph_py_service.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_library(graph_py_service SRCS ps_service/graph_py_service.cc DEPS ps_service)
diff --git a/paddle/fluid/distributed/ps/service/brpc_ps_client.cc b/paddle/fluid/distributed/ps/service/brpc_ps_client.cc
index 9674717ffc24b..971c448bf2714 100644
--- a/paddle/fluid/distributed/ps/service/brpc_ps_client.cc
+++ b/paddle/fluid/distributed/ps/service/brpc_ps_client.cc
@@ -55,6 +55,8 @@ DEFINE_int32(pserver_sparse_merge_thread, 1, "pserver sparse merge thread num");
DEFINE_int32(pserver_sparse_table_shard_num, 1000,
"sparse table shard for save & load");
+DEFINE_int32(heter_world_size, 100, "group size"); // 可配置
+
namespace paddle {
namespace framework {
class Scope;
@@ -78,7 +80,7 @@ void DownpourPsClientService::service(
const PsRequestMessage *request, PsResponseMessage *response,
::google::protobuf::Closure *done) {
brpc::ClosureGuard done_guard(done);
- int ret = _client->handle_client2client_msg(
+ int ret = _client->HandleClient2ClientMsg(
request->cmd_id(), request->client_id(), request->data());
response->set_err_code(0);
response->set_err_msg("");
@@ -89,8 +91,8 @@ void DownpourPsClientService::service(
}
// 启动client端RpcService 用于数据互发等操作
-int32_t BrpcPsClient::start_client_service() {
- if (_service.configure(this, _client_id) != 0) {
+int32_t BrpcPsClient::StartClientService() {
+ if (_service.Configure(this, _client_id) != 0) {
LOG(ERROR)
<< "service initialize failed, service_name:DownpourPsClientService";
return -1;
@@ -106,12 +108,12 @@ int32_t BrpcPsClient::start_client_service() {
return -1;
}
_server_started = true;
- _env->registe_ps_client(butil::my_ip_cstr(), _server.listen_address().port,
- _client_id);
+ _env->RegistePsClient(butil::my_ip_cstr(), _server.listen_address().port,
+ _client_id);
return 0;
}
-int32_t BrpcPsClient::create_client2client_connection(
+int32_t BrpcPsClient::CreateClient2ClientConnection(
int pserver_timeout_ms, int pserver_connect_timeout_ms, int max_retry) {
brpc::ChannelOptions options;
options.protocol = "baidu_std";
@@ -120,12 +122,12 @@ int32_t BrpcPsClient::create_client2client_connection(
options.connect_timeout_ms = pserver_connect_timeout_ms;
options.max_retry = max_retry;
- std::vector client_list = _env->get_ps_clients();
+ std::vector client_list = _env->GetPsClients();
VLOG(1) << "BrpcPsClient::create_c2c_connection client_list size: "
<< client_list.size();
for (auto cc : client_list) {
VLOG(1) << "BrpcPsClient::create_c2c_connection client_list: "
- << cc.to_string();
+ << cc.ToString();
}
_client_channels.resize(client_list.size());
std::ostringstream os;
@@ -152,7 +154,7 @@ int32_t BrpcPsClient::create_client2client_connection(
return 0;
}
-int32_t BrpcPsClient::initialize() {
+int32_t BrpcPsClient::Initialize() {
_async_call_num = 0;
brpc::ChannelOptions options;
@@ -167,7 +169,7 @@ int32_t BrpcPsClient::initialize() {
std::string client_ip(butil::my_ip_cstr());
// 获取server列表,并连接
- std::vector server_list = _env->get_ps_servers();
+ std::vector server_list = _env->GetPsServers();
_server_channels.resize(server_list.size());
for (size_t i = 0; i < server_list.size(); ++i) {
server_ip_port.assign(server_list[i].ip.c_str());
@@ -192,7 +194,7 @@ int32_t BrpcPsClient::initialize() {
os << server_ip_port << ",";
}
// 启动client探听接口, 并相互建立连接
- start_client_service();
+ StartClientService();
// 异步push 请求队列初始化
const auto &worker_param = _config.worker_param().downpour_worker_param();
@@ -232,13 +234,13 @@ int32_t BrpcPsClient::initialize() {
_flushing = false;
// 启动异步push线程
_async_push_sparse_thread =
- std::thread(std::bind(&BrpcPsClient::push_sparse_task_consume, this));
+ std::thread(std::bind(&BrpcPsClient::PushSparseTaskConsume, this));
// _async_push_sparse_thread.detach();
_async_push_dense_thread =
- std::thread(std::bind(&BrpcPsClient::push_dense_task_consume, this));
+ std::thread(std::bind(&BrpcPsClient::PushDenseTaskConsume, this));
// for debug
// _print_thread =
- // std::thread(std::bind(&BrpcPsClient::print_queue_size_thread, this));
+ // std::thread(std::bind(&BrpcPsClient::PrintQueueSizeThread, this));
return 0;
}
@@ -284,7 +286,7 @@ std::string DownpourBrpcClosure::get_response(size_t request_idx, int cmd_id) {
return data;
}
-std::future BrpcPsClient::print_table_stat(uint32_t table_id) {
+std::future BrpcPsClient::PrintTableStat(uint32_t table_id) {
size_t request_call_num = _server_channels.size();
DownpourBrpcClosure *closure = new DownpourBrpcClosure(
request_call_num, [request_call_num, table_id](void *done) {
@@ -317,7 +319,7 @@ std::future BrpcPsClient::print_table_stat(uint32_t table_id) {
closure->request(i)->set_cmd_id(PS_PRINT_TABLE_STAT);
closure->request(i)->set_table_id(table_id);
closure->request(i)->set_client_id(_client_id);
- PsService_Stub rpc_stub(get_cmd_channel(i));
+ PsService_Stub rpc_stub(GetCmdChannel(i));
closure->cntl(i)->set_timeout_ms(
10800000); // cmd msg don't limit timeout for save/load
rpc_stub.service(closure->cntl(i), closure->request(i),
@@ -325,7 +327,7 @@ std::future BrpcPsClient::print_table_stat(uint32_t table_id) {
}
return fut;
}
-std::future BrpcPsClient::send_cmd(
+std::future BrpcPsClient::SendCmd(
uint32_t table_id, int cmd_id, const std::vector ¶ms) {
size_t request_call_num = _server_channels.size();
DownpourBrpcClosure *closure = new DownpourBrpcClosure(
@@ -350,7 +352,7 @@ std::future BrpcPsClient::send_cmd(
for (const auto ¶m : params) {
closure->request(i)->add_params(param);
}
- PsService_Stub rpc_stub(get_cmd_channel(i));
+ PsService_Stub rpc_stub(GetCmdChannel(i));
closure->cntl(i)->set_timeout_ms(
10800000 * 2); // cmd msg don't limit timeout for save/load
rpc_stub.service(closure->cntl(i), closure->request(i),
@@ -359,7 +361,7 @@ std::future BrpcPsClient::send_cmd(
return fut;
}
-std::future BrpcPsClient::send_save_cmd(
+std::future BrpcPsClient::SendSaveCmd(
uint32_t table_id, int cmd_id, const std::vector ¶ms) {
size_t request_call_num = _server_channels.size();
DownpourBrpcClosure *closure = new DownpourBrpcClosure(
@@ -390,7 +392,7 @@ std::future BrpcPsClient::send_save_cmd(
for (const auto ¶m : params) {
closure->request(i)->add_params(param);
}
- PsService_Stub rpc_stub(get_cmd_channel(i));
+ PsService_Stub rpc_stub(GetCmdChannel(i));
closure->cntl(i)->set_timeout_ms(
10800000); // cmd msg don't limit timeout for save/load
rpc_stub.service(closure->cntl(i), closure->request(i),
@@ -399,65 +401,42 @@ std::future BrpcPsClient::send_save_cmd(
return fut;
}
-std::future BrpcPsClient::shrink(uint32_t table_id,
+std::future BrpcPsClient::Shrink(uint32_t table_id,
const std::string threshold) {
- return send_cmd(table_id, PS_SHRINK_TABLE, {threshold});
+ return SendCmd(table_id, PS_SHRINK_TABLE, {threshold});
}
-std::future BrpcPsClient::load(const std::string &epoch,
+std::future BrpcPsClient::Load(const std::string &epoch,
const std::string &mode) {
- return send_cmd(-1, PS_LOAD_ALL_TABLE, {epoch, mode});
+ return SendCmd(-1, PS_LOAD_ALL_TABLE, {epoch, mode});
}
-std::future BrpcPsClient::load(uint32_t table_id,
+std::future BrpcPsClient::Load(uint32_t table_id,
const std::string &epoch,
const std::string &mode) {
- return send_cmd(table_id, PS_LOAD_ONE_TABLE, {epoch, mode});
-}
-
-std::future BrpcPsClient::Load(const LoadSaveContext &load_context) {
- if (load_context.table_id < 0) {
- return send_cmd(-1, PS_LOAD_ALL_TABLE,
- {load_context.epoch, load_context.mode});
- } else {
- return send_cmd(load_context.table_id, PS_LOAD_ONE_TABLE,
- {load_context.epoch, load_context.mode});
- }
+ return SendCmd(table_id, PS_LOAD_ONE_TABLE, {epoch, mode});
}
-std::future BrpcPsClient::save(const std::string &epoch,
+std::future BrpcPsClient::Save(const std::string &epoch,
const std::string &mode) {
VLOG(1) << "BrpcPsClient::save path " << epoch;
- return send_save_cmd(-1, PS_SAVE_ALL_TABLE, {epoch, mode});
+ return SendSaveCmd(-1, PS_SAVE_ALL_TABLE, {epoch, mode});
}
-std::future BrpcPsClient::save(uint32_t table_id,
+std::future BrpcPsClient::Save(uint32_t table_id,
const std::string &epoch,
const std::string &mode) {
VLOG(1) << "BrpcPsClient::save one table path " << epoch << " table_id "
<< table_id;
- return send_save_cmd(table_id, PS_SAVE_ONE_TABLE, {epoch, mode});
-}
-
-std::future BrpcPsClient::Save(const LoadSaveContext &save_context) {
- if (save_context.table_id < 0) {
- VLOG(1) << "BrpcPsClient::save path " << save_context.epoch;
- return send_save_cmd(-1, PS_SAVE_ALL_TABLE,
- {save_context.epoch, save_context.mode});
- } else {
- VLOG(1) << "BrpcPsClient::save one table path " << save_context.epoch
- << " table_id " << save_context.table_id;
- return send_save_cmd(save_context.table_id, PS_SAVE_ONE_TABLE,
- {save_context.epoch, save_context.mode});
- }
+ return SendSaveCmd(table_id, PS_SAVE_ONE_TABLE, {epoch, mode});
}
-std::future BrpcPsClient::clear() {
- return send_cmd(-1, PS_CLEAR_ALL_TABLE, {});
+std::future BrpcPsClient::Clear() {
+ return SendCmd(-1, PS_CLEAR_ALL_TABLE, {});
}
-std::future BrpcPsClient::clear(uint32_t table_id) {
- return send_cmd(table_id, PS_CLEAR_ONE_TABLE, {});
+std::future BrpcPsClient::Clear(uint32_t table_id) {
+ return SendCmd(table_id, PS_CLEAR_ONE_TABLE, {});
}
-std::future BrpcPsClient::flush() {
+std::future BrpcPsClient::Flush() {
VLOG(0) << "BrpcPsClient::flush begin";
_flushing = true;
std::promise promise;
@@ -470,106 +449,69 @@ std::future BrpcPsClient::flush() {
promise.set_value(0);
_flushing = false;
VLOG(0) << "BrpcPsClient::flush done";
- print_queue_size();
+ PrintQueueSize();
return fut;
}
-void BrpcPsClient::print_queue_size() {
+void BrpcPsClient::PrintQueueSize() {
for (auto &push_sparse_task_itr : _push_sparse_task_queue_map) {
auto table_id = push_sparse_task_itr.first;
auto queue_size = push_sparse_task_itr.second->Size();
- VLOG(0) << "BrpcPsClient::print_queue_size: table " << table_id
+ VLOG(0) << "BrpcPsClient::PrintQueueSize: table " << table_id
<< " size: " << queue_size;
}
for (auto &task_queue_itr : _push_dense_task_queue_map) {
auto table_id = task_queue_itr.first;
auto queue_size = task_queue_itr.second->Size();
- VLOG(0) << "BrpcPsClient::print_queue_size: table " << table_id
+ VLOG(0) << "BrpcPsClient::PrintQueueSize: table " << table_id
<< " size: " << queue_size;
}
}
-void BrpcPsClient::print_queue_size_thread() {
+void BrpcPsClient::PrintQueueSizeThread() {
while (_running) {
usleep(1000000 * 60 * 2);
- print_queue_size();
+ PrintQueueSize();
}
}
-void BrpcPsClient::finalize_worker() {
- flush();
- VLOG(0) << "BrpcPsClient::finalize_worker begin join thread";
+void BrpcPsClient::FinalizeWorker() {
+ Flush();
+ VLOG(0) << "BrpcPsClient::FinalizeWorker begin join thread";
_running = false;
_async_push_dense_thread.join();
_async_push_sparse_thread.join();
// _print_thread.join();
- VLOG(0) << "BrpcPsClient::finalize_worker begin join server";
+ VLOG(0) << "BrpcPsClient::FinalizeWorker begin join server";
_server.Stop(1000);
_server.Join();
_server_started = false;
- VLOG(0) << "BrpcPsClient::finalize_worker done";
+ VLOG(0) << "BrpcPsClient::FinalizeWorker done";
}
-std::future BrpcPsClient::stop_server() {
- return send_cmd(-1, PS_STOP_SERVER, {});
+std::future BrpcPsClient::StopServer() {
+ return SendCmd(-1, PS_STOP_SERVER, {});
}
-std::future BrpcPsClient::start_profiler() {
- return send_cmd(-1, PS_START_PROFILER, {});
+std::future BrpcPsClient::StartProfiler() {
+ return SendCmd(-1, PS_START_PROFILER, {});
}
-std::future BrpcPsClient::stop_profiler() {
- return send_cmd(-1, PS_STOP_PROFILER, {});
+std::future BrpcPsClient::StopProfiler() {
+ return SendCmd(-1, PS_STOP_PROFILER, {});
}
-std::future BrpcPsClient::barrier(size_t table_id,
+std::future BrpcPsClient::Barrier(size_t table_id,
uint32_t barrier_type) {
- return send_cmd(table_id, PS_BARRIER, {std::to_string(barrier_type)});
-}
-
-std::future BrpcPsClient::Pull(RequestContext &pull_context) {
- if (pull_context.value_type == Dense) { // pull dense
- Region *dense_region =
- reinterpret_cast(pull_context.dense_values);
- return pull_dense(dense_region, pull_context.num, pull_context.table);
- } else { // pull sparse
- size_t table_id = pull_context.table;
- size_t num = pull_context.num;
- bool is_training = pull_context.is_training;
- if (pull_context.training_mode == Geo) { // for geo
- return pull_sparse_param(pull_context.sparse_values, table_id,
- pull_context.keys, num, is_training);
- } else if (pull_context.training_mode == Async) { // for async
- return pull_sparse(pull_context.sparse_values, table_id,
- pull_context.keys, num, is_training);
- }
- }
-}
-
-std::future BrpcPsClient::Push(RequestContext &push_context) {
- if (push_context.value_type == Dense) { // push dense
- const Region *dense_region = push_context.push_context.push_dense_values;
- return push_dense(dense_region, push_context.num, push_context.table);
- } else { // push sparse
- size_t table_id = push_context.table;
- size_t num = push_context.num;
- bool is_training = push_context.is_training;
- if (push_context.training_mode == Geo) { // for geo
- // TODO(zhaocaibei)
- } else if (push_context.training_mode == Async) { // for async
- const uint64_t *keys = push_context.push_context.keys;
- const float **update_values = push_context.push_context.push_values;
- return push_sparse(table_id, keys, update_values, num);
- }
- }
+ return SendCmd(table_id, PS_BARRIER, {std::to_string(barrier_type)});
}
-std::future BrpcPsClient::pull_geo_param(size_t table_id,
- std::vector *values,
- std::vector *keys,
- int pserver_idx) {
- auto *accessor = table_accessor(table_id);
+std::future BrpcPsClient::PullGeoParam(size_t table_id,
+ std::vector *values,
+ std::vector *keys,
+ int pserver_idx) {
+ auto *accessor = GetTableAccessor(table_id);
DownpourBrpcClosure *closure =
new DownpourBrpcClosure(1, [keys, values, accessor](void *done) {
int ret = 0;
@@ -583,12 +525,12 @@ std::future BrpcPsClient::pull_geo_param(size_t table_id,
io_buffer_itr.copy_and_forward(reinterpret_cast(&shard_nums),
sizeof(uint32_t));
keys->resize(shard_nums);
- values->resize(shard_nums * accessor->GetTableInfo(UPDATE_DIM));
+ values->resize(shard_nums * accessor->GetAccessorInfo().update_dim);
io_buffer_itr.copy_and_forward((void *)(keys->data()), // NOLINT
sizeof(uint64_t) * shard_nums);
io_buffer_itr.copy_and_forward(
(void *)(values->data()), // NOLINT
- shard_nums * accessor->GetTableInfo(UPDATE_SIZE));
+ shard_nums * accessor->GetAccessorInfo().update_size);
closure->set_promise_value(ret);
});
auto promise = std::make_shared>();
@@ -598,7 +540,7 @@ std::future BrpcPsClient::pull_geo_param(size_t table_id,
closure->request(0)->set_cmd_id(PS_PULL_GEO_PARAM);
closure->request(0)->set_table_id(table_id);
closure->request(0)->set_client_id(_client_id);
- PsService_Stub rpc_stub(get_cmd_channel(pserver_idx));
+ PsService_Stub rpc_stub(GetCmdChannel(pserver_idx));
closure->cntl(0)->set_log_id(butil::gettimeofday_ms());
rpc_stub.service(closure->cntl(0), closure->request(0), closure->response(0),
closure);
@@ -606,10 +548,11 @@ std::future BrpcPsClient::pull_geo_param(size_t table_id,
}
// for GEO
-std::future BrpcPsClient::push_sparse_param(
- size_t table_id, const uint64_t *keys, const float **update_values,
- size_t num, void *done) {
- auto *accessor = table_accessor(table_id);
+std::future BrpcPsClient::PushSparseParam(size_t table_id,
+ const uint64_t *keys,
+ const float **update_values,
+ size_t num, void *done) {
+ auto *accessor = GetTableAccessor(table_id);
// 发送RPC请求
DownpourBrpcClosure *closure = reinterpret_cast(done);
auto promise = std::make_shared>();
@@ -630,7 +573,7 @@ std::future BrpcPsClient::push_sparse_param(
auto kvs = ids[shard_idx];
auto value_ptr = value_ptrs[shard_idx];
size_t kv_size = kvs.size();
- uint32_t value_size = accessor->GetTableInfo(UPDATE_SIZE);
+ uint32_t value_size = accessor->GetAccessorInfo().update_size;
// 发送RPC请求
auto *push_request = closure->request(shard_idx);
push_request->set_cmd_id(PS_PUSH_SPARSE_PARAM);
@@ -638,16 +581,15 @@ std::future BrpcPsClient::push_sparse_param(
push_request->set_client_id(_client_id);
push_request->add_params((char *)&kv_size, sizeof(uint32_t)); // NOLINT
auto *push_data = push_request->mutable_data();
- push_data->resize(kv_size *
- (sizeof(uint64_t) + accessor->GetTableInfo(UPDATE_SIZE)));
+ push_data->resize(kv_size * (sizeof(uint64_t) + value_size));
char *push_data_ptr = const_cast(push_data->data());
memcpy(push_data_ptr, kvs.data(), kv_size * sizeof(uint64_t));
push_data_ptr += kv_size * sizeof(uint64_t);
for (int i = 0; i < kv_size; ++i) {
- memcpy(push_data_ptr, value_ptr[i], accessor->GetTableInfo(UPDATE_SIZE));
- push_data_ptr += accessor->GetTableInfo(UPDATE_SIZE);
+ memcpy(push_data_ptr, value_ptr[i], value_size);
+ push_data_ptr += value_size;
}
- PsService_Stub rpc_stub(get_sparse_channel(shard_idx));
+ PsService_Stub rpc_stub(GetSparseChannel(shard_idx));
closure->cntl(shard_idx)->set_request_compress_type(
(brpc::CompressType)FLAGS_pserver_communicate_compress_type);
rpc_stub.service(closure->cntl(shard_idx), closure->request(shard_idx),
@@ -656,16 +598,13 @@ std::future BrpcPsClient::push_sparse_param(
return fut;
}
-std::future BrpcPsClient::pull_dense(Region *regions,
- size_t region_num,
- size_t table_id) {
+std::future BrpcPsClient::PullDense(Region *regions, size_t region_num,
+ size_t table_id) {
auto timer = std::make_shared("pserver_client_pull_dense");
- auto *accessor = table_accessor(table_id);
- auto fea_dim = accessor->GetTableInfo(FEA_DIM);
- auto select_size = accessor->GetTableInfo(SELECT_SIZE);
+ auto *accessor = GetTableAccessor(table_id);
+ auto fea_dim = accessor->GetAccessorInfo().fea_dim;
size_t request_call_num = _server_channels.size();
- uint32_t num_per_shard =
- dense_dim_per_shard(accessor->GetTableInfo(FEA_DIM), request_call_num);
+ uint32_t num_per_shard = DenseDimPerShard(fea_dim, request_call_num);
// callback 将各shard结果,顺序填入region
DownpourBrpcClosure *closure = new DownpourBrpcClosure(
request_call_num, [request_call_num, num_per_shard, regions, region_num,
@@ -675,7 +614,7 @@ std::future BrpcPsClient::pull_dense(Region *regions,
size_t region_data_idx = 0; // 当前填充的region内data偏移
auto *closure = reinterpret_cast(done);
size_t shard_data_size =
- num_per_shard * accessor->GetTableInfo(SELECT_SIZE);
+ num_per_shard * accessor->GetAccessorInfo().select_size;
for (size_t i = 0; i < request_call_num; ++i) {
if (closure->check_response(i, PS_PULL_DENSE_TABLE) != 0) {
ret = -1;
@@ -728,23 +667,24 @@ std::future BrpcPsClient::pull_dense(Region *regions,
closure->request(i)->set_client_id(_client_id);
closure->request(i)->add_params((char *)&num_per_shard, // NOLINT
sizeof(num_per_shard));
- PsService_Stub rpc_stub(get_dense_channel(i));
+ PsService_Stub rpc_stub(GetDenseChannel(i));
rpc_stub.service(closure->cntl(i), closure->request(i),
closure->response(i), closure);
}
return fut;
}
-std::future BrpcPsClient::push_dense_param(const Region *regions,
- size_t region_num,
- size_t table_id) {
- auto *accessor = table_accessor(table_id);
+std::future BrpcPsClient::PushDenseParam(const Region *regions,
+ size_t region_num,
+ size_t table_id) {
+ auto *accessor = GetTableAccessor(table_id);
+ auto accessor_info = accessor->GetAccessorInfo();
size_t request_call_num = _server_channels.size();
// 1.拆分Region数据到shard中,后续多shard并行拷贝数据
std::vector> regions_partition(request_call_num);
uint32_t num_per_shard =
- dense_dim_per_shard(accessor->GetTableInfo(FEA_DIM), request_call_num);
- size_t shard_data_size = num_per_shard * accessor->GetTableInfo(UPDATE_SIZE);
+ DenseDimPerShard(accessor_info.fea_dim, request_call_num);
+ size_t shard_data_size = num_per_shard * accessor_info.update_size;
size_t current_region_idx = 0;
size_t current_region_data_idx = 0;
for (size_t i = 0; i < request_call_num; ++i) {
@@ -807,17 +747,17 @@ std::future BrpcPsClient::push_dense_param(const Region *regions,
fill_num);
fill_remain_size -= fill_num;
}
- PsService_Stub rpc_stub(get_dense_channel(i));
+ PsService_Stub rpc_stub(GetDenseChannel(i));
rpc_stub.service(closure->cntl(i), closure->request(i),
closure->response(i), closure);
}
return fut;
}
-std::future BrpcPsClient::push_sparse_raw_gradient(
+std::future BrpcPsClient::PushSparseRawGradient(
size_t table_id, const uint64_t *keys, const float **update_values,
size_t num, void *done) {
- auto *accessor = table_accessor(table_id);
+ auto *accessor = GetTableAccessor(table_id);
// 发送RPC请求
DownpourBrpcClosure *closure = reinterpret_cast(done);
auto promise = std::make_shared>();
@@ -851,7 +791,7 @@ std::future BrpcPsClient::push_sparse_raw_gradient(
auto value_ptr = value_ptrs[shard_idx];
size_t kv_size = kvs.size();
- uint32_t value_size = accessor->GetTableInfo(UPDATE_SIZE);
+ uint32_t value_size = accessor->GetAccessorInfo().update_size;
// 发送RPC请求
auto *push_request = closure->request(shard_idx);
@@ -860,17 +800,16 @@ std::future BrpcPsClient::push_sparse_raw_gradient(
push_request->set_client_id(_client_id);
push_request->add_params((char *)&kv_size, sizeof(uint32_t)); // NOLINT
auto *push_data = push_request->mutable_data();
- push_data->resize(kv_size *
- (sizeof(uint64_t) + accessor->GetTableInfo(UPDATE_SIZE)));
+ push_data->resize(kv_size * (sizeof(uint64_t) + value_size));
char *push_data_ptr = const_cast(push_data->data());
memcpy(push_data_ptr, kvs.data(), kv_size * sizeof(uint64_t));
push_data_ptr += kv_size * sizeof(uint64_t);
for (int i = 0; i < kv_size; ++i) {
- memcpy(push_data_ptr, value_ptr[i], accessor->GetTableInfo(UPDATE_SIZE));
- push_data_ptr += accessor->GetTableInfo(UPDATE_SIZE);
+ memcpy(push_data_ptr, value_ptr[i], value_size);
+ push_data_ptr += value_size;
}
- PsService_Stub rpc_stub(get_sparse_channel(shard_idx));
+ PsService_Stub rpc_stub(GetSparseChannel(shard_idx));
closure->cntl(shard_idx)->set_request_compress_type(
(brpc::CompressType)FLAGS_pserver_communicate_compress_type);
rpc_stub.service(closure->cntl(shard_idx), closure->request(shard_idx),
@@ -879,7 +818,7 @@ std::future BrpcPsClient::push_sparse_raw_gradient(
return fut;
}
-std::future BrpcPsClient::push_dense_raw_gradient(
+std::future BrpcPsClient::PushDenseRawGradient(
int table_id, float *total_send_data, size_t total_send_data_size,
void *done) {
size_t request_call_num = _server_channels.size();
@@ -887,9 +826,9 @@ std::future BrpcPsClient::push_dense_raw_gradient(
auto promise = std::make_shared>();
closure->add_promise(promise);
std::future fut = promise->get_future();
- auto *accessor = table_accessor(table_id);
+ auto *accessor = GetTableAccessor(table_id);
uint32_t num_per_shard =
- dense_dim_per_shard(accessor->GetTableInfo(FEA_DIM), request_call_num);
+ DenseDimPerShard(accessor->GetAccessorInfo().fea_dim, request_call_num);
for (size_t i = 0; i < request_call_num; ++i) {
closure->request(i)->set_cmd_id(PS_PUSH_DENSE_TABLE);
closure->request(i)->set_table_id(table_id);
@@ -903,16 +842,16 @@ std::future BrpcPsClient::push_dense_raw_gradient(
total_send_data + i * num_per_shard, num_per_shard * sizeof(float));
// closure->cntl(i)->set_request_compress_type(
// (brpc::CompressType)FLAGS_pserver_communicate_compress_type);
- PsService_Stub rpc_stub(get_dense_channel(i));
+ PsService_Stub rpc_stub(GetDenseChannel(i));
rpc_stub.service(closure->cntl(i), closure->request(i),
closure->response(i), closure);
}
return fut;
}
-std::future BrpcPsClient::push_global_step(int table_id,
- int64_t *total_send_data,
- void *done) {
+std::future BrpcPsClient::PushGlobalStep(int table_id,
+ int64_t *total_send_data,
+ void *done) {
size_t request_call_num = _server_channels.size();
DownpourBrpcClosure *closure = reinterpret_cast(done);
auto promise = std::make_shared>();
@@ -931,17 +870,17 @@ std::future BrpcPsClient::push_global_step(int table_id,
memcpy(push_data_ptr + sizeof(uint32_t), total_send_data,
num_per_shard * sizeof(int64_t));
- PsService_Stub rpc_stub(get_dense_channel(i));
+ PsService_Stub rpc_stub(GetDenseChannel(i));
rpc_stub.service(closure->cntl(i), closure->request(i),
closure->response(i), closure);
}
return fut;
}
-std::future BrpcPsClient::pull_sparse(float **select_values,
- size_t table_id,
- const uint64_t *keys, size_t num,
- bool is_training) {
+std::future BrpcPsClient::PullSparse(float **select_values,
+ size_t table_id,
+ const uint64_t *keys, size_t num,
+ bool is_training) {
auto timer = std::make_shared("pserver_client_pull_sparse");
auto local_timer =
std::make_shared("pserver_client_pull_sparse_local");
@@ -966,9 +905,9 @@ std::future BrpcPsClient::pull_sparse(float **select_values,
shard_sorted_kvs->at(shard_id).push_back({keys[i], select_values[i]});
}
- auto *accessor = table_accessor(table_id);
+ auto *accessor = GetTableAccessor(table_id);
- size_t value_size = accessor->GetTableInfo(SELECT_SIZE);
+ size_t value_size = accessor->GetAccessorInfo().select_size;
DownpourBrpcClosure *closure = new DownpourBrpcClosure(
request_call_num, [shard_sorted_kvs, value_size](void *done) {
@@ -1053,7 +992,7 @@ std::future BrpcPsClient::pull_sparse(float **select_values,
closure->request(i)->set_client_id(_client_id);
closure->request(i)->add_params((char *)&kv_request_count, // NOLINT
sizeof(uint32_t));
- PsService_Stub rpc_stub(get_cmd_channel(i));
+ PsService_Stub rpc_stub(GetCmdChannel(i));
closure->cntl(i)->set_log_id(butil::gettimeofday_ms());
rpc_stub.service(closure->cntl(i), closure->request(i),
closure->response(i), closure);
@@ -1063,11 +1002,11 @@ std::future BrpcPsClient::pull_sparse(float **select_values,
}
// for GEO
-std::future BrpcPsClient::pull_sparse_param(float **select_values,
- size_t table_id,
- const uint64_t *keys,
- size_t num,
- bool is_training) {
+std::future BrpcPsClient::PullSparseParam(float **select_values,
+ size_t table_id,
+ const uint64_t *keys,
+ size_t num,
+ bool is_training) {
auto timer = std::make_shared("pserver_client_pull_sparse_param");
size_t request_call_num = _server_channels.size();
@@ -1080,9 +1019,8 @@ std::future BrpcPsClient::pull_sparse_param(float **select_values,
shard_sorted_kvs->at(shard_id).push_back({keys[i], select_values[i]});
}
- auto *accessor = table_accessor(table_id);
- size_t value_size = accessor->GetTableInfo(SELECT_SIZE);
-
+ auto *accessor = GetTableAccessor(table_id);
+ size_t value_size = accessor->GetAccessorInfo().select_size;
DownpourBrpcClosure *closure = new DownpourBrpcClosure(
request_call_num, [shard_sorted_kvs, value_size](void *done) {
int ret = 0;
@@ -1167,7 +1105,7 @@ std::future BrpcPsClient::pull_sparse_param(float **select_values,
closure->request(i)->set_client_id(_client_id);
closure->request(i)->add_params((char *)&kv_request_count, // NOLINT
sizeof(uint32_t));
- PsService_Stub rpc_stub(get_cmd_channel(i));
+ PsService_Stub rpc_stub(GetCmdChannel(i));
closure->cntl(i)->set_log_id(butil::gettimeofday_ms());
rpc_stub.service(closure->cntl(i), closure->request(i),
closure->response(i), closure);
@@ -1176,7 +1114,7 @@ std::future BrpcPsClient::pull_sparse_param(float **select_values,
return fut;
}
-std::future BrpcPsClient::send_client2client_msg(
+std::future BrpcPsClient::SendClient2ClientMsg(
int msg_type, int to_client_id, const std::string &msg) {
auto promise = std::make_shared>();
std::future fut = promise->get_future();
@@ -1201,11 +1139,11 @@ std::future BrpcPsClient::send_client2client_msg(
return fut;
}
-std::future BrpcPsClient::push_sparse_raw_gradient_partial(
+std::future BrpcPsClient::PushSparseRawGradientPartial(
size_t table_id, const uint64_t *keys, const float **update_values,
uint32_t num, void *done, int pserver_idx) {
- auto *accessor = table_accessor(table_id);
- size_t value_size = accessor->GetTableInfo(UPDATE_SIZE);
+ auto *accessor = GetTableAccessor(table_id);
+ size_t value_size = accessor->GetAccessorInfo().update_size;
DownpourBrpcClosure *closure = reinterpret_cast(done);
auto promise = std::make_shared>();
closure->add_promise(promise);
@@ -1226,7 +1164,7 @@ std::future BrpcPsClient::push_sparse_raw_gradient_partial(
memcpy(push_data_ptr, update_values[i], value_size);
push_data_ptr += value_size;
}
- PsService_Stub rpc_stub(get_sparse_channel(pserver_idx));
+ PsService_Stub rpc_stub(GetSparseChannel(pserver_idx));
closure->cntl(0)->set_request_compress_type(
(brpc::CompressType)FLAGS_pserver_communicate_compress_type);
rpc_stub.service(closure->cntl(0), closure->request(0), closure->response(0),
@@ -1234,8 +1172,8 @@ std::future BrpcPsClient::push_sparse_raw_gradient_partial(
return fut;
}
-int32_t BrpcPsClient::recv_and_save_table(const uint64_t table_id,
- const std::string &path) {
+int32_t BrpcPsClient::RecvAndSaveTable(const uint64_t table_id,
+ const std::string &path) {
// get var information
std::string var_name = "";
int64_t var_num = 0;
@@ -1269,17 +1207,17 @@ int32_t BrpcPsClient::recv_and_save_table(const uint64_t table_id,
save_vec.push_back(save_huge_vec.data() + i * var_shape);
}
- VLOG(2) << "recv_and_save_table: table_class: " << table_class;
+ VLOG(2) << "RecvAndSaveTable: table_class: " << table_class;
// TODO(zhaocaibei123): new GeoBrpcPSClient, move this to its
- // recv_and_save_table
+ // RecvAndSaveTable
if (table_class == "MemorySparseGeoTable") {
auto status =
- pull_sparse_param(reinterpret_cast(save_vec.data()), table_id,
- save_key.data(), save_key.size(), true);
+ PullSparseParam(reinterpret_cast(save_vec.data()), table_id,
+ save_key.data(), save_key.size(), true);
status.wait();
} else {
- auto status = pull_sparse(reinterpret_cast(save_vec.data()),
- table_id, save_key.data(), save_key.size(), true);
+ auto status = PullSparse(reinterpret_cast(save_vec.data()),
+ table_id, save_key.data(), save_key.size(), true);
status.wait();
}
@@ -1313,15 +1251,15 @@ int32_t BrpcPsClient::recv_and_save_table(const uint64_t table_id,
return 0;
}
-std::future BrpcPsClient::push_sparse(size_t table_id,
- const uint64_t *keys,
- const float **update_values,
- size_t num) {
+std::future BrpcPsClient::PushSparse(size_t table_id,
+ const uint64_t *keys,
+ const float **update_values,
+ size_t num) {
auto push_timer = std::make_shared("pserver_client_push_sparse");
CostTimer parse_timer("pserver_client_push_sparse_parse");
int push_sparse_async_num = _push_sparse_task_queue_map[table_id]->Size();
while (push_sparse_async_num > FLAGS_pserver_max_async_call_num) {
- // LOG(INFO) << "push_sparse Waiting for async_call_num comsume,
+ // LOG(INFO) << "PushSparse Waiting for async_call_num comsume,
// task_num:"
// << push_sparse_async_num
// << ", max_task_limit:" << FLAGS_pserver_max_async_call_num;
@@ -1331,7 +1269,7 @@ std::future BrpcPsClient::push_sparse(size_t table_id,
auto put_timer = std::make_shared("client_push_sparse_put");
thread_local std::vector>>
shard_sorted_kv_list;
- auto *accessor = table_accessor(table_id);
+ auto *accessor = GetTableAccessor(table_id);
size_t request_call_num = _server_channels.size();
shard_sorted_kv_list.resize(request_call_num);
for (auto &x : shard_sorted_kv_list) {
@@ -1365,7 +1303,7 @@ std::future BrpcPsClient::push_sparse(size_t table_id,
shard_kv_data.kv_num = 0;
continue;
}
- uint32_t value_size = accessor->GetTableInfo(UPDATE_SIZE);
+ uint32_t value_size = accessor->GetAccessorInfo().update_size;
for (size_t kv_idx = 0; kv_idx < sorted_kv_size; ++kv_idx) {
shard_kv_data.key_list[kv_idx] = sorted_kv_list[kv_idx].first;
shard_kv_data.value_list[kv_idx].assign(
@@ -1379,7 +1317,7 @@ std::future BrpcPsClient::push_sparse(size_t table_id,
return fut;
}
-void BrpcPsClient::push_sparse_task_consume() {
+void BrpcPsClient::PushSparseTaskConsume() {
uint64_t merge_size = FLAGS_pserver_push_sparse_merge_limit;
std::vector> task_list;
size_t request_call_num = _server_channels.size();
@@ -1390,7 +1328,7 @@ void BrpcPsClient::push_sparse_task_consume() {
// 所有sparseTable的pushTask 进行处理
for (auto &push_sparse_task_itr : _push_sparse_task_queue_map) {
auto table_id = push_sparse_task_itr.first;
- auto *accessor = table_accessor(table_id);
+ auto *accessor = GetTableAccessor(table_id);
auto &task_queue = push_sparse_task_itr.second;
auto queue_size = task_queue->Size();
if (queue_size == 0) {
@@ -1469,7 +1407,7 @@ void BrpcPsClient::push_sparse_task_consume() {
for (int shard_idx = 0; shard_idx < request_call_num; ++shard_idx) {
merge_status[shard_idx] =
async_push_sparse_shard_threads.enqueue(std::bind(
- &BrpcPsClient::push_sparse_async_shard_push, this, task_list,
+ &BrpcPsClient::PushSparseAsyncShardPush, this, task_list,
request_kv_num, table_id, shard_idx, closure, accessor));
}
for (int shard_idx = 0; shard_idx < request_call_num; ++shard_idx) {
@@ -1485,7 +1423,7 @@ void BrpcPsClient::push_sparse_task_consume() {
for (int shard_idx = 0; shard_idx < request_call_num; ++shard_idx) {
merge_status[shard_idx] =
async_push_sparse_shard_threads.enqueue(std::bind(
- &BrpcPsClient::push_sparse_async_shard_merge, this, task_list,
+ &BrpcPsClient::PushSparseAsyncShardMerge, this, task_list,
request_kv_num, table_id, shard_idx, accessor));
}
for (int shard_idx = 0; shard_idx < request_call_num; ++shard_idx) {
@@ -1511,23 +1449,23 @@ void BrpcPsClient::push_sparse_task_consume() {
void sparse_local_merge(ValueAccessor *accessor, float *merge_data,
const float *another_data) {
- size_t col_num = accessor->GetTableInfo(UPDATE_SIZE) / sizeof(float);
+ size_t col_num = accessor->GetAccessorInfo().update_dim;
float *merge_data_shell[col_num];
const float *another_data_shell[col_num];
for (int i = 0; i < col_num; ++i) {
merge_data_shell[i] = merge_data + i;
another_data_shell[i] = another_data + i;
}
- accessor->merge(merge_data_shell, another_data_shell, 1);
+ accessor->Merge(merge_data_shell, another_data_shell, 1);
}
-int BrpcPsClient::push_sparse_async_shard_merge(
+int BrpcPsClient::PushSparseAsyncShardMerge(
std::vector> &task_list,
std::vector &request_kv_num, int table_id, int shard_idx,
ValueAccessor *accessor) {
size_t merged_kv_count = 0;
uint64_t min_key = UINT64_MAX;
- uint32_t value_size = accessor->GetTableInfo(UPDATE_SIZE);
+ uint32_t value_size = accessor->GetAccessorInfo().update_size;
thread_local std::vector> sorted_kv_list;
sorted_kv_list.clear();
@@ -1613,12 +1551,12 @@ int BrpcPsClient::push_sparse_async_shard_merge(
return 0;
}
-int BrpcPsClient::push_sparse_async_shard_push(
+int BrpcPsClient::PushSparseAsyncShardPush(
std::vector> &task_list,
std::vector