Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protect the producer index in case of OutOfMemoryError #241

Merged
merged 1 commit into from Apr 6, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -260,7 +260,7 @@ public boolean offer(final E e)
case QUEUE_FULL:
return false;
case QUEUE_RESIZE:
resize(mask, buffer, pIndex, e);
resize(mask, buffer, pIndex, e, null);
return true;
}
}
Expand Down Expand Up @@ -563,7 +563,7 @@ public int fill(Supplier<E> s, int batchSize)
case QUEUE_FULL:
return 0;
case QUEUE_RESIZE:
resize(mask, buffer, pIndex, s.get());
resize(mask, buffer, pIndex, null, s);
return 1;
}
}
Expand Down Expand Up @@ -641,10 +641,21 @@ public void drain(Consumer<E> c, WaitStrategy w, ExitCondition exit)
}
}

private void resize(long oldMask, E[] oldBuffer, long pIndex, E e)
private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s)
{
assert (e != null && s == null) || (e == null || s != null);
int newBufferLength = getNextBufferSize(oldBuffer);
final E[] newBuffer = allocate(newBufferLength);
final E[] newBuffer;
try
{
newBuffer = allocate(newBufferLength);
}
catch (OutOfMemoryError oom)
{
assert lvProducerIndex() == pIndex + 1;
soProducerIndex(pIndex);
throw oom;
}

producerBuffer = newBuffer;
final int newMask = (newBufferLength - 2) << 1;
Expand All @@ -653,7 +664,7 @@ private void resize(long oldMask, E[] oldBuffer, long pIndex, E e)
final long offsetInOld = modifiedCalcElementOffset(pIndex, oldMask);
final long offsetInNew = modifiedCalcElementOffset(pIndex, newMask);

soElement(newBuffer, offsetInNew, e);// element in new array
soElement(newBuffer, offsetInNew, e == null ? s.get() : e);// element in new array
soElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked

// ASSERT code
Expand Down