-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
wasm_instance_cache.cc
196 lines (172 loc) · 6.53 KB
/
wasm_instance_cache.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "lang/wasm_instance_cache.hh"
#include "seastar/core/metrics.hh"
#include "seastar/core/scheduling.hh"
#include <seastar/core/units.hh>
#include <seastar/core/shared_mutex.hh>
#include <seastar/util/defer.hh>
namespace wasm {
static constexpr size_t WASM_PAGE_SIZE = 64 * KB;
instance_cache::stats& instance_cache::shard_stats() {
return _stats;
}
void instance_cache::setup_metrics() {
namespace sm = seastar::metrics;
_metrics.add_group("user_functions", {
sm::make_counter("cache_hits", wasm::instance_cache::shard_stats().cache_hits,
sm::description("The number of user defined function cache hits")),
sm::make_counter("cache_misses", wasm::instance_cache::shard_stats().cache_misses,
sm::description("The number of user defined functions loaded")),
sm::make_counter("cache_blocks", wasm::instance_cache::shard_stats().cache_blocks,
sm::description("The number of times a user defined function waited for an instance")),
sm::make_gauge("cache_instace_count_any", [this] { return _cache.size(); },
sm::description("The total number of cached wasm instances, instances in use and empty instances")),
sm::make_gauge("cache_total_size", [this] { return _total_size; },
sm::description("The total size of instances stored in the user defined function cache")),
});
}
instance_cache::instance_cache(size_t size, seastar::lowres_clock::duration timer_period)
: _timer([this] { return on_timer(); })
, _timer_period(timer_period)
, _max_size(size)
{
setup_metrics();
_timer.arm_periodic(_timer_period);
}
wasm_instance instance_cache::load(wasm::context& ctx) {
auto store = wasmtime::create_store(**ctx.engine_ptr);
auto instance = wasmtime::create_instance(**ctx.engine_ptr, **ctx.module, *store);
auto func = wasmtime::create_func(*instance, *store, ctx.function_name);
auto memory = wasmtime::get_memory(*instance, *store);
return wasm_instance{.store=std::move(store), .instance=std::move(instance), .func=std::move(func), .memory=std::move(memory)};
}
// lru must not be empty, and its elements must refer to entries in _cache
void instance_cache::evict_lru() noexcept {
auto& entry = _lru.front();
_total_size -= entry.instance_size;
entry.cache_entry->instance = std::nullopt;
entry.cache_entry->it = _lru.end();
_lru.pop_front();
}
void instance_cache::on_timer() noexcept {
auto now = seastar::lowres_clock::now();
while (!_lru.empty() && _lru.front().timestamp + _timer_period < now) {
evict_lru();
}
}
static uint32_t get_instance_size(wasm_instance& instance) {
// reserve 1 wasm page for instance data other than the wasm memory
return WASM_PAGE_SIZE * (1 + instance.memory->size(*instance.store));
}
seastar::future<instance_cache::value_type> instance_cache::get(const db::functions::function_name& name, const std::vector<data_type>& arg_types, wasm::context& ctx) {
auto [it, end_it] = _cache.equal_range(name);
while (it != end_it) {
if (it->second->scheduling_group == seastar::current_scheduling_group() && it->second->arg_types == arg_types) {
break;
}
++it;
}
if (it == end_it) {
it = _cache.emplace(name, make_lw_shared<cache_entry_type>(cache_entry_type{
.scheduling_group = seastar::current_scheduling_group(),
.arg_types = arg_types,
.mutex = seastar::shared_mutex(),
.instance = std::nullopt,
.it = _lru.end(),
}));
}
auto& entry = it->second;
auto f = entry->mutex.lock();
if (!f.available()) {
++shard_stats().cache_blocks;
}
return f.then([this, entry, &ctx] {
if (!entry->instance) {
++shard_stats().cache_misses;
entry->instance = load(ctx);
} else {
// because we don't want to remove an instance after it starts being used,
// and also because we can't track its size efficiently, we remove it from
// lru and subtract its size from the total size until it is no longer used
++shard_stats().cache_hits;
_total_size -= entry->it->instance_size;
_lru.erase(entry->it);
entry->it = _lru.end();
}
return entry;
});
}
void instance_cache::recycle(instance_cache::value_type val) noexcept {
val->mutex.unlock();
size_t size;
try {
size = get_instance_size(val->instance.value());
if (size > 1 * MB) {
val->instance = std::nullopt;
return;
}
} catch (...) {
// we can't get the instance size, so we can't recycle it
val->instance = std::nullopt;
return;
}
while (_total_size + size > _max_size) {
// make space for the recycled instance if needed. we won't
// remove the instance itself because it was not in the lru
evict_lru();
}
try {
// new instance_size is set here
_lru.push_back({val, seastar::lowres_clock::now(), size});
val->it = --_lru.end();
_total_size += val->it->instance_size;
} catch (...) {
// we can't add the instance to the lru, so we can't recycle it
val->instance = std::nullopt;
}
}
void instance_cache::remove(const db::functions::function_name& name, const std::vector<data_type>& arg_types) noexcept {
auto [it,end_it] = _cache.equal_range(name);
while (it != end_it) {
auto& entry_ptr = it->second;
if (entry_ptr->arg_types == arg_types) {
if (entry_ptr->it != _lru.end()) {
_total_size -= entry_ptr->it->instance_size;
_lru.erase(entry_ptr->it);
}
it = _cache.erase(it);
} else {
++it;
}
}
}
size_t instance_cache::size() const {
return _cache.size();
}
size_t instance_cache::max_size() const {
return _max_size;
}
size_t instance_cache::memory_footprint() const {
return _total_size;
}
future<> instance_cache::stop() {
_timer.cancel();
return make_ready_future<>();
}
}
namespace std {
inline std::ostream& operator<<(std::ostream& out, const seastar::scheduling_group& sg) {
return out << sg.name();
}
template <>
struct equal_to<seastar::scheduling_group> {
bool operator()(seastar::scheduling_group& sg1, seastar::scheduling_group& sg2) const noexcept {
return sg1 == sg2;
}
};
}