Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js: Big refactor in preparation for implementing retries #2243

Merged
merged 5 commits into from Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -231,7 +231,7 @@ const NUMBER_REGEX = /\d+/;
let totalActiveFaults = 0;

class FaultInjectionFilter extends BaseFilter implements Filter {
constructor(private callStream: CallStream, private config: FaultInjectionConfig) {
constructor(private config: FaultInjectionConfig) {
super();
}

Expand Down Expand Up @@ -316,7 +316,7 @@ class FaultInjectionFilter extends BaseFilter implements Filter {
}
}
if (abortStatus !== null && rollRandomPercentage(numerator, denominator)) {
this.callStream.cancelWithStatus(abortStatus, 'Fault injected');
return Promise.reject({code: abortStatus, details: 'Fault injected', metadata: new Metadata()});
}
}
return metadata;
Expand All @@ -333,8 +333,8 @@ class FaultInjectionFilterFactory implements FilterFactory<FaultInjectionFilter>
}
}

createFilter(callStream: experimental.CallStream): FaultInjectionFilter {
return new FaultInjectionFilter(callStream, this.config);
createFilter(): FaultInjectionFilter {
return new FaultInjectionFilter(this.config);
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/grpc-js-xds/src/http-filter/router-filter.ts
Expand Up @@ -26,7 +26,7 @@ class RouterFilter extends BaseFilter implements Filter {}
class RouterFilterFactory implements FilterFactory<RouterFilter> {
constructor(config: HttpFilterConfig, overrideConfig?: HttpFilterConfig) {}

createFilter(callStream: experimental.CallStream): RouterFilter {
createFilter(): RouterFilter {
return new RouterFilter();
}
}
Expand Down
28 changes: 5 additions & 23 deletions packages/grpc-js-xds/src/load-balancer-eds.ts
Expand Up @@ -146,24 +146,6 @@ export class EdsLoadBalancingConfig implements LoadBalancingConfig {
}
}

class CallEndTrackingFilter extends BaseFilter implements Filter {
constructor(private onCallEnd: () => void) {
super();
}
receiveTrailers(status: StatusObject) {
this.onCallEnd();
return status;
}
}

class CallTrackingFilterFactory implements FilterFactory<CallEndTrackingFilter> {
constructor(private onCallEnd: () => void) {}

createFilter(callStream: CallStream) {
return new CallEndTrackingFilter(this.onCallEnd);
}
}

/**
* This class load balances over a cluster by making an EDS request and then
* transforming the result into a configuration for another load balancing
Expand Down Expand Up @@ -217,9 +199,6 @@ export class EdsLoadBalancer implements LoadBalancer {
* balancer. */
if (dropCategory === null) {
const originalPick = originalPicker.pick(pickArgs);
const trackingFilterFactory: FilterFactory<Filter> = new CallTrackingFilterFactory(() => {
this.concurrentRequests -= 1;
});
return {
pickResultType: originalPick.pickResultType,
status: originalPick.status,
Expand All @@ -228,7 +207,10 @@ export class EdsLoadBalancer implements LoadBalancer {
originalPick.onCallStarted?.();
this.concurrentRequests += 1;
},
extraFilterFactories: originalPick.extraFilterFactories.concat(trackingFilterFactory)
onCallEnded: status => {
originalPick.onCallEnded?.(status);
this.concurrentRequests -= 1;
}
};
} else {
let details: string;
Expand All @@ -247,7 +229,7 @@ export class EdsLoadBalancer implements LoadBalancer {
metadata: new Metadata(),
},
subchannel: null,
extraFilterFactories: [],
onCallEnded: null,
onCallStarted: null
};
}
Expand Down
31 changes: 4 additions & 27 deletions packages/grpc-js-xds/src/load-balancer-lrs.ts
Expand Up @@ -108,29 +108,6 @@ export class LrsLoadBalancingConfig implements LoadBalancingConfig {
}
}

/**
* Filter class that reports when the call ends.
*/
class CallEndTrackingFilter extends BaseFilter implements Filter {
constructor(private localityStatsReporter: XdsClusterLocalityStats) {
super();
}

receiveTrailers(status: StatusObject) {
this.localityStatsReporter.addCallFinished(status.code !== Status.OK);
return status;
}
}

class CallEndTrackingFilterFactory
implements FilterFactory<CallEndTrackingFilter> {
constructor(private localityStatsReporter: XdsClusterLocalityStats) {}

createFilter(callStream: Call): CallEndTrackingFilter {
return new CallEndTrackingFilter(this.localityStatsReporter);
}
}

/**
* Picker that delegates picking to another picker, and reports when calls
* created using those picks start and end.
Expand All @@ -144,9 +121,6 @@ class LoadReportingPicker implements Picker {
pick(pickArgs: PickArgs): PickResult {
const wrappedPick = this.wrappedPicker.pick(pickArgs);
if (wrappedPick.pickResultType === PickResultType.COMPLETE) {
const trackingFilterFactory = new CallEndTrackingFilterFactory(
this.localityStatsReporter
);
return {
pickResultType: PickResultType.COMPLETE,
subchannel: wrappedPick.subchannel,
Expand All @@ -155,7 +129,10 @@ class LoadReportingPicker implements Picker {
wrappedPick.onCallStarted?.();
this.localityStatsReporter.addCallStarted();
},
extraFilterFactories: wrappedPick.extraFilterFactories.concat(trackingFilterFactory),
onCallEnded: status => {
wrappedPick.onCallEnded?.(status);
this.localityStatsReporter.addCallFinished(status !== Status.OK);
}
};
} else {
return wrappedPick;
Expand Down
4 changes: 2 additions & 2 deletions packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts
Expand Up @@ -107,8 +107,8 @@ class XdsClusterManagerPicker implements Picker {
metadata: new Metadata(),
},
subchannel: null,
extraFilterFactories: [],
onCallStarted: null
onCallStarted: null,
onCallEnded: null
};
}
}
Expand Down
86 changes: 0 additions & 86 deletions packages/grpc-js/src/call-credentials-filter.ts

This file was deleted.