New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce Avro schema registry [HZ-3010] #25349
Introduce Avro schema registry [HZ-3010] #25349
Conversation
d743031
to
f4abe36
Compare
f4abe36
to
301eec4
Compare
The job Click to expand the log file---------ERRORS----------- -------------------------- [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.497 s <<< FAILURE! -- in com.hazelcast.jet.sql.impl.type.PortableNestedFieldsTest -------------------------- [ERROR] com.hazelcast.jet.sql.impl.type.PortableNestedFieldsTest.test_nestedPortablesAreReturnedAsDeserialized -- Time elapsed: 0.188 s <<< ERROR! -------------------------- [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 2.539 s <<< FAILURE! -- in com.hazelcast.jet.sql.impl.type.CompactNestedFieldsTest -------------------------- [ERROR] com.hazelcast.jet.sql.impl.type.CompactNestedFieldsTest.test_nestedCompactsAreReturnedAsDeserialized -- Time elapsed: 0.581 s <<< ERROR! -------------------------- [ERROR] Errors: -------------------------- [ERROR] CompactNestedFieldsTest.test_nestedCompactsAreReturnedAsDeserialized:101->execute:79 ? HazelcastSql From line 1, column 43 to line 1, column 48: Unknown field 'office' -------------------------- [ERROR] PortableNestedFieldsTest.test_nestedPortablesAreReturnedAsDeserialized:135->execute:112 ? HazelcastSql From line 1, column 43 to line 1, column 48: Unknown field 'office' -------------------------- [ERROR] Tests run: 6006, Failures: 0, Errors: 2, Skipped: 10 -------------------------- [ERROR] -------------------------- |
@@ -111,13 +113,14 @@ | |||
/** | |||
* Base class for Hazelcast tests which provides a big number of convenient test methods. | |||
* <p> | |||
* Has built-in support for {@link TestHazelcastInstanceFactory}, {@link JitterRule} and {@link DumpBuildInfoOnFailureRule}. | |||
* Has built-in support for {@link TestHazelcastInstanceFactory}, {@link JitterRule} and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reformatting the code makes the PR much bigger
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As soon as I touch a file, I refactor it. Otherwise, I am constantly distracted by the thoughts about what can be improved on the existing code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the problem is that we do not enforce consistent formatting settings
...in/java/com/hazelcast/internal/serialization/impl/portable/PortableGenericRecordBuilder.java
Show resolved
Hide resolved
Compact uses much more complicated replication algorithm. IMap is AP data structure and has various caveats (like by default ignoring backup operation failure). I am afraid that this might cause problems eg. in split brain scenarios. Also a general question: AFAIR Jet should not be sending entire Avro |
private static final int AVRO_SCHEMAS_MAX_IDLE_SECONDS = (int) DAYS.toSeconds(7); | ||
|
||
private final Map<Long, Schema> localSchemas = new ConcurrentHashMap<>(); | ||
private final Map<Schema, Long> localSchemaIds = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
local caches do not use Rabin fingerprint but Java hashCode + equals, which also should be unique. This is surprising.
private static final String AVRO_SCHEMAS_MAP_NAME = INTERNAL_JET_OBJECTS_PREFIX + "avro.schemas"; | ||
private static final int AVRO_SCHEMAS_MAX_IDLE_SECONDS = (int) DAYS.toSeconds(7); | ||
|
||
private final Map<Long, Schema> localSchemas = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we had lots of trouble with such local cache of values from IMap in case of data connection. I expect similar problems here. The only thing simpler here is that schemas are immutable.
}, true); | ||
|
||
// Update the expiration time of most recently used schemas periodically. | ||
executorService.scheduleAtFixedRate(() -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMap has expiration policies, this looks like reinventing the wheel, because we cache IMap in local CHM.
|
||
// Update the expiration time of most recently used schemas periodically. | ||
executorService.scheduleAtFixedRate(() -> | ||
schemas.putAll(mostRecentlyUsed.getAndSet(new HashMap<>())), 1, 1, HOURS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
race condition with mostRecentlyUsed.get().put(schemaId, schema);
is possible here.
Is update operation (pulAll
) necessary here? Wouldn't get
be enough?
* specification</a>, such fingerprints are safe to be used as a key in schema caches of up | ||
* to a million entries (for such a cache, the chance of a collision is 3x10<sup>-8</sup>). | ||
* <p> | ||
* Schemas that are not used for one week are dropped and the expiration time of the most |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Schemas that are not used for one week are dropped
There is no good TTL for schemas. Records may be kept for long time in window aggregation processors (eg. if the window is 2 weeks).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also a serialized record can be kept in buffer (Outbox or Inbox) for a long time - usually not desired but theoretically possible. If the schema expires between put
and get
, entire job will fail
public AvroSchemaStorage(HazelcastInstance instance) { | ||
instance.getLifecycleService().addLifecycleListener(event -> { | ||
if (event.getState() == LifecycleEvent.LifecycleState.STARTED) { | ||
instance.getConfig().addMapConfig(new MapConfig(AVRO_SCHEMAS_MAP_NAME) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the IMap be persistent? Take into account jobs with lossless restart, processing guarantees and long windows.
import static org.apache.avro.SchemaNormalization.parsingFingerprint64; | ||
|
||
/** | ||
* Schemas are stored in an IMap, where the key is the 64-bit Rabin fingerprint of the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we build that, we might as well build full-fledged built-in avro schema registry which was also envisioned in the PRDs
return ((LazyImmutableRecord) record).serializedRecord; | ||
} | ||
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { | ||
long schemaId = schemas.put(record.getSchema()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this PR mostly a performance optimisation of Avro GenericRecord serde with the baseline implementation being just sending each record along with schema?
301eec4
to
3d63f50
Compare
f3bef67
to
f9763de
Compare
f9763de
to
e4f286c
Compare
e4f286c
to
8a55eb4
Compare
byte[]: - Cannot be assumed to be the serialized format of a JSON. JsonObject: 1. It has no Getter. 2. It is unlikely to be passed as a conversion source. It is more probable to receive a HazelcastJsonValue that wraps the string representation of a JSON. 3. Obtaining the schema is hard, which defeats the purpose of shortcut. HazelcastJsonValue: 1. It has a Getter. 2. Obtaining the schema requires parsing it, and the parsed form has no Getter. Thus, in the long path, it will be parsed twice.
8a55eb4
to
f2ce43b
Compare
f2ce43b
to
71e36ba
Compare
PR closed by Hazelcast automation as no activity (>3 months). Please reopen with comments, if necessary. Thank you for using Hazelcast and your valuable contributions |
Depends on #25269
Closes HZ-3010
Checklist:
Team:
,Type:
,Source:
,Module:
) and Milestone setAdd to Release Notes
orNot Release Notes content
set@Nonnull/@Nullable
annotations@since
tags in Javadoc