Skip to content

Commit

Permalink
Feature - labels support for RTimeSeries object. #4553
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Koksharov committed Oct 18, 2022
1 parent a483625 commit 47f6a39
Show file tree
Hide file tree
Showing 20 changed files with 555 additions and 119 deletions.
4 changes: 2 additions & 2 deletions redisson/src/main/java/org/redisson/Redisson.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ public RedissonReactiveClient reactive() {
}

@Override
public <V> RTimeSeries<V> getTimeSeries(String name) {
public <V, L> RTimeSeries<V, L> getTimeSeries(String name) {
return new RedissonTimeSeries<>(evictionScheduler, commandExecutor, name);
}

@Override
public <V> RTimeSeries<V> getTimeSeries(String name, Codec codec) {
public <V, L> RTimeSeries<V, L> getTimeSeries(String name, Codec codec) {
return new RedissonTimeSeries<>(codec, evictionScheduler, commandExecutor, name);
}

Expand Down
12 changes: 6 additions & 6 deletions redisson/src/main/java/org/redisson/RedissonReactive.java
Original file line number Diff line number Diff line change
Expand Up @@ -460,17 +460,17 @@ public <V> RDequeReactive<V> getDeque(String name, Codec codec) {
}

@Override
public <V> RTimeSeriesReactive<V> getTimeSeries(String name) {
RTimeSeries<V> timeSeries = new RedissonTimeSeries<V>(evictionScheduler, commandExecutor, name);
public <V, L> RTimeSeriesReactive<V, L> getTimeSeries(String name) {
RTimeSeries<V, L> timeSeries = new RedissonTimeSeries<V, L>(evictionScheduler, commandExecutor, name);
return ReactiveProxyBuilder.create(commandExecutor, timeSeries,
new RedissonTimeSeriesReactive<V>(timeSeries, this), RTimeSeriesReactive.class);
new RedissonTimeSeriesReactive<V, L>(timeSeries, this), RTimeSeriesReactive.class);
}

@Override
public <V> RTimeSeriesReactive<V> getTimeSeries(String name, Codec codec) {
RTimeSeries<V> timeSeries = new RedissonTimeSeries<V>(codec, evictionScheduler, commandExecutor, name);
public <V, L> RTimeSeriesReactive<V, L> getTimeSeries(String name, Codec codec) {
RTimeSeries<V, L> timeSeries = new RedissonTimeSeries<V, L>(codec, evictionScheduler, commandExecutor, name);
return ReactiveProxyBuilder.create(commandExecutor, timeSeries,
new RedissonTimeSeriesReactive<V>(timeSeries, this), RTimeSeriesReactive.class);
new RedissonTimeSeriesReactive<V, L>(timeSeries, this), RTimeSeriesReactive.class);
}

@Override
Expand Down
12 changes: 6 additions & 6 deletions redisson/src/main/java/org/redisson/RedissonRx.java
Original file line number Diff line number Diff line change
Expand Up @@ -439,17 +439,17 @@ public <V> RDequeRx<V> getDeque(String name, Codec codec) {
}

@Override
public <V> RTimeSeriesRx<V> getTimeSeries(String name) {
RTimeSeries<V> timeSeries = new RedissonTimeSeries<V>(evictionScheduler, commandExecutor, name);
public <V, L> RTimeSeriesRx<V, L> getTimeSeries(String name) {
RTimeSeries<V, L> timeSeries = new RedissonTimeSeries<V, L>(evictionScheduler, commandExecutor, name);
return RxProxyBuilder.create(commandExecutor, timeSeries,
new RedissonTimeSeriesRx<V>(timeSeries, this), RTimeSeriesRx.class);
new RedissonTimeSeriesRx<V, L>(timeSeries, this), RTimeSeriesRx.class);
}

@Override
public <V> RTimeSeriesRx<V> getTimeSeries(String name, Codec codec) {
RTimeSeries<V> timeSeries = new RedissonTimeSeries<V>(codec, evictionScheduler, commandExecutor, name);
public <V, L> RTimeSeriesRx<V, L> getTimeSeries(String name, Codec codec) {
RTimeSeries<V, L> timeSeries = new RedissonTimeSeries<V, L>(codec, evictionScheduler, commandExecutor, name);
return RxProxyBuilder.create(commandExecutor, timeSeries,
new RedissonTimeSeriesRx<V>(timeSeries, this), RTimeSeriesRx.class);
new RedissonTimeSeriesRx<V, L>(timeSeries, this), RTimeSeriesRx.class);
}

@Override
Expand Down
177 changes: 154 additions & 23 deletions redisson/src/main/java/org/redisson/RedissonTimeSeries.java

Large diffs are not rendered by default.

62 changes: 55 additions & 7 deletions redisson/src/main/java/org/redisson/api/RTimeSeries.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.redisson.api;

import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
Expand All @@ -26,18 +27,30 @@
*
* @author Nikita Koksharov
*
* @param <V> value type
* @param <L> label type
*/
public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsync<V>, RDestroyable {
public interface RTimeSeries<V, L> extends RExpirable, Iterable<V>, RTimeSeriesAsync<V, L>, RDestroyable {

/**
* Adds element to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp - object timestamp
* @param object - object itself
* @param timestamp object timestamp
* @param object object itself
*/
void add(long timestamp, V object);

/**
* Adds element with <code>label</code> to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp object timestamp
* @param object object itself
* @param label object label
*/
void add(long timestamp, V object, L label);

/**
* Adds all elements contained in the specified map to this time-series collection.
* Map contains of timestamp mapped by object.
Expand All @@ -46,6 +59,13 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
*/
void addAll(Map<Long, V> objects);

/**
* Adds all entries collection to this time-series collection.
*
* @param entries collection of time series entries
*/
void addAll(Collection<TimeSeriesEntry<V, L>> entries);

/**
* Adds element to this time-series collection
* by specified <code>timestamp</code>.
Expand All @@ -57,6 +77,17 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
*/
void add(long timestamp, V object, long timeToLive, TimeUnit timeUnit);

/**
* Adds element with <code>label</code> to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp object timestamp
* @param object object itself
* @param label object label
* @param timeToLive time to live interval
*/
void add(long timestamp, V object, L label, Duration timeToLive);

/**
* Adds all elements contained in the specified map to this time-series collection.
* Map contains of timestamp mapped by object.
Expand All @@ -67,6 +98,15 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
*/
void addAll(Map<Long, V> objects, long timeToLive, TimeUnit timeUnit);

/**
* Adds all time series entries collection to this time-series collection.
* Specified time to live interval applied to all entries defined in collection.
*
* @param entries collection of time series entries
* @param timeToLive time to live interval
*/
void addAll(Collection<TimeSeriesEntry<V, L>> entries, Duration timeToLive);

/**
* Returns size of this set.
*
Expand All @@ -82,6 +122,14 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
*/
V get(long timestamp);

/**
* Returns time series entry by specified <code>timestamp</code> or <code>null</code> if it doesn't exist.
*
* @param timestamp object timestamp
* @return time series entry
*/
TimeSeriesEntry<V, L> getEntry(long timestamp);

/**
* Removes object by specified <code>timestamp</code>.
*
Expand Down Expand Up @@ -220,7 +268,7 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
* @param endTimestamp - end timestamp
* @return elements collection
*/
Collection<TimeSeriesEntry<V>> entryRange(long startTimestamp, long endTimestamp);
Collection<TimeSeriesEntry<V, L>> entryRange(long startTimestamp, long endTimestamp);

/**
* Returns ordered entries of this time-series collection within timestamp range. Including boundary values.
Expand All @@ -230,7 +278,7 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
* @param limit result size limit
* @return elements collection
*/
Collection<TimeSeriesEntry<V>> entryRange(long startTimestamp, long endTimestamp, int limit);
Collection<TimeSeriesEntry<V, L>> entryRange(long startTimestamp, long endTimestamp, int limit);

/**
* Returns entries of this time-series collection in reverse order within timestamp range. Including boundary values.
Expand All @@ -239,7 +287,7 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
* @param endTimestamp - end timestamp
* @return elements collection
*/
Collection<TimeSeriesEntry<V>> entryRangeReversed(long startTimestamp, long endTimestamp);
Collection<TimeSeriesEntry<V, L>> entryRangeReversed(long startTimestamp, long endTimestamp);

/**
* Returns entries of this time-series collection in reverse order within timestamp range. Including boundary values.
Expand All @@ -249,7 +297,7 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
* @param limit result size limit
* @return elements collection
*/
Collection<TimeSeriesEntry<V>> entryRangeReversed(long startTimestamp, long endTimestamp, int limit);
Collection<TimeSeriesEntry<V, L>> entryRangeReversed(long startTimestamp, long endTimestamp, int limit);

/**
* Returns stream of elements in this time-series collection.
Expand Down
66 changes: 59 additions & 7 deletions redisson/src/main/java/org/redisson/api/RTimeSeriesAsync.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.redisson.api;

import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand All @@ -24,19 +25,32 @@
*
* @author Nikita Koksharov
*
* @param <V> value type
* @param <L> label type
*
*/
public interface RTimeSeriesAsync<V> extends RExpirableAsync {
public interface RTimeSeriesAsync<V, L> extends RExpirableAsync {

/**
* Adds element to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp - object timestamp
* @param object - object itself
* @param timestamp object timestamp
* @param object object itself
* @return void
*/
RFuture<Void> addAsync(long timestamp, V object);

/**
* Adds element with <code>label</code> to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp object timestamp
* @param object object itself
* @param label object label
*/
RFuture<Void> addAsync(long timestamp, V object, L label);

/**
* Adds all elements contained in the specified map to this time-series collection.
* Map contains of timestamp mapped by object.
Expand All @@ -46,6 +60,14 @@ public interface RTimeSeriesAsync<V> extends RExpirableAsync {
*/
RFuture<Void> addAllAsync(Map<Long, V> objects);

/**
* Adds all entries collection to this time-series collection.
*
* @param entries collection of time series entries
* @return void
*/
RFuture<Void> addAllAsync(Collection<TimeSeriesEntry<V, L>> entries);

/**
* Adds element to this time-series collection
* by specified <code>timestamp</code>.
Expand All @@ -58,6 +80,18 @@ public interface RTimeSeriesAsync<V> extends RExpirableAsync {
*/
RFuture<Void> addAsync(long timestamp, V object, long timeToLive, TimeUnit timeUnit);

/**
* Adds element with <code>label</code> to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp object timestamp
* @param object object itself
* @param label object label
* @param timeToLive time to live interval
* @return void
*/
RFuture<Void> addAsync(long timestamp, V object, L label, Duration timeToLive);

/**
* Adds all elements contained in the specified map to this time-series collection.
* Map contains of timestamp mapped by object.
Expand All @@ -69,6 +103,16 @@ public interface RTimeSeriesAsync<V> extends RExpirableAsync {
*/
RFuture<Void> addAllAsync(Map<Long, V> objects, long timeToLive, TimeUnit timeUnit);

/**
* Adds all time series entries collection to this time-series collection.
* Specified time to live interval applied to all entries defined in collection.
*
* @param entries collection of time series entries
* @param timeToLive time to live interval
* @return void
*/
RFuture<Void> addAllAsync(Collection<TimeSeriesEntry<V, L>> entries, Duration timeToLive);

/**
* Returns size of this set.
*
Expand All @@ -84,6 +128,14 @@ public interface RTimeSeriesAsync<V> extends RExpirableAsync {
*/
RFuture<V> getAsync(long timestamp);

/**
* Returns time series entry by specified <code>timestamp</code> or <code>null</code> if it doesn't exist.
*
* @param timestamp object timestamp
* @return time series entry
*/
RFuture<TimeSeriesEntry<V, L>> getEntryAsync(long timestamp);

/**
* Removes object by specified <code>timestamp</code>.
*
Expand Down Expand Up @@ -222,7 +274,7 @@ public interface RTimeSeriesAsync<V> extends RExpirableAsync {
* @param endTimestamp - end timestamp
* @return elements collection
*/
RFuture<Collection<TimeSeriesEntry<V>>> entryRangeAsync(long startTimestamp, long endTimestamp);
RFuture<Collection<TimeSeriesEntry<V, L>>> entryRangeAsync(long startTimestamp, long endTimestamp);

/**
* Returns ordered entries of this time-series collection within timestamp range. Including boundary values.
Expand All @@ -232,7 +284,7 @@ public interface RTimeSeriesAsync<V> extends RExpirableAsync {
* @param limit result size limit
* @return elements collection
*/
RFuture<Collection<TimeSeriesEntry<V>>> entryRangeAsync(long startTimestamp, long endTimestamp, int limit);
RFuture<Collection<TimeSeriesEntry<V, L>>> entryRangeAsync(long startTimestamp, long endTimestamp, int limit);

/**
* Returns entries of this time-series collection in reverse order within timestamp range. Including boundary values.
Expand All @@ -241,7 +293,7 @@ public interface RTimeSeriesAsync<V> extends RExpirableAsync {
* @param endTimestamp - end timestamp
* @return elements collection
*/
RFuture<Collection<TimeSeriesEntry<V>>> entryRangeReversedAsync(long startTimestamp, long endTimestamp);
RFuture<Collection<TimeSeriesEntry<V, L>>> entryRangeReversedAsync(long startTimestamp, long endTimestamp);

/**
* Returns entries of this time-series collection in reverse order within timestamp range. Including boundary values.
Expand All @@ -251,6 +303,6 @@ public interface RTimeSeriesAsync<V> extends RExpirableAsync {
* @param limit result size limit
* @return elements collection
*/
RFuture<Collection<TimeSeriesEntry<V>>> entryRangeReversedAsync(long startTimestamp, long endTimestamp, int limit);
RFuture<Collection<TimeSeriesEntry<V, L>>> entryRangeReversedAsync(long startTimestamp, long endTimestamp, int limit);

}

0 comments on commit 47f6a39

Please sign in to comment.