..

simple load balancer as an exercise - pure asyncio implementation

In my previous post I’ve attempted to present solution for simple load balancer using Python and Starlette. I was wondering though how such solution could look like if implemented using python streams, asyncio.start_server and asyncio.open_connection. Here is my take on it though by all means not perfect. My intention was to write solution based on asynchronous streams without external dependencies in order to compare it with baseline (uvicorn-based/starlette) solution. I was curious how complicated it would be to implement it and how performance is going to differ from baseline.

HTTP server

I’m leveraging asyncio.start_server to listen on socket and create task for each incoming connection. Each connection is handled in _handle_connection handler. First request is parsed and wrapped in handy Request object. handle_request function can return Response object or None. If it returns Response it means it was registration request. Otherwise, request is proxied to the upstream and closed. HTTPLoadBalancer object keeps reference to the RoundRobinTargets which tracks which upstream server should receive incoming connection.

class HTTPLoadBalancer:
    def __init__(self, host, port, buffer_limit=1024 * 1024 * 64, backlog=100):
        self._host = host
        self._port = port
        self._buffer_limit = buffer_limit
        self._backlog = backlog
        self._server = None
        self._lb_targets = RoundRobinTargets()

    async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
        request = Request(reader)
        response = None
        logger.debug("Handling new connection")
        try:
            await request.parse()
        except Exception:
            logger.exception("Unhandled exception")
        try:
            response = await handle_request(self._lb_targets, request, writer)
        except asyncio.TimeoutError:
            response = ProxyResponse(504)
        except asyncio.CancelledError:
            response = ProxyResponse(500)
        except Exception:
            logger.exception("Unhandled exception")
            response = ProxyResponse(500)
        finally:
            if response:
                await self.send_response(response, writer)

    async def send_response(self, response: ProxyResponse, writer: asyncio.StreamWriter):
        try:
            writer.write(response.as_bytes())
            await writer.drain()
            writer.close()
            await writer.wait_closed()
        except Exception:
            logger.exception("Unable to send response to the client")

    async def start(self):
        server_options = {"limit": self._buffer_limit, "backlog": self._backlog}
        self._server = await asyncio.start_server(self._handle_connection, self._host, self._port, **server_options)
        async with self._server:
            await self._server.serve_forever()

    async def stop(self):
        self._server.close()
        await self._server.wait_closed()

Proxying request

Main HTTP handler decides if request is registration request or should be proxied to the upstream. If it needs to be proxied then headers are sanitized and request path is set accordingly to target parameters. Since request stream was already read up to the point of request body I’m creating asynchronous generator which takes initially read data (request line and sanitized headers) and StreamReader object. It’s going to be used by proxify as stream of data to be sent to the upstream.

async def handle_request(
    lb_targets: RoundRobinTargets, request: Request, writer: asyncio.StreamWriter
) -> Optional[ProxyResponse]:
    if request.method == "POST" and request.path == "/register":
        response = await handle_target_registration(lb_targets, request)
        return response
    else:
        try:
            target = lb_targets.get_next(request.path)
        except ValueError:
            return ProxyResponse(503, "No targets available")
        sanatized_headers = get_sanitized_headers(request.headers, target)
        rewritten_path = request.path
        if target.path != "/":
            rewritten_path = request.path.replace(target.path, "/")
        headers_as_bytes = sanatized_headers.as_bytes()
        request_line = f"{request.method} {rewritten_path} {request.http_version}\r\n".encode("utf-8")
        refreshed_reader = stream(request._reader, initial_data=request_line + headers_as_bytes)
        await proxify(refreshed_reader, writer, target)

proxify function opens up connection to the upstream server and replays request by forwarding data from local_reader to the upstream_writer. Another task is created to forward upstream response to the client. This time though ProxyResponseTransformer is used to wrap upstream_reader which allows to perform transformation of upstream response before it’s being forwarded to the client. It’s not crucial but allows to remove Connection header. I was interested how such implementation would allow me to deal with real life scenarios like AWS chunked stream

async def proxify(local_reader: AsyncIterator[bytes], local_writer, upstream_host: Host, connection_timeout: int = 5):
    logger.debug("Proxying connection to %s", upstream_host)
    connection = asyncio.open_connection(upstream_host.ip_address, upstream_host.port)
    upstream_reader, upstream_writer = await asyncio.wait_for(connection, timeout=connection_timeout)
    try:
        async with asyncio.TaskGroup() as tg:
            forward_request = tg.create_task(stream_copy(local_reader, upstream_writer))
            transformed_upstream_reader = ProxyResponseTransformer(upstream_reader)
            forward_response = tg.create_task(stream_copy(transformed_upstream_reader.transformed(), local_writer))
            forward_request.add_done_callback(lambda _: logger.debug("Request forwarding done"))
            forward_response.add_done_callback(lambda _: logger.debug("Response forwarding done"))
    finally:
        await upstream_writer.drain()
        upstream_writer.close()
        await upstream_writer.wait_closed()

Conclusions

I’m surprised how easy it was to implement. On the other hand it’s not exactly on par with what can be done using uvicorn/starlette. Consider for example connection pooling available for free in httpx.AsyncClient (connection pools are in fact implemented in HTTPCore) can speed up connect time by order of magnitude. Another thing is limited concurrency. Pure asyncio implementation runs as one process and so it won’t be able to make use of all available CPUs while uvicorn can run many workers out of the box. To be fair starlette-based implementation I’ve done already is not going to run according to requirements when used with many uvicorn workers either. That’s because RoundRobinTargets is initialized at application level and so each uvicorn worker will have its own instance of RoundRobinTargets. This will of course start and even forward requests to the upstream servers, but it won’t distribute requests according to round-robin algorithm. Pure asyncio solution can be extended with some effort to make use of all CPUs and will be fun to explore. It might be even easier to just that than trying to achieve the same with uvicorn.

Finally, here is full source code if you would like to explore in more details.