Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hill climbing #191

Merged
merged 10 commits into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
70 changes: 69 additions & 1 deletion BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void WhenNewItemsAreAddedTheyArePromotedBasedOnFrequency()
LogLru();

for (int k = 0; k < 2; k++)
{
{
for (int j = 0; j < 6; j++)
{
for (int i = 0; i < 15; i++)
Expand Down Expand Up @@ -264,6 +264,74 @@ public void WriteUpdatesProtectedLruOrder()
cache.TryGet(7, out var _).Should().BeTrue();
}

[Fact]
public void WhenHitRateChangesWindowSizeIsAdapted()
{
cache = new ConcurrentLfu<int, int>(1, 20, new NullScheduler());

// First completely fill the cache, push entries into protected
for (int i = 0; i < 20; i++)
{
cache.GetOrAdd(i, k => k);
}

// W [19] Protected [] Probation [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18]
cache.PendingMaintenance();
LogLru();

for (int i = 0; i < 15; i++)
{
cache.GetOrAdd(i, k => k);
}

// W [19] Protected [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14] Probation [15,16,17,18]
cache.PendingMaintenance();
LogLru();

// The reset sample size is 200, so do 200 cache hits
// W [19] Protected [12,13,14,15,16,17,18,0,1,2,3,4,5,6,7] Probation [8,9,10,11]
for (int j = 0; j < 10; j++)
for (int i = 0; i < 20; i++)
{
cache.GetOrAdd(i, k => k);
}

cache.PendingMaintenance();
LogLru();

// then miss 200 times
// W [300] Protected [12,13,14,15,16,17,18,0,1,2,3,4,5,6,7] Probation [9,10,11,227]
for (int i = 0; i < 201; i++)
{
cache.GetOrAdd(i + 100, k => k);
}

cache.PendingMaintenance();
LogLru();

// then miss 200 more times (window adaptation +1 window slots)
// W [399,400] Protected [14,15,16,17,18,0,1,2,3,4,5,6,7,227] Probation [9,10,11,12]
for (int i = 0; i < 201; i++)
{
cache.GetOrAdd(i + 200, k => k);
}

cache.PendingMaintenance();
LogLru();

// make 2 requests to new keys, if window is size is now 2 both will exist:
cache.GetOrAdd(666, k => k);
cache.GetOrAdd(667, k => k);

cache.PendingMaintenance();
LogLru();

cache.TryGet(666, out var _).Should().BeTrue();
cache.TryGet(667, out var _).Should().BeTrue();

this.output.WriteLine($"Scheduler ran {cache.Scheduler.RunCount} times.");
}

[Fact]
public void ReadSchedulesMaintenanceWhenBufferIsFull()
{
Expand Down
154 changes: 154 additions & 0 deletions BitFaster.Caching.UnitTests/Lfu/LfuCapacityPartitionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,19 @@
using BitFaster.Caching.Lfu;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace BitFaster.Caching.UnitTests.Lfu
{
public class LfuCapacityPartitionTests
{
private readonly ITestOutputHelper output;

public LfuCapacityPartitionTests(ITestOutputHelper output)
{
this.output = output;
}

[Fact]
public void WhenCapacityIsLessThan3CtorThrows()
{
Expand All @@ -37,5 +45,151 @@ public void CtorSetsExpectedCapacity(int capacity, int expectedWindow, int expec
partition.Protected.Should().Be(expectedProtected);
partition.Probation.Should().Be(expectedProbation);
}

[Fact]
public void WhenHitRateKeepsDecreasingWindowIsCappedAt80Percent()
{
int max = 100;
var partition = new LfuCapacityPartition(max);
var metrics = new TestMetrics();

SetHitRate(partition, metrics, max, 0.9);

for (int i = 0; i < 20; i++)
{
SetHitRate(partition, metrics, max, 0.1);
}

partition.Window.Should().Be(80);
partition.Protected.Should().Be(16);
}

[Fact]
public void WhenHitRateIsStableWindowConverges()
{
int max = 100;
var partition = new LfuCapacityPartition(max);
var metrics = new TestMetrics();

// start by causing some adaptation in window so that steady state is not window = 1
SetHitRate(partition, metrics, max, 0.9);

for (int i = 0; i < 5; i++)
{
SetHitRate(partition, metrics, max, 0.1);
}

this.output.WriteLine("Decrease hit rate");
SetHitRate(partition, metrics, max, 0.0);
// window is now larger

// go into steady state with small up and down fluctuation in hit rate
List<int> windowSizes = new List<int>(200);
this.output.WriteLine("Stable hit rate");

double inc = 0.01;
for (int i = 0; i < 200; i++)
{
double c = i % 2 == 0 ? inc : -inc;
SetHitRate(partition, metrics, max, 0.9 + c);

windowSizes.Add(partition.Window);
}

// verify that hit rate has converged, last 50 samples have low variance
var last50 = windowSizes.Skip(150).Take(50).ToArray();

var minWindow = last50.Min();
var maxWindow = last50.Max();

(maxWindow - minWindow).Should().BeLessThanOrEqualTo(1);
}

[Fact]
public void WhenHitRateFluctuatesWindowIsAdapted()
{
int max = 100;
var partition = new LfuCapacityPartition(max);
var metrics = new TestMetrics();

var snapshot = new WindowSnapshot();

// steady state, window stays at 1 initially
SetHitRate(partition, metrics, max, 0.9);
SetHitRate(partition, metrics, max, 0.9);
snapshot.Capture(partition);

// Decrease hit rate, verify window increases each time
this.output.WriteLine("1. Decrease hit rate");
SetHitRate(partition, metrics, max, 0.1);
snapshot.AssertWindowIncreased(partition);
SetHitRate(partition, metrics, max, 0.1);
snapshot.AssertWindowIncreased(partition);

// Increase hit rate, verify window continues to increase
this.output.WriteLine("2. Increase hit rate");
SetHitRate(partition, metrics, max, 0.9);
snapshot.AssertWindowIncreased(partition);

// Decrease hit rate, verify window decreases
this.output.WriteLine("3. Decrease hit rate");
SetHitRate(partition, metrics, max, 0.1);
snapshot.AssertWindowDecreased(partition);

// Increase hit rate, verify window continues to decrease
this.output.WriteLine("4. Increase hit rate");
SetHitRate(partition, metrics, max, 0.9);
snapshot.AssertWindowDecreased(partition);
SetHitRate(partition, metrics, max, 0.9);
snapshot.AssertWindowDecreased(partition);
}

private void SetHitRate(LfuCapacityPartition p, TestMetrics m, int max, double hitRate)
{
int total = max * 10;
m.Hits += (long)(total * hitRate);
m.Misses += total - (long)(total * hitRate);

p.OptimizePartitioning(m, total);

this.output.WriteLine($"W: {p.Window} P: {p.Protected}");
}

private class WindowSnapshot
{
private int prev;

public void Capture(LfuCapacityPartition p)
{
prev = p.Window;
}

public void AssertWindowIncreased(LfuCapacityPartition p)
{
p.Window.Should().BeGreaterThan(prev);
prev = p.Window;
}

public void AssertWindowDecreased(LfuCapacityPartition p)
{
p.Window.Should().BeLessThan(prev);
prev = p.Window;
}
}

private class TestMetrics : ICacheMetrics
{
public double HitRatio => (double)Hits / (double)Total;

public long Total => Hits + Misses;

public long Hits { get; set; }

public long Misses { get; set; }

public long Evicted { get; set; }

public long Updated { get; set; }
}
}
}
32 changes: 27 additions & 5 deletions BitFaster.Caching/Lfu/ConcurrentLfu.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
using BitFaster.Caching.Buffers;
using BitFaster.Caching.Lru;
using BitFaster.Caching.Scheduler;
using static BitFaster.Caching.Lfu.LfuCapacityPartition;

namespace BitFaster.Caching.Lfu
{
Expand Down Expand Up @@ -75,6 +76,8 @@ public ConcurrentLfu(int concurrencyLevel, int capacity, IScheduler scheduler)
this.dictionary = new ConcurrentDictionary<K, LfuNode<K, V>>(concurrencyLevel, capacity, comparer);

this.readBuffer = new StripedBuffer<LfuNode<K, V>>(concurrencyLevel, BufferSize);

// TODO: how big should this be in total? We shouldn't allow more than some capacity % of writes in the buffer
this.writeBuffer = new StripedBuffer<LfuNode<K, V>>(concurrencyLevel, BufferSize);

this.cmSketch = new CmSketch<K>(1, comparer);
Expand Down Expand Up @@ -396,8 +399,9 @@ private bool Maintenance()
ArrayPool<LfuNode<K, V>>.Shared.Return(localDrainBuffer);
#endif

// TODO: hill climb
EvictEntries();
this.capacity.OptimizePartitioning(this.metrics, this.cmSketch.ResetSampleSize);
ReFitProtected();

// Reset to idle if either
// 1. We drained both input buffers (all work done)
Expand Down Expand Up @@ -521,13 +525,13 @@ private int EvictFromWindow()

private void EvictFromMain(int candidates)
{
//var victimQueue = Position.Probation;
// var victimQueue = Position.Probation;
var victim = this.probationLru.First;
var candidate = this.probationLru.Last;

while (this.windowLru.Count + this.probationLru.Count + this.protectedLru.Count > this.Capacity)
{
// TODO: is this logic reachable?
// TODO: this logic is only reachable if entries have time expiry, and are removed early.
// Search the admission window for additional candidates
//if (candidates == 0)
//{
Expand Down Expand Up @@ -555,7 +559,7 @@ private void EvictFromMain(int candidates)
// break;
//}

//// Evict immediately if only one of the entries is present
// Evict immediately if only one of the entries is present
//if (victim == null)
//{
// var previous = candidate.Previous;
Expand All @@ -581,13 +585,17 @@ private void EvictFromMain(int candidates)
if (AdmitCandidate(candidate.Key, victim.Key))
{
var evictee = victim;
victim = victim.Previous;

// victim is initialized to first, and iterates forwards
victim = victim.Next;

Evict(evictee);
}
else
{
var evictee = candidate;

// candidate is initialized to last, and iterates backwards
candidate = candidate.Previous;

Evict(evictee);
Expand All @@ -611,6 +619,20 @@ private void Evict(LfuNode<K, V> evictee)
this.metrics.evictedCount++;
}

private void ReFitProtected()
{
// If hill climbing decreased protected, there may be too many items
// - demote overflow to probation.
while (this.protectedLru.Count > this.capacity.Protected)
{
var demoted = this.protectedLru.First;
this.protectedLru.RemoveFirst();

demoted.Position = Position.Probation;
this.probationLru.AddLast(demoted);
}
}

[DebuggerDisplay("{Format()}")]
private class DrainStatus
{
Expand Down