New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make ScheduledEventExecutor task scheduler pluggable #13552
base: main
Are you sure you want to change the base?
Conversation
common/src/main/java/io/netty5/util/internal/ScheduledTaskQueue.java
Outdated
Show resolved
Hide resolved
common/src/main/java/io/netty5/util/concurrent/AbstractScheduledEventExecutor.java
Outdated
Show resolved
Hide resolved
/cc @trustin |
33d0a4c
to
1d2ff7f
Compare
1d2ff7f
to
6fd6257
Compare
Hi @He-Pin , @trustin , can you review this PR? 🙇 |
@@ -37,11 +38,17 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut | |||
private static final RunnableScheduledFutureNode<?>[] | |||
EMPTY_RUNNABLE_SCHEDULED_FUTURE_NODES = new RunnableScheduledFutureNode<?>[0]; | |||
|
|||
private PriorityQueue<RunnableScheduledFutureNode<?>> scheduledTaskQueue; | |||
private ScheduledTaskQueue<RunnableScheduledFutureNode<?>> scheduledTaskQueue; | |||
private Supplier<ScheduledTaskQueue<RunnableScheduledFutureNode<?>>> scheduledTaskQueueSupplier; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about make it a factory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh it's nice! ✅ I introduced TaskSchedulerFactory
.
* the nodes will not be re-inserted into this or any other {@link ScheduledTaskQueue} and it is known that | ||
* the {@link ScheduledTaskQueue} itself will be garbage collected after this call. | ||
*/ | ||
void clearIgnoringIndexes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@injae-kim I would expect a Scheduler
interface which defines methods like scheduleAtFixedRate
., and the ScheduledTaskQueue
may be just an implementation detail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interface TaskScheduler
|- AbstractTaskScheduler
|- DefaultTaskScheduler
interface TaskSchedulerFactory
|- DefaultTaskSchedulerFactory
aha I understood your point. ✅ I introduced TaskScheduler
interface!
2e85f2f
to
7995b22
Compare
Gentle ping @He-Pin , could you review this PR? I need this feature too! thanks a lot for your help 🙇 |
Code Review Agent Run Status
Code Review Overview
>>See detailed code suggestions<< High-level FeedbackThe PR is a well-thought-out enhancement to the Netty scheduling mechanism, introducing flexibility and the potential for performance optimizations with custom TaskScheduler implementations. The use of a factory pattern for creating TaskScheduler instances is a good design choice, allowing for future extensions and customizations. The addition of tests is commendable, ensuring that the new functionality does not break existing behavior. |
private TaskScheduler taskScheduler; | ||
private TaskSchedulerFactory taskSchedulerFactory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: Consider documenting the behavior when both taskScheduler and taskSchedulerFactory are null. It might be beneficial to clarify the expected behavior or fallback mechanism in such cases, ensuring that users of the API have a clear understanding of how task scheduling is handled.
Code Suggestion:
+ /**
+ * Task scheduler instance used for scheduling tasks. If null, the taskSchedulerFactory is used
+ * to create a new instance. If both are null, document the expected behavior or fallback mechanism.
+ */
+ private TaskScheduler taskScheduler;
+ private TaskSchedulerFactory taskSchedulerFactory;
public interface TaskScheduler { | ||
|
||
/** | ||
* Schedule the {@link RunnableScheduledFuture} task for execution. | ||
*/ | ||
<V> Future<V> schedule(RunnableScheduledFuture<V> task); | ||
|
||
/** | ||
* Schedule the given {@link Runnable} task for execution after the given delay. | ||
*/ | ||
Future<Void> schedule(Runnable command, long delay, TimeUnit unit); | ||
|
||
/** | ||
* Schedule the given {@link Callable} task for execution after the given delay. | ||
*/ | ||
<V> Future<V> schedule(Callable<V> callable, long delay, TimeUnit unit); | ||
|
||
/** | ||
* Schedule the given {@link Runnable} task for periodic execution. | ||
* The first execution will occur after the given initial delay, and the following repeated executions will occur | ||
* with the given period of time between each execution is started. | ||
* If the task takes longer to complete than the requested period, then the following executions will be delayed, | ||
* rather than allowing multiple instances of the task to run concurrently. | ||
* <p> | ||
* The task will be executed repeatedly until it either fails with an exception, or its future is | ||
* {@linkplain Future#cancel() cancelled}. The future thus will never complete successfully. | ||
*/ | ||
Future<Void> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); | ||
|
||
/** | ||
* Schedule the given {@link Runnable} task for periodic execution. | ||
* The first execution will occur after the given initial delay, and the following repeated executions will occur | ||
* with the given subsequent delay between one task completing and the next task starting. | ||
* The delay from the completion of one task, to the start of the next, stays unchanged regardless of how long a | ||
* task takes to complete. | ||
* <p> | ||
* This is in contrast to {@link #scheduleAtFixedRate(Runnable, long, long, TimeUnit)} which varies the delays | ||
* between the tasks in order to hit a given frequency. | ||
* <p> | ||
* The task will be executed repeatedly until it either fails with an exception, or its future is | ||
* {@linkplain Future#cancel() cancelled}. The future thus will never complete successfully. | ||
*/ | ||
Future<Void> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); | ||
|
||
RunnableScheduledFuture<?> peekScheduledTask(); | ||
|
||
RunnableScheduledFuture<?> pollScheduledTask(); | ||
|
||
/** | ||
* Return the task which is ready to be executed with the given {@code nanoTime}. | ||
*/ | ||
RunnableScheduledFuture<?> pollScheduledTask(long nanoTime); | ||
|
||
void removeNextScheduledTask(); | ||
|
||
void removeScheduled(RunnableScheduledFuture<?> task); | ||
|
||
/** | ||
* Cancel all scheduled tasks. | ||
*/ | ||
void cancelScheduledTasks(); | ||
|
||
int size(); | ||
|
||
boolean isEmpty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: For methods that schedule tasks (e.g., schedule, scheduleAtFixedRate, scheduleWithFixedDelay), consider specifying the behavior or exceptions when scheduling tasks in a shutdown state. This clarification can help ensure consistent usage and error handling.
Code Suggestion:
+ /**
+ * Schedules the specified task for execution after the specified delay.
+ * @param task the task to schedule
+ * @param delay the time from now to delay execution
+ * @param unit the time unit of the delay parameter
+ * @return a Future representing pending completion of the task
+ * @throws RejectedExecutionException if the task cannot be scheduled for execution
+ * @throws IllegalStateException if the scheduler is in a shutdown state
+ */
+ Future<Void> schedule(Runnable command, long delay, TimeUnit unit);
@Test | ||
public void testTaskScheduler_scheduleRunnableZero() { | ||
TestScheduledEventExecutor executor = new TestScheduledEventExecutor( | ||
excutor -> new TestTaskScheduler(excutor)); | ||
Future<?> future = executor.schedule(TEST_RUNNABLE, 0, TimeUnit.NANOSECONDS); | ||
assertEquals(0, getDelay(future)); | ||
assertNotNull(executor.pollScheduledTask()); | ||
assertNull(executor.pollScheduledTask()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: Enhance the test coverage by adding cases that verify the behavior when a TaskScheduler fails to schedule a task, e.g., throwing an exception. This will ensure the robustness of the scheduling mechanism in error scenarios.
Code Suggestion:
+ @Test
+ public void testTaskScheduler_scheduleFailure() {
+ TestScheduledEventExecutor executor = new TestScheduledEventExecutor(
+ executor -> new FailingTaskScheduler(executor));
+ assertThrows(SchedulingException.class, () -> executor.schedule(TEST_RUNNABLE, 0, TimeUnit.NANOSECONDS));
+ }
private TaskScheduler taskScheduler; | ||
private TaskSchedulerFactory taskSchedulerFactory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scalability Issue: The introduction of TaskScheduler and TaskSchedulerFactory as fields increases the memory footprint of each AbstractScheduledEventExecutor instance. In a highly scalable system, where potentially millions of these executors could be instantiated, this additional memory usage could become significant. Moreover, the dynamic selection of TaskScheduler based on a factory pattern, while providing flexibility, introduces additional overhead during executor instantiation, which could impact the startup time of systems with a large number of executors.
Fix: Consider using a static, shared TaskScheduler instance for common scheduling scenarios, reducing the memory footprint per executor instance. This shared instance could be a highly efficient, multi-tenant scheduler designed to handle tasks from multiple executors concurrently. For use cases requiring specialized scheduling, continue to allow the injection of a custom TaskScheduler, but ensure this is an exceptional scenario rather than the norm.
Code Suggestion:
- private TaskScheduler taskScheduler;
- private TaskSchedulerFactory taskSchedulerFactory;
+ private static final TaskScheduler sharedTaskScheduler = DefaultTaskSchedulerFactory.INSTANCE.newTaskScheduler(this);
protected AbstractScheduledEventExecutor(TaskSchedulerFactory taskSchedulerFactory) {
- this.taskSchedulerFactory = requireNonNull(taskSchedulerFactory, "taskSchedulerFactory");
+ // Use sharedTaskScheduler for common cases
}
+ TaskScheduler taskScheduler() {
+ return sharedTaskScheduler;
+ }
private DefaultTaskSchedulerFactory() { } | ||
|
||
@Override | ||
public TaskScheduler newTaskScheduler(AbstractScheduledEventExecutor executor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scalability Issue: The DefaultTaskSchedulerFactory creates a new instance of DefaultTaskScheduler for each call to newTaskScheduler. This could lead to unnecessary object creation if many executors share the same scheduling logic, impacting the scalability of the system.
Fix: Implement a caching mechanism within DefaultTaskSchedulerFactory to reuse DefaultTaskScheduler instances where possible. This could be based on identifying common configurations or parameters that lead to equivalent scheduling behavior.
Code Suggestion:
Implement a caching mechanism within DefaultTaskSchedulerFactory to reuse DefaultTaskScheduler instances where possible. This could be based on identifying common configurations or parameters that lead to equivalent scheduling behavior.
protected AbstractScheduledEventExecutor(TaskSchedulerFactory taskSchedulerFactory) {
this.taskSchedulerFactory = requireNonNull(taskSchedulerFactory, "taskSchedulerFactory");
}
TaskScheduler taskScheduler() {
if (taskScheduler == null && taskSchedulerFactory != null) {
taskScheduler = taskSchedulerFactory.newTaskScheduler(this);
}
if (taskScheduler == null) {
taskScheduler = DefaultTaskSchedulerFactory.ISTANCE.newTaskScheduler(this);
}
return taskScheduler;
}
@@ -91,7 +90,7 @@ this, newPromise(), Executors.callable(() -> { | |||
deadlineNanos(ticker().nanoTime(), SCHEDULE_QUIET_PERIOD_INTERVAL), | |||
-SCHEDULE_QUIET_PERIOD_INTERVAL); | |||
|
|||
scheduledTaskQueue().add(quietPeriodTask); | |||
taskScheduler().schedule(quietPeriodTask); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scalability Issue: The modification to use taskScheduler().schedule directly for quietPeriodTask in GlobalEventExecutor could introduce a delay in the execution of this task due to the overhead of task scheduling. In a highly scalable system, even minor delays introduced in core system tasks can amplify, impacting overall system responsiveness and throughput.
Fix: Review the scheduling requirements for quietPeriodTask to determine if it can be executed immediately or if a more efficient scheduling strategy can be applied. For tasks critical to system performance, consider bypassing the task scheduler and executing directly or using a more lightweight scheduling mechanism.
Code Suggestion:
protected AbstractScheduledEventExecutor(TaskSchedulerFactory taskSchedulerFactory) {
this.taskSchedulerFactory = requireNonNull(taskSchedulerFactory, "taskSchedulerFactory");
}
protected TaskScheduler taskScheduler() {
if (taskScheduler == null && taskSchedulerFactory != null) {
taskScheduler = taskSchedulerFactory.newTaskScheduler(this);
}
if (taskScheduler == null) {
taskScheduler = DefaultTaskSchedulerFactory.INSTANCE.newTaskScheduler(this);
}
return taskScheduler;
}
private static boolean isNullOrEmpty(TaskScheduler taskScheduler) {
return taskScheduler == null || taskScheduler.isEmpty();
}
protected final void cancelScheduledTasks() {
assert inEventLoop();
TaskScheduler taskScheduler = this.taskScheduler;
if (isNullOrEmpty(taskScheduler)) {
return;
}
taskScheduler.cancelScheduledTasks();
}
public Future<Void> schedule(Runnable command, long delay, TimeUnit unit) { | ||
requireNonNull(command, "command"); | ||
requireNonNull(unit, "unit"); | ||
if (delay < 0) { | ||
delay = 0; | ||
} | ||
RunnableScheduledFuture<Void> task = newScheduledTaskFor( | ||
callable(command, null), deadlineNanos(ticker().nanoTime(), unit.toNanos(delay)), 0); | ||
return schedule(task); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scalability Issue: The schedule method in AbstractTaskScheduler directly creates a new instance of RunnableScheduledFuture for each scheduled task. This could lead to a high rate of object creation and garbage collection in systems with a high volume of task scheduling operations, impacting scalability and performance.
Fix: Utilize a pool of RunnableScheduledFuture objects instead of instantiating new ones for each task. This object pooling pattern can significantly reduce the rate of object creation and garbage collection, improving the system's scalability and performance.
Code Suggestion:
+ private final ObjectPool<RunnableScheduledFuture<?>> futurePool;
+ public AbstractTaskScheduler(ObjectPool<RunnableScheduledFuture<?>> futurePool) {
+ this.futurePool = futurePool;
+ }
+ @Override
+ public Future<Void> schedule(Runnable command, long delay, TimeUnit unit) {
+ RunnableScheduledFuture<Void> task = futurePool.get();
+ task.reset(command, delay, unit);
+ return schedule(task);
+ }
private static final Comparator<RunnableScheduledFutureNode<?>> SCHEDULED_FUTURE_TASK_COMPARATOR = | ||
Comparable::compareTo; | ||
private static final RunnableScheduledFutureNode<?>[] | ||
EMPTY_RUNNABLE_SCHEDULED_FUTURE_NODES = new RunnableScheduledFutureNode<?>[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scalability Issue: The DefaultTaskScheduler uses a PriorityQueue to manage scheduled tasks, which can become inefficient for a very high number of concurrent tasks due to its O(log n) complexity for insertion and removal operations. This could become a scalability bottleneck in systems with a high volume of scheduled tasks.
Fix: Implement or utilize a more efficient data structure for managing scheduled tasks, such as a time-wheel or a hierarchical timing wheel, which can offer better performance characteristics (O(1) complexity for insertion and removal) for the scheduling use case.
Code Suggestion:
private TaskScheduler taskScheduler;
protected AbstractScheduledEventExecutor() {
}
protected AbstractScheduledEventExecutor(TaskSchedulerFactory taskSchedulerFactory) {
this.taskSchedulerFactory = requireNonNull(taskSchedulerFactory, "taskSchedulerFactory");
this.taskScheduler = taskSchedulerFactory.newTaskScheduler(this);
}
TaskScheduler taskScheduler() {
return this.taskScheduler;
}
private static final Comparator<RunnableScheduledFutureNode<?>> SCHEDULED_FUTURE_TASK_COMPARATOR = | ||
Comparable::compareTo; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Performance Issue: The use of Comparable::compareTo in the DefaultTaskScheduler may not be the most efficient way to compare RunnableScheduledFutureNode instances. This approach relies on the natural ordering of the elements, which might not be optimized for the specific use case of task scheduling, potentially leading to performance degradation when managing large numbers of scheduled tasks.
Fix: Implement a custom Comparator for RunnableScheduledFutureNode instances that is specifically optimized for the task scheduling use case. This custom Comparator should prioritize the most critical factors for task ordering (e.g., execution time) to ensure efficient task scheduling and execution.
Code Suggestion:
private static final Comparator<RunnableScheduledFuture<?>> TASK_SCHEDULER_COMPARATOR = new Comparator<RunnableScheduledFuture<?>>() {
@Override
public int compare(RunnableScheduledFuture<?> o1, RunnableScheduledFuture<?> o2) {
return Long.compare(o1.getDelay(TimeUnit.NANOSECONDS), o2.getDelay(TimeUnit.NANOSECONDS));
}
};
private PriorityQueue<RunnableScheduledFuture<?>> scheduledTaskQueue = new PriorityQueue<>(TASK_SCHEDULER_COMPARATOR);
@Override | ||
protected <V> RunnableScheduledFuture<V> newScheduledTaskFor(Callable<V> callable, long deadlineNanos, | ||
long period) { | ||
return new RunnableScheduledFutureAdapter<>(executor, executor.newPromise(), callable, deadlineNanos, period); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Performance Issue: The RunnableScheduledFutureAdapter instantiation within DefaultTaskScheduler's newScheduledTaskFor method doesn't consider the reuse of instances, which might lead to unnecessary object creation and increased garbage collection overhead.
Fix: Implement a mechanism to reuse RunnableScheduledFutureAdapter instances, such as an object pool. This approach will reduce the number of created objects and mitigate the garbage collection impact, especially under high load.
Code Suggestion:
private final ObjectPool<RunnableScheduledFutureAdapter<?>> runnableScheduledFutureAdapterPool = new DefaultObjectPool<>(new ObjectPool.ObjectCreator<RunnableScheduledFutureAdapter<?>>() {
@Override
public RunnableScheduledFutureAdapter<?> create() {
return new RunnableScheduledFutureAdapter<>(DefaultTaskScheduler.this, newPromise(), callable, deadlineNanos, periodNanos);
}
});
@Override
protected <V> RunnableScheduledFuture<V> newScheduledTaskFor(Callable<V> callable, long deadlineNanos, long periodNanos) {
return (RunnableScheduledFuture<V>) runnableScheduledFutureAdapterPool.get().init(callable, deadlineNanos, periodNanos);
}
Motivation:
PRs Overview
TaskScheduler
interfaceTaskScheduler
on ScheduledEventExecutorHashedWheelTimer
task scheduler and make it pluggable to ScheduledEventExecutor for user who want to use itModification:
TaskScheduler
interfaceResult: