Skip to content

Commit

Permalink
Merge pull request #2243 from murgatroid99/grpc-js_retry
Browse files Browse the repository at this point in the history
grpc-js: Big refactor in preparation for implementing retries
  • Loading branch information
murgatroid99 committed Oct 14, 2022
2 parents fd76558 + 63d9f6a commit 3e0a037
Show file tree
Hide file tree
Showing 39 changed files with 2,029 additions and 2,001 deletions.
Expand Up @@ -231,7 +231,7 @@ const NUMBER_REGEX = /\d+/;
let totalActiveFaults = 0;

class FaultInjectionFilter extends BaseFilter implements Filter {
constructor(private callStream: CallStream, private config: FaultInjectionConfig) {
constructor(private config: FaultInjectionConfig) {
super();
}

Expand Down Expand Up @@ -316,7 +316,7 @@ class FaultInjectionFilter extends BaseFilter implements Filter {
}
}
if (abortStatus !== null && rollRandomPercentage(numerator, denominator)) {
this.callStream.cancelWithStatus(abortStatus, 'Fault injected');
return Promise.reject({code: abortStatus, details: 'Fault injected', metadata: new Metadata()});
}
}
return metadata;
Expand All @@ -333,8 +333,8 @@ class FaultInjectionFilterFactory implements FilterFactory<FaultInjectionFilter>
}
}

createFilter(callStream: experimental.CallStream): FaultInjectionFilter {
return new FaultInjectionFilter(callStream, this.config);
createFilter(): FaultInjectionFilter {
return new FaultInjectionFilter(this.config);
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/grpc-js-xds/src/http-filter/router-filter.ts
Expand Up @@ -26,7 +26,7 @@ class RouterFilter extends BaseFilter implements Filter {}
class RouterFilterFactory implements FilterFactory<RouterFilter> {
constructor(config: HttpFilterConfig, overrideConfig?: HttpFilterConfig) {}

createFilter(callStream: experimental.CallStream): RouterFilter {
createFilter(): RouterFilter {
return new RouterFilter();
}
}
Expand Down
28 changes: 5 additions & 23 deletions packages/grpc-js-xds/src/load-balancer-eds.ts
Expand Up @@ -146,24 +146,6 @@ export class EdsLoadBalancingConfig implements LoadBalancingConfig {
}
}

class CallEndTrackingFilter extends BaseFilter implements Filter {
constructor(private onCallEnd: () => void) {
super();
}
receiveTrailers(status: StatusObject) {
this.onCallEnd();
return status;
}
}

class CallTrackingFilterFactory implements FilterFactory<CallEndTrackingFilter> {
constructor(private onCallEnd: () => void) {}

createFilter(callStream: CallStream) {
return new CallEndTrackingFilter(this.onCallEnd);
}
}

/**
* This class load balances over a cluster by making an EDS request and then
* transforming the result into a configuration for another load balancing
Expand Down Expand Up @@ -217,9 +199,6 @@ export class EdsLoadBalancer implements LoadBalancer {
* balancer. */
if (dropCategory === null) {
const originalPick = originalPicker.pick(pickArgs);
const trackingFilterFactory: FilterFactory<Filter> = new CallTrackingFilterFactory(() => {
this.concurrentRequests -= 1;
});
return {
pickResultType: originalPick.pickResultType,
status: originalPick.status,
Expand All @@ -228,7 +207,10 @@ export class EdsLoadBalancer implements LoadBalancer {
originalPick.onCallStarted?.();
this.concurrentRequests += 1;
},
extraFilterFactories: originalPick.extraFilterFactories.concat(trackingFilterFactory)
onCallEnded: status => {
originalPick.onCallEnded?.(status);
this.concurrentRequests -= 1;
}
};
} else {
let details: string;
Expand All @@ -247,7 +229,7 @@ export class EdsLoadBalancer implements LoadBalancer {
metadata: new Metadata(),
},
subchannel: null,
extraFilterFactories: [],
onCallEnded: null,
onCallStarted: null
};
}
Expand Down
31 changes: 4 additions & 27 deletions packages/grpc-js-xds/src/load-balancer-lrs.ts
Expand Up @@ -108,29 +108,6 @@ export class LrsLoadBalancingConfig implements LoadBalancingConfig {
}
}

/**
* Filter class that reports when the call ends.
*/
class CallEndTrackingFilter extends BaseFilter implements Filter {
constructor(private localityStatsReporter: XdsClusterLocalityStats) {
super();
}

receiveTrailers(status: StatusObject) {
this.localityStatsReporter.addCallFinished(status.code !== Status.OK);
return status;
}
}

class CallEndTrackingFilterFactory
implements FilterFactory<CallEndTrackingFilter> {
constructor(private localityStatsReporter: XdsClusterLocalityStats) {}

createFilter(callStream: Call): CallEndTrackingFilter {
return new CallEndTrackingFilter(this.localityStatsReporter);
}
}

/**
* Picker that delegates picking to another picker, and reports when calls
* created using those picks start and end.
Expand All @@ -144,9 +121,6 @@ class LoadReportingPicker implements Picker {
pick(pickArgs: PickArgs): PickResult {
const wrappedPick = this.wrappedPicker.pick(pickArgs);
if (wrappedPick.pickResultType === PickResultType.COMPLETE) {
const trackingFilterFactory = new CallEndTrackingFilterFactory(
this.localityStatsReporter
);
return {
pickResultType: PickResultType.COMPLETE,
subchannel: wrappedPick.subchannel,
Expand All @@ -155,7 +129,10 @@ class LoadReportingPicker implements Picker {
wrappedPick.onCallStarted?.();
this.localityStatsReporter.addCallStarted();
},
extraFilterFactories: wrappedPick.extraFilterFactories.concat(trackingFilterFactory),
onCallEnded: status => {
wrappedPick.onCallEnded?.(status);
this.localityStatsReporter.addCallFinished(status !== Status.OK);
}
};
} else {
return wrappedPick;
Expand Down
4 changes: 2 additions & 2 deletions packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts
Expand Up @@ -107,8 +107,8 @@ class XdsClusterManagerPicker implements Picker {
metadata: new Metadata(),
},
subchannel: null,
extraFilterFactories: [],
onCallStarted: null
onCallStarted: null,
onCallEnded: null
};
}
}
Expand Down
86 changes: 0 additions & 86 deletions packages/grpc-js/src/call-credentials-filter.ts

This file was deleted.

0 comments on commit 3e0a037

Please sign in to comment.