Skip to content

Commit

Permalink
Merge branch 'gpugraph' of https://github.com/xuewujiao/Paddle into a…
Browse files Browse the repository at this point in the history
…dd_random_settings_v2
  • Loading branch information
DesmonDay committed Oct 11, 2022
2 parents d6c50e8 + 7899bd6 commit f33d447
Show file tree
Hide file tree
Showing 11 changed files with 360 additions and 196 deletions.
43 changes: 34 additions & 9 deletions paddle/fluid/distributed/ps/table/common_graph_table.cc
Expand Up @@ -72,6 +72,11 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
std::vector<uint64_t> node_id_array[task_pool_size_];
std::vector<paddle::framework::GpuPsFeaInfo>
node_fea_info_array[task_pool_size_];
slot_feature_num_map_.resize(slot_num);
for (int k = 0; k < slot_num; ++k) {
slot_feature_num_map_[k] = 0;
}

for (size_t i = 0; i < bags.size(); i++) {
if (bags[i].size() > 0) {
tasks.push_back(_shards_task_pool[i]->enqueue([&, i, this]() -> int {
Expand All @@ -92,13 +97,17 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
int total_feature_size = 0;
for (int k = 0; k < slot_num; ++k) {
v->get_feature_ids(k, &feature_ids);
total_feature_size += feature_ids.size();
int feature_ids_size = feature_ids.size();
if (slot_feature_num_map_[k] < feature_ids_size) {
slot_feature_num_map_[k] = feature_ids_size;
}
total_feature_size += feature_ids_size;
if (!feature_ids.empty()) {
feature_array[i].insert(feature_array[i].end(),
feature_ids.begin(),
feature_ids.end());
slot_id_array[i].insert(
slot_id_array[i].end(), feature_ids.size(), k);
slot_id_array[i].end(), feature_ids_size, k);
}
}
x.feature_size = total_feature_size;
Expand All @@ -111,6 +120,13 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
}
}
for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get();

std::stringstream ss;
for (int k = 0; k < slot_num; ++k) {
ss << slot_feature_num_map_[k] << " ";
}
VLOG(0) << "slot_feature_num_map: " << ss.str();

paddle::framework::GpuPsCommGraphFea res;
uint64_t tot_len = 0;
for (int i = 0; i < task_pool_size_; i++) {
Expand Down Expand Up @@ -1844,9 +1860,14 @@ int GraphTable::parse_feature(int idx,
// "")
thread_local std::vector<paddle::string::str_ptr> fields;
fields.clear();
const char c = feature_separator_.at(0);
char c = slot_feature_separator_.at(0);
paddle::string::split_string_ptr(feat_str, len, c, &fields);

thread_local std::vector<paddle::string::str_ptr> fea_fields;
fea_fields.clear();
c = feature_separator_.at(0);
paddle::string::split_string_ptr(fields[1].ptr, fields[1].len, c, &fea_fields);

std::string name = fields[0].to_string();
auto it = feat_id_map[idx].find(name);
if (it != feat_id_map[idx].end()) {
Expand All @@ -1857,26 +1878,26 @@ int GraphTable::parse_feature(int idx,
// string_vector_2_string(fields.begin() + 1, fields.end(), ' ',
// fea_ptr);
FeatureNode::parse_value_to_bytes<uint64_t>(
fields.begin() + 1, fields.end(), fea_ptr);
fea_fields.begin(), fea_fields.end(), fea_ptr);
return 0;
} else if (dtype == "string") {
string_vector_2_string(fields.begin() + 1, fields.end(), ' ', fea_ptr);
string_vector_2_string(fea_fields.begin(), fea_fields.end(), ' ', fea_ptr);
return 0;
} else if (dtype == "float32") {
FeatureNode::parse_value_to_bytes<float>(
fields.begin() + 1, fields.end(), fea_ptr);
fea_fields.begin(), fea_fields.end(), fea_ptr);
return 0;
} else if (dtype == "float64") {
FeatureNode::parse_value_to_bytes<double>(
fields.begin() + 1, fields.end(), fea_ptr);
fea_fields.begin(), fea_fields.end(), fea_ptr);
return 0;
} else if (dtype == "int32") {
FeatureNode::parse_value_to_bytes<int32_t>(
fields.begin() + 1, fields.end(), fea_ptr);
fea_fields.begin(), fea_fields.end(), fea_ptr);
return 0;
} else if (dtype == "int64") {
FeatureNode::parse_value_to_bytes<uint64_t>(
fields.begin() + 1, fields.end(), fea_ptr);
fea_fields.begin(), fea_fields.end(), fea_ptr);
return 0;
}
} else {
Expand Down Expand Up @@ -2113,6 +2134,10 @@ void GraphTable::set_feature_separator(const std::string &ch) {
feature_separator_ = ch;
}

void GraphTable::set_slot_feature_separator(const std::string &ch) {
slot_feature_separator_ = ch;
}

int32_t GraphTable::get_server_index_by_id(uint64_t id) {
return id % shard_num / shard_num_per_server;
}
Expand Down
4 changes: 4 additions & 0 deletions paddle/fluid/distributed/ps/table/common_graph_table.h
Expand Up @@ -680,6 +680,7 @@ class GraphTable : public Table {
int32_t make_complementary_graph(int idx, int64_t byte_size);
int32_t dump_edges_to_ssd(int idx);
int32_t get_partition_num(int idx) { return partitions[idx].size(); }
std::vector<int> slot_feature_num_map() const { return slot_feature_num_map_; }
std::vector<uint64_t> get_partition(int idx, int index) {
if (idx >= (int)partitions.size() || index >= (int)partitions[idx].size())
return std::vector<uint64_t>();
Expand All @@ -697,6 +698,7 @@ class GraphTable : public Table {
#endif
virtual int32_t add_comm_edge(int idx, uint64_t src_id, uint64_t dst_id);
virtual int32_t build_sampler(int idx, std::string sample_type = "random");
void set_slot_feature_separator(const std::string &ch);
void set_feature_separator(const std::string &ch);
std::vector<std::vector<GraphShard *>> edge_shards, feature_shards;
size_t shard_start, shard_end, server_num, shard_num_per_server, shard_num;
Expand Down Expand Up @@ -735,7 +737,9 @@ class GraphTable : public Table {
// std::shared_ptr<GraphSampler> graph_sampler;
// REGISTER_GRAPH_FRIEND_CLASS(2, CompleteGraphSampler, BasicBfsGraphSampler)
#endif
std::string slot_feature_separator_ = std::string(" ");
std::string feature_separator_ = std::string(" ");
std::vector<int> slot_feature_num_map_;
};

/*
Expand Down

0 comments on commit f33d447

Please sign in to comment.