Skip to content

Commit

Permalink
[Java, C#, C++] Fix precedence checks to account for non-contiguous v…
Browse files Browse the repository at this point in the history
…ersions. (#989)

Relates to issue #988.

An SBE message might not change in every version of a schema. For
example, another message might change.

Previously, the field precedence checking model would expect an exact
match for `actingVersion` in `wrap` to enter one of its initial states.
However, it is only aware of the versions in which a message schema has
changed. Therefore, it was possible for `actingVersion` not to match any
of these versions, in which case the initial state that represented a
codec wrapped around the latest known version of the format was picked.

Now, we do _not_ expect an exact match in `wrap`. Instead, we select the
"best" match. For example, if a codec was changed in v1 and v3, and the
acting version is v2, we will decode using v1. However, if the acting
version is v4, we would decode as v3.
  • Loading branch information
ZachBray committed Apr 11, 2024
1 parent c9be629 commit 7e6e503
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 63 deletions.
19 changes: 19 additions & 0 deletions csharp/sbe-tests/FieldAccessOrderCheckTests.cs
Expand Up @@ -3159,6 +3159,25 @@ public void DisallowsIncompleteMessagesDueToMissingNestedGroup1(int bCount, stri
var exception = Assert.ThrowsException<InvalidOperationException>(encoder.CheckEncodingIsComplete);
StringAssert.Contains(exception.Message, $"Not fully encoded, current state: {expectedState}");
}

[TestMethod]
public void AllowsSkippingFutureGroupWhenDecodingFromVersionWithNoChangesInMessage()
{
var encoder = new AddGroupBeforeVarDataV0()
.WrapForEncodeAndApplyHeader(_buffer, Offset, _messageHeader);

_messageHeader.TemplateId = SkipVersionAddGroupBeforeVarDataV2.TemplateId;
_messageHeader.Version = 1;

encoder.A = 42;
encoder.SetB("abc");

var decoder = new SkipVersionAddGroupBeforeVarDataV2()
.WrapForDecodeAndApplyHeader(_buffer, Offset, _messageHeader);

Assert.AreEqual(42, decoder.A);
Assert.AreEqual(decoder.GetB(), "abc");
}

private void ModifyHeaderToLookLikeVersion0()
{
Expand Down
Expand Up @@ -46,7 +46,7 @@ public final class FieldPrecedenceModel
new CodecInteraction.CodecInteractionFactory(groupPathsByField, topLevelBlockFields);
private final Map<CodecInteraction, List<TransitionGroup>> transitionsByInteraction = new LinkedHashMap<>();
private final Map<State, List<TransitionGroup>> transitionsByState = new HashMap<>();
private final Int2ObjectHashMap<State> versionWrappedStates = new Int2ObjectHashMap<>();
private final TreeMap<Integer, State> versionWrappedStates = new TreeMap<>();
private final State notWrappedState = allocateState("NOT_WRAPPED");
private final String generatedRepresentationClassName;
private State encoderWrappedState;
Expand Down Expand Up @@ -103,13 +103,11 @@ public State latestVersionWrappedState()
* Iterates over the states after a codec is wrapped over a particular version of data.
* @param consumer the consumer of the states.
*/
public void forEachWrappedStateByVersion(final IntObjConsumer<State> consumer)
public void forEachWrappedStateByVersionDesc(final IntObjConsumer<State> consumer)
{
final Int2ObjectHashMap<State>.EntryIterator iterator = versionWrappedStates.entrySet().iterator();
while (iterator.hasNext())
for (final Map.Entry<Integer, State> entry : versionWrappedStates.descendingMap().entrySet())
{
iterator.next();
consumer.accept(iterator.getIntKey(), iterator.getValue());
consumer.accept(entry.getKey(), entry.getValue());
}
}

Expand Down
Expand Up @@ -3055,22 +3055,37 @@ private static CharSequence generateDecoderWrapListener(final FieldPrecedenceMod

final StringBuilder sb = new StringBuilder();
sb.append(INDENT).append("void onWrapForDecode(std::uint64_t actingVersion)\n")
.append(INDENT).append("{\n")
.append(INDENT).append(INDENT).append("switch(actingVersion)\n")
.append(INDENT).append(INDENT).append("{\n");

fieldPrecedenceModel.forEachWrappedStateByVersion((version, state) ->
sb.append(INDENT).append(TWO_INDENT).append("case ").append(version).append(":\n")
.append(INDENT).append(THREE_INDENT).append("codecState(")
.append(qualifiedStateCase(state)).append(");\n")
.append(INDENT).append(THREE_INDENT).append("break;\n"));

sb.append(INDENT).append(TWO_INDENT).append("default:\n")
.append(INDENT).append(THREE_INDENT).append("codecState(")
.append(qualifiedStateCase(fieldPrecedenceModel.latestVersionWrappedState())).append(");\n")
.append(INDENT).append(THREE_INDENT).append("break;\n")
.append(INDENT).append(INDENT).append("}\n")
.append(INDENT).append("}\n\n");
.append(INDENT).append("{\n");

final MutableBoolean actingVersionCanBeTooLowToBeValid = new MutableBoolean(true);

fieldPrecedenceModel.forEachWrappedStateByVersionDesc((version, state) ->
{
if (version == 0)
{
actingVersionCanBeTooLowToBeValid.set(false);

sb.append(INDENT).append(" codecState(")
.append(qualifiedStateCase(state)).append(");\n");
}
else
{
sb.append(INDENT).append(" if (actingVersion >= ").append(version).append(")\n")
.append(INDENT).append(" {\n")
.append(INDENT).append(" codecState(")
.append(qualifiedStateCase(state)).append(");\n")
.append(INDENT).append(" return;\n")
.append(INDENT).append(" }\n\n");
}
});

if (actingVersionCanBeTooLowToBeValid.get())
{
sb.append(INDENT)
.append(" throw std::runtime_error(\"Unsupported acting version: \" + actingVersion);\n");
}

sb.append(INDENT).append("}\n\n");

return sb;
}
Expand Down
Expand Up @@ -1912,22 +1912,20 @@ private static CharSequence generateDecoderWrapListener(
}

final StringBuilder sb = new StringBuilder();

sb.append(indent).append("private void OnWrapForDecode(int actingVersion)\n")
.append(indent).append("{\n")
.append(indent).append(INDENT).append("switch(actingVersion)\n")
.append(indent).append(INDENT).append("{\n");
.append(indent).append("{\n");

fieldPrecedenceModel.forEachWrappedStateByVersion((version, state) ->
sb.append(indent).append(TWO_INDENT).append("case ").append(version).append(":\n")
.append(indent).append(THREE_INDENT).append("codecState(")
fieldPrecedenceModel.forEachWrappedStateByVersionDesc((version, state) ->
sb.append(indent).append(" if (actingVersion >= ").append(version).append(")\n")
.append(indent).append(" {\n")
.append(indent).append(" codecState(")
.append(qualifiedStateCase(state)).append(");\n")
.append(indent).append(THREE_INDENT).append("break;\n"));
.append(indent).append(" return;\n")
.append(indent).append(" }\n\n"));

sb.append(indent).append(TWO_INDENT).append("default:\n")
.append(indent).append(THREE_INDENT).append("codecState(")
.append(qualifiedStateCase(fieldPrecedenceModel.latestVersionWrappedState())).append(");\n")
.append(indent).append(THREE_INDENT).append("break;\n")
.append(indent).append(INDENT).append("}\n")
sb.append(indent)
.append(" throw new InvalidOperationException(\"Unsupported acting version: \" + actingVersion);\n")
.append(indent).append("}\n\n");

return sb;
Expand Down
Expand Up @@ -762,21 +762,18 @@ private static CharSequence generateDecoderWrapListener(

final StringBuilder sb = new StringBuilder();
sb.append(indent).append("private void onWrap(final int actingVersion)\n")
.append(indent).append("{\n")
.append(indent).append(" switch(actingVersion)\n")
.append(indent).append(" {\n");
.append(indent).append("{\n");

fieldPrecedenceModel.forEachWrappedStateByVersion((version, state) ->
sb.append(indent).append(" case ").append(version).append(":\n")
.append(indent).append(" codecState(")
fieldPrecedenceModel.forEachWrappedStateByVersionDesc((version, state) ->
sb.append(indent).append(" if (actingVersion >= ").append(version).append(")\n")
.append(indent).append(" {\n")
.append(indent).append(" codecState(")
.append(qualifiedStateCase(state)).append(");\n")
.append(indent).append(" break;\n"));
.append(indent).append(" return;\n")
.append(indent).append(" }\n\n"));

sb.append(indent).append(" default:\n")
.append(indent).append(" codecState(")
.append(qualifiedStateCase(fieldPrecedenceModel.latestVersionWrappedState())).append(");\n")
.append(indent).append(" break;\n")
.append(indent).append(" }\n")
sb.append(indent)
.append(" throw new IllegalStateException(\"Unsupported acting version: \" + actingVersion);\n")
.append(indent).append("}\n\n");

return sb;
Expand Down
Expand Up @@ -139,15 +139,13 @@ public int offset()

private void onWrap(final int actingVersion)
{
switch(actingVersion)
if (actingVersion >= 0)
{
case 0:
codecState(CodecStates.V0_BLOCK);
break;
default:
codecState(CodecStates.V0_BLOCK);
break;
codecState(CodecStates.V0_BLOCK);
return;
}

throw new IllegalStateException("Unsupported acting version: " + actingVersion);
}

public FrameCodecDecoder wrap(
Expand Down
Expand Up @@ -186,15 +186,13 @@ public int offset()

private void onWrap(final int actingVersion)
{
switch(actingVersion)
if (actingVersion >= 0)
{
case 0:
codecState(CodecStates.V0_BLOCK);
break;
default:
codecState(CodecStates.V0_BLOCK);
break;
codecState(CodecStates.V0_BLOCK);
return;
}

throw new IllegalStateException("Unsupported acting version: " + actingVersion);
}

public TokenCodecDecoder wrap(
Expand Down
20 changes: 20 additions & 0 deletions sbe-tool/src/test/cpp/FieldAccessOrderCheckTest.cpp
Expand Up @@ -51,6 +51,7 @@
#include "order_check/NoBlock.h"
#include "order_check/GroupWithNoBlock.h"
#include "order_check/NestedGroupWithVarLength.h"
#include "order_check/SkipVersionAddGroupBeforeVarDataV2.h"

using namespace order::check;
using ::testing::HasSubstr;
Expand Down Expand Up @@ -4935,3 +4936,22 @@ INSTANTIATE_TEST_SUITE_P(
std::make_tuple(2, 2, "V0_B_N_D_N_BLOCK")
)
);

TEST_F(FieldAccessOrderCheckTest, allowsSkippingFutureGroupWhenDecodingFromVersionWithNoChangesInMessagePart1)
{
AddGroupBeforeVarDataV0 encoder;
encoder.wrapForEncode(m_buffer, OFFSET, BUFFER_LEN);
encoder.a(42).putB("abc");

SkipVersionAddGroupBeforeVarDataV2 decoder;
decoder.wrapForDecode(
m_buffer,
OFFSET,
AddGroupBeforeVarDataV0::sbeBlockLength(),
1,
BUFFER_LEN
);

EXPECT_EQ(decoder.a(), 42);
EXPECT_EQ(decoder.getBAsString(), "abc");
}
Expand Up @@ -26,6 +26,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.nio.charset.StandardCharsets;
import java.util.Random;
Expand Down Expand Up @@ -3407,6 +3408,116 @@ void disallowsIncompleteMessagesDueToMissingVarDataInNestedGroup(
assertThat(exception.getMessage(), containsString("Not fully encoded, current state: " + expectedState));
}

@Test
void decodesNullValueForFutureVersionBlockFieldWhenCurrentVersionHasNoChangesInMessage()
{
final MessageHeaderEncoder headerEncoder = new MessageHeaderEncoder();
final SkipVersionAddPrimitiveV1Encoder encoder = new SkipVersionAddPrimitiveV1Encoder()
.wrapAndApplyHeader(buffer, OFFSET, headerEncoder);

headerEncoder
.templateId(SkipVersionAddPrimitiveV2Encoder.TEMPLATE_ID)
.version(1);

encoder.b("abc");

final SkipVersionAddPrimitiveV2Decoder decoder = new SkipVersionAddPrimitiveV2Decoder()
.wrapAndApplyHeader(buffer, OFFSET, messageHeaderDecoder);

assertThat(decoder.a(), equalTo(SkipVersionAddPrimitiveV2Decoder.aNullValue()));
assertThat(decoder.b(), equalTo("abc"));
}

@Test
void decodesEmptyFutureGroupWhenDecodingFromVersionWithNoChangesInMessage()
{
final MessageHeaderEncoder headerEncoder = new MessageHeaderEncoder();
final SkipVersionAddGroupV1Encoder encoder = new SkipVersionAddGroupV1Encoder()
.wrapAndApplyHeader(buffer, OFFSET, headerEncoder);

headerEncoder
.templateId(SkipVersionAddGroupV2Decoder.TEMPLATE_ID)
.version(1);

encoder.a(42);

final SkipVersionAddGroupV2Decoder decoder = new SkipVersionAddGroupV2Decoder()
.wrapAndApplyHeader(buffer, OFFSET, messageHeaderDecoder);

assertThat(decoder.a(), equalTo(42));
assertThat(decoder.b().count(), equalTo(0));
}

@Test
void decodesNullValueForFutureVarDataFieldWhenDecodingFromVersionWithNoChangesInMessage()
{
final MessageHeaderEncoder headerEncoder = new MessageHeaderEncoder();
final SkipVersionAddVarDataV1Encoder encoder = new SkipVersionAddVarDataV1Encoder()
.wrapAndApplyHeader(buffer, OFFSET, headerEncoder);

headerEncoder
.templateId(SkipVersionAddVarDataV2Decoder.TEMPLATE_ID)
.version(1);

encoder.a(42);

final SkipVersionAddVarDataV2Decoder decoder = new SkipVersionAddVarDataV2Decoder()
.wrapAndApplyHeader(buffer, OFFSET, messageHeaderDecoder);

assertThat(decoder.a(), equalTo(42));
assertThat(decoder.b(), equalTo(""));
}

@Test
void allowsDecodingUnknownFutureVersions()
{
final MessageHeaderEncoder headerEncoder = new MessageHeaderEncoder();
final SkipVersionAddPrimitiveV2Encoder encoder = new SkipVersionAddPrimitiveV2Encoder()
.wrapAndApplyHeader(buffer, OFFSET, headerEncoder);

headerEncoder
.templateId(SkipVersionAddPrimitiveV2Encoder.TEMPLATE_ID)
.version(42);

encoder.a(43).b("abc");

final SkipVersionAddPrimitiveV2Decoder decoder = new SkipVersionAddPrimitiveV2Decoder()
.wrapAndApplyHeader(buffer, OFFSET, messageHeaderDecoder);

assertThat(decoder.a(), equalTo(43));
assertThat(decoder.b(), equalTo("abc"));
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
void allowsSkippingFutureGroupWhenDecodingFromVersionWithNoChangesInMessage(final boolean decodeGroup)
{
final MessageHeaderEncoder headerEncoder = new MessageHeaderEncoder();
final AddGroupBeforeVarDataV0Encoder encoder = new AddGroupBeforeVarDataV0Encoder()
.wrapAndApplyHeader(buffer, OFFSET, headerEncoder);

headerEncoder
.templateId(SkipVersionAddGroupBeforeVarDataV2Decoder.TEMPLATE_ID)
.version(1);

encoder.a(42).b("abc");

final SkipVersionAddGroupBeforeVarDataV2Decoder decoder = new SkipVersionAddGroupBeforeVarDataV2Decoder()
.wrapAndApplyHeader(buffer, OFFSET, messageHeaderDecoder);

assertThat(decoder.a(), equalTo(42));

if (decodeGroup)
{
for (final SkipVersionAddGroupBeforeVarDataV2Decoder.CDecoder group : decoder.c())
{
group.sbeSkip();
}
}

assertThat(decoder.b(), equalTo("abc"));
}

private void modifyHeaderToLookLikeVersion0()
{
messageHeaderDecoder.wrap(buffer, OFFSET);
Expand All @@ -3418,9 +3529,9 @@ private void modifyHeaderToLookLikeVersion0()
private void modifyHeaderToLookLikeVersion1()
{
messageHeaderDecoder.wrap(buffer, OFFSET);
assert messageHeaderDecoder.version() == 1;
assert messageHeaderDecoder.version() >= 1;
final int v0TemplateId = messageHeaderDecoder.templateId() - 1_000;
messageHeaderEncoder.wrap(buffer, OFFSET);
messageHeaderEncoder.templateId(v0TemplateId);
messageHeaderEncoder.templateId(v0TemplateId).version(1);
}
}

0 comments on commit 7e6e503

Please sign in to comment.