Skip to content

Commit

Permalink
Per Stream Session Limits (#1598)
Browse files Browse the repository at this point in the history
* [Java] Flatten out StreamInterest structure and separate out active images into its own map.

* [Java] Implement per-stream session limits.

* [Java] Add system test for session limits.

* [C] Separate state from image in data packet dispatcher.

* [Java] Add null check when adding publication image in data packet dispatcher.

* [C] Start applying session limits.

* [C] Add test to validate rejection of too many sessions on a given stream.

* [C] Add experimental features and stream session limits to print configuration.

* [Java] Enable tests on C media driver by enabling stream session limit configuration option.

* [Java] Catch NumberFormatException and report more useful error.
  • Loading branch information
mikeb01 committed May 15, 2024
1 parent 8956a8a commit 02b3b11
Show file tree
Hide file tree
Showing 20 changed files with 701 additions and 215 deletions.
4 changes: 4 additions & 0 deletions aeron-client/src/main/c/collections/aeron_int64_counter_map.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ extern int aeron_int64_counter_map_get_and_dec(aeron_int64_counter_map_t *map, c
extern void aeron_int64_counter_map_for_each(
aeron_int64_counter_map_t *map, aeron_int64_counter_map_for_each_func_t func, void *clientd);

extern void aeron_int64_counter_map_remove_if(
aeron_int64_counter_map_t *map, aeron_int64_counter_map_predicate_func_t func, void *clientd);


31 changes: 31 additions & 0 deletions aeron-client/src/main/c/collections/aeron_int64_counter_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ inline int aeron_int64_counter_map_get_and_dec(aeron_int64_counter_map_t *map, c
}

typedef void (*aeron_int64_counter_map_for_each_func_t)(void *clientd, int64_t key, int64_t value);
typedef bool (*aeron_int64_counter_map_predicate_func_t)(void *clientd, int64_t key, int64_t value);

inline void aeron_int64_counter_map_for_each(
aeron_int64_counter_map_t *map, aeron_int64_counter_map_for_each_func_t func, void *clientd)
Expand All @@ -354,4 +355,34 @@ inline void aeron_int64_counter_map_for_each(
}
}

inline void aeron_int64_counter_map_remove_if(
aeron_int64_counter_map_t *map, aeron_int64_counter_map_predicate_func_t func, void *clientd)
{
size_t i = 0;
size_t remaining = map->size;

while (i < map->entries_length && remaining > 0)
{
bool is_removed = false;
int64_t key = map->entries[i];
int64_t value = map->entries[i + 1];
if (map->initial_value != value)
{
if (func(clientd, key, value))
{
is_removed = (map->initial_value != aeron_int64_counter_map_remove(map, key));
}
}

if (is_removed)
{
--remaining;
}
else
{
i += 2;
}
}
}

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
* limitations under the License.
*/

#ifndef AERON_INT32_TO_PTR_HASH_MAP_H
#define AERON_INT32_TO_PTR_HASH_MAP_H
#ifndef AERON_INT64_TO_PTR_HASH_MAP_H
#define AERON_INT64_TO_PTR_HASH_MAP_H

#include <errno.h>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,38 @@ TEST_F(Int64CounterMapTest, shouldForEachNonEmptyMap)

ASSERT_EQ(called, 1u);
}

TEST_F(Int64CounterMapTest, shouldRemoveIfValueMatches)
{
const int64_t initialValue = -2;
ASSERT_EQ(aeron_int64_counter_map_init(&m_map, initialValue, 8, AERON_MAP_DEFAULT_LOAD_FACTOR), 0);

for (int64_t i = 0; i < 10; i++)
{
int64_t value = i / 2;
aeron_int64_counter_map_put(&m_map, i, value, nullptr);
}

const int64_t value_to_remove = 3;
aeron_int64_counter_map_remove_if(
&m_map,
[](void *clientd, int64_t key, int64_t value)
{
int64_t client_v = *(int64_t *)clientd;
return client_v == value;
},
(void *)&value_to_remove);

for (int64_t i = 0; i < 10; i++)
{
int64_t value = i / 2;
if (value_to_remove == value)
{
EXPECT_EQ(initialValue, aeron_int64_counter_map_get(&m_map, i));
}
else
{
EXPECT_EQ(value, aeron_int64_counter_map_get(&m_map, i));
}
}
}

0 comments on commit 02b3b11

Please sign in to comment.