Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KV prefix handling #239

Merged
merged 7 commits into from
Jan 18, 2022
8 changes: 4 additions & 4 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ class ViewsImpl implements Views {
this.js = js;
jetstreamPreview(this.js.nc);
}
async kv(name: string, opts: Partial<KvOptions> = {}): Promise<KV> {
return Bucket.create(this.js.nc, name, opts);
kv(name: string, opts: Partial<KvOptions> = {}): Promise<KV> {
return Bucket.create(this.js, name, opts);
}
}

Expand Down Expand Up @@ -398,7 +398,7 @@ export class JetStreamClientImpl extends BaseApiClient
ErrorCode.ApiError,
);
}
jsi.config.deliver_subject = createInbox();
jsi.config.deliver_subject = createInbox(this.nc.options.inboxPrefix);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this.nc. does not have a create inbox function that makes use of the inbox Prefix automatically?
As is the case in go.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no it doesn't - it is just a function. But anywhere it is called in the lib it is honored.

jsi.config.ack_policy = AckPolicy.None;
jsi.config.max_deliver = 1;
jsi.config.flow_control = true;
Expand Down Expand Up @@ -560,7 +560,7 @@ class JetStreamSubscriptionImpl extends TypedSubscription<JsMsg>
if (this.info === null || this.sub.isClosed()) {
return;
}
const newDeliver = createInbox(this.js.opts.apiPrefix);
const newDeliver = createInbox(this.js.nc.options.inboxPrefix);
const nci = this.js.nc;
nci._resub(this.sub, newDeliver);
const info = this.info;
Expand Down
44 changes: 22 additions & 22 deletions nats-base-client/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
import {
AckPolicy,
ConsumerConfig,
ConsumerInfo,
DeliverPolicy,
DiscardPolicy,
Empty,
JetStreamClient,
JetStreamManager,
JetStreamOptions,
JetStreamPublishOptions,
JsHeaders,
JsMsg,
Expand All @@ -33,7 +33,6 @@ import {
KvPutOptions,
KvRemove,
KvStatus,
NatsConnection,
PurgeOpts,
PurgeResponse,
RetentionPolicy,
Expand All @@ -49,7 +48,6 @@ import { millis, nanos } from "./jsutil.ts";
import { QueuedIterator, QueuedIteratorImpl } from "./queued_iterator.ts";
import { deferred } from "./util.ts";
import { headers, MsgHdrs } from "./headers.ts";
import { createInbox } from "./protocol.ts";
import { consumerOpts } from "./mod.ts";

export function Base64KeyCodec(): KvCodec<string> {
Expand Down Expand Up @@ -158,24 +156,35 @@ export class Bucket implements KV, KvRemove {
bucket: string;
codec!: KvCodecs;
_prefixLen: number;
subjPrefix: string;

constructor(bucket: string, jsm: JetStreamManager, js: JetStreamClient) {
validateBucket(bucket);
this.jsm = jsm;
this.js = js;
this.bucket = bucket;
this._prefixLen = 0;
this.subjPrefix = kvSubjectPrefix;

const jsi = js as JetStreamClientImpl;
const prefix = jsi.prefix || "$JS.API";
if (prefix !== "$JS.API") {
this.subjPrefix = `${prefix}.${kvSubjectPrefix}`;
}
}

static async create(
nc: NatsConnection,
js: JetStreamClient,
matthiashanel marked this conversation as resolved.
Show resolved Hide resolved
name: string,
opts: Partial<KvOptions> = {},
): Promise<KV> {
validateBucket(name);
const to = opts.timeout || 2000;
const jsm = await nc.jetstreamManager({ timeout: to });
const bucket = new Bucket(name, jsm, nc.jetstream({ timeout: to }));
const jsi = js as JetStreamClientImpl;
let jsopts = jsi.opts || {} as JetStreamOptions;
jsopts = Object.assign(jsopts, { timeout: to });
const jsm = await jsi.nc.jetstreamManager(jsopts);
const bucket = new Bucket(name, jsm, js);
await bucket.init(opts);
return bucket;
}
Expand Down Expand Up @@ -212,10 +221,14 @@ export class Bucket implements KV, KvRemove {
}

subjectForBucket(): string {
return `${kvSubjectPrefix}.${this.bucket}.>`;
return `${this.subjPrefix}.${this.bucket}.>`;
}

subjectForKey(k: string): string {
return `${this.subjPrefix}.${this.bucket}.${k}`;
}

fullKeyName(k: string): string {
return `${kvSubjectPrefix}.${this.bucket}.${k}`;
}

Expand Down Expand Up @@ -337,7 +350,7 @@ export class Bucket implements KV, KvRemove {
this.validateKey(ek);
try {
const sm = await this.jsm.streams.getMessage(this.bucketName(), {
last_by_subj: this.subjectForKey(ek),
last_by_subj: this.fullKeyName(ek),
});
return this.smToEntry(k, sm);
} catch (err) {
Expand Down Expand Up @@ -398,25 +411,12 @@ export class Bucket implements KV, KvRemove {
? DeliverPolicy.All
: DeliverPolicy.LastPerSubject,
"ack_policy": AckPolicy.None,
"filter_subject": this.subjectForKey(ek),
"filter_subject": this.fullKeyName(ek),
"flow_control": true,
"idle_heartbeat": nanos(5 * 1000),
}, opts) as Partial<ConsumerConfig>;
}

/**
* @deprecated
*/
consumerOn(k: string, history = false): Promise<ConsumerInfo> {
return this.jsm.consumers.add(
this.stream,
this._buildCC(k, history, {
ack_policy: AckPolicy.Explicit,
deliver_subject: createInbox(),
}),
);
}

remove(k: string): Promise<void> {
return this.purge(k);
}
Expand Down
9 changes: 4 additions & 5 deletions tests/auth_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@ import { assert } from "../nats-base-client/denobuffer.ts";

const conf = {
authorization: {
PERM: {
subscribe: "bar",
publish: "foo",
},
users: [{
user: "derek",
password: "foobar",
permission: "$PERM",
permission: {
subscribe: "bar",
publish: "foo",
},
}],
},
};
Expand Down
2 changes: 1 addition & 1 deletion tests/helpers/launcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ export function toConf(o: any, indent?: string): string {
} else {
if (!Array.isArray(o)) {
if (
typeof v === "string" && v.startsWith("$JS.")
typeof v === "string" && v.startsWith("$")
) {
buf.push(`${pad}${k}: "${v}"`);
} else if (
Expand Down
9 changes: 4 additions & 5 deletions tests/iterators_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,13 @@ Deno.test("iterators - autounsub breaks and closes", async () => {
Deno.test("iterators - permission error breaks and closes", async () => {
const conf = {
authorization: {
PERM: {
subscribe: "bar",
publish: "foo",
},
users: [{
user: "derek",
password: "foobar",
permission: "$PERM",
permission: {
subscribe: "bar",
publish: "foo",
},
}],
},
};
Expand Down