Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: PrepareMsg API to more easily parallelize serialization #2432

Open
dfawley opened this issue Nov 1, 2018 · 6 comments
Open

Proposal: PrepareMsg API to more easily parallelize serialization #2432

dfawley opened this issue Nov 1, 2018 · 6 comments

Comments

@dfawley
Copy link
Member

dfawley commented Nov 1, 2018

If the time to serialize and compress a message is less than the time to transmit it, a single stream is capable of saturating a network connection. This is because, in grpc-go, the N+1th message can be serialized and compressed while the Nth message is being transmitted. However, compression is usually a slow process that results in a small amount of data which can be quickly transmitted, so this is not usually the case with compression enabled. References: #1879, #2355.

We would like to separate the encoding and transmission steps so that users are able to perform multiple encodes simultaneously and take advantage of system parallelism.

Proposed API:

package grpc

type PreparedMsg struct { /* Nothing exported */ }

// Encode prepares msg into p (marshals and compresses) for use with s.SendMsg.
// msg may not be modified until after SendMsg is called with p.  p is not valid
// if a non-nil error is returned.
func (p *PreparedMsg) Encode(s grpc.Stream, msg interface{}) error { ... }

If a PreparedMsg is passed to SendMsg, SendMsg will use the PreparedMsg's internal buffer to send the message on the stream, bypassing the marshal and compress steps.

This API, as opposed to func NewPreparedMsg(msg interface{}) PreparedMsg, would allow users to re-use a PreparedMsg, and may save allocations of internal buffers.

@prannayk
Copy link
Contributor

This would require updating the grpc.Stream interface to get the required codec and compressors for the PreparedMsg. Right now there is no method to access those universally for any stream.

@dfawley
Copy link
Member Author

dfawley commented Jan 7, 2019

This would require updating the grpc.Stream interface to get the required codec and compressors for the PreparedMsg. Right now there is no method to access those universally for any stream.

We can do this without changing grpc.Stream. I was originally thinking we would type assert s to grpc.clientStream, but this would make it so Streams implemented by interceptors couldn't support this operation. Instead, we could store this information in the rpcInfo that we already store in the stream's context instead:

grpc-go/rpc_util.go

Lines 670 to 684 in e441557

type rpcInfo struct {
failfast bool
}
type rpcInfoContextKey struct{}
func newContextWithRPCInfo(ctx context.Context, failfast bool) context.Context {
return context.WithValue(ctx, rpcInfoContextKey{}, &rpcInfo{failfast: failfast})
}
func rpcInfoFromContext(ctx context.Context) (s *rpcInfo, ok bool) {
s, ok = ctx.Value(rpcInfoContextKey{}).(*rpcInfo)
return
}

...or add a new key/value in the context, but this one seems fine to extend.

An interceptor could mask this context completely, but that should be rare, and there are limits to what we can do given that we can't add methods to grpc.Stream.

@prannayk
Copy link
Contributor

prannayk commented Jan 10, 2019

I think a good idea is to do the following

Change rpcInfo to:

type rpcInfo struct {
    failfast.      bool
    codec        baseCodec
    cp              Compressor
    comp         encoding.Compressor
}

Following that, we will have to change the implementation of grpc.SendMsg for every stream individually. This should type assert m to PreparedMsg and use the marshalled and compressed data inside it if the type assertion succceeds. If assertion fails, revert to the present implemenation. This would be same for all streams, but will have to be implemented for every stream individually, since we can not make any changes to grpc.Stream.

Since we can not add methods to grpc.Stream we can not ensure that anyone implementing a new stream adheres to this, but this will efficiently implement it for the present implementation.

An alternate implementation is as follows :

  1. Create a type BaseStream struct which has a member as a function pointer called sendMsgFn
  2. BaseStream implements SendMsg which first type asserts to PreparedMsg. On success it passeds hdr, payload to the function pointer as sendMsgFn(hdr, payload). If not, then create the PreparedMsg on the fly.

This would ensure that every stream that extends BaseStream implements this feature in the right way. Any interceptor which wants to implement this can easily do this by extending the BaseStream struct. Otherwise, we will have to copy paste this, and implement this again everytime we implement a new stream that wants to implement this.

@menghanl
Copy link
Contributor

menghanl commented Jun 5, 2019

This is done for the sending side.

A similar optimization could be done on the receiving side, too.

@stale stale bot added the stale label Sep 6, 2019
@dfawley dfawley removed the stale label Sep 6, 2019
@grpc grpc deleted a comment from stale bot Mar 16, 2020
@eafzali
Copy link
Contributor

eafzali commented Mar 27, 2020

@prannayk is it possible to make this available for ServerStreams as well?

@eafzali
Copy link
Contributor

eafzali commented Mar 27, 2020

doing something like #3480 solved my problem at least. But I'm completely new to this codebase, can someone guide me how should I proceed with this? Like which kinda of testing would make sense for this? Or what would make sense for the failFast?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants