Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use asyncio for TCP/TLS comms #5450

Merged
merged 16 commits into from
Dec 10, 2021
Merged

Use asyncio for TCP/TLS comms #5450

merged 16 commits into from
Dec 10, 2021

Conversation

jcrist
Copy link
Member

@jcrist jcrist commented Oct 21, 2021

This is a WIP PR for using asyncio instead of tornado for TCP/TLS comms.
There are a few goals here:

  • Reduce our dependency on tornado, in favor of the builtin asyncio
    support
  • Lower latency for small/medium sized messages. Right now the tornado
    call stack in the IOStream interface increases our latency for small
    messages. We can do better here by making use of asyncio protocols.
  • Equal performance for large messages. We should be able to make zero
    copy reads/writes just as before. In this case I wouldn't expect a
    performance increase from asyncio for large (10 MiB+) messages, but we
    also shouldn't see a slowdown.
  • Improved TLS performance. The TLS implementation in asyncio (and
    more-so in uvloop) is better optimized than the implementation in
    tornado. We should see a measurable performance boost here.
  • Reduced GIL contention when using TLS. Right now a single write or
    read through tornado drops and reaquires the GIL several times. If there
    are busy background threads, this can lead to the IO thread having to
    wait for the GIL mutex multiple times, leading to lower IO
    performance. The implementations in asyncio/uvloop don't drop the GIL
    except for IO (instead of also dropping it for TLS operations), which
    leads to fewer chances of another thread picking up the GIL and slowing
    IO.

This PR is still a WIP, and still has some TODOs:

  • Pause the read side of the protocol after a certain limit. Right now
    the protocol will buffer in memory without a reader, providing no
    backpressure.
  • The tornado comms version includes some unnecessary length info in
    it's framing that I didn't notice when implementing the asyncio version
    originally. As is, the asyncio comm can't talk to the tornado comm.
    We'll want to fix this for now, as it would break cross-version support
    (but we can remove the excess framing later if/when we make other
    breaking changes).
  • Right now the asyncio implementation is slower than expected for large
    frames. Need to profile and debug why.
  • Do we want to keep both the tornado and asyncio implementations around
    for a while behind a config knob, or do we want to fully replace the
    implementation with asyncio? I tend towards the latter, but may add an
    interim config in this PR (to be ripped out before merge) to make it
    easier for users to test and benchmark this PR.

Fixes #4513.

This is a WIP PR for using asyncio instead of tornado for TCP/TLS comms.
There are a few goals here:

- Reduce our dependency on tornado, in favor of the builtin asyncio
support
- Lower latency for small/medium sized messages. Right now the tornado
call stack in the IOStream interface increases our latency for small
messages. We can do better here by making use of asyncio protocols.
- Equal performance for large messages. We should be able to make zero
copy reads/writes just as before. In this case I wouldn't expect a
performance increase from asyncio for large (10 MiB+) messages, but we
also shouldn't see a slowdown.
- Improved TLS performance. The TLS implementation in asyncio (and
more-so in uvloop) is better optimized than the implementation in
tornado. We should see a measurable performance boost here.
- Reduced GIL contention when using TLS. Right now a single write or
read through tornado drops and reaquires the GIL several times. If there
are busy background threads, this can lead to the IO thread having to
wait for the GIL mutex multiple times, leading to lower IO
performance. The implementations in asyncio/uvloop don't drop the GIL
except for IO (instead of also dropping it for TLS operations), which
leads to fewer chances of another thread picking up the GIL and slowing
IO.

This PR is still a WIP, and still has some TODOs:
- Pause the read side of the protocol after a certain limit. Right now
the protocol will buffer in memory without a reader, providing no
backpressure.
- The tornado comms version includes some unnecessary length info in
it's framing that I didn't notice when implementing the asyncio version
originally. As is, the asyncio comm can't talk to the tornado comm.
We'll want to fix this for now, as it would break cross-version support
(but we can remove the excess framing later if/when we make other
breaking changes).
- Right now the asyncio implementation is slower than expected for large
frames. Need to profile and debug why.
- Do we want to keep both the tornado and asyncio implementations around
for a while behind a config knob, or do we want to fully replace the
implementation with asyncio? I tend towards the latter, but may add an
interim config in this PR (to be ripped out before merge) to make it
easier for users to test and benchmark this PR.
@jcrist
Copy link
Member Author

jcrist commented Oct 21, 2021

This is by no means ready for review yet, just getting it up to save it somewhere not on my computer.

If people want to try this out (it works, but I have known TODOs) though, please feel free. cc @gjoseph92 and @jakirkham, both of whom may be interested.

@jakirkham
Copy link
Member

Also PR ( MagicStack/uvloop#445 ) may be relevant for us when sending multiple buffers

Bug squashing.
- Workaround cpython bug that prevents listening on all interfaces when
also using a random port.
- Assorted other small cleanups
- Make asyncio protocol compatible with the tornado protocol. This adds
back an unnecessary header prefix, which can be removed later if
desired.
- Add an environment variable for enabling asyncio-based comms. They are
now disabled by default.
The builtin asyncio socket transport makes lots of copies, which can
slow down large writes. To get around this, we implement a hacky wrapper
for the transport that removes the use of copies (at the cost of some
more bookkeeping).
@jcrist
Copy link
Member Author

jcrist commented Nov 5, 2021

Ok, this all works now and is generally (with some caveats) faster than the existing tornado comms. It's surprisingly hard to isolate the comms from everything else (e.g. the protocol) - here's the benchmark I've been using:

Benchmark Script
import argparse
import os
from dask.utils import format_time, format_bytes
from distributed.comm import listen, connect, CommClosedError
from distributed.protocol.serialize import Serialized
import asyncio, time


async def handle_comm(comm):
    try:
        while True:
            msg = await comm.read()
            await comm.write(msg=msg)
    except CommClosedError:
        pass


class Bench:
    def __init__(self, n_bytes, n_frames, seconds):
        self.n_bytes = n_bytes
        self.n_frames = n_frames
        self.seconds = seconds
        self.running = True

    async def _loop(self):
        frames = [os.urandom(self.n_bytes) for _ in range(self.n_frames)]
        msg = Serialized({}, frames)
        async with listen("tcp://127.0.0.1:8080", handle_comm, deserialize=False):
            comm = await connect("tcp://127.0.0.1:8080", deserialize=False)
            start = time.time()
            nmsgs = 0
            while self.running:
                await comm.write(msg=msg)
                await comm.read()
                nmsgs += 1
            end = time.time()
            print(f"{format_time((end - start) / nmsgs)} roundtrip")
            await comm.close()

    async def _stopper(self):
        await asyncio.sleep(self.seconds)
        self.running = False

    async def entry(self):
        asyncio.create_task(self._stopper())
        await self._loop()

    def run(self):
        asyncio.get_event_loop().run_until_complete(self.entry())


def main():
    parser = argparse.ArgumentParser(description="Benchmark channels")
    parser.add_argument(
        "--frames",
        "-f",
        default=1,
        type=int,
        help="Number of frames per message",
    )
    parser.add_argument(
        "--bytes", "-b", default=1000, type=float, help="total payload size in bytes"
    )
    parser.add_argument(
        "--seconds", "-s", default=5, type=int, help="bench duration in secs"
    )
    parser.add_argument("--uvloop", action="store_true", help="Whether to use uvloop")
    args = parser.parse_args()

    if args.uvloop:
        import uvloop

        uvloop.install()

    print(
        f"Benchmarking -- n_frames = {args.frames}, "
        f"bytes_per_frame = {format_bytes(args.bytes)}, "
        f"uvloop={args.uvloop}"
    )

    Bench(int(args.bytes), args.frames, args.seconds).run()


if __name__ == "__main__":
    main()

A summary of results:

Tornado based comms

$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=0 python bench.py -f 1 -b 1e6
Benchmarking -- n_frames = 1, bytes_per_frame = 0.95 MiB, uvloop=False
842.27 us roundtrip
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=0 python bench.py -f 1 -b 1e5
Benchmarking -- n_frames = 1, bytes_per_frame = 97.66 kiB, uvloop=False
318.96 us roundtrip
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=0 python bench.py -f 1 -b 1e4
Benchmarking -- n_frames = 1, bytes_per_frame = 9.77 kiB, uvloop=False
265.72 us roundtrip
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=0 python bench.py -f 1 -b 1e3
Benchmarking -- n_frames = 1, bytes_per_frame = 0.98 kiB, uvloop=False
262.93 us roundtrip

Asyncio based comms

$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py -f 1 -b 1e6
Benchmarking -- n_frames = 1, bytes_per_frame = 0.95 MiB, uvloop=False
780.15 us roundtrip
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py -f 1 -b 1e5
Benchmarking -- n_frames = 1, bytes_per_frame = 97.66 kiB, uvloop=False
192.18 us roundtrip
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py -f 1 -b 1e4
Benchmarking -- n_frames = 1, bytes_per_frame = 9.77 kiB, uvloop=False
151.51 us roundtrip
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py -f 1 -b 1e3
Benchmarking -- n_frames = 1, bytes_per_frame = 0.98 kiB, uvloop=False
114.16 us roundtrip

So we're generally faster for single frame messages.

However! When writing multiple frames per message, we're slower:

$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=0 python bench.py -f 5 -b 1e6
Benchmarking -- n_frames = 5, bytes_per_frame = 0.95 MiB, uvloop=False
3.79 ms roundtrip
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py -f 5 -b 1e6
Benchmarking -- n_frames = 5, bytes_per_frame = 0.95 MiB, uvloop=False
4.49 ms roundtrip

I've tracked this down to this line here:

frames = memoryview(bytearray(frames_nbytes))
.

The tornado based comms are allocating a single large array for all frames in a message, then filling that in. In the asyncio comm, I allocate a separate array for each frame. The allocation overhead of creating multiple bytearray objects is the cause of the slowdown here (N calls to malloc are slower than 1 large call), not anything to do with the IO pattern. If I move to a single allocation in the asyncio comms, we're faster again. But I don't think that's a good idea. Allocating one large array for all frames means that the lifetime of all frames are tied together - no frame can be freed until they all can. This can mean that memory will hang around a lot longer than expected. This may be a source of memory issues in distributed (please correct me if I'm wrong).

Note that to get the perf on write we had to muck with asyncio's internals. The default socket transport in asyncio will make unnecessary copies on write (hence the _ZeroCopyWriter wrapper class). This isn't needed with uvloop (where those copies don't happen), but is needed (but not implemented) on windows. I can fix this for windows if we want, just haven't put in the work yet. Also note that this fix as currently written won't be useful in a TLS context on asyncio without uvloop, since it wraps the transport rather than replacing it. Since openssl requires data copies anyway, I suspect this won't matter much, but haven't done enough benchmarking yet to be sure.

@gjoseph92
Copy link
Collaborator

Just curious, what are the results with --uvloop? I'm curious to see both overall performance, and which of tornado vs asyncio is better helped by uvloop out of the box.

If I move to a single allocation in the asyncio comms, we're faster again. But I don't think that's a good idea. Allocating one large array for all frames means that the lifetime of all frames are tied together - no frame can be freed until they all can. This can mean that memory will hang around a lot longer than expected.

This is interesting, and at least deserves an xref to #5208. @jakirkham can speak to this a lot more, but because we currently split frames in our protocol, often 1 frame < 1 output object that needs contiguous memory (such as a NumPy array). So we do want each block of sub-frames to all use the same contiguous memory, so we can zero-copy deserialize them.
However, when there are multiple logical frames/objects per message, then I agree that they should get separate buffers. This seems like yet another argument for not splitting frames at the protocol level, and instead letting the transport handle that, as you've mentioned in the past.

@jcrist
Copy link
Member Author

jcrist commented Nov 8, 2021

Ok, here's a more complete benchmark script. It now supports multiple clients, enabling/disabling uvloop, and enabling/disabling TLS.

Benchmark Script
import argparse
import asyncio
import datetime
import os
from concurrent.futures import ProcessPoolExecutor

from distributed.comm import listen, connect, CommClosedError
from distributed.protocol.serialize import Serialized


DIR = os.path.abspath(os.path.dirname(__file__))
CERT = os.path.join(DIR, "bench-cert.pem")
KEY = os.path.join(DIR, "bench-key.pem")


def ensure_certs():
    if not (os.path.exists(KEY) and os.path.exists(CERT)):
        from cryptography import x509
        from cryptography.hazmat.backends import default_backend
        from cryptography.hazmat.primitives import hashes
        from cryptography.hazmat.primitives import serialization
        from cryptography.hazmat.primitives.asymmetric import rsa
        from cryptography.x509.oid import NameOID

        key = rsa.generate_private_key(
            public_exponent=65537, key_size=2048, backend=default_backend()
        )
        key_bytes = key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption(),
        )

        subject = issuer = x509.Name(
            [x509.NameAttribute(NameOID.COMMON_NAME, "ery-bench")]
        )
        now = datetime.datetime.utcnow()
        cert = (
            x509.CertificateBuilder()
            .subject_name(subject)
            .issuer_name(issuer)
            .public_key(key.public_key())
            .serial_number(x509.random_serial_number())
            .not_valid_before(now)
            .not_valid_after(now + datetime.timedelta(days=365))
            .sign(key, hashes.SHA256(), default_backend())
        )
        cert_bytes = cert.public_bytes(serialization.Encoding.PEM)
        with open(CERT, "wb") as f:
            f.write(cert_bytes)
        with open(KEY, "wb") as f:
            f.write(key_bytes)


def get_ssl_context():
    import ssl

    context = ssl.SSLContext(ssl.PROTOCOL_TLS)
    context.load_cert_chain(CERT, KEY)
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE
    return context


class BenchProc:
    def __init__(self, n_bytes, n_frames, n_seconds, n_clients, use_tls):
        self.n_bytes = n_bytes
        self.n_frames = n_frames
        self.n_seconds = n_seconds
        self.n_clients = n_clients
        self.use_tls = use_tls

    async def client(self, address, comm_kwargs, msg):
        comm = await connect(address, deserialize=False, **comm_kwargs)

        while self.running:
            await comm.write(msg)
            await comm.read()

        await comm.close()

    def stop(self):
        self.running = False

    async def run(self):
        self.running = True
        if self.use_tls:
            kwargs = {"ssl_context": get_ssl_context()}
            prefix = "tls"
        else:
            kwargs = {}
            prefix = "tcp"

        address = f"{prefix}://127.0.0.1:8080"
        msg = Serialized({}, [os.urandom(self.n_bytes) for _ in range(self.n_frames)])

        loop = asyncio.get_running_loop()
        loop.call_later(self.n_seconds, self.stop)
        tasks = [
            asyncio.create_task(self.client(address, kwargs, msg))
            for _ in range(self.n_clients)
        ]
        await asyncio.gather(*tasks, return_exceptions=True)


def bench_proc_main(n_bytes, n_frames, n_seconds, n_clients, use_tls):
    bench = BenchProc(n_bytes, n_frames, n_seconds, n_clients, use_tls)
    asyncio.run(bench.run())


async def run(n_bytes, n_frames, n_seconds, n_procs, n_clients, use_tls):
    if use_tls:
        ensure_certs()
        kwargs = {"ssl_context": get_ssl_context()}
        prefix = "tls"
    else:
        kwargs = {}
        prefix = "tcp"

    address = f"{prefix}://127.0.0.1:8080"

    loop = asyncio.get_running_loop()

    count = 0

    async def handle_comm(comm):
        nonlocal count
        try:
            while True:
                msg = await comm.read()
                await comm.write(msg=msg)
                count += 1
        except CommClosedError:
            pass

    async with listen(address, handle_comm, deserialize=False, **kwargs):
        with ProcessPoolExecutor(max_workers=n_procs) as executor:
            tasks = [
                loop.run_in_executor(
                    executor,
                    bench_proc_main,
                    n_bytes,
                    n_frames,
                    n_seconds,
                    n_clients,
                    use_tls,
                )
                for _ in range(n_procs)
            ]
            await asyncio.gather(*tasks)

    print(f"{count / n_seconds} RPS")
    print(f"{n_seconds / count * 1e6} us per request")
    print(f"{n_bytes * count / (n_seconds * 1e6)} MB/s each way")


def main():
    parser = argparse.ArgumentParser(description="Benchmark channels")
    parser.add_argument(
        "--procs", "-p", default=1, type=int, help="Number of client processes"
    )
    parser.add_argument(
        "--concurrency",
        "-c",
        default=1,
        type=int,
        help="Number of clients per process",
    )
    parser.add_argument(
        "--frames",
        "-f",
        default=1,
        type=int,
        help="Number of frames per message",
    )
    parser.add_argument(
        "--bytes", "-b", default=1000, type=float, help="total payload size in bytes"
    )
    parser.add_argument(
        "--seconds", "-s", default=5, type=int, help="bench duration in secs"
    )
    parser.add_argument("--uvloop", action="store_true", help="Whether to use uvloop")
    parser.add_argument("--tls", action="store_true", help="Whether to use TLS")
    args = parser.parse_args()

    if args.uvloop:
        import uvloop

        uvloop.install()

    n_bytes = int(args.bytes)

    print(
        f"processes = {args.procs}, "
        f"concurrency = {args.concurrency}, "
        f"bytes = {n_bytes}, "
        f"frames = {args.frames}, "
        f"seconds = {args.seconds}, "
        f"uvloop = {args.uvloop}, "
        f"tls = {args.tls}"
    )
    asyncio.run(
        run(n_bytes, args.frames, args.seconds, args.procs, args.concurrency, args.tls)
    )


if __name__ == "__main__":
    main()

There are many different axis to benchmark on, instead of posting complete results I'll just summarize the current status below. If you're interested in specific numbers, I encourage you to pull this branch locally and run the benchmark yourself.

Does uvloop help?

Kind of? It really depends on the bench flags used.

  • For tornado based comms, there's a minor (negligible) performance improvement with uvloop enabled. This makes sense, since tornado is implementing its own socket handling, and only using the loop to watch sockets for events. We're not running at scale large enough in this benchmark to stress the selector code much, so the default asyncio loop is fine here. This might matter more with thousands of sockets, but for this benchmark tornado with uvloop looks about the same as without.
  • For asyncio based comms, there's a noticeable performance improvement for small messages or with many connections. As message size increases / number of connections decrease the benefit of uvloop decreases. If TLS is enabled uvloop also performs measurably better (we'll get into why later). A few example benchmark runs:
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py -b 1e4 -p 3 -c 4
processes = 3, concurrency = 4, bytes = 10000, frames = 1, seconds = 5, uvloop = False, tls = False
15434.4 RPS
64.79033846472814 us per request
154.344 MB/s each way
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py -b 1e4 -p 3 -c 4 --uvloop
processes = 3, concurrency = 4, bytes = 10000, frames = 1, seconds = 5, uvloop = True, tls = False
19144.4 RPS
52.234596017634395 us per request
191.444 MB/s each way

Are the asyncio comms faster than the tornado comms (no TLS)?

Generally yes. This is especially true for small messages - as the message size increases the performance difference decreases. Here we benchmark with uvloop exclusively to show the maximum payoff. Without uvloop, asyncio yields at max 20% lower RPS (for small messages), and is slightly faster for large messages.

Some numbers are below, but in general expect the asyncio comms to be 2x faster than the tornado comms for messages <= 1e4 bytes, decreasing to equal performance for messages of 1e7 bytes.

msg size = 1e3 bytes
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py -b 1e3 -p 3 -c 4 --uvloop
processes = 3, concurrency = 4, bytes = 1000, frames = 1, seconds = 5, uvloop = True, tls = False
21230.2 RPS
47.10271217416699 us per request
21.2302 MB/s each way
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=0 python bench.py -b 1e3 -p 3 -c 4 --uvloop
processes = 3, concurrency = 4, bytes = 1000, frames = 1, seconds = 5, uvloop = True, tls = False
9412.2 RPS
106.24508616476488 us per request
9.4122 MB/s each way
msg size = 1e4 bytes
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py -b 1e4 -p 3 -c 4 --uvloop
processes = 3, concurrency = 4, bytes = 10000, frames = 1, seconds = 5, uvloop = True, tls = False
19551.4 RPS
51.14723242325358 us per request
195.514 MB/s each way
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=0 python bench.py -b 1e4 -p 3 -c 4 --uvloop
processes = 3, concurrency = 4, bytes = 10000, frames = 1, seconds = 5, uvloop = True, tls = False
8996.0 RPS
111.16051578479323 us per request
89.96 MB/s each way
msg size = 1e5 bytes
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py -b 1e5 -p 3 -c 4 --uvloop
processes = 3, concurrency = 4, bytes = 100000, frames = 1, seconds = 5, uvloop = True, tls = False
11346.8 RPS
88.13057425882188 us per request
1134.68 MB/s each way
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=0 python bench.py -b 1e5 -p 3 -c 4 --uvloop
processes = 3, concurrency = 4, bytes = 100000, frames = 1, seconds = 5, uvloop = True, tls = False
6839.4 RPS
146.2116559932158 us per request
683.94 MB/s each way
msg size = 1e6 bytes
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py -b 1e6 -p 3 -c 4 --uvloop
processes = 3, concurrency = 4, bytes = 1000000, frames = 1, seconds = 5, uvloop = True, tls = False
1737.6 RPS
575.5064456721916 us per request
1737.6 MB/s each way
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=0 python bench.py -b 1e6 -p 3 -c 4 --uvloop
processes = 3, concurrency = 4, bytes = 1000000, frames = 1, seconds = 5, uvloop = True, tls = False
1581.0 RPS
632.5110689437065 us per request
1581.0 MB/s each way
msg size = 1e7 bytes
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py -b 1e7 -p 3 -c 4 --uvloop
processes = 3, concurrency = 4, bytes = 10000000, frames = 1, seconds = 5, uvloop = True, tls = False
149.8 RPS
6675.567423230975 us per request
1498.0 MB/s each way
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=0 python bench.py -b 1e7 -p 3 -c 4 --uvloop
processes = 3, concurrency = 4, bytes = 10000000, frames = 1, seconds = 5, uvloop = True, tls = False
143.0 RPS
6993.006993006993 us per request
1430.0 MB/s each way

Are the asyncio comms faster than the tornado comms (with TLS)?

It depends. For small messages, yes. For large messages, no.

  • For messages < 1e5 bytes, asyncio (with uvloop) performs better (almost 2x better for small messages)
  • 1e5 byte messages seem to be the break-even point
  • For messages > 1e5 bytes, tornado performs better (1.3x higher RPS for 1e6 byte messages)

I haven't figured out why yet, but I suspect it's unnecessary buffer copying (probably on the write side).

Note that if uvloop is not used, then the asyncio comm performance tanks even further. This is due to a few things:

  • The asyncio socket transport will copy a buffer once if the full write doesn't go through immediately in a single socket.send call. This is more likely to happen for large messages, which are also already more expensive to copy. In our implementation we hack around this by manually managing the socket (this is what the _ZeroCopyWriter class does), but this doesn't apply when TLS is enabled (for that we'd need to patch out the whole transport, rather than wrap it, something that would be doable but would take a lot more code). The uvloop transports won't copy large buffers, so if uvloop is enabled this hackery is unnecessary.
  • The asyncio SSL protocol class currently also copies data more than necessary on read (it uses data_received rather than get_buffer), resulting in further slowdowns on large buffers.

With some work (mostly copying and modifying code out of asyncio in the stdlib to reduce copies) we could likely close the gap between uvloop & standard asyncio, but I don't think that work would be a good use of time.

msg size = 1e3 bytes
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py -b 1e3 -p 3 -c 4 --uvloop --tls
processes = 3, concurrency = 4, bytes = 1000, frames = 1, seconds = 5, uvloop = True, tls = True
14501.4 RPS
68.95885914463432 us per request
14.5014 MB/s each way
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=0 python bench.py -b 1e3 -p 3 -c 4 --uvloop --tls
processes = 3, concurrency = 4, bytes = 1000, frames = 1, seconds = 5, uvloop = True, tls = True
8184.6 RPS
122.18068079075336 us per request
8.1846 MB/s each way
msg size = 1e4 bytes
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py -b 1e4 -p 3 -c 4 --uvloop --tls
processes = 3, concurrency = 4, bytes = 10000, frames = 1, seconds = 5, uvloop = True, tls = True
13125.6 RPS
76.18699335649418 us per request
131.256 MB/s each way
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=0 python bench.py -b 1e4 -p 3 -c 4 --uvloop --tls
processes = 3, concurrency = 4, bytes = 10000, frames = 1, seconds = 5, uvloop = True, tls = True
7404.0 RPS
135.06212857914642 us per request
74.04 MB/s each way
msg size = 1e5 bytes
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py -b 1e5 -p 3 -c 4 --uvloop --tls
processes = 3, concurrency = 4, bytes = 100000, frames = 1, seconds = 5, uvloop = True, tls = True
3931.2 RPS
254.37525437525434 us per request
393.12 MB/s each way
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=0 python bench.py -b 1e5 -p 3 -c 4 --uvloop --tls
processes = 3, concurrency = 4, bytes = 100000, frames = 1, seconds = 5, uvloop = True, tls = True
3996.0 RPS
250.25025025025025 us per request
399.6 MB/s each way
msg size = 1e6 bytes
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py -b 1e6 -p 3 -c 4 --uvloop --tls
processes = 3, concurrency = 4, bytes = 1000000, frames = 1, seconds = 5, uvloop = True, tls = True
575.4 RPS
1737.9214459506431 us per request
575.4 MB/s each way
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=0 python bench.py -b 1e6 -p 3 -c 4 --uvloop --tls
processes = 3, concurrency = 4, bytes = 1000000, frames = 1, seconds = 5, uvloop = True, tls = True
761.4 RPS
1313.370107696349 us per request
761.4 MB/s each way

Assorted Thoughts

  • With TLS disabled, asyncio comms definitely perform better across all message sizes. At best, we might expect a 2x decrease in comm latency for small messages (e.g. scheduler <-> worker coordination). However! We currently use a batching mechanism (BatchedSend) to batch up small messages into larger payloads, so we the number of small messages on the wire is currently low. Batching comes at a cost (higher latency, higher memory usage, more copying, consistency issues (Properly support restarting BatchedSend #5481), ...), but in general makes throughput higher. We may not want to drop batching entirely, but if we move to using asyncio comms we may be able to reduce the batch interval and thus improve latency in scheduler <-> worker coordination.

  • For large (1e5 bytes +) messages, TLS is slower on asyncio + uvloop than it is on tornado, and I'm not yet sure why. On fast networks this would be a performance decrease, which means we probably wouldn't want to enable asyncio comms by default until they're faster in most cases.

  • None of these numbers might matter on common networks where the bandwidth may be the limiting factor. This may still matter for scheduler performance, where reducing comm CPU usage can make larger clusters more viable, or for clusters using fast networks, but for cloud computing even the tornado numbers are faster than typical bandwidth (see Testing network performance #5258).

  • The _ZeroCopyWriter hack is unfortunate, but I think maintainable. It doesn't support windows, but an equivalent patch for windows may be doable. I'm less enthused about implementing a full transport to speedup asyncio + TLS (without uvloop) performance, as that would be lots more code, but it is doable.

@jcrist
Copy link
Member Author

jcrist commented Nov 8, 2021

often 1 frame < 1 output object that needs contiguous memory (such as a NumPy array). So we do want each block of sub-frames to all use the same contiguous memory, so we can zero-copy deserialize them.

This is unfortunate, and would be good to fix. For messages that contain multiple buffers (e.g. a pandas dataframe composed of multiple numpy arrays), currently all numpy arrays will be backed by the same memoryview after deserialization. If an operation returns a single column, this would keep the whole memoryview in memory a lot longer than expected, and may masquerade as a memory leak. Frame splitting doesn't belong at the protocol level and would be better handled by the transports affected by it IMO.

@fjetter
Copy link
Member

fjetter commented Nov 10, 2021

TLS?

In case of uncertanity betewen no-TLS and TLS, we should emphasize TLS in this analysis. I expect most large scale users care about TLS. Large scale users will also be the ones who care about performance the most. I don't see anybody paying money for large scale deployments and high perf clusters who wouldn't want to protect their cluster with TLS. Although, running the benchmarks myself, I'm a bit shocked about how much we pay for TLS for lage payload... (I meassure a drop to ~30% throughtput using TLS with messages at 1e6 bytes)

How realistic are the benchmarks?

My initial thoughts about these benchmarks are that we have two different usages (actually three) of our comms and we might want to discuss both distinctly. I'm trying to discuss the edge cases where I see the system to be under heavy stress, i.e. the worst case.

Worker<->Worker communication to fetch data / send_recv

The worst case here is that we have many dependencies to fetch from many different workers. We already apply batching until we reach a size of Worker.target_message_size (hard coded to 50e6 bytes).
We have up to 50 (distributed.worker.connections.outgoing) of these concurrently trying to fetch, assuming there are 50 workers holding dependencies.
An interesting attribute of this edge case is that the receiving server is typicall also engaged in up to 10 incoming requests and is writing 10 * 50MB. I guess your benchmark is a decent proxy if we are using many processes and few concurrent clients within a process.

I would assume that something like
DISTRIBUTED_USE_ASYNCIO_FOR_TCP=0 python bench.py -b 1e6 -p 10 -c 2 -s 60 --tls would be a decent proxy?

This particular command gives me almost identical measurements for new vs old.

In this example, the main process would be the worker and it's communicating to ten other workers (processes). In this approximation we assume the remote workers are not under super heavy load. The concurrency of two is merely there for stressing out the main process.
A better mapping for the realism of this would be if the messages were asymmetric. Incomign small {'op': 'get-data', 'key': 'foo'}, outgoing large (payload).

Scheduler<->Worker administrative stream using BatchedSend in both directions.

It's worth pointing out that we recently did a benchmarking of the submission interval of our BatchedSend which is currently set to 2ms. Our intuition told us that this should be way too small since we don't actually end up batching a lot of messages in that time and started profiling this, see #5036. This benchmark used a task based shuffle and is, I believe, a good benchmark for our network architecture since it requires a lot of administration and a lot of W<->W thoughput at the same time.

The bottom line is that we were right in thinking that not that much batching is happening but the data was way too noisy to show any significance in either direction. However, as part of this evaluation we meassured buffer sizes in bytes and #messages, see histograms in #5036 (comment).

The histograms show the distribution of buffer length for multiple cluster sizes (vertical dim) and batched send intervals (horizontal). As we increase the interval, even up to 500ms, we can see more messages being buffered but the buffer never grows beyond 1e5 bytes even in a very extreme interval case.

Therefore, optimizing our administrative stream connection to messages <<1e5 bytes seems reasonable to me.

I would say this is a decent proxy for simulating the scheduler. Very high concurrency on all sides but small messages DISTRIBUTED_USE_ASYNCIO_FOR_TCP=0 python bench.py -b 1e3 -p 10 -c 5 -s 60 --tls?

Of course, this measurement was using the shuffle graph where we do not serialize a lot of user code as part of the task runspec. If user code is invovled, these numbers may easily jump a few orders. However, with short batching intervals, the <<1e5 threshold still feels safe.

I'm wondering if the different comm pattern for this plays a role. We're not doing send-recv-send-recv but it could rather be send-send-send-send-recv-recv-etc.

  1. The third kind of communication are administrative messages which are not using a BatchedSend. I think we can ignore those since they are typically one-off interactions (e.g. register client/worker/etc.). The most notable exception in this category is the worker hearbeat which is a recurring send_recv as in 1.) with small messages.

Conclusion

  1. Focusing on TLS, I cannot meassure a performance difference between asyncio(uvloop) and tornado for messages of size 1e6 (using ~ 10 processes). That reassures me that our W<->W dependency transfer should not be impacted.
  2. Small messages seem to perform genuinely better. Using the parameter set I suggested above, I meassure more than twice the RPS using uvloop. The batching measurements create a certain confidence that this is not too far off a realistic case.
  3. I'm carefully optimistic that we might be able to get rid of the current BatchedSend even. If the new asyncio comm without BatchedSend is better or equally performing than tornado + BatchedSend, I'd be inclined to throw it away entirely in favour of consistency.

Edit: I did ignore the number_frames parameter. Maybe I'm not smart enough for this but I don't see why this should impact us here.

@fjetter
Copy link
Member

fjetter commented Nov 10, 2021

Final remark: I think we should put those benchmarks into ASV or something similar for the sake of preservation. This is likely not the last time we're going to do this.

@mrocklin
Copy link
Member

mrocklin commented Nov 10, 2021 via email

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Nov 10, 2021

My understanding of the relation to GIL is:

  • The C implementation of SSL and sockets would need to be changed to not release the GIL for non-blocking reads/writes.
  • Getting this change upstream will be far easier and faster in uvloop than in cpython
  • We can use uvloop more effectively if we use asyncio directly, rather than tornado

But we've also talked about switching to asyncio as a benefit unto itself, since it reduces the technical debt of depending on tornado.

If we wanted to benchmark how much we could improve GIL convoy effect performance with asyncio vs tornado, then I think we'd want to use both @jcrist's patched uvloop, and patch cpython to do the same, then rerun these benchmarks while a GIL-holding thread is running in the background.

Intuitively though, I think GIL-convoy-effect performance is orthogonal to the asyncio vs tornado question, and even to uvloop vs pure asyncio.

The problem is just that a nonblocking read/write on the socket is in fact blocking, because some other thread holds the GIL when the read/write wants to return. Though uvloop's nonblocking read might be a tiny bit faster than asyncio's, everyone's nonblocking reads are way, way faster than the GIL switch interval.

So basically, when the convoy effect is in play, I'd expect any performance difference between asyncio/uvloop/tornado to be washed away. And if a low-level patch eliminates the convoy effect across the board, then I'd expect the performance differences we see in these current benchmarks to remain about the same whether or not there's a GIL-holding background thread.

Of course, if one library (uvloop) can eliminate the convoy effect while another (asyncio) can't, then that would become the dominant factor and any asyncio-vs-tornado effect might fade out—except if tornado doesn't use uvloop's sockets/SSL in a way that lets it take advantage of that (which IIUC maybe it doesn't?).

@jcrist
Copy link
Member Author

jcrist commented Nov 10, 2021

I agree with everything @gjoseph92 said above.

I made a quick modification to the benchmark script above to measure the effect of the GIL's interaction with the comms (source here).

This adds two new flags --gil-hold-time and --gil-release-time. If --gil-hold-time is set, a background thread is started that loops forever, holding the gil for the hold time and releasing it for the release time (so if only --gil-hold-time is set, the background thread holds the GIL forever). Note that the GIL holding thread only runs in the server, slowing down responses only.

The results are interesting.

First - some background on how cpython's GIL currently works. Given a process with two threads (like the benchmark script), where one thread hogs the GIL and the other does some IO:

  • The background thread should acquire the GIL every time the IO thread releases it.
  • The background thread shouldn't release the GIL until the switch interval is hit (default 0.005 s).

This means that we'd expect the IO thread to support releasing the GIL a maximum of 200 times a second. For asyncio & tornado (where each request has a socket.recv and a socket.send), we'd expect to cap around 100 RPS. UVLoop doesn't use the stdlib socket module, but does release and acquire the GIL as necessary during each iteration of its event loop.

For simplicity, we'll benchmark with a background thread that never releases the GIL (worst case scenario). We'll test 3 configurations:

  • tornado comms
  • asyncio comms
  • asyncio comms with uvloop

One Client, no TLS

$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=0 python bench.py --gil-hold-time=1
processes = 1, concurrency = 1, bytes = 1000, frames = 1, seconds = 5, uvloop = False, tls = False
gil_hold_time = 1.00 s, gil_release_time = 0.00 us
96.52289783563648 RPS
10360.235989835746 us per request
0.09652289783563649 MB/s each way
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py --gil-hold-time=1
processes = 1, concurrency = 1, bytes = 1000, frames = 1, seconds = 5, uvloop = False, tls = False
gil_hold_time = 1.00 s, gil_release_time = 0.00 us
97.04507931799715 RPS
10304.489491148766 us per request
0.09704507931799715 MB/s each way
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py --gil-hold-time=1 --uvloop
processes = 1, concurrency = 1, bytes = 1000, frames = 1, seconds = 5, uvloop = True, tls = False
gil_hold_time = 1.00 s, gil_release_time = 0.00 us
97.2629619923594 RPS
10281.405989656743 us per request
0.09726296199235938 MB/s each way

All 3 are stuck at ~100 RPS, matching the theory from above (each request releases the GIL twice, each release takes 0.005 seconds to reacquire). Now watch what happens if we increase the number of active clients:

6 clients, no TLS

$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=0 python bench.py -p3 -c2 --gil-hold-time=1
processes = 3, concurrency = 2, bytes = 1000, frames = 1, seconds = 5, uvloop = False, tls = False
gil_hold_time = 1.00 s, gil_release_time = 0.00 us
99.26745186019129 RPS
10073.795400816818 us per request
0.09926745186019129 MB/s each way
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py -p3 -c2 --gil-hold-time=1
processes = 3, concurrency = 2, bytes = 1000, frames = 1, seconds = 5, uvloop = False, tls = False
gil_hold_time = 1.00 s, gil_release_time = 0.00 us
84.39836410347574 RPS
11848.570889051362 us per request
0.08439836410347575 MB/s each way
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py -p3 -c2 --gil-hold-time=1 --uvloop
processes = 3, concurrency = 2, bytes = 1000, frames = 1, seconds = 5, uvloop = True, tls = False
gil_hold_time = 1.00 s, gil_release_time = 0.00 us
290.9671084955164 RPS
3436.8145773267333 us per request
0.2909671084955164 MB/s each way

In this case both tornado and asyncio are still capped at ~100 RPS, but uvloop is higher at 290 RPS. This surprised me upon first glance, but upon closer reading of uvloop's code it's due to uvloop being able to avoid excess GIL drop & reacquire calls during a single event loop iteration. This is only possible with multiple clients, as the request-response model means the event loop only has one thing to read/write every loop per socket. In practice a fully-blocking background thread is likely rare, but this does mean that uvloop handles this case better (though still not well). Tornado is not able to take advantage of this behavior of uvloop (since it uses raw socket operations), so no benefits there.

$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=0 python bench.py -p3 -c2 --gil-hold-time=1 --uvloop
processes = 3, concurrency = 2, bytes = 1000, frames = 1, seconds = 5, uvloop = True, tls = False
gil_hold_time = 1.00 s, gil_release_time = 0.00 us
96.68774637809385 RPS
10342.572223056446 us per request
0.09668774637809385 MB/s each way

1 client, TLS

Lastly, we test TLS behavior. Originally I thought that asyncio would handle this case better - I had misread tornado's SSL handling code and thought that every write tornado made with ssl enabled would release the GIL twice (once in ssl code, once in socket code), but this doesn't appear to be the case. The benchmarks show that actually asyncio code must release the GIL twice for every request, halving performance.

$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=0 python bench.py --tls --gil-hold-time=1processes = 1, concurrency = 1, bytes = 1000, frames = 1, seconds = 5, uvloop = False, tls = True
gil_hold_time = 1.00 s, gil_release_time = 0.00 us
96.0717432132376 RPS
10408.887843124005 us per request
0.0960717432132376 MB/s each way
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py --tls --gil-hold-time=1processes = 1, concurrency = 1, bytes = 1000, frames = 1, seconds = 5, uvloop = False, tls = True
gil_hold_time = 1.00 s, gil_release_time = 0.00 us
38.73910709375392 RPS
25813.708033586412 us per request
0.03873910709375392 MB/s each way
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py --tls --gil-hold-time=1 --uvloop
processes = 1, concurrency = 1, bytes = 1000, frames = 1, seconds = 5, uvloop = True, tls = True
gil_hold_time = 1.00 s, gil_release_time = 0.00 us
39.254480500048835 RPS
25474.798985016652 us per request
0.03925448050004884 MB/s each way

The previously described uvloop behavior helps a bit here with multiple clients, but it's stilll disadvantaged:

$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py --tls --gil-hold-time=1 --uvloop -p10
processes = 10, concurrency = 1, bytes = 1000, frames = 1, seconds = 5, uvloop = True, tls = True
gil_hold_time = 1.00 s, gil_release_time = 0.00 us
61.14442014312276 RPS
16354.722109707918 us per request
0.061144420143122755 MB/s each way

I haven't dug in yet to figure out where in the asyncio/uvloop code the GIL is being released twice. I suspect the write buffer size is smaller, so we're making more transport.write calls than tornado does, but that's just a guess.

@mrocklin
Copy link
Member

mrocklin commented Nov 11, 2021 via email

@jakirkham
Copy link
Member

Have caught up on reading this thread. Though not quite all of the code yet. Still digesting.

So this may be adding more dimensions than we want to think about atm, though I recall with ery UNIX domain sockets were explored. This has also come up here ( #3630 ) as well. IIRC these were faster than TCP. Is this something we've explored? No is a fine answer (and understand if other things take priority)

@jcrist
Copy link
Member Author

jcrist commented Nov 15, 2021

If we wanted to benchmark how much we could improve GIL convoy effect performance with asyncio vs tornado, then I think we'd want to use both @jcrist's patched uvloop, and patch cpython to do the same, then rerun these benchmarks while a GIL-holding thread is running in the background.

I've made a patch to CPython here. This results in negligible slowdown of asyncio when mixing IO and CPU intensive threads, provided that IO is mostly saturated. Anytime the asyncio thread would block (waiting for new IO events) the background thread reclaims the GIL and the IO thread will have to wait to reacquire it. This makes sense to me - do IO work when possible (to the detriment of the CPU work), then do CPU work when idle (note that the CPU thread can still steal the GIL after the switch interval if the IO thread doesn't release it by then).

Numbers:

# With the patch
$ DISTRIBUTED_USE_ASYNCIO_FOR_TCP=1 python bench.py --gil-hold-time=1 -p3 -c2
processes = 3, concurrency = 2, bytes = 100, frames = 1, seconds = 5, uvloop = False, tls = False
gil_hold_time = 1.00 s, gil_release_time = 0.00 us
3939.522629374948 RPS
253.8378616087965 us per request
0.39395226293749475 MB/s each way

# Without the patch
$ DISTRIBUTED_USE_ASYNCIO_TCP=1 python bench.py --gil-hold-time=1 -p3 -c2
processes = 3, concurrency = 2, bytes = 100, frames = 1, seconds = 5, uvloop = False, tls = False
gil_hold_time = 1.00 s, gil_release_time = 0.00 us
24.09949751510525 RPS
41494.64109669561 us per request
0.002409949751510525 MB/s each way

Since this patch will take a while to get into cpython (and/or may never get in), I don't think it's worth spending a ton more time on it (I spent an hour or so this morning looking into this), but it was an interesting experiment.

@jcrist
Copy link
Member Author

jcrist commented Nov 15, 2021

It might be useful to see how these changes affect the
scheduler performance and shuffle benchmarks directly. I appreciate that
this is possibly a pain, but it might shine more representative light on
the situation.

Agreed. This is flagged as a TODO in our internal queue, but it's good to flag it here as well.

So this may be adding more dimensions than we want to think about atm, though I recall with ery UNIX domain sockets were explored. This has also come up here ( #3630 ) as well. IIRC these were faster than TCP. Is this something we've explored? No is a fine answer (and understand if other things take priority)

This is unrelated, but also not terribly hard to support in asyncio. Unix sockets generally have lower latency on supported platforms, but on osx (and maybe others) they have significantly lower bandwidth than using TCP with loopback. So as to whether they're faster, the answer would be "it depends". They also only work for processes communicating on a single machine, which would be relevant for LocalCluster users and a few other configurations, but wouldn't benefit users deploying on distributed systems.

@gjoseph92
Copy link
Collaborator

@jcrist this CPython GIL experiment is really cool. Would it be possible to also report how many cycles the GIL-holding thread made it through? How much is the CPU thread's performance degraded by improving IO performance? I'm hoping that it's imbalanced (more IO doesn't hurt CPU much); that would make a patch like this much more likely to get into uvloop/cpython someday.

- Add config option for selecting TCP backend
- Parametrize TCP comm tests to test both backends
- A few small code cleanups
@jcrist jcrist marked this pull request as ready for review November 23, 2021 18:45
@jcrist
Copy link
Member Author

jcrist commented Nov 24, 2021

Ok, this should be ready for review. The windows test failures are a known flaky test. I'm not sure about the osx failures, they seem unrelated and don't fail every time.

@jcrist jcrist changed the title WIP: Use asyncio for TCP/TLS comms Use asyncio for TCP/TLS comms Nov 30, 2021
Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reviewed a bit over half of this so far; _ZeroCopyWriter coming up tomorrow. This is really cool!

I hadn't realized until reading this that you're now loading each frame into its own buffer, instead of one big buffer per message like the current TCP comm. I'm curious how much this is affecting the performance numbers, beyond just tornado vs asyncio. I would imagine it's a disadvantage compared to current TCP (more small mallocs). This is definitely the right way to go once we get frame-splitting out of serialization though.

distributed/comm/asyncio_tcp.py Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Outdated Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Outdated Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Outdated Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Outdated Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Outdated Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Show resolved Hide resolved
@jakirkham
Copy link
Member

Was rereading your comment above ( #5450 (comment) ) Jim about handling a large number of connections and this is probably already on your radar at Coiled, but one thing that comes up is users wanting to spin up large numbers of workers. For example ( #3691 ). ATM things don't behave particularly well there. Though that may be for all sorts of reasons. That said, faster/more reliable handling of more connections seems like an important first step. If uvloop is better at this, that itself may be an important reason to consider using it.

Due to flaws in distributed's current serialization process, it's easy
for a message to be serialized as hundreds/thousands of tiny buffers. We
now coalesce these buffers into a series of larger buffers following a
few heuristics. The goal is to minimize copies while also minimizing
socket.send calls.

This also moves to using `sendmsg` for writing from multiple buffers.
Both optimizations seem critical for some real world workflows.
Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_ZeroCopyWriter is cool!

distributed/comm/asyncio_tcp.py Outdated Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Outdated Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Outdated Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Outdated Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Outdated Show resolved Hide resolved
During extraction of `coalesce_buffers` a bug handling memoryviews of
non-single-byte formats was introduced. This fixes that.
- Adds write backpressure to the `_ZeroCopyWriter` patch
- Sets the high water mark for all transports - the default can be quite
low in some implementations.
- Respond to some feedback (still have more to do).
- Fix windows support (hopefully)
- Lots of comments and docstrings
- A bit of simplification of the buffer management
- Closed comms always raise `CommClosedError` instead of underlying OS
error.
- Simplify buffer peak implementation
- Decrease threshold for always concatenating buffers. With `sendmsg`
there's less pressure to do so, as scatter IO is equally efficient.
Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jcrist just curious, why'd you get rid of SENDMSG_MAX_SIZE? Just because there's not really extra cost to giving sendmsg more data than can fit into the socket's buffer?

distributed/comm/asyncio_tcp.py Outdated Show resolved Hide resolved
@jcrist
Copy link
Member Author

jcrist commented Dec 10, 2021

why'd you get rid of SENDMSG_MAX_SIZE? Just because there's not really extra cost to giving sendmsg more data than can fit into the socket's buffer?

Yeah. The main point was to reduce our processing cost for prepping the buffers for each sendmsg/send call, but it turns out that something simpler (but results in more sendmsg than send calls) was just as performant on real world benchmarks. No use for an extra magic config variable if we don't need it.

@jcrist
Copy link
Member Author

jcrist commented Dec 10, 2021

This keeps failing on some unrelated windows tests that I think are flaky (I've seen them fail elsewhere). I'm not sure how the changes here would cause these failures, as the comms used in those cases are still the tornado ones.

Anyway, I believe this to be good to merge now.

@fjetter
Copy link
Member

fjetter commented Dec 10, 2021

I think are flaky (I've seen them fail elsewhere).

The recent test failures on main are visualized on https://ian-r-rose.github.io/distributed/test_report.html can you find the flaky test there?

I would defer approval to @gjoseph92

@jcrist
Copy link
Member Author

jcrist commented Dec 10, 2021

One of the test failures is test_acquire_replicas_already_in_flight, which isn't in that chart, but the rest of the failures are. They also don't fail consistently, but generally I'm seeing at least one windows failure each test run.

@ian-r-rose
Copy link
Collaborator

ian-r-rose commented Dec 10, 2021

One of the test failures is test_acquire_replicas_already_in_flight, which isn't in that chart, but the rest of the failures are. They also don't fail consistently, but generally I'm seeing at least one windows failure each test run.

I see the first failure for test_aquire_replicas_already_in_flight as of a couple hours ago in main. It may indicate a new bug, I haven't looked into it.

Edit: it seems it's a brand new test from @crusaderky

Copy link
Member

@jakirkham jakirkham left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this Jim! 😄

Tried to do a more thorough review now that I have a bit more time to do so. Apologies for getting to this late.

distributed/comm/asyncio_tcp.py Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Show resolved Hide resolved
distributed/comm/asyncio_tcp.py Show resolved Hide resolved
@jcrist
Copy link
Member Author

jcrist commented Dec 10, 2021

I believe this to be ready for merge.

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've got a LGTM from M!

I'm very excited about this. Even if there are issues—which I don't think there are are after a number of reads over—there's really no risk in merging this, so let's get it in. This is a nice stepping-stone for a number of things we'd like to improve (GIL convoy effect, framing in serialization, serialization in general, proper uvloop support, ...)

@jakirkham
Copy link
Member

Will merge EOD if no comments

@jcrist jcrist merged commit bdc2189 into dask:main Dec 10, 2021
@jcrist jcrist deleted the asyncio-comms branch December 10, 2021 22:05
@crusaderky
Copy link
Collaborator

I see the first failure for test_aquire_replicas_already_in_flight as of a couple hours ago in main. It may indicate a new bug, I haven't looked into it.

Edit: it seems it's a brand new test from @crusaderky

Acknowledged; I'm looking it up

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Using asyncio directly in TCP
7 participants