Skip to content

Commit

Permalink
[Java Client] Fix producer data race to get cnx (#13176)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljmarshall committed Dec 7, 2021
1 parent ab652b8 commit 42469de
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,11 @@ public boolean add(OpSendMsg opSendMsg) {
}

@Override
protected boolean shouldWriteOpSendMsg() {
protected ClientCnx getCnxIfReady() {
if (dropOpSendMessages) {
return false;
return null;
} else {
return super.shouldWriteOpSendMsg();
return super.getCnxIfReady();
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,22 @@ private synchronized void closeAndClearPendingMessages() {

@Override
public boolean isConnected() {
return connectionHandler.cnx() != null && (getState() == State.Ready);
return getCnxIfReady() != null;
}

/**
* Hook method for testing. By returning null, it's possible to prevent messages
* being delivered to the broker.
*
* @return cnx if OpSend messages should be written to open connection. Caller must
* verify that the returned cnx is not null before using reference.
*/
protected ClientCnx getCnxIfReady() {
if (getState() == State.Ready) {
return connectionHandler.cnx();
} else {
return null;
}
}

@Override
Expand Down Expand Up @@ -1830,8 +1845,9 @@ protected void processOpSendMsg(OpSendMsg op) {
LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(this,
last -> Math.max(last, getHighestSequenceId(op)));
}
if (shouldWriteOpSendMsg()) {
ClientCnx cnx = cnx();

final ClientCnx cnx = getCnxIfReady();
if (cnx != null) {
if (op.msg != null && op.msg.getSchemaState() == None) {
tryRegisterSchema(cnx, op.msg, op.callback, this.connectionHandler.getEpoch());
return;
Expand All @@ -1854,16 +1870,6 @@ protected void processOpSendMsg(OpSendMsg op) {
}
}

/**
* Hook method for testing. By returning false, it's possible to prevent messages
* being delivered to the broker.
*
* @return true if OpSend messages should be written to open connection
*/
protected boolean shouldWriteOpSendMsg() {
return isConnected();
}

// Must acquire a lock on ProducerImpl.this before calling method.
private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long expectedEpoch) {
if (expectedEpoch != this.connectionHandler.getEpoch() || cnx() == null) {
Expand Down

0 comments on commit 42469de

Please sign in to comment.