Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid volatile writes from Atomics default values #22373

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -72,7 +72,7 @@
public class KinesisIntegrationTest extends AbstractKinesisTest {

public static LocalStackContainer localStack;
public static final AtomicInteger threadCounter = new AtomicInteger(0);
public static final AtomicInteger threadCounter = new AtomicInteger();

private static AwsConfig AWS_CONFIG;
private static AmazonKinesisAsync KINESIS;
Expand Down
Expand Up @@ -130,8 +130,8 @@ public void distributedObjectDestroyed(DistributedObjectEvent event) {

public static class DummyBean implements IDummyBean {

final AtomicBoolean nullCall = new AtomicBoolean(false);
final AtomicBoolean firstCall = new AtomicBoolean(false);
final AtomicBoolean nullCall = new AtomicBoolean();
final AtomicBoolean firstCall = new AtomicBoolean();

@Override
public String getName(int k) {
Expand Down
Expand Up @@ -63,7 +63,7 @@ public static HazelcastRelOptCluster create(RelOptPlanner planner, RexBuilder re
planner,
rexBuilder.getTypeFactory(),
rexBuilder,
new AtomicInteger(0),
new AtomicInteger(),
new HashMap<>(),
ssc
);
Expand Down
Expand Up @@ -72,7 +72,7 @@ public void before() throws InterruptedException {
}
SqlTestSupport.createMapping(instances[0], MAP_NAME, Integer.class, Integer.class);
map = instances[0].getMap(MAP_NAME);
mutatorException = new AtomicReference<>(null);
mutatorException = new AtomicReference<>();
progress = new AtomicInteger();
}

Expand Down
Expand Up @@ -75,8 +75,8 @@ public abstract class AbstractHazelcastCacheManager implements HazelcastCacheMan
protected final URI uri;
protected final Properties properties;

private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicBoolean isDestroyed = new AtomicBoolean(false);
private final AtomicBoolean isClosed = new AtomicBoolean();
private final AtomicBoolean isDestroyed = new AtomicBoolean();

private final WeakReference<ClassLoader> classLoaderReference;
private final String cacheNamePrefix;
Expand Down
Expand Up @@ -26,9 +26,9 @@
public class CacheContext {

private final AtomicBoolean implicitMerkleTreeEnableLogged = new AtomicBoolean();
private final AtomicLong entryCount = new AtomicLong(0L);
private final AtomicInteger cacheEntryListenerCount = new AtomicInteger(0);
private final AtomicInteger invalidationListenerCount = new AtomicInteger(0);
private final AtomicLong entryCount = new AtomicLong();
private final AtomicInteger cacheEntryListenerCount = new AtomicInteger();
private final AtomicInteger invalidationListenerCount = new AtomicInteger();

public long getEntryCount() {
return entryCount.get();
Expand Down
Expand Up @@ -106,8 +106,8 @@ abstract class CacheProxySupport<K, V>
private final CopyOnWriteArrayList<Future> loadAllTasks = new CopyOnWriteArrayList<>();

private final AtomicReference<HazelcastServerCacheManager> cacheManagerRef = new AtomicReference<>();
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicBoolean isDestroyed = new AtomicBoolean(false);
private final AtomicBoolean isClosed = new AtomicBoolean();
private final AtomicBoolean isDestroyed = new AtomicBoolean();

private final CacheProxySyncListenerCompleter listenerCompleter = new CacheProxySyncListenerCompleter(this);

Expand Down
Expand Up @@ -134,8 +134,8 @@ abstract class ClientCacheProxySupport<K, V> extends ClientProxy implements ICac
private ILogger logger;
private final AtomicReference<HazelcastClientCacheManager> cacheManagerRef = new AtomicReference<>();
private final ConcurrentMap<Future, CompletionListener> loadAllCalls = new ConcurrentHashMap<>();
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicBoolean isDestroyed = new AtomicBoolean(false);
private final AtomicBoolean isClosed = new AtomicBoolean();
private final AtomicBoolean isDestroyed = new AtomicBoolean();

private final AtomicInteger completionIdCounter = new AtomicInteger();

Expand Down
Expand Up @@ -30,7 +30,7 @@ public class ClusterDiscoveryService {
private final int maxTryCount;
private final LifecycleService lifecycleService;
private final List<CandidateClusterContext> candidateClusters;
private final AtomicInteger index = new AtomicInteger(0);
private final AtomicInteger index = new AtomicInteger();

public ClusterDiscoveryService(List<CandidateClusterContext> candidateClusters,
int maxTryCount, LifecycleService lifecycleService) {
Expand Down
Expand Up @@ -56,7 +56,7 @@ public final class LifecycleServiceImpl implements LifecycleService {
private final HazelcastClientInstanceImpl client;
private final ConcurrentMap<UUID, LifecycleListener> lifecycleListeners
= new ConcurrentHashMap<UUID, LifecycleListener>();
private final AtomicBoolean active = new AtomicBoolean(false);
private final AtomicBoolean active = new AtomicBoolean();
private final BuildInfo buildInfo;
private final ExecutorService executor;

Expand Down
Expand Up @@ -48,7 +48,7 @@ public final class ClientConnectionProcessListenerRunner {
public ClientConnectionProcessListenerRunner(HazelcastClientInstanceImpl client) {
this.client = client;
listeners = new CopyOnWriteArrayList<>();
hasListeners = new AtomicBoolean(false);
hasListeners = new AtomicBoolean();
}

/**
Expand Down
Expand Up @@ -46,7 +46,7 @@ public final class ClientPartitionServiceImpl implements ClientPartitionService
private final ILogger logger;
private final AtomicReference<PartitionTable> partitionTable =
new AtomicReference<>(new PartitionTable(null, -1, new Int2ObjectHashMap<>()));
private final AtomicInteger partitionCount = new AtomicInteger(0);
private final AtomicInteger partitionCount = new AtomicInteger();

public ClientPartitionServiceImpl(HazelcastClientInstanceImpl client) {
this.client = client;
Expand Down
Expand Up @@ -53,7 +53,7 @@ public abstract class AbstractProxySessionManager {
private final ConcurrentMap<RaftGroupId, Object> mutexes = new ConcurrentHashMap<>();
private final ConcurrentMap<RaftGroupId, SessionState> sessions = new ConcurrentHashMap<>();
private final ConcurrentMap<BiTuple<RaftGroupId, Long>, Long> threadIds = new ConcurrentHashMap<>();
private final AtomicBoolean scheduleHeartbeat = new AtomicBoolean(false);
private final AtomicBoolean scheduleHeartbeat = new AtomicBoolean();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private boolean running = true;

Expand Down
Expand Up @@ -64,7 +64,7 @@ public final class HazelcastBootstrap {

private static final ILogger LOGGER = Logger.getLogger(HazelcastBootstrap.class);

private static final AtomicBoolean LOGGING_CONFIGURED = new AtomicBoolean(false);
private static final AtomicBoolean LOGGING_CONFIGURED = new AtomicBoolean();

private HazelcastBootstrap() {
}
Expand Down
Expand Up @@ -176,7 +176,7 @@ public class Node {
final ClusterTopologyIntentTracker clusterTopologyIntentTracker;

private final ILogger logger;
private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
private final AtomicBoolean shuttingDown = new AtomicBoolean();
private final NodeShutdownHookThread shutdownHookThread;
private final MemberSchemaService schemaService;
private final InternalSerializationService serializationService;
Expand Down
Expand Up @@ -74,7 +74,7 @@ public abstract class AbstractJoiner
protected final ClusterJoinManager clusterJoinManager;

private final AtomicLong joinStartTime = new AtomicLong(Clock.currentTimeMillis());
private final AtomicInteger tryCount = new AtomicInteger(0);
private final AtomicInteger tryCount = new AtomicInteger();

private final long mergeNextRunDelayMs;
private volatile Address targetAddress;
Expand Down
Expand Up @@ -39,7 +39,7 @@ public class MulticastJoiner extends AbstractJoiner {
private static final int TRY_COUNT_MAX_LAST_DIGITS = 512;
private static final int TRY_COUNT_MODULO = 10;

private final AtomicInteger currentTryCount = new AtomicInteger(0);
private final AtomicInteger currentTryCount = new AtomicInteger();
private final AtomicInteger maxTryCount;

// this deque is used as a stack, the SplitBrainMulticastListener adds to its head and the periodic split brain handler job
Expand Down
Expand Up @@ -25,7 +25,7 @@
final class SplitBrainHandler implements Runnable {

private final Node node;
private final AtomicBoolean inProgress = new AtomicBoolean(false);
private final AtomicBoolean inProgress = new AtomicBoolean();

SplitBrainHandler(Node node) {
this.node = node;
Expand Down
Expand Up @@ -64,7 +64,7 @@ public abstract class ClearExpiredRecordsTask<T, S> implements Runnable {

private final Address thisAddress;
private final OperationServiceImpl operationService;
private final AtomicBoolean singleRunPermit = new AtomicBoolean(false);
private final AtomicBoolean singleRunPermit = new AtomicBoolean();
private final AtomicInteger lostPartitionCounter = new AtomicInteger();
private final AtomicInteger nextExpiryQueueToScanIndex = new AtomicInteger();

Expand Down
Expand Up @@ -52,11 +52,11 @@ public final class ExpirationManager implements LifecycleListener, PartitionLost
private final TaskScheduler globalTaskScheduler;
private final LifecycleService lifecycleService;
private final PartitionService partitionService;
private final AtomicBoolean scheduled = new AtomicBoolean(false);
private final AtomicBoolean scheduled = new AtomicBoolean();
/**
* @see #rescheduleIfScheduledBefore()
*/
private final AtomicBoolean scheduledOneTime = new AtomicBoolean(false);
private final AtomicBoolean scheduledOneTime = new AtomicBoolean();

private volatile ScheduledFuture<?> scheduledExpirationTask;

Expand Down
Expand Up @@ -32,8 +32,8 @@ public class LocalStatsDelegate<T> {
private volatile T localStats;
private final StatsSupplier<T> supplier;
private final long intervalMs;
private final AtomicLong lastUpdated = new AtomicLong(0);
private final AtomicBoolean inProgress = new AtomicBoolean(false);
private final AtomicLong lastUpdated = new AtomicLong();
private final AtomicBoolean inProgress = new AtomicBoolean();

public LocalStatsDelegate(StatsSupplier<T> supplier, long intervalSec) {
this.supplier = supplier;
Expand Down
Expand Up @@ -160,7 +160,7 @@ public List<MCEventDTO> pollMCEvents(UUID mcClientUuid) {

private final AtomicReference<String> tmsJson = new AtomicReference<>();
private final TimedMemberStateFactory tmsFactory;
private final AtomicBoolean tmsFactoryInitialized = new AtomicBoolean(false);
private final AtomicBoolean tmsFactoryInitialized = new AtomicBoolean();
private final ConsoleCommandHandler commandHandler;
private final ClientBwListConfigHandler bwListConfigHandler;
private final MCEventStore eventStore;
Expand Down
Expand Up @@ -75,7 +75,7 @@ public class MetricsRegistryImpl implements MetricsRegistry {

private final DefaultMetricDescriptorSupplier staticDescriptorSupplier = new DefaultMetricDescriptorSupplier();

private final AtomicReference<MetricDescriptorReusableData> metricDescriptorReusableData = new AtomicReference<>(null);
private final AtomicReference<MetricDescriptorReusableData> metricDescriptorReusableData = new AtomicReference<>();

/**
* Creates a MetricsRegistryImpl instance.
Expand Down
Expand Up @@ -224,7 +224,7 @@ private void checkKeyFormat(K key) {

private class ExpirationTask implements Runnable {

private final AtomicBoolean expirationInProgress = new AtomicBoolean(false);
private final AtomicBoolean expirationInProgress = new AtomicBoolean();

@Override
public void run() {
Expand Down
Expand Up @@ -61,7 +61,7 @@ public class BatchInvalidator extends Invalidator {
private final int batchSize;
private final int batchFrequencySeconds;
private final UUID nodeShutdownListenerId;
private final AtomicBoolean runningBackgroundTask = new AtomicBoolean(false);
private final AtomicBoolean runningBackgroundTask = new AtomicBoolean();

public BatchInvalidator(String serviceName, int batchSize, int batchFrequencySeconds,
Predicate<EventRegistration> eventFilter, NodeEngine nodeEngine) {
Expand Down
Expand Up @@ -25,8 +25,8 @@

@SerializableByConvention
public final class InvalidationQueue<T> extends ConcurrentLinkedQueue<T> {
private final AtomicInteger elementCount = new AtomicInteger(0);
private final AtomicBoolean flushingInProgress = new AtomicBoolean(false);
private final AtomicInteger elementCount = new AtomicInteger();
private final AtomicBoolean flushingInProgress = new AtomicBoolean();

@Override
public int size() {
Expand Down
Expand Up @@ -80,7 +80,7 @@ public final class RepairingTask implements Runnable {
private final InvalidationMetaDataFetcher invalidationMetaDataFetcher;
private final SerializationService serializationService;
private final MinimalPartitionService partitionService;
private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicBoolean running = new AtomicBoolean();
private final ConcurrentMap<String, RepairingHandler> handlers = new ConcurrentHashMap<String, RepairingHandler>();
private final ContextMutexFactory contextMutexFactory = new ContextMutexFactory();

Expand Down
Expand Up @@ -141,7 +141,7 @@ public class InternalPartitionServiceImpl implements InternalPartitionService,
private final PartitionEventManager partitionEventManager;

/** Determines if a {@link AssignPartitions} is being sent to the master, used to limit partition assignment requests. */
private final AtomicBoolean masterTriggered = new AtomicBoolean(false);
private final AtomicBoolean masterTriggered = new AtomicBoolean();
private final CoalescingDelayedTrigger masterTrigger;

private final AtomicReference<CountDownLatch> shutdownLatchRef = new AtomicReference<>();
Expand Down
Expand Up @@ -146,7 +146,7 @@ public class MigrationManager {
private final ConcurrentMap<Integer, MigrationInfo> activeMigrations = new ConcurrentHashMap<>();
// both reads and updates will be done under lock!
private final LinkedHashSet<MigrationInfo> completedMigrations = new LinkedHashSet<>();
private final AtomicBoolean promotionPermit = new AtomicBoolean(false);
private final AtomicBoolean promotionPermit = new AtomicBoolean();
private final MigrationStats stats = new MigrationStats();
private volatile MigrationInterceptor migrationInterceptor = new MigrationInterceptor.NopMigrationInterceptor();
private final Lock partitionServiceLock;
Expand Down
Expand Up @@ -52,7 +52,7 @@ class ParallelOperationInvoker {
private final Supplier<Operation> operationSupplier;
private final int maxRetries;
private final long retryDelayMillis;
private final AtomicInteger retryCount = new AtomicInteger(0);
private final AtomicInteger retryCount = new AtomicInteger();
private final InternalCompletableFuture<Collection<UUID>> future = new InternalCompletableFuture<>();
private final Predicate<Member> memberFilter;
private volatile Set<Member> members;
Expand Down
Expand Up @@ -51,7 +51,7 @@ public final class CachedExecutorServiceDelegate implements ExecutorService, Man
private final ExecutorService cachedExecutor;
private final BlockingQueue<Runnable> taskQ;
private final Lock lock = new ReentrantLock();
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final AtomicBoolean shutdown = new AtomicBoolean();
private volatile int size;

public CachedExecutorServiceDelegate(String name, ExecutorService cachedExecutor,
Expand Down
Expand Up @@ -25,7 +25,7 @@
public class PoolExecutorThreadFactory extends AbstractExecutorThreadFactory {

private final String threadNamePrefix;
private final AtomicInteger idGen = new AtomicInteger(0);
private final AtomicInteger idGen = new AtomicInteger();
// to reuse previous thread IDs
private final Queue<Integer> idQ = new LinkedBlockingQueue<Integer>(1000);

Expand Down
Expand Up @@ -34,7 +34,7 @@
*/
public class NodeWideUsedCapacityCounter {
private final long maxPerNodeCapacity;
private final AtomicLong nodeWideUsedCapacityCounter = new AtomicLong(0);
private final AtomicLong nodeWideUsedCapacityCounter = new AtomicLong();

public NodeWideUsedCapacityCounter(HazelcastProperties properties) {
this.maxPerNodeCapacity = properties.getLong(ClusterProperty.MAP_WRITE_BEHIND_QUEUE_CAPACITY);
Expand Down
Expand Up @@ -71,7 +71,7 @@ public class WriteBehindStore extends AbstractMapDataStore<Data, Object> {
/**
* Sequence number of store operations.
*/
private final AtomicLong sequence = new AtomicLong(0);
private final AtomicLong sequence = new AtomicLong();

/**
* @see {@link com.hazelcast.config.MapStoreConfig#setWriteCoalescing(boolean)}
Expand Down
Expand Up @@ -28,7 +28,7 @@ public class DefaultPartitionSequencer implements PartitionSequencer {
private final AtomicLong sequence;

public DefaultPartitionSequencer() {
this.sequence = new AtomicLong(0L);
this.sequence = new AtomicLong();
}

@Override
Expand Down
Expand Up @@ -53,7 +53,7 @@ public abstract class AbstractBaseReplicatedRecordStore<K, V> implements Replica
protected final SerializationService serializationService;
protected final ReplicatedMapService replicatedMapService;
protected final AtomicReference<InternalReplicatedMapStorage<K, V>> storageRef;
protected final AtomicBoolean isLoaded = new AtomicBoolean(false);
protected final AtomicBoolean isLoaded = new AtomicBoolean();

private final EntryTaskScheduler<Object, Object> ttlEvictionScheduler;

Expand Down
Expand Up @@ -83,7 +83,7 @@ public class DistributedScheduledExecutorService
public static final CapacityPermit NOOP_PERMIT = new NoopCapacityPermit();

//Testing only
static final AtomicBoolean FAIL_MIGRATIONS = new AtomicBoolean(false);
static final AtomicBoolean FAIL_MIGRATIONS = new AtomicBoolean();

private static final Object NULL_OBJECT = new Object();

Expand Down
Expand Up @@ -56,9 +56,9 @@ public final class ScheduledFutureProxy<V>

private transient HazelcastInstance instance;

private final transient AtomicBoolean partitionLost = new AtomicBoolean(false);
private final transient AtomicBoolean partitionLost = new AtomicBoolean();

private final transient AtomicBoolean memberLost = new AtomicBoolean(false);
private final transient AtomicBoolean memberLost = new AtomicBoolean();

// Single writer, many readers (see partition & member listener)
private volatile ScheduledTaskHandler handler;
Expand Down
Expand Up @@ -48,7 +48,7 @@ public class ScheduledTaskDescriptor

private TaskDefinition definition;

private final AtomicReference<ScheduledTaskResult> resultRef = new AtomicReference<ScheduledTaskResult>(null);
private final AtomicReference<ScheduledTaskResult> resultRef = new AtomicReference<ScheduledTaskResult>();

private transient volatile ScheduledFuture<?> future;

Expand Down
Expand Up @@ -42,7 +42,7 @@ final class SlowOperationLog {

private static final int SHORT_STACKTRACE_LENGTH = 200;

final AtomicInteger totalInvocations = new AtomicInteger(0);
final AtomicInteger totalInvocations = new AtomicInteger();

final String operation;
final String stackTrace;
Expand Down
Expand Up @@ -74,7 +74,7 @@ public class TopicService implements ManagedService, RemoteService, EventPublish
private final ConstructorFunction<String, LocalTopicStatsImpl> localTopicStatsConstructorFunction =
mapName -> new LocalTopicStatsImpl();
private EventService eventService;
private final AtomicInteger counter = new AtomicInteger(0);
private final AtomicInteger counter = new AtomicInteger();
private Address localAddress;

@Override
Expand Down