Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ordered send_recv pattern for RPCs #7480

Open
fjetter opened this issue Jan 17, 2023 · 2 comments · May be fixed by #8430
Open

Ordered send_recv pattern for RPCs #7480

fjetter opened this issue Jan 17, 2023 · 2 comments · May be fixed by #8430
Labels
asyncio core enhancement Improve existing functionality or make things work better feature Something is missing

Comments

@fjetter
Copy link
Member

fjetter commented Jan 17, 2023

We are currently supporting two ways to trigger RPC calls.

  1. PooledRPCs allow many-to-many communication send_recv patterns. Every request is using a dedicated connection that is pooled and potentially reused after succesful completion of the RPC. [1]
  2. BatchedComm is using a bidirectional one-to-one connection that is typically used to submit multiple smaller messages and submits a batch of messages every Xms.

A notable difference between the two methods is that 2.) guarantees message ordering while 1.) does not.

For many applications ordering is essential, e.g. to implement consistent distributed transactions.
It is very easy for developers to fall back to the pooled RPC approach that does not provide ordering whenever a response is useful. If both a response and ordering is required, there is currently no proper way to implement this.

This issue intends to discuss the possibiltiy of introducing standard functionality to support this use case, i.e. provide an ordered send_recv functionality

[1] Using a dedicated connection is primarily useful to submit large amounts of payload data s.t. the stream is not blocked for other smaller messages while the large payload is being submitted.

Naive approach w/out additional infrastructure

It is possible to write a send_recv-like pattern using the batched stream by breaking a coroutine into two, e.g.

# Server A

async def batched_ordered():
    bcommToB: BatchedSend
    bcommToB.write({
        "op": "inc",
        "ix": 1,
    })

async def handle_inc_response(x):
    assert x == 2


# Server B
async def handle_inc(comm, x):
    comm.write({
        "op": "inc-response",
        "ix": x + 1,
    })

This approach has a couple of downsides

  1. Writing to a BatchedSend in the first place is awkward since we need to assemble the message ourselves. The PooledRPC is building the msg for us, mimicking an actual remote call.
  2. The handler that needs/wants to respond cannot simply return the result but needs to take care of writing to the comm itself.
  3. ServerA needs to implement a dedicated response handler which is cumbersome and decouples the response from the caller, i.e. it is impossible to share local context

Suggested approach

Ideally, the batched, ordered communication allows a similar access pattern as the PooledRPCCalls and constructs the message behind the scenes. Further, any returned results would be sent back to the caller.
Effectively, stream handlers and comm handlers would be identical and there would no longer be the need to distinguish the two.

# Server A

async def batched_ordered():
    bcommToB: OrderedBatchedSendRcv
    expected = 2
    res = await bcommToB.inc(1)
    assert res == expected

# Server B
async def handle_inc(x):
    return x + 1

To enable such an API we would need to change both sender and receiver of the stream to deal with responses.

Sender

On sender side we are stricing to provide a similar API to what PooledRPCCall is currently offering.
A notable difference between the PooledRPCCall and this suggested approach is that the batched comm we are using here is already used/read in Server.handle_stream, i.e. we cannot wait for the response ourselves. Instead we can introduce a new handler that is listening to all responses (or special case/inline this into handle_stream)

class BatchedResponseMixin:  # Could be directly part of the server, ofc

    stream_handlers = {
        "bcomm_recv": self.handle_send_recv_batched_response
    }
    _response_events : dict[Comm, dict[str, asyncio.Future]]= {}

    async def handle_send_recv_batched_response(self, comm, response, request_id):
        if response["status"] == "OK":
            self._response_events[comm][request_id].set_result(response["result"])
        else:
            self._response_events[comm][request_id].set_exception(response["result"])

   def send_recv_batched(self, bcomm, msg):
        request_id : int = _gen_unique_id()  # e.g. a counter
        msg["__request__id__"] = request_id
        self._response_events[bcomm][request_id] = fut = asyncio.Future()
        bcomm.send(**msg)
        return fut

# Should mimick PooledRPCCall as good as possible
class RPCCallBatched:
    def __init__(self, addr, server: BatchedResponseMixin):
        self.server = server
        self.bcomm = server.batched_comms[addr]

    def __getattr__(self, key):
        # A kwarg reply==False version could just construct the message, send and return 
        # w/out a future to await or instantiate the future and set a None result immediately
        # for API consistency. This way we could get rid of all the {"op": ...} messages in the
        # code base but we'd have a reply kwarg... not sure what's the lesser evil

        async def send_recv_batched(**kwargs):
            return await self.server.send_recv_batched(
                self.bcomm, op=key, **kwargs
            )
        return send_recv_batched

The sender generates a unique ID for every request (could be simply a monotonic counter) and attaches this to every request. It also creates and an asyncio.Future (or some other synchronization primitive) that is mapped to that request ID and can be awaited to receive the result once available. We then use a dedicated stream handler send_recv_batched that is accepting the response and is setting the result to the future. The receiver side subsequently submits the response to this dedicated response handler with the original request_id.

Receiver

async def _wrap_stream_handler(bcomm, request_id, handler, **kwargs):
    try:
        res = await handler(**kwargs)
        msg = {
            "status": "OK",
            "response": res,
            "request_id": request_id,
        }
        bcomm.send(msg)

Caveats

The above pseudo code is obviously not complete. I believe the most critical missing component is us dealing with dead remotes. The current send_recv will naturally fail since the used comm is being closed once the remote is dead. Since we are not awaiting the comm.read we will need to handle this case explicitly in the handle_stream try/except/finally.

cc @hendrikmakait @graingert

@fjetter fjetter added enhancement Improve existing functionality or make things work better core feature Something is missing asyncio labels Jan 17, 2023
@hendrikmakait
Copy link
Member

One thing to define is which ordering guarantees we want to provide. For example, handle_stream only guarantees the correct processing order for synchronous handlers (#7486 (comment)).

@fjetter
Copy link
Member Author

fjetter commented Jan 27, 2023

There are also use cases where such a send/reply pattern comes with a certain complexity in the context of async code.

async def handle_operation(self, *args, **kwargs):
    do_import_stuff()
    # At this point we would like to queue up the message to the stack of messages on batched send.#7486 
    return_response("response")
    # Only afterwards we want to await something but we have to send a reply first because we do not
    # know if any other coroutine might produce another message that could conflict with foo and destroy the ordering
    await cleanup()

Real world example #7486 (comment)

A possible way around this would be to use an async generator s.t. the syntax would look like

async def handle_operation(self, *args, **kwargs):
    do_import_stuff()
    # At this point we would like to queue up the message to the stack of messages on batched send.#7486 
    yield "response"
    # Only afterwards we want to await something but we have to send a reply first because we do not
    # know if any other coroutine might produce another message that could conflict with foo and destroy the ordering
    await cleanup()

a less sophisticated version could also pass the BatchedSend object to the coroutine to send replies, similarly to how we do it with coms. However, this would also require a relatively costly refactoring for a mediocre solution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
asyncio core enhancement Improve existing functionality or make things work better feature Something is missing
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants