Skip to content

Commit

Permalink
added a typed consumer update - while JetStream simply does an `add…
Browse files Browse the repository at this point in the history
…`, the update api, simply retrieves the consumer, and assigns values present in the update (typed to show what properties are editable) - note that this is only intended for updates to durable consumers.

modifying a consumer is only supported on server 2.6.4 or newer
  • Loading branch information
aricart committed Nov 29, 2021
1 parent 0f902ff commit ef8543f
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
deno-version: ${{ matrix.deno-version }}

- name: Set NATS Server Version
run: echo "NATS_VERSION=v2.6.3" >> $GITHUB_ENV
run: echo "NATS_VERSION=v2.6.5" >> $GITHUB_ENV

# this here because dns seems to be wedged on gha
- name: Add hosts to /etc/hosts
Expand Down
11 changes: 11 additions & 0 deletions nats-base-client/jsmconsumer_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
ConsumerConfig,
ConsumerInfo,
ConsumerListResponse,
ConsumerUpdateConfig,
CreateConsumerRequest,
JetStreamOptions,
Lister,
Expand Down Expand Up @@ -64,6 +65,16 @@ export class ConsumerAPIImpl extends BaseApiClient implements ConsumerAPI {
return r as ConsumerInfo;
}

async update(
stream: string,
durable: string,
cfg: ConsumerUpdateConfig,
): Promise<ConsumerInfo> {
const ci = await this.info(stream, durable);
const changable = cfg as ConsumerConfig;
return this.add(stream, Object.assign(ci.config, changable));
}

async info(stream: string, name: string): Promise<ConsumerInfo> {
validateStreamName(stream);
validateDurableName(name);
Expand Down
20 changes: 14 additions & 6 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,11 @@ export interface Lister<T> {
export interface ConsumerAPI {
info(stream: string, consumer: string): Promise<ConsumerInfo>;
add(stream: string, cfg: Partial<ConsumerConfig>): Promise<ConsumerInfo>;
update(
stream: string,
durable: string,
cfg: ConsumerUpdateConfig,
): Promise<ConsumerInfo>;
delete(stream: string, consumer: string): Promise<boolean>;
list(stream: string): Lister<ConsumerInfo>;
}
Expand Down Expand Up @@ -757,25 +762,28 @@ export interface AccountLimits {
"max_consumers": number;
}

export interface ConsumerConfig {
description?: string;
export interface ConsumerConfig extends ConsumerUpdateConfig {
"ack_policy": AckPolicy;
"ack_wait"?: Nanos;
"deliver_policy": DeliverPolicy;
"deliver_subject"?: string;
"deliver_group"?: string;
"durable_name"?: string;
"filter_subject"?: string;
"flow_control"?: boolean; // send message with status of 100 and reply subject
"idle_heartbeat"?: Nanos; // send empty message when idle longer than this
"max_ack_pending"?: number;
"max_deliver"?: number;
"max_waiting"?: number;
"opt_start_seq"?: number;
"opt_start_time"?: string;
"rate_limit_bps"?: number;
"replay_policy": ReplayPolicy;
}

export interface ConsumerUpdateConfig {
description?: string;
"ack_wait"?: Nanos;
"max_deliver"?: number;
"sample_freq"?: string;
"max_ack_pending"?: number;
"max_waiting"?: number;
"headers_only"?: boolean;
}

Expand Down
38 changes: 37 additions & 1 deletion tests/jsm_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
ErrorCode,
headers,
JSONCodec,
nanos,
NatsError,
nuid,
StreamConfig,
Expand All @@ -41,7 +42,7 @@ import {
setup,
} from "./jstest_util.ts";
import { connect } from "../src/mod.ts";
import { assertThrowsAsyncErrorCode, NatsServer } from "./helpers/mod.ts";
import { assertThrowsAsyncErrorCode, notCompatible } from './helpers/mod.ts'
import { validateName } from "../nats-base-client/jsutil.ts";

const StreamNameRequired = "stream name required";
Expand Down Expand Up @@ -912,3 +913,38 @@ Deno.test("jsm - jetstream error info", async () => {
}
await cleanup(ns, nc);
});

Deno.test("jsm - update consumer", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
if (await notCompatible(ns, nc, "2.6.4")) {
return;
}
const { stream } = await initStream(nc);

const jsm = await nc.jetstreamManager();
await jsm.consumers.add(stream, {
durable_name: "dur",
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(2000),
max_ack_pending: 500,
headers_only: false,
max_deliver: 100,
});

// update is simply syntatic sugar for add providing a type to
// help the IDE show editable properties - server will still
// reject options it doesn't deem editable
const ci = await jsm.consumers.update(stream, "dur", {
ack_wait: nanos(3000),
max_ack_pending: 5,
headers_only: true,
max_deliver: 2,
});

assertEquals(ci.config.ack_wait, nanos(3000));
assertEquals(ci.config.max_ack_pending, 5);
assertEquals(ci.config.headers_only, true);
assertEquals(ci.config.max_deliver, 2);

await cleanup(ns, nc);
});

0 comments on commit ef8543f

Please sign in to comment.