Skip to content

Commit

Permalink
Revert "Revert "Merge 'cql3: select_statement: coroutinize indexed_ta…
Browse files Browse the repository at this point in the history
…ble_select_statement::do_execute_base_query()' from Avi Kivity""

This reverts commit 22f13e7, and reinstates
commit df8e1da ("Merge 'cql3: select_statement:
coroutinize indexed_table_select_statement::do_execute_base_query()' from
Avi Kivity"). The original commit was reverted due to failures in debug
mode on aarch64, but after commit 224a287
("build: disable -Og in debug mode to avoid coroutine asan breakage"), it
works again.

Closes scylladb#12021
  • Loading branch information
avikivity authored and denesb committed Nov 18, 2022
1 parent d7649a8 commit 7da12c6
Showing 1 changed file with 49 additions and 59 deletions.
108 changes: 49 additions & 59 deletions cql3/statements/select_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -655,68 +655,58 @@ indexed_table_select_statement::do_execute_base_query(
auto cmd = prepare_command_for_base_query(qp, options, state, now, bool(paging_state));
auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);

struct base_query_state {
query::result_merger merger;
std::vector<primary_key> primary_keys;
std::vector<primary_key>::iterator current_primary_key;
size_t previous_result_size = 0;
size_t next_iteration_size = 0;
base_query_state(uint64_t row_limit, std::vector<primary_key>&& keys)
: merger(row_limit, query::max_partitions)
, primary_keys(std::move(keys))
, current_primary_key(primary_keys.begin())
{}
base_query_state(base_query_state&&) = default;
base_query_state(const base_query_state&) = delete;
};
query::result_merger merger(cmd->get_row_limit(), query::max_partitions);
std::vector<primary_key> keys = std::move(primary_keys);
std::vector<primary_key>::iterator key_it(keys.begin());
size_t previous_result_size = 0;
size_t next_iteration_size = 0;

base_query_state query_state{cmd->get_row_limit(), std::move(primary_keys)};
const bool is_paged = bool(paging_state);
return do_with(std::move(query_state), [this, is_paged, &qp, &state, &options, cmd, timeout] (auto&& query_state) {
auto &merger = query_state.merger;
auto &keys = query_state.primary_keys;
auto &key_it = query_state.current_primary_key;
auto &previous_result_size = query_state.previous_result_size;
auto &next_iteration_size = query_state.next_iteration_size;
return utils::result_repeat([this, is_paged, &previous_result_size, &next_iteration_size, &keys, &key_it, &merger, &qp, &state, &options, cmd, timeout]() {
// Starting with 1 key, we check if the result was a short read, and if not,
// we continue exponentially, asking for 2x more key than before
auto already_done = std::distance(keys.begin(), key_it);
// If the previous result already provided 1MB worth of data,
// stop increasing the number of fetched partitions
if (previous_result_size < query::result_memory_limiter::maximum_result_size) {
next_iteration_size = already_done + 1;
}
next_iteration_size = std::min<size_t>({next_iteration_size, keys.size() - already_done, max_base_table_query_concurrency});
auto key_it_end = key_it + next_iteration_size;
auto command = ::make_lw_shared<query::read_command>(*cmd);
while (key_it != keys.end()) {
// Starting with 1 key, we check if the result was a short read, and if not,
// we continue exponentially, asking for 2x more key than before
auto already_done = std::distance(keys.begin(), key_it);
// If the previous result already provided 1MB worth of data,
// stop increasing the number of fetched partitions
if (previous_result_size < query::result_memory_limiter::maximum_result_size) {
next_iteration_size = already_done + 1;
}
next_iteration_size = std::min<size_t>({next_iteration_size, keys.size() - already_done, max_base_table_query_concurrency});
auto key_it_end = key_it + next_iteration_size;
auto command = ::make_lw_shared<query::read_command>(*cmd);

query::result_merger oneshot_merger(cmd->get_row_limit(), query::max_partitions);
return utils::result_map_reduce(key_it, key_it_end, [this, &qp, &state, &options, cmd, timeout] (auto& key) {
auto command = ::make_lw_shared<query::read_command>(*cmd);
// for each partition, read just one clustering row (TODO: can
// get all needed rows of one partition at once.)
command->slice._row_ranges.clear();
if (key.clustering) {
command->slice._row_ranges.push_back(query::clustering_range::make_singular(key.clustering));
}
return qp.proxy().query_result(_schema, command, {dht::partition_range::make_singular(key.partition)}, options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()})
.then(utils::result_wrap([] (service::storage_proxy::coordinator_query_result qr) -> coordinator_result<foreign_ptr<lw_shared_ptr<query::result>>> {
return std::move(qr.query_result);
}));
}, std::move(oneshot_merger)).then(utils::result_wrap([is_paged, &previous_result_size, &key_it, key_it_end = std::move(key_it_end), &keys, &merger] (foreign_ptr<lw_shared_ptr<query::result>> result) -> coordinator_result<stop_iteration> {
auto is_short_read = result->is_short_read();
// Results larger than 1MB should be shipped to the client immediately
const bool page_limit_reached = is_paged && result->buf().size() >= query::result_memory_limiter::maximum_result_size;
previous_result_size = result->buf().size();
merger(std::move(result));
key_it = key_it_end;
return stop_iteration(is_short_read || key_it == keys.end() || page_limit_reached);
}));
}).then(utils::result_wrap([&merger, cmd] () mutable {
return make_ready_future<coordinator_result<value_type>>(value_type(merger.get(), std::move(cmd)));
}));
});
query::result_merger oneshot_merger(cmd->get_row_limit(), query::max_partitions);
coordinator_result<foreign_ptr<lw_shared_ptr<query::result>>> rresult = co_await utils::result_map_reduce(key_it, key_it_end, coroutine::lambda([&] (auto& key)
-> future<coordinator_result<foreign_ptr<lw_shared_ptr<query::result>>>> {
auto command = ::make_lw_shared<query::read_command>(*cmd);
// for each partition, read just one clustering row (TODO: can
// get all needed rows of one partition at once.)
command->slice._row_ranges.clear();
if (key.clustering) {
command->slice._row_ranges.push_back(query::clustering_range::make_singular(key.clustering));
}
coordinator_result<service::storage_proxy::coordinator_query_result> rqr
= co_await qp.proxy().query_result(_schema, command, {dht::partition_range::make_singular(key.partition)}, options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()});
if (!rqr.has_value()) {
co_return std::move(rqr).as_failure();
}
co_return std::move(rqr.value().query_result);
}), std::move(oneshot_merger));
if (!rresult.has_value()) {
co_return std::move(rresult).as_failure();
}
auto& result = rresult.value();
auto is_short_read = result->is_short_read();
// Results larger than 1MB should be shipped to the client immediately
const bool page_limit_reached = is_paged && result->buf().size() >= query::result_memory_limiter::maximum_result_size;
previous_result_size = result->buf().size();
merger(std::move(result));
key_it = key_it_end;
if (is_short_read || page_limit_reached) {
break;
}
}
co_return value_type(merger.get(), std::move(cmd));
}

future<shared_ptr<cql_transport::messages::result_message>>
Expand Down

0 comments on commit 7da12c6

Please sign in to comment.