Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CacheLongKeyLIRS concurrency improvements #3069

Draft
wants to merge 22 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions h2/src/main/org/h2/engine/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -1897,6 +1897,10 @@ public TraceSystem getTraceSystem() {
return traceSystem;
}

public int getCacheSize() {
return cacheSize;
}

public synchronized void setCacheSize(int kb) {
if (starting) {
int max = MathUtils.convertLongToInt(Utils.getMemoryMax()) / 2;
Expand Down
21 changes: 21 additions & 0 deletions h2/src/main/org/h2/engine/DbSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.HashMap;

import org.h2.api.ErrorCode;
import org.h2.util.Utils;
import org.h2.message.DbException;

/**
Expand Down Expand Up @@ -306,6 +307,26 @@ public class DbSettings extends SettingsBase {
*/
public final boolean compressData = get("COMPRESS", false);

/**
* Database setting <code>CACHE_CONCURRENCY</code>
* (default: 16).<br />
* Set the read cache concurrency.
*/
public final int cacheConcurrency = get("CACHE_CONCURRENCY", 16);

/**
* Database setting <code>AUTO_COMMIT_BUFFER_SIZE_KB</code>
* (default: depends on max heap).<br />
* Set the size of the write buffer, in KB disk space (for file-based
* stores). Unless auto-commit is disabled, changes are automatically
* saved if there are more than this amount of changes.
*
* When the value is set to 0 or lower, data is not automatically
* stored.
*/
public final int autoCommitBufferSize = get("AUTO_COMMIT_BUFFER_SIZE_KB",
Math.max(1, Math.min(19, Utils.scaleForAvailableMemory(64))) * 1024);
Comment on lines +310 to +328
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, explain why you need these settings.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use H2 as a cache for binary chunks in high concurrency environment (currently 60 threads). After we increase cacheConcurrency up to 1024 H2 looks really well most of the time (same or even better than postgresql). The only problem arises after we emptied db content - we observed a significant degradation in throughput and cpu usage.
Here's what we run into:

  1. CPU contention in Segment.get (optimization in this PR). Looks like when primary index is small and all 60 threads wants to use it they compete for the exactly same segments. The only way to fix it is to improve LIRS concurrency.
  2. After the fix Incorrect LEFT JOIN with aggrigated query in the join condition #1 write rate increased and next is 'back pressure' feature.
    // if unsaved memory creation rate is to high,
    // some back pressure need to be applied
    // to slow things down and avoid OOME
    Looks like 2 Mb is too small for 150 Mb per sec. This is why we need to introduce AUTO_COMMIT_BUFFER_SIZE_KB.


/**
* Database setting <code>IGNORE_CATALOGS</code>
* (default: false).<br />
Expand Down
150 changes: 124 additions & 26 deletions h2/src/main/org/h2/mvstore/cache/CacheLongKeyLIRS.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
package org.h2.mvstore.cache;

import java.lang.ref.WeakReference;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -161,9 +165,13 @@ public V put(long key, V value, int memory) {
// check whether resize is required: synchronize on s, to avoid
// concurrent resizes (concurrent reads read
// from the old segment)
synchronized (s) {
s = resizeIfNeeded(s, segmentIndex);
return s.put(key, hash, value, memory);
s.l.lock();
try {
Segment<V> s2 = resizeIfNeeded(s, segmentIndex);
return s2.put(key, hash, value, memory);
}
finally {
s.l.unlock();
}
}

Expand Down Expand Up @@ -208,9 +216,13 @@ public V remove(long key) {
// check whether resize is required: synchronize on s, to avoid
// concurrent resizes (concurrent reads read
// from the old segment)
synchronized (s) {
s = resizeIfNeeded(s, segmentIndex);
return s.remove(key, hash);
s.l.lock();
try {
Segment<V> s2 = resizeIfNeeded(s, segmentIndex);
return s2.remove(key, hash);
}
finally {
s.l.unlock();
}
}

Expand Down Expand Up @@ -312,7 +324,7 @@ public long getMaxMemory() {
*
* @return the entry set
*/
public synchronized Set<Map.Entry<Long, V>> entrySet() {
public Set<Map.Entry<Long, V>> entrySet() {
return getMap().entrySet();
}

Expand Down Expand Up @@ -389,7 +401,7 @@ public long getHits() {
public long getMisses() {
int x = 0;
for (Segment<V> s : segments) {
x += s.misses;
x += s.misses.get();
}
return x;
}
Expand Down Expand Up @@ -491,9 +503,13 @@ public void putAll(Map<Long, ? extends V> m) {
*/
public void trimNonResidentQueue() {
for (Segment<V> s : segments) {
synchronized (s) {
s.l.lock();
try {
s.trimNonResidentQueue();
}
finally {
s.l.unlock();
}
}
}

Expand Down Expand Up @@ -527,7 +543,7 @@ private static class Segment<V> {
/**
* The number of cache misses.
*/
long misses;
final AtomicLong misses;

/**
* The map array. The size is always a power of 2.
Expand Down Expand Up @@ -602,6 +618,16 @@ private static class Segment<V> {
*/
private int stackMoveCounter;

/*
* Holds entries that were concurrently get()
*/
private final ConcurrentSkipListSet<Entry<V>> concAccess;

/*
* Serialize access to this segments
*/
private final ReentrantLock l;

/**
* Create a new cache segment.
* @param maxMemory the maximum memory to use
Expand Down Expand Up @@ -632,6 +658,10 @@ private static class Segment<V> {
@SuppressWarnings("unchecked")
Entry<V>[] e = new Entry[len];
entries = e;

misses = new AtomicLong();
concAccess = new ConcurrentSkipListSet<>();
l = new ReentrantLock();
}

/**
Expand All @@ -646,7 +676,7 @@ private static class Segment<V> {
this(old.maxMemory, old.stackMoveDistance, len,
old.nonResidentQueueSize, old.nonResidentQueueSizeHigh);
hits = old.hits;
misses = old.misses;
misses.set(old.misses.get());
Entry<V> s = old.stack.stackPrev;
while (s != old.stack) {
Entry<V> e = new Entry<>(s);
Expand Down Expand Up @@ -710,17 +740,37 @@ private void addToMap(Entry<V> e) {
* @param e the entry
* @return the value, or null if there is no resident entry
*/
synchronized V get(Entry<V> e) {
V get(Entry<V> e) {
V value = e == null ? null : e.getValue();
if (value == null) {
// the entry was not found
// or it was a non-resident entry
misses++;
} else {
misses.incrementAndGet();
return null;
}
if (!l.tryLock()) {
//concAccess.add(e);
return value;
}
try {
access(e);
hits++;

// process entries that were accessed concurrently
// while (true) {
// Entry<V> p = concAccess.pollFirst();
// if (p == null) {
// break;
// }
// access(p);
// hits++;
// }

return value;
}
finally {
l.unlock();
}
return value;
}

/**
Expand Down Expand Up @@ -749,9 +799,8 @@ private void access(Entry<V> e) {
V v = e.getValue();
if (v != null) {
removeFromQueue(e);
if (e.reference != null) {
if (e.value == null) {
e.value = v;
e.reference = null;
usedMemory += e.memory;
}
if (e.stackNext != null) {
Expand Down Expand Up @@ -787,7 +836,17 @@ private void access(Entry<V> e) {
* @param memory the memory used for the given entry
* @return the old value, or null if there was no resident entry
*/
synchronized V put(long key, int hash, V value, int memory) {
V put(long key, int hash, V value, int memory) {
l.lock();
try {
return putUnlocked(key, hash, value, memory);
}
finally {
l.unlock();
}
}

private V putUnlocked(long key, int hash, V value, int memory) {
Entry<V> e = find(key, hash);
boolean existed = e != null;
V old = null;
Expand Down Expand Up @@ -832,7 +891,17 @@ synchronized V put(long key, int hash, V value, int memory) {
* @param hash the hash
* @return the old value, or null if there was no resident entry
*/
synchronized V remove(long key, int hash) {
V remove(long key, int hash) {
l.lock();
try {
return removeUnlocked(key, hash);
}
finally {
l.unlock();
}
}

private V removeUnlocked(long key, int hash) {
int index = hash & mask;
Entry<V> e = entries[index];
if (e == null) {
Expand Down Expand Up @@ -897,7 +966,6 @@ private void evictBlock() {
Entry<V> e = queue.queuePrev;
usedMemory -= e.memory;
removeFromQueue(e);
e.reference = new WeakReference<>(e.value);
e.value = null;
addToQueue(queue2, e);
// the size of the non-resident-cold entries needs to be limited
Expand Down Expand Up @@ -1031,7 +1099,17 @@ private void removeFromQueue(Entry<V> e) {
* @param nonResident true for non-resident entries
* @return the key list
*/
synchronized List<Long> keys(boolean cold, boolean nonResident) {
List<Long> keys(boolean cold, boolean nonResident) {
l.lock();
try {
return keysUnlocked(cold, nonResident);
}
finally {
l.unlock();
}
}

private List<Long> keysUnlocked(boolean cold, boolean nonResident) {
ArrayList<Long> keys = new ArrayList<>();
if (cold) {
Entry<V> start = nonResident ? queue2 : queue;
Expand All @@ -1053,7 +1131,17 @@ synchronized List<Long> keys(boolean cold, boolean nonResident) {
*
* @return the set of keys
*/
synchronized Set<Long> keySet() {
Set<Long> keySet() {
l.lock();
try {
return keySetUnlocked();
}
finally {
l.unlock();
}
}

private Set<Long> keySetUnlocked() {
HashSet<Long> set = new HashSet<>();
for (Entry<V> e = stack.stackNext; e != stack; e = e.stackNext) {
set.add(e.key);
Expand All @@ -1074,7 +1162,6 @@ synchronized Set<Long> keySet() {
void setMaxMemory(long maxMemory) {
this.maxMemory = maxMemory;
}

}

/**
Expand All @@ -1086,7 +1173,7 @@ void setMaxMemory(long maxMemory) {
*
* @param <V> the value type
*/
static class Entry<V> {
static class Entry<V> implements Comparable<Entry<V>> {

/**
* The key.
Expand Down Expand Up @@ -1148,10 +1235,15 @@ static class Entry<V> {
this.key = key;
this.memory = memory;
this.value = value;
if (value != null) {
this.reference = new WeakReference<>(value);
}
}

Entry(Entry<V> old) {
this(old.key, old.value, old.memory);
this.key = old.key;
this.memory = old.memory;
this.value = old.value;
this.reference = old.reference;
this.topMove = old.topMove;
}
Expand All @@ -1166,12 +1258,18 @@ boolean isHot() {
}

V getValue() {
return value == null ? reference.get() : value;
final V v = value;
return v == null ? reference.get() : v;
}

int getMemory() {
return value == null ? 0 : memory;
}

@Override
public int compareTo(Entry<V> tgt) {
return Long.compare(this.key, tgt.key);
}
}

/**
Expand Down
4 changes: 4 additions & 0 deletions h2/src/main/org/h2/mvstore/db/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ public Store(Database db) {
// otherwise background thread would compete for store lock
// with maps opening procedure
builder.autoCommitDisabled();

builder.cacheConcurrency(db.getSettings().cacheConcurrency);
builder.autoCommitBufferSize(db.getSettings().autoCommitBufferSize);
builder.cacheSize(db.getCacheSize());
}
this.encrypted = encrypted;
try {
Expand Down