Skip to content

Commit

Permalink
fix: don't emit error event during stream handoff (#1592)
Browse files Browse the repository at this point in the history
* fix: don't emit error event during stream handoff

* properly resolve promises in tests
  • Loading branch information
leahecole committed May 8, 2024
1 parent 903004d commit 2e7d30a
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 71 deletions.
7 changes: 5 additions & 2 deletions gax/src/streamingRetryRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,11 @@ export function streamingRetryRequest(opts: streamingRetryRequestOptions) {
// No more attempts need to be made, just continue on.
retryStream.emit('response', response);
delayStream.pipe(retryStream);
requestStream.on('error', (err: GoogleError) => {
retryStream.destroy(err);
requestStream.on('error', () => {
// retryStream must be destroyed here for the stream handoff part of retries to function properly
// but the error event should not be passed - if it emits as part of .destroy()
// it will bubble up early to the caller
retryStream.destroy();
});
}
}
116 changes: 47 additions & 69 deletions gax/test/test-application/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ async function testWait(client: EchoClient) {
assert.deepStrictEqual(response.content, request.success.content);
}

// a successful streaming call that has retry options passed but does not retry
async function testServerStreamingRetryOptions(client: SequenceServiceClient) {
const finalData: string[] = [];
const backoffSettings = createBackoffSettings(
Expand Down Expand Up @@ -495,7 +496,7 @@ async function testServerStreamingRetryOptions(client: SequenceServiceClient) {
);

const response = await client.createStreamingSequence(request);
await new Promise<void>(resolve => {
await new Promise<void>((resolve, reject) => {
const sequence = response[0];

const attemptRequest =
Expand All @@ -509,8 +510,9 @@ async function testServerStreamingRetryOptions(client: SequenceServiceClient) {
attemptStream.on('data', (response: {content: string}) => {
finalData.push(response.content);
});
attemptStream.on('error', () => {
//Do Nothing
attemptStream.on('error', (error: GoogleError) => {
// should not reach this
reject(error);
});
attemptStream.on('end', () => {
attemptStream.end();
Expand All @@ -524,6 +526,7 @@ async function testServerStreamingRetryOptions(client: SequenceServiceClient) {
});
}

// a streaming call that retries two times and finishes successfully
async function testServerStreamingRetrieswithRetryOptions(
client: SequenceServiceClient
) {
Expand Down Expand Up @@ -554,7 +557,7 @@ async function testServerStreamingRetrieswithRetryOptions(
);

const response = await client.createStreamingSequence(request);
await new Promise<void>(resolve => {
await new Promise<void>((resolve, reject) => {
const sequence = response[0];

const attemptRequest =
Expand All @@ -568,8 +571,8 @@ async function testServerStreamingRetrieswithRetryOptions(
attemptStream.on('data', (response: {content: string}) => {
finalData.push(response.content);
});
attemptStream.on('error', () => {
//Do Nothing
attemptStream.on('error', error => {
reject(error);
});
attemptStream.on('end', () => {
attemptStream.end();
Expand All @@ -583,6 +586,7 @@ async function testServerStreamingRetrieswithRetryOptions(
});
}

// a streaming call that retries twice using shouldRetryFn and finally succeeds
async function testServerStreamingRetriesWithShouldRetryFn(
client: SequenceServiceClient
) {
Expand Down Expand Up @@ -617,7 +621,7 @@ async function testServerStreamingRetriesWithShouldRetryFn(
);

const response = await client.createStreamingSequence(request);
await new Promise<void>(resolve => {
await new Promise<void>((resolve, reject) => {
const sequence = response[0];
const attemptRequest =
new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest();
Expand All @@ -630,8 +634,9 @@ async function testServerStreamingRetriesWithShouldRetryFn(
attemptStream.on('data', (response: {content: string}) => {
finalData.push(response.content);
});
attemptStream.on('error', () => {
// Do nothing
attemptStream.on('error', error => {
// should not reach this
reject(error);
});
attemptStream.on('end', () => {
attemptStream.end();
Expand All @@ -645,6 +650,7 @@ async function testServerStreamingRetriesWithShouldRetryFn(
});
}

// streaming call that retries twice using RetryRequestOptions instead of RetryOptions
async function testServerStreamingRetrieswithRetryRequestOptions(
client: SequenceServiceClient
) {
Expand Down Expand Up @@ -676,7 +682,7 @@ async function testServerStreamingRetrieswithRetryRequestOptions(
);

const response = await client.createStreamingSequence(request);
await new Promise<void>(resolve => {
await new Promise<void>((resolve, reject) => {
const sequence = response[0];

const attemptRequest =
Expand All @@ -690,8 +696,8 @@ async function testServerStreamingRetrieswithRetryRequestOptions(
attemptStream.on('data', (response: {content: string}) => {
finalData.push(response.content);
});
attemptStream.on('error', () => {
// Do Nothing
attemptStream.on('error', error => {
reject(error);
});
attemptStream.on('end', () => {
attemptStream.end();
Expand All @@ -704,6 +710,8 @@ async function testServerStreamingRetrieswithRetryRequestOptions(
);
});
}

// streaming call that retries twice with RetryRequestOpsions and resumes from where it left off
async function testServerStreamingRetrieswithRetryRequestOptionsResumptionStrategy(
client: SequenceServiceClient
) {
Expand Down Expand Up @@ -748,7 +756,7 @@ async function testServerStreamingRetrieswithRetryRequestOptionsResumptionStrate
'This is testing the brand new and shiny StreamingSequence server 3'
);
const response = await client.createStreamingSequence(request);
await new Promise<void>(resolve => {
await new Promise<void>((resolve, reject) => {
const sequence = response[0];

const attemptRequest =
Expand All @@ -762,8 +770,8 @@ async function testServerStreamingRetrieswithRetryRequestOptionsResumptionStrate
attemptStream.on('data', (response: {content: string}) => {
finalData.push(response.content);
});
attemptStream.on('error', () => {
// do nothing
attemptStream.on('error', error => {
reject(error);
});
attemptStream.on('end', () => {
attemptStream.end();
Expand All @@ -777,6 +785,7 @@ async function testServerStreamingRetrieswithRetryRequestOptionsResumptionStrate
});
}

// retries twice but fails with an error not from the streaming sequence
async function testServerStreamingRetrieswithRetryRequestOptionsErrorsOnBadResumptionStrategy(
client: SequenceServiceClient
) {
Expand Down Expand Up @@ -816,9 +825,8 @@ async function testServerStreamingRetrieswithRetryRequestOptionsErrorsOnBadResum
[1, 2, 11],
'This is testing the brand new and shiny StreamingSequence server 3'
);
const allowedCodes = [4, 14];
const response = await client.createStreamingSequence(request);
await new Promise<void>((_, reject) => {
await new Promise<void>(resolve => {
const sequence = response[0];

const attemptRequest =
Expand All @@ -831,21 +839,14 @@ async function testServerStreamingRetrieswithRetryRequestOptionsErrorsOnBadResum
);

attemptStream.on('error', (e: GoogleError) => {
if (!allowedCodes.includes(e.code!)) {
reject(e);
}
assert.strictEqual(e.code, 3);
assert.match(e.note!, /not classified as transient/);
resolve();
});
}).then(
() => {
assert(false);
},
(err: GoogleError) => {
assert.strictEqual(err.code, 3);
assert.match(err.note!, /not classified as transient/);
}
);
});
}

// fails on the first error in the sequence
async function testServerStreamingThrowsClassifiedTransientErrorNote(
client: SequenceServiceClient
) {
Expand Down Expand Up @@ -875,7 +876,7 @@ async function testServerStreamingThrowsClassifiedTransientErrorNote(
);

const response = await client.createStreamingSequence(request);
await new Promise<void>((_, reject) => {
await new Promise<void>(resolve => {
const sequence = response[0];

const attemptRequest =
Expand All @@ -887,21 +888,14 @@ async function testServerStreamingThrowsClassifiedTransientErrorNote(
settings
);
attemptStream.on('error', (e: GoogleError) => {
if (!allowedCodes.includes(e.code!)) {
reject(e);
}
assert.strictEqual(e.code, 14);
assert.match(e.note!, /not classified as transient/);
resolve();
});
}).then(
() => {
assert(false);
},
(err: GoogleError) => {
assert.strictEqual(err.code, 14);
assert.match(err.note!, /not classified as transient/);
}
);
});
}

// retries once and fails on the second error in the sequence
async function testServerStreamingRetriesAndThrowsClassifiedTransientErrorNote(
client: SequenceServiceClient
) {
Expand Down Expand Up @@ -931,7 +925,7 @@ async function testServerStreamingRetriesAndThrowsClassifiedTransientErrorNote(
);

const response = await client.createStreamingSequence(request);
await new Promise<void>((_, reject) => {
await new Promise<void>(resolve => {
const sequence = response[0];

const attemptRequest =
Expand All @@ -943,19 +937,11 @@ async function testServerStreamingRetriesAndThrowsClassifiedTransientErrorNote(
settings
);
attemptStream.on('error', (e: GoogleError) => {
if (!allowedCodes.includes(e.code!)) {
reject(e);
}
assert.strictEqual(e.code, 4);
assert.match(e.note!, /not classified as transient/);
resolve();
});
}).then(
() => {
assert(false);
},
(err: GoogleError) => {
assert.strictEqual(err.code, 4);
assert.match(err.note!, /not classified as transient/);
}
);
});
}

async function testServerStreamingThrowsCannotSetTotalTimeoutMillisMaxRetries(
Expand Down Expand Up @@ -988,7 +974,7 @@ async function testServerStreamingThrowsCannotSetTotalTimeoutMillisMaxRetries(
);

const response = await client.createStreamingSequence(request);
await new Promise<void>((_, reject) => {
await new Promise<void>(resolve => {
const sequence = response[0];

const attemptRequest =
Expand All @@ -1000,22 +986,14 @@ async function testServerStreamingThrowsCannotSetTotalTimeoutMillisMaxRetries(
settings
);
attemptStream.on('error', (e: GoogleError) => {
if (!allowedCodes.includes(e.code!)) {
reject(e);
}
});
}).then(
() => {
assert(false);
},
(err: GoogleError) => {
assert.strictEqual(err.code, 3);
assert.strictEqual(e.code, 3);
assert.match(
err.message,
e.message,
/Cannot set both totalTimeoutMillis and maxRetries/
);
}
);
resolve();
});
});
}

async function main() {
Expand Down

0 comments on commit 2e7d30a

Please sign in to comment.