Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bigquery/storage/write: getting error stream.Recv(): rpc error: code = Unavailable desc = the connection is draining #6595

Closed
Av1shay opened this issue Aug 31, 2022 · 13 comments
Assignees
Labels
api: bigquery Issues related to the BigQuery API. type: question Request for information or clarification. Not an issue.

Comments

@Av1shay
Copy link

Av1shay commented Aug 31, 2022

Client

BigQueryWriteClient (cloud.google.com/go/bigquery v1.32.0)

Environment

Compute VM instances in GCP

Go Environment

1.18

Code

package main

func main() {
    // ...

    lenRows := len(rows)

    // create chunks of chunkSize size to not send too many rows at once
    chunkSize := 15000
    chunks := make([]interface{}, 0, int(math.Ceil(float64(lenRows)/float64(chunkSize))))
    for i := 0; i < lenRows; i += chunkSize {
        end := i + chunkSize
	if end > lenRows {
	    end = lenRows
	}
	chunks = append(chunks, rows[i:end])
    }

     stream, err := bigqueryWriteClient.AppendRows(ctx)
     if err != nil {
         log.Fatalf("AppendRows: %s", err)
    }
    rowProto, ok := rows[0].(protoreflect.ProtoMessage)
    if !ok {
        log.Fatalf("row type suppose to be <protoreflect.ProtoMessage>, got: %T", rows[0])
    }
    descriptor, err := adapt.NormalizeDescriptor(rowProto.ProtoReflect().Descriptor())
    if err != nil {
        log.Fatalf("NormalizeDescriptor(): %s", err)
    }
    streamName := fmt.Sprintf("projects/%s/datasets/%s/tables/%s/streams/_default", projectID, datasetID, tableID)

    go func() {
        err := sendToStream(stream, streamName, descriptor, chunks)
        if err != nil {
             log.Fatalf(ctx, "sendToStream(): %s", err)
        }
    }()

    for {
        _, err := stream.Recv()
	if err == io.EOF {
	    break
        }
        if err != nil {
            log.Fatalf("stream.Recv(): %s", err)
        }
    }
}

func sendToStream(stream storagepb.BigQueryWrite_AppendRowsClient, streamName string, descriptor *descriptorpb.DescriptorProto, dataChunks []interface{}) error {
    streamReqs, err := buildStreamRequests(streamName, descriptor, dataChunks)
    if err != nil {
        return err
    }

    for _, req := range streamReqs {
        if err := stream.Send(req); err != nil {
	    return fmt.Errorf("stream.Send(): %s", err)
	}
    }

    return stream.CloseSend()
}

func buildStreamRequests(streamName string, descriptor *descriptorpb.DescriptorProto, dataChunks []interface{}) ([]*storagepb.AppendRowsRequest, error) {
    streamReqs := make([]*storagepb.AppendRowsRequest, len(dataChunks))

    for i, chunk := range dataChunks {
        rowsArr, ok := chunk.([]interface{})
	if !ok {
	    return nil, fmt.Errorf("chunk must be an array, got %T", chunk)
	}

	data := make([][]byte, len(rowsArr))
	for j, row := range rowsArr {
	    rowProto, ok := row.(protoreflect.ProtoMessage)
	    if !ok {
	        return nil, fmt.Errorf("row type suppose to be <protoreflect.ProtoMessage>, got: %T", row)
	    }
	    buf, err := proto.Marshal(rowProto)
	    if err != nil {
	        return nil, fmt.Errorf("proto.Marshal(): %s", err)
	    }
	    data[j] = buf
	}

	streamReqs[i] = &storagepb.AppendRowsRequest{
	    WriteStream: streamName,
	    Rows: &storagepb.AppendRowsRequest_ProtoRows{
	        ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
	            WriterSchema: &storagepb.ProtoSchema{
	                ProtoDescriptor: descriptor,
	            },
	            Rows: &storagepb.ProtoRows{
	                SerializedRows: data,
	            },
                },
            },
        }
    }
    return streamReqs, nil
}

Expected behavior

The stream should be able to process all requests without error.

Actual behavior

Sometimes getting error from stream.Recv(): "rpc error: code = Unavailable desc = the connection is draining"

Additional context

Depends on the load, the process can sometimes need to process a lot of rows, this is why I split the rows to chunks of 15,000, to not pass the limits.

@Av1shay Av1shay added the triage me I really want to be triaged. label Aug 31, 2022
@product-auto-label product-auto-label bot added the api: bigquery Issues related to the BigQuery API. label Aug 31, 2022
@shollyman
Copy link
Contributor

The error is trying to communicate that the remote end of the grpc AppendRows grpc stream connection is going away. It is unclear if that's in response to your request traffic, or some other factor. Regardless, the resolution here is to (re)-open a new stream connection.

Since you're using the raw storage API, you'll want to open a new stream via bigqueryWriteClient.AppendRows().

You may also want to look at the managedwriter package as it handles things like stream reconnection on your behalf. It appears you're already using the adapt subpackage of managedwriter to handle the descriptor normalization.

@shollyman shollyman added type: question Request for information or clarification. Not an issue. and removed triage me I really want to be triaged. labels Aug 31, 2022
@Av1shay
Copy link
Author

Av1shay commented Aug 31, 2022

When should I open the new stream(s)?
Create x streams in advance to handle the chunks or only in case this error return create new stream and retry send the data that failed?

@shollyman
Copy link
Contributor

When should I open the new stream(s)?

In this single-writer pattern, when the existing stream closes.

You can potentially issue concurrent appends by managing multiple connections. Regardless, you also want to keep track of which appends failed, which doesn't appear to be part of the sample code in the issue. Responses return in the same order as the appends are issued, there's no identifier for individual requests to correlate beyond the ordering.

@Av1shay
Copy link
Author

Av1shay commented Sep 1, 2022

Thanks,
I've refactor the code to use managedwriter, I will check it's behavior on high traffic

func do() error {
    // ....
    chunkSize := 10000
    chunks := make([]interface{}, 0, int(math.Ceil(float64(lenRows)/float64(chunkSize))))
    for i := 0; i < lenRows; i += chunkSize {
	end := i + chunkSize
	if end > lenRows {
	    end = lenRows
	 }
	chunks = append(chunks, rows[i:end])
    }

    destTable := fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, importer.Key())
    rowProto, ok := rows[0].(protoreflect.ProtoMessage)
    if !ok {
	return fmt.Errorf("row type suppose to be <protoreflect.ProtoMessage>, got: %T", rows[0])
    }
    descriptor, err := adapt.NormalizeDescriptor(rowProto.ProtoReflect().Descriptor())
    if err != nil {
        return fmt.Errorf("NormalizeDescriptor(): %s", err)
    }
    stream, err := bigqueryWriteClient.NewManagedStream(ctx,
	managedwriter.WithType(managedwriter.DefaultStream),
	managedwriter.WithDestinationTable(destTable),
	managedwriter.WithSchemaDescriptor(descriptor))
    if err != nil {
	return fmt.Errorf("NewManagedStream(): %s", err)
    }

    if err := streamData(ctx, stream, chunks); err != nil {
        return fmt.Errorf("streamData(): %s", err)
    }
}

func streamData(ctx context.Context, stream *managedwriter.ManagedStream, dataChunks []interface{}) error {
    // todo use workers pool to send chunks to stream
    for _, chunk := range dataChunks {
        rowsArr, ok := chunk.([]interface{})
        if !ok {
            return fmt.Errorf("chunk must be an array, got %T", chunk)
        }

        data := make([][]byte, len(rowsArr))
        for j, row := range rowsArr {
            rowProto, ok := row.(protoreflect.ProtoMessage)
            if !ok {
                return fmt.Errorf("row type suppose to be <protoreflect.ProtoMessage>, got: %T", row)
            }
            buf, err := proto.Marshal(rowProto)
            if err != nil {
                return fmt.Errorf("proto.Marshal(): %s", err)
            }
            data[j] = buf
        }

        result, err := stream.AppendRows(ctx, data)
        if err != nil {
            return fmt.Errorf("stream.AppendRows(): %s", err)
        }
        _, err = result.GetResult(ctx)
        if err != nil {
            return fmt.Errorf("stream.GetResult(): %s", err)
        }
    }

    return nil
}

@Av1shay
Copy link
Author

Av1shay commented Sep 2, 2022

Using the code above with high throughput also generated error "stream.GetResult(): rpc error: code = Unavailable desc = the connection is draining" and another quota error: "stream.AppendRows(): rpc error: code = ResourceExhausted desc = Exceeds 'AppendRows throughput' quota, user_id: $USER_ID (status: INSUFFICIENT_TOKENS), you can issue a raise quota request through Google Cloud Console. Be sure to include this full error message in the request description. Entity: projects/project_id/datasets/dataset_id/tables/table_id/streams/_default"

I'm not sure why I see this error and did not see it using the implementation with the raw package, as now I'm issuing AppendRows request for each chunk of data (in the original code I call AppendRows one time and used it for all chunks), so the throughput should be smaller.

Anyways, I'm switching back the the original package and will try to manage the reconnections myself.

@shollyman
Copy link
Contributor

No worries, thanks for following up with your feedback. Some more information that may be useful to you:

In the first case (draining error), the response is coming back from the server that the connection is going away and you'll need to reconnect. I should have been clearer that because it's an error, it means the write was rejected and you'll need to re-send it after the reconnection. The managedwriter doesn't re-enqueue writes automatically, but its something we could consider for default streams where the user isn't doing exactly-once or controlling offset management directly.

The second (throughput error) is likely a signal you that you're writing faster under the managedwriter implementation. Looking at your initial implementation again, the reason for this is a subtle performance improvement you can mirror in your own code: you only need to populate the schema for the first AppendRowsRequest on a connection, it can be omitted from the subsequent writes on an opened connection.

More info about the throughput quota: https://cloud.google.com/bigquery/quotas#write-api-limits

shollyman added a commit to shollyman/google-cloud-go that referenced this issue Sep 2, 2022
This PR augments the reconnection logic to include the grpc transport
stream drain error as a condition where we should force reconnect,
rather the waiting for the io.EOF of the connection fully closing.

Related: googleapis#6595
shollyman added a commit that referenced this issue Sep 2, 2022
This PR augments the reconnection logic to include the grpc transport
stream drain error as a condition where we should force reconnect,
rather the waiting for the io.EOF of the connection fully closing.

Related: #6595
@Av1shay
Copy link
Author

Av1shay commented Sep 3, 2022

Thanks for help!
I see now that I need to watch out to not write too fast because of the MB/s quota.. I'm still learning how this api works, we just migrated from the legacy insertAll api two weeks ago, it was much easier to manage, but too slow for our use case.

@Av1shay
Copy link
Author

Av1shay commented Sep 3, 2022

Another implementation with retry logic, in case of specific errors I'm trying again with a new stream connection.
One thing I'm not sure is how to handle throughput error (ResourceExhausted), wether the whole chunk of data failed and I can safely resend it, or some rows maybe sent successfully and I need to remove them from the chunk before resend.
In the second case how can I tell which rows left to send, as I get offset -1 from GetResult(ctx) (since I'm not using offset)

if err := streamData(ctx, bigqueryWriteClient, destTable, descriptor, chunks, 0); err != nil {
        return fmt.Errorf("streamData(): %s", err)
}

func streamData(ctx context.Context,
	client *managedwriter.Client,
	destTable string,
	descriptor *descriptorpb.DescriptorProto,
	dataChunks []interface{},
	retry int,
) error {
	if len(dataChunks) == 0 {
		return nil
	}

	stream, err := client.NewManagedStream(ctx,
		managedwriter.WithType(managedwriter.DefaultStream),
		managedwriter.WithDestinationTable(destTable),
		managedwriter.WithSchemaDescriptor(descriptor))
	if err != nil {
		return fmt.Errorf("NewManagedStream(): %s", err)
	}

	for i, chunk := range dataChunks {
		rowsArr, ok := chunk.([]interface{})
		if !ok {
			return fmt.Errorf("chunk must be an array, got %T", chunk)
		}

		data := make([][]byte, len(rowsArr))
		for j, row := range rowsArr {
			rowProto, ok := row.(protoreflect.ProtoMessage)
			if !ok {
				return fmt.Errorf("row type suppose to be <protoreflect.ProtoMessage>, got: %T", row)
			}
			buf, err := proto.Marshal(rowProto)
			if err != nil {
				return fmt.Errorf("proto.Marshal(): %s", err)
			}
			data[j] = buf
		}

		result, err := stream.AppendRows(ctx, data)
		if err != nil {
			return fmt.Errorf("stream.AppendRows(): %s", err)
		}
		_, err = result.GetResult(ctx)
		if err != nil {
			if shouldRetry(retry, err) {
				log.Warningf(ctx, "trying to retry for the %d time because of an error %s", retry+1, err)
				retry++
				leftDataChunks := dataChunks[i:]
				if err := stream.Close(); err != nil { // close current stream
					return fmt.Errorf("stream.Close(): %s", err)
				}
	                         bo := time.Duration(500*retry) * time.Millisecond
				if err := gax.Sleep(ctx, bo); err != nil {
					return fmt.Errorf("gax.Sleep(): %s", err)
				}
		                 if status.Code(err) == codes.ResourceExhausted {
					// todo maybe some of the rows sent successfully and we need to re-send only part of dataChunks[i]
				}
				return streamData(ctx, client, destTable, descriptor, leftDataChunks, retry)
			}
			return fmt.Errorf("stream.GetResult(): %s", err)
		}
	}

	return nil
}

func shouldRetry(retry int, err error) bool {
	return retry < maxStreamRetry && (errors.Is(err, status.Error(codes.Unavailable, "the connection is draining")) || status.Code(err) == codes.ResourceExhausted)
}

@shollyman
Copy link
Contributor

Committing of rows within a single AppendRowsRequest is either all or none. You don't need to worry about partial commit.

You can potentially constrain the throughput on a single ManagedStream by setting more aggressive flow control (there's options to limit inflight bytes/requests), but the quota is on the aggregate write traffic per-project-per-region so backing off within a single streams write will likely still be a valuable response mechanism.

I've been contemplating augmenting AppendResult to make the work to re-enqueuing an append easier, but I haven't finalized anything at this point.

@Av1shay
Copy link
Author

Av1shay commented Sep 11, 2022

Hi @shollyman
There are another errors we encounter every now and then:

  • stream.GetResult(): rpc error: code = Internal desc = Internal error encountered. Entity: projects/project-id/datasets/dataset-id/tables/table-id/streams/_default
  • stream.GetResult(): rpc error: code = Aborted desc = The operation was aborted. Entity: projects/table-id/datasets/dataset-id/tables/table-id/streams/_default

These errors missing some details.. I'm not sure how should I handle it. Adding them to the retry flow will be valuable?

@shollyman
Copy link
Contributor

Yes, both should be safe to retry.

See https://pkg.go.dev/google.golang.org/grpc/codes for longer descriptions of the kinds of errors that get bucketed into each classification.

@shollyman
Copy link
Contributor

FYI working on retries, see #6695 for some of the details.

@shollyman
Copy link
Contributor

Closing this issue out. Please open a new issue if something else comes up here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquery Issues related to the BigQuery API. type: question Request for information or clarification. Not an issue.
Projects
None yet
Development

No branches or pull requests

2 participants