Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

removed preview notice from JetStream APIs #234

Merged
merged 2 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ bundle:
deno bundle --log-level info --unstable src/mod.ts ./nats.js

fmt:
deno fmt src/ doc/ bin/ nats-base-client/ examples/ tests/
deno fmt src/ doc/ bin/ nats-base-client/ examples/ tests/ jetstream.md README.md
27 changes: 24 additions & 3 deletions jetstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ The new generation of Javascript clients:
- [nats.ws](https://github.com/nats-io/nats.ws)
- [nats.deno](https://github.com/nats-io/nats.deno)

all support JetStream, however the functionality is a _preview_, and the APIs
are subject to change. Please report any issues you find.
all support JetStream. Please report any issues you find.

## JetStreamManager

Expand Down Expand Up @@ -254,6 +253,28 @@ setInterval(() => {
Note the above example is contrived, as the pull interval is fixed based on some
interval.

#### Consumer Binding

JetStream's `subscribe()`, and `pullSubscribe()` can `bind` to a specific
durable consumer. The consumer must already exist, note that if your consumer is
working on a stream that is sourced from another `bind` is the only way you can
attach to the correct consumer on the correct stream:

```typescript
const inbox = createInbox();
await jsm.consumers.add("A", {
durable_name: "me",
ack_policy: AckPolicy.None,
deliver_subject: inbox,
});

const opts = consumerOpts();
opts.bind("A", "me");

const sub = await js.subscribe(subj, opts);
// process messages...
```

#### JetStream Queue Consumers

Queue Consumers allow scaling the processing of messages stored in a stream. To
Expand Down Expand Up @@ -436,7 +457,7 @@ As the ordered consumer processes messages, it enforces that messages are
presented to the client with the correct sequence. If a gap is detected, the
consumer is recreated at the expected sequence.

Most consumer options are rejected, as the ordered consumer has manages its
Most consumer options are rejected, as the ordered consumer manages its
configuration in a very specific way.

To create an ordered consumer (assuming a stream that captures `my.messages`):
Expand Down
21 changes: 21 additions & 0 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import { createInbox } from "./protocol.ts";
import { headers } from "./headers.ts";
import { consumerOpts, isConsumerOptsBuilder } from "./jsconsumeropts.ts";
import { Bucket } from "./kv.ts";
import { NatsConnectionImpl } from "./nats.ts";

export interface JetStreamSubscriptionInfoable {
info: JetStreamSubscriptionInfo | null;
Expand All @@ -86,6 +87,7 @@ class ViewsImpl implements Views {
js: JetStreamClientImpl;
constructor(js: JetStreamClientImpl) {
this.js = js;
jetstreamPreview(this.js.nc);
}
async kv(name: string, opts: Partial<KvOptions> = {}): Promise<KV> {
return Bucket.create(this.js.nc, name, opts);
Expand Down Expand Up @@ -737,3 +739,22 @@ function autoAckJsMsg(data: JsMsg | null) {
data.ack();
}
}

const jetstreamPreview = (() => {
let once = false;
return (nci: NatsConnectionImpl) => {
if (!once) {
once = true;
const { lang } = nci?.protocol?.transport;
if (lang) {
console.log(
`\u001B[33m >> jetstream's materialized views functionality in ${lang} is beta functionality \u001B[0m`,
);
} else {
console.log(
`\u001B[33m >> jetstream's materialized views functionality is beta functionality \u001B[0m`,
);
}
}
};
})();
21 changes: 0 additions & 21 deletions nats-base-client/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ export class NatsConnectionImpl implements NatsConnection {
async jetstreamManager(
opts: JetStreamOptions = {},
): Promise<JetStreamManager> {
jetstreamPreview(this);
const adm = new JetStreamManagerImpl(this, opts);
try {
await adm.getAccountInfo();
Expand All @@ -301,26 +300,6 @@ export class NatsConnectionImpl implements NatsConnection {
jetstream(
opts: JetStreamOptions = {},
): JetStreamClient {
jetstreamPreview(this);
return new JetStreamClientImpl(this, opts);
}
}

const jetstreamPreview = (() => {
let once = false;
return (nci: NatsConnectionImpl) => {
if (!once) {
once = true;
const { lang } = nci?.protocol?.transport;
if (lang) {
console.log(
`\u001B[33m >> jetstream functionality in ${lang} is preview functionality \u001B[0m`,
);
} else {
console.log(
`\u001B[33m >> jetstream functionality is preview functionality \u001B[0m`,
);
}
}
};
})();
2 changes: 1 addition & 1 deletion nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,6 @@ export interface AccountLimits {
export interface ConsumerConfig extends ConsumerUpdateConfig {
"ack_policy": AckPolicy;
"deliver_policy": DeliverPolicy;
"deliver_subject"?: string;
"deliver_group"?: string;
"durable_name"?: string;
"filter_subject"?: string;
Expand All @@ -788,6 +787,7 @@ export interface ConsumerUpdateConfig {
"max_ack_pending"?: number;
"max_waiting"?: number;
"headers_only"?: boolean;
"deliver_subject"?: string;
}

export interface Consumer {
Expand Down
26 changes: 26 additions & 0 deletions tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2759,3 +2759,29 @@ Deno.test("jetstream - bind with diff subject fails", async () => {
);
await cleanup(ns, nc);
});

Deno.test("jetstream - bind example", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const js = nc.jetstream();
const jsm = await nc.jetstreamManager();
const subj = `A.*`;
await jsm.streams.add({
name: "A",
subjects: [subj],
});

const inbox = createInbox();
await jsm.consumers.add("A", {
durable_name: "me",
ack_policy: AckPolicy.None,
deliver_subject: inbox,
});

const opts = consumerOpts();
opts.bind("A", "me");

const sub = await js.subscribe(subj, opts);
assertEquals(sub.getProcessed(), 0);

await cleanup(ns, nc);
});