Skip to content

Commit

Permalink
merge: #9889
Browse files Browse the repository at this point in the history
9889: [Backport stable/8.0] Support logging partition id in compact record logger r=korthout a=backport-action

# Description
Backport of #9885 to `stable/8.0`.

relates to #9877

Co-authored-by: Nico Korthout <nico.korthout@camunda.com>
  • Loading branch information
zeebe-bors-camunda[bot] and korthout committed Jul 26, 2022
2 parents f22dcfb + 6c6b13a commit d349d79
Showing 1 changed file with 23 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public class CompactRecordLogger {
private final int keyDigits;
private final int valueTypeChars;
private final int intentChars;
private final boolean singlePartition;
private final boolean multiPartition;
private final Map<Long, String> substitutions = new HashMap<>();
private final ArrayList<Record<?>> records;

Expand Down Expand Up @@ -117,15 +117,8 @@ public class CompactRecordLogger {

public CompactRecordLogger(final Collection<Record<?>> records) {
this.records = new ArrayList<>(records);
multiPartition = isMultiPartition();

singlePartition =
this.records.stream()
.mapToLong(Record::getKey)
.filter(key -> key != -1)
.map(Protocol::decodePartitionId)
.distinct()
.count()
< 2;
final var highestPosition = this.records.get(this.records.size() - 1).getPosition();

int digits = 0;
Expand Down Expand Up @@ -168,11 +161,21 @@ public void log() {
LOG.info(bulkMessage.toString());
}

private boolean isMultiPartition() {
final long numberOfPartitions =
records.stream()
.map(r -> Math.max(Protocol.decodePartitionId(r.getKey()), r.getPartitionId()))
.filter(x -> x != -1)
.distinct()
.count();
return numberOfPartitions > 1;
}

private void addSummarizedRecords(final StringBuilder bulkMessage) {
bulkMessage
.append("--------\n")
.append(
"\t['C'ommand/'E'event/'R'ejection] [valueType] [intent] - #[position]->#[source record position] P[partitionId]K[key] - [summary of value]\n")
"\t[Partition] ['C'ommand/'E'event/'R'ejection] [valueType] [intent] - #[position]->#[source record position] P[partitionId]K[key] - [summary of value]\n")
.append(
"\tP9K999 - key; #999 - record position; \"ID\" element/process id; @\"elementid\"/[P9K999] - element with ID and key\n")
.append(
Expand All @@ -181,10 +184,7 @@ private void addSummarizedRecords(final StringBuilder bulkMessage) {
"\tLong IDs are shortened (e.g. 'startEvent_5d56488e-0570-416c-ba2d-36d2a3acea78' -> 'star..acea78'\n")
.append("--------\n");

records.forEach(
record -> {
bulkMessage.append(summarizeRecord(record)).append("\n");
});
records.forEach(record -> bulkMessage.append(summarizeRecord(record)).append("\n"));
}

private void addDeployedProcesses(final StringBuilder bulkMessage) {
Expand Down Expand Up @@ -217,6 +217,7 @@ private void addDecomposedKeys(final StringBuilder bulkMessage) {
private StringBuilder summarizeRecord(final Record<?> record) {
final StringBuilder message = new StringBuilder();

message.append(summarizePartition(record));
message.append(summarizeIntent(record));
message.append(summarizePositionFields(record));
message.append(summarizeValue(record));
Expand All @@ -229,6 +230,13 @@ private StringBuilder summarizeRecord(final Record<?> record) {
return message;
}

private String summarizePartition(final Record<?> record) {
if (!multiPartition) {
return "";
}
return record.getPartitionId() + " ";
}

private StringBuilder summarizePositionFields(final Record<?> record) {
return new StringBuilder()
.append(formatPosition(record.getPosition()))
Expand Down Expand Up @@ -608,7 +616,7 @@ private String shortenKey(final long input) {
private String formatKey(final long key) {
final var result = new StringBuilder();

if (!singlePartition) {
if (multiPartition) {
if (key > 0) {
result.append("P").append(Protocol.decodePartitionId(key));
} else {
Expand Down

0 comments on commit d349d79

Please sign in to comment.