Skip to content

Commit

Permalink
fix pulsar-common module check-style (#13729)
Browse files Browse the repository at this point in the history
  • Loading branch information
nlu90 committed Jan 13, 2022
1 parent adcbe0f commit 4dcb166
Show file tree
Hide file tree
Showing 65 changed files with 269 additions and 188 deletions.
13 changes: 13 additions & 0 deletions pulsar-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,19 @@
<excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<executions>
<execution>
<id>checkstyle</id>
<phase>verify</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static AuthPolicies.Builder builder() {
public static class AuthPoliciesImplBuilder implements AuthPolicies.Builder {
private Map<String, Set<AuthAction>> namespaceAuthentication = new TreeMap<>();
private Map<String, Map<String, Set<AuthAction>>> topicAuthentication = new TreeMap<>();;
private Map<String, Set<String>> subscriptionAuthentication= new TreeMap<>();;
private Map<String, Set<String>> subscriptionAuthentication = new TreeMap<>();;

AuthPoliciesImplBuilder() {
}
Expand Down Expand Up @@ -83,4 +83,4 @@ public String toString() {
+ this.subscriptionAuthentication + ")";
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl.schema;

import static java.nio.charset.StandardCharsets.UTF_8;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonDeserializationContext;
Expand All @@ -32,7 +33,6 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;

import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
Expand All @@ -43,17 +43,13 @@
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.ObjectMapperFactory;

import static java.nio.charset.StandardCharsets.UTF_8;


/**
* Utils for schemas.
*/
Expand Down Expand Up @@ -336,31 +332,35 @@ public static String jsonifyKeyValueSchemaInfo(KeyValue<SchemaInfo, SchemaInfo>
}

/**
* convert the key/value schema info data to string
* Convert the key/value schema info data to string.
*
* @param kvSchemaInfo the key/value schema info
* @return the convert schema info data string
*/
public static String convertKeyValueSchemaInfoDataToString(KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo) throws IOException {
public static String convertKeyValueSchemaInfoDataToString(
KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo) throws IOException {
ObjectMapper objectMapper = ObjectMapperFactory.create();
KeyValue<Object, Object> keyValue = new KeyValue<>(SchemaType.isPrimitiveType(kvSchemaInfo.getKey().getType()) ? ""
: objectMapper.readTree(kvSchemaInfo.getKey().getSchema()), SchemaType.isPrimitiveType(kvSchemaInfo.getValue().getType()) ?
"" : objectMapper.readTree(kvSchemaInfo.getValue().getSchema()));
KeyValue<Object, Object> keyValue = new KeyValue<>(
SchemaType.isPrimitiveType(kvSchemaInfo.getKey().getType()) ? ""
: objectMapper.readTree(kvSchemaInfo.getKey().getSchema()),
SchemaType.isPrimitiveType(kvSchemaInfo.getValue().getType()) ? ""
: objectMapper.readTree(kvSchemaInfo.getValue().getSchema()));
return objectMapper.writeValueAsString(keyValue);
}

private static byte[] getKeyOrValueSchemaBytes(JsonElement jsonElement) {
return KEY_VALUE_SCHEMA_NULL_STRING.equals(jsonElement.toString()) ?
KEY_VALUE_SCHEMA_IS_PRIMITIVE : jsonElement.toString().getBytes(UTF_8);
return KEY_VALUE_SCHEMA_NULL_STRING.equals(jsonElement.toString())
? KEY_VALUE_SCHEMA_IS_PRIMITIVE : jsonElement.toString().getBytes(UTF_8);
}

/**
* convert the key/value schema info data json bytes to key/value schema info data bytes
* Convert the key/value schema info data json bytes to key/value schema info data bytes.
*
* @param keyValueSchemaInfoDataJsonBytes the key/value schema info data json bytes
* @return the key/value schema info data bytes
*/
public static byte[] convertKeyValueDataStringToSchemaInfoSchema(byte[] keyValueSchemaInfoDataJsonBytes) throws IOException {
public static byte[] convertKeyValueDataStringToSchemaInfoSchema(
byte[] keyValueSchemaInfoDataJsonBytes) throws IOException {
JsonObject jsonObject = (JsonObject) toJsonElement(new String(keyValueSchemaInfoDataJsonBytes, UTF_8));
byte[] keyBytes = getKeyOrValueSchemaBytes(jsonObject.get("key"));
byte[] valueBytes = getKeyOrValueSchemaBytes(jsonObject.get("value"));
Expand All @@ -374,7 +374,7 @@ public static byte[] convertKeyValueDataStringToSchemaInfoSchema(byte[] keyValue
}

/**
* Serialize schema properties
* Serialize schema properties.
*
* @param properties schema properties
* @return the serialized schema properties
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public static void registerOOMListener(Consumer<OutOfMemoryError> listener) {
static {
boolean isPooled = "true".equalsIgnoreCase(System.getProperty(PULSAR_ALLOCATOR_POOLED, "true"));
EXIT_ON_OOM = "true".equalsIgnoreCase(System.getProperty(PULSAR_ALLOCATOR_EXIT_ON_OOM, "false"));
OutOfMemoryPolicy outOfMemoryPolicy = OutOfMemoryPolicy.valueOf(System.getProperty(PULSAR_ALLOCATOR_OUT_OF_MEMORY_POLICY, "FallbackToHeap"));
OutOfMemoryPolicy outOfMemoryPolicy = OutOfMemoryPolicy.valueOf(
System.getProperty(PULSAR_ALLOCATOR_OUT_OF_MEMORY_POLICY, "FallbackToHeap"));

LeakDetectionPolicy leakDetectionPolicy = LeakDetectionPolicy
.valueOf(System.getProperty(PULSAR_ALLOCATOR_LEAK_DETECTION, "Disabled"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;

import java.io.IOException;

import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;

import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;

Expand Down Expand Up @@ -94,7 +93,7 @@ public Map<String, String> getProperties() {
if (singleMessageMetadata != null && singleMessageMetadata.getPropertiesCount() > 0) {
return singleMessageMetadata.getPropertiesList().stream()
.collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue,
(oldValue,newValue) -> newValue));
(oldValue, newValue) -> newValue));
} else if (msgMetadata.getMetadata().getPropertiesCount() > 0) {
return msgMetadata.getMetadata().getPropertiesList().stream()
.collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class PulsarEvent {
private ActionType actionType;
private TopicPoliciesEvent topicPoliciesEvent;
/**
* Which remote clusters to replicate to
* Which remote clusters to replicate to.
*/
private HashSet<String> replicateTo;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.common.intercept;

import java.util.concurrent.atomic.AtomicLong;

import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;

public class AppendIndexMetadataInterceptor implements BrokerEntryMetadataInterceptor{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@

import java.lang.reflect.InvocationTargetException;
import java.util.HashSet;
import java.util.Set;
import java.util.LinkedHashSet;
import java.util.Set;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* A tool class for loading BrokerEntryMetadataInterceptor classes.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
public interface ManagedLedgerPayloadProcessor {
interface Processor {
/**
* Process the input payload and return a new payload
* Process the input payload and return a new payload.
* NOTE: If this processor returns a different ByteBuf instance than the passed one
* DO THE FOLLOWING to avoid memory leaks
* 1. Call inputPayload.release() to release a reference
Expand All @@ -45,15 +45,15 @@ interface Processor {
void release(ByteBuf processedPayload);
}
/**
* Used by ManagedLedger for pre-processing payload before storing in bookkeeper ledger
* Used by ManagedLedger for pre-processing payload before storing in bookkeeper ledger.
* @return Handle to Processor instance
*/
default Processor inputProcessor() {
return null;
}

/**
* Used by ManagedLedger for processing payload after reading from bookkeeper ledger
* Used by ManagedLedger for processing payload after reading from bookkeeper ledger.
* @return Handle to Processor instance
*/
default Processor outputProcessor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.UncheckedExecutionException;

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
*/
package org.apache.pulsar.common.nar;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileFilter;
Expand All @@ -45,6 +42,8 @@
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import com.google.common.base.CharMatcher;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,24 @@ public ClusterDataImplBuilder listenerName(String listenerName) {
}

public ClusterDataImpl build() {
return new ClusterDataImpl(serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls, proxyServiceUrl, authenticationPlugin, authenticationParameters, proxyProtocol, peerClusterNames, brokerClientTlsEnabled, tlsAllowInsecureConnection, brokerClientTlsEnabledWithKeyStore, brokerClientTlsTrustStoreType, brokerClientTlsTrustStore, brokerClientTlsTrustStorePassword, brokerClientTrustCertsFilePath, listenerName);
return new ClusterDataImpl(
serviceUrl,
serviceUrlTls,
brokerServiceUrl,
brokerServiceUrlTls,
proxyServiceUrl,
authenticationPlugin,
authenticationParameters,
proxyProtocol,
peerClusterNames,
brokerClientTlsEnabled,
tlsAllowInsecureConnection,
brokerClientTlsEnabledWithKeyStore,
brokerClientTlsTrustStoreType,
brokerClientTlsTrustStore,
brokerClientTlsTrustStorePassword,
brokerClientTrustCertsFilePath,
listenerName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
package org.apache.pulsar.common.policies.data;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.pulsar.common.util.ObjectMapperFactory;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import org.apache.pulsar.common.util.ObjectMapperFactory;

public class EnsemblePlacementPolicyConfig {
public static final String ENSEMBLE_PLACEMENT_POLICY_CONFIG = "EnsemblePlacementPolicyConfig";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
@JsonInclude(JsonInclude.Include.ALWAYS)
@JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", "systemExceptionsTotal",
"userExceptionsTotal", "avgProcessLatency", "1min", "lastInvocation", "userMetrics" })
public class FunctionInstanceStatsDataImpl extends FunctionInstanceStatsDataBaseImpl implements FunctionInstanceStatsData {
public class FunctionInstanceStatsDataImpl
extends FunctionInstanceStatsDataBaseImpl
implements FunctionInstanceStatsData {
@JsonProperty("1min")
public FunctionInstanceStatsDataBaseImpl oneMin = new FunctionInstanceStatsDataBaseImpl();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;

import lombok.Data;
import org.apache.pulsar.common.util.ObjectMapperFactory;

Expand Down Expand Up @@ -98,18 +97,18 @@ public FunctionStatsImpl calculateOverall() {
}

oneMin.setReceivedTotal(oneMin.getReceivedTotal() + functionInstanceStatsData.oneMin.getReceivedTotal());
oneMin.setProcessedSuccessfullyTotal(oneMin.getProcessedSuccessfullyTotal() +
functionInstanceStatsData.oneMin.getProcessedSuccessfullyTotal());
oneMin.setSystemExceptionsTotal(oneMin.getSystemExceptionsTotal() +
functionInstanceStatsData.oneMin.getSystemExceptionsTotal());
oneMin.setUserExceptionsTotal(oneMin.getUserExceptionsTotal() +
functionInstanceStatsData.oneMin.getUserExceptionsTotal());
oneMin.setProcessedSuccessfullyTotal(oneMin.getProcessedSuccessfullyTotal()
+ functionInstanceStatsData.oneMin.getProcessedSuccessfullyTotal());
oneMin.setSystemExceptionsTotal(oneMin.getSystemExceptionsTotal()
+ functionInstanceStatsData.oneMin.getSystemExceptionsTotal());
oneMin.setUserExceptionsTotal(oneMin.getUserExceptionsTotal()
+ functionInstanceStatsData.oneMin.getUserExceptionsTotal());
if (functionInstanceStatsData.oneMin.getAvgProcessLatency() != null) {
if (oneMin.getAvgProcessLatency() == null) {
oneMin.setAvgProcessLatency(0.0);
}
oneMin.setAvgProcessLatency(oneMin.getAvgProcessLatency() +
functionInstanceStatsData.oneMin.getAvgProcessLatency());
oneMin.setAvgProcessLatency(oneMin.getAvgProcessLatency()
+ functionInstanceStatsData.oneMin.getAvgProcessLatency());
nonNullInstancesOneMin++;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
*/
package org.apache.pulsar.common.policies.data;

import org.apache.pulsar.common.util.ObjectMapperFactory;
import java.io.IOException;
import org.apache.pulsar.common.util.ObjectMapperFactory;

public class FunctionStatusUtil {
public static FunctionStatus decode(String json) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;

/**
* Topic policy hierarchy value container
* Topic policy hierarchy value container.
*/
@Getter
public class HierarchyTopicPolicies {
Expand Down

0 comments on commit 4dcb166

Please sign in to comment.