Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35049][state] Implement Map Async State API for ForStStateBackend #24812

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

fredia
Copy link
Contributor

@fredia fredia commented May 20, 2024

What is the purpose of the change

This PR implements Map Async State API for ForStStateBackend.

Brief change log

  • Introduce ForStDBIterRequest and ForStIterateOperation for Map#iterator
  • Introduce ForStDBBunchPutRequest for Map#putAll and Map#clear
  • Utilize ForStDBPutRequest for Map#put, and ForStDBGetRequest for Map#value

Verifying this change

This change added tests and can be verified as follows:

  • ForStDBIterateOperationTest
  • Othe map related tests are located in subclasses of ForStDBOperationTestBase.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): ( no)
  • The serializers: (no )
  • The runtime per-record code paths (performance sensitive): (no )
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented May 20, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@Zakelly Zakelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this PR! I'm still reviewing and will put comments incrementally.

@@ -31,13 +32,19 @@
*/
public class ForStStateRequestClassifier implements StateRequestContainer {

private final StateRequestHandler stateRequestHandler;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems unused?

*
* @param <K> The type of key in original state access request.
*/
public class ForStDBBunchPutRequest<K> extends ForStDBPutRequest<ContextKey<K>, Map<?, ?>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this extension lacks practical meaning......

I'd suggest we re-organize the interfaces. ForStWriteBatchOperation manipulating some new introduced ForStSingleWriteOperation, which is wrapped and iterative given by ForStWriteBatchOperation and ForStDBPutRequest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the logic of interacting with the db to the ForStDBPutRequest and ForStDBBunchPutRequest, to avoid the classifications in ForStWriteBatchOperation.

Copy link
Contributor

@jectpro7 jectpro7 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @fredia, thanks for the PR. I left several comments, but I am still beginner of rockdb, please correct me if I am wrong :-)

public ForStDBBunchPutRequest(
ContextKey<K> key, Map value, ForStMapState table, InternalStateFuture<Void> future) {
super(key, value, table, future);
Preconditions.checkArgument(table instanceof ForStMapState);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check seems redundant.

Comment on lines 96 to 101
if (!find) {
byte[] newBytes = new byte[len + 1];
System.arraycopy(bytes, 0, newBytes, 0, len);
newBytes[len] = 1;
return newBytes;
}
Copy link
Contributor

@jectpro7 jectpro7 May 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, if the byteArr is maximum value. The next should like this:

Suggested change
if (!find) {
byte[] newBytes = new byte[len + 1];
System.arraycopy(bytes, 0, newBytes, 0, len);
newBytes[len] = 1;
return newBytes;
}
if (!find) {
byte[] newBytes = new byte[len + 1];
System.arraycopy(bytes, 0, newBytes, 0, len);
newBytes[len] = Byte.MIN_VALUE + 1;
return newBytes;
}

Copy link
Contributor Author

@fredia fredia Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, after reading the code in rocksdb, I think newBytes[len] = Byte.MAX_VALUE is enough.

inline int Slice::compare(const Slice& b) const {
  assert(data_ != nullptr && b.data_ != nullptr);
  const size_t min_len = (size_ < b.size_) ? size_ : b.size_;
  int r = memcmp(data_, b.data_, min_len);
  if (r == 0) {
    if (size_ < b.size_)
      r = -1;
    else if (size_ > b.size_)
      r = +1;
  }
  return r;
}

this.key = key;
this.table = table;
this.future = future;
this.toBoolean = toBoolean;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toBoolean seems not very clear. I would suggest to add ForStDBExistRequest for this kind of operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍,I added ForStDBMapCheckRequest for this kind of operation.

if (request instanceof ForStDBBunchPutRequest) {
ForStDBBunchPutRequest<?> bunchPutRequest =
(ForStDBBunchPutRequest<?>) request;
byte[] primaryKey = bunchPutRequest.buildSerializedKey(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I cannot understand this line, why serialize null here? Why the Bunch remove and Single remove are using different serialization method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For map state, there are two types of key: primary key and user key.
Logically it is organized like this:
<primary key, <user key, user value>

We concatenate primary key and user key together as the key of rocksdb, and physically organized like this:
< <primary key - user key> , user value>

For Bunch remove, we will remove all user keys under one primary key, so we use the prefix to match all keys.
For Single remove, we use the bytes of <primary key-user key> to match one specific key.

Copy link
Contributor

@masteryhx masteryhx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR.
PTAL my comments.

ForStDBBunchPutRequest<?> bunchPutRequest =
(ForStDBBunchPutRequest<?>) request;
byte[] primaryKey = bunchPutRequest.buildSerializedKey(null);
byte[] endKey = ForStDBBunchPutRequest.nextBytes(primaryKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may missed something.
What's the logic of calculating and using deleteRange ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We concatenate primary key and user key together as the key of rocksdb, and physically organized like this:

< <primary key1 - user key1> , user value1> 
< <primary key1 - user key2> , user value2> 
< <primary key1 - user key3> , user value3> 
< <primary key1 - user key4> , user value4> 

Here, we use primary key as a prefix to calculate an interval to leverage deleteRange.

But too much deleteRange may affect the read performance, and we will consider changing it iterator.delete later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that deleteRange may be ambiguous in some cases, such as when the primary key is an array of Byte. MAX_VALUE , so I changed it to iterator and delete one by one.


private ForStDBGetRequest(K key, ForStInnerTable<K, V> table, InternalStateFuture<V> future) {
private final boolean toBoolean;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this have a subclass to make the structure more clear ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍I added ForStDBMapCheckRequest for this kind of operation.

executor.execute(
() -> {
// todo: config read options
try (RocksIterator iter = db.newIterator(request.getColumnFamilyHandle())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic seems a bit complex.
Could you add some descriptions in some key steps or split them into methods ?

}

public boolean valueIsNull() {
return value == null;
}

public boolean valueIsMap() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as GetRequest.
Maybe we could have a more clear structure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the logic of interacting with the db to the ForStDBPutRequest and ForStDBBunchPutRequest, valueIsMap is deleted now.

request.completeStateFuture(stateIterator);
} catch (Exception e) {
LOG.warn("Error when process iterate operation for forStDB", e);
future.completeExceptionally(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember to completeExceptionally for InternalStateFuture.

dbPutRequests.add(forStMapState.buildDBBunchPutRequest(stateRequest));
return;
}
case MAP_IS_EMPTY:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Same as MAP_GET ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants