..

dissecting gunicorn: WSGI and pre-fork server model

What is WSGI and why it’s important?

What’s WSGI? Why do we need WSGI? WSGI is just a specification that allows to separate web framework from the web server.

Let’s start from the beginning. Back in the days, when you wanted to provide dynamic content on the web you had to delegate it to executable of your choice. Basically, web server took request, parsed it and launched executable. This executable was responsible for generating response which was then piped back to the web server. In order to make it interchangeable, Common Gateway Interface - CGI specification was created. It boils down to structure of environment variables used to pass parsed request data to the executable. For example, in Python you could read request method like this:

import os

REQUEST_METHOD = os.environ.get('REQUEST_METHOD', 'GET')

But there is a problem with that, launching executable for each request will be as slow and expensive operation. It will be even slower for interpreted languages like Python. This is where WSGI comes into play.

WSGI borrows from CGI in a way, instead of launching process each time it calls function within running process. Now, we have web server which sends data to translation layer, e.g. WSGI HTTP Server gunicorn. Gunicorn will parse HTTP request into WSGI format and call WSGI application. WSGI application is a callable that takes two arguments: environ and start_response. environ is a dictionary containing request data and start_response is a callable that takes status code and headers. WSGI application returns response body as iterable of bytes. Here is an example of WSGI callable:

def application(environ, start_response):
    if environ.get('PATH_INFO') == '/':
        status = '200 OK'
        response_headers = [('Content-type', 'text/plain')]
        start_response(status, response_headers)
        return [b'Hello, World!\n']
    else:
        status = '404 Not Found'
        response_headers = [('Content-type', 'text/plain')]
        start_response(status, response_headers)
        return [b'Not Found\n']

Note that WSGI application is not responsible for parsing request, it’s just a callable that takes parsed request data and returns response body. This is why WSGI is important, it allows to separate web framework from the web server. Web server can be written in any language and web framework can be written in any language. As long as they adhere to WSGI specification they can work together.

HTTP server - WSGI- Django as seen in WSGI for Web Developers (Ryan Wilson-Perkin) - YouTube

There is official python spec: PEP 3333 – Python Web Server Gateway Interface v1.0.1 which superseded PEP 333 – Python Web Server Gateway Interface v1.0

What is pre-fork server?

WSGI overcomes CGI limitations and helps to push more traffic through our app but GIL is still there and scaling app over multiple cores on the same machine is challenging for python applications. This is where pre-fork server model comes into play. It’s a server that forks a number of child processes to handle incoming connections. Before forking though it needs to open a socket and bind to it. Then each child process is responsible for handling incoming connections. Let’s see some code:

import socket
import os
import signal
import sys
import asyncio
import logging
import functools


def setup_server_socket(addr, port=8080):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
    sock.setblocking(0)
    sock.bind((addr, port))
    sock.listen(100)
    return sock


class AsyncWorker:
    def __init__(self, child_id, sock, loop=None):
        self.child_id = child_id
        self.sock = sock
        self._loop = asyncio.get_event_loop() if loop is None else loop
        self._task_pool = set()
        self._main_task = None
        self.logger = logging.getLogger(f"Worker-{child_id}")
        logging.basicConfig(level=logging.INFO)

    async def handle_connection(self, reader, writer):
        try:
            request_data = await reader.read(1024)
            self.logger.info(f"Worker {self.child_id} received {request_data}")
            writer.write(b"HTTP/1.1 200 OK\r\n")
            writer.write(b"Content-Type: text/plain\r\n")
            writer.write(b"Content-Length: 12\r\n")
            writer.write(b"\r\n")
            writer.write(b"Hello world!")
            await writer.drain()
            writer.close()
        except Exception as ex:
            self.logger.exception("Unexpected error while handling connection")
            writer.close()

    async def worker_loop(self):
        try:
            srv = await asyncio.start_server(self.handle_connection, sock=self.sock)
            srv_task = self._loop.create_task(srv.serve_forever())
            await srv_task
        except (asyncio.CancelledError, GeneratorExit):
            self.logger.info(f"Worker {self.child_id} loop cancelled")
            srv_task.cancel()
            return
        except:
            self.logger.exception("Unexpected error")

    def run(self):
        try:
            self._main_task = self._loop.create_task(self.worker_loop())
            self._loop.add_signal_handler(signal.SIGTERM, functools.partial(self.exit_gracefully, signal.SIGTERM))
            self._loop.run_until_complete(self._main_task)
        finally:
            self.logger.info("Main loop ended")
            self._loop.run_until_complete(self._loop.shutdown_asyncgens())
            self._loop.close()
            sys.exit(0)

    def exit_gracefully(self, signum):
        self.logger.info(f"Worker {self.child_id} received signal {signum}")
        self._main_task.cancel()
        self._loop.stop()

def main():
    sock = setup_server_socket("localhost", 8080)
    pids = []
    for i in range(3):
        pid = os.fork()
        if pid == 0:
            worker = AsyncWorker(i, sock)
            worker.run()
        else:
            pids.append(pid)
            print(f"Started worker {pid}")
    while True:
        print('Press Ctrl+C to exit')
        try:
            pid, status = os.wait()
            print(f"Worker {pid} exited with status {status}")
        except KeyboardInterrupt:
            for pid in pids:
                os.kill(pid, signal.SIGTERM)
            break

Wait, but what about python docs stating that:

The newly created socket is non-inheritable.

A file descriptor has an “inheritable” flag which indicates if the file descriptor can be inherited by child processes. Since Python 3.4, file descriptors created by Python are non-inheritable by default.

On UNIX, non-inheritable file descriptors are closed in child processes at the execution of a new program, other file descriptors are inherited.

Well, this is confusing but docs apply to calls like os.execv(), os.spawn() and subprocess not to os.fork(). This is reasonable in context of security since we don’t want to pass file descriptors to child processes that we don’t trust. In case of os.fork() child process inherits file descriptors from parent process. This is why we can pass sock to child processes, and they can use it to accept incoming connections.

Forking process is expensive and pre-fork model allows to overcome that since child processes are created once and then reused to handle incoming connections. Why not threads? Well, threads are not a good fit for Python due to GIL. Forking allows to bypass GIL and utilize multiple cores on single machine.

WSGI and asynchronous execution

According to PEP 3333 WSGI application should be synchronous. This is a problem if one wants to squeeze as much as possible from web server since asynchronous execution is often good fit for such use case . There are two ways to handle this:

Let’s dive into details. Gunicorn defines Arbiter which is responsible for managing workers. Worker is a process that handles incoming connections. aiohttp.GunicornWebWorker is gunicorn worker class provided by aiohttp - asynchronous HTTP client/server framework for asyncio to handle incoming connections and recommended by gunicorn documentation for async applications . Let’s see how it’s done:

class Arbiter(object):
    ...
    def spawn_worker(self):
        self.worker_age += 1
        worker = self.worker_class(self.worker_age, self.pid, self.LISTENER,
                                    self.app, self.timeout/2.0, self.cfg)
        self.cfg.pre_fork(self, worker)
        pid = os.fork()
        if pid != 0:
            self.WORKERS[pid] = worker
            return
        # Process Child
        worker_pid = os.getpid()
        try:
            util._setproctitle("worker [%s]" % self.proc_name)
            self.log.info("Booting worker with pid: %s" % worker_pid)
            self.cfg.post_fork(self, worker)
            worker.init_process()
            sys.exit(0)
        except SystemExit:
            raise
    ...

self.LISTENER is the socket opened by the arbiter and then passed down to each worker.

class GunicornWebWorker(base.Worker):  # type: ignore[misc,no-any-unimported]
    ...
    def init_process(self) -> None:
        # create new event_loop after fork
        asyncio.get_event_loop().close()

        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)

        super().init_process()

    def run(self) -> None:
        self._task = self.loop.create_task(self._run())

        try:  # ignore all finalization problems
            self.loop.run_until_complete(self._task)
        except Exception:
            self.log.exception("Exception in gunicorn worker")
        self.loop.run_until_complete(self.loop.shutdown_asyncgens())
        self.loop.close()

        sys.exit(self.exit_code)

    async def _run(self) -> None:
        runner = None
        if isinstance(self.wsgi, Application):
            app = self.wsgi
        elif asyncio.iscoroutinefunction(self.wsgi):
            wsgi = await self.wsgi()
            if isinstance(wsgi, web.AppRunner):
                runner = wsgi
                app = runner.app
            else:
                app = wsgi
        else:
            raise RuntimeError(
                "wsgi app should be either Application or "
                "async function returning Application, got {}".format(self.wsgi)
            )

        if runner is None:
            access_log = self.log.access_log if self.cfg.accesslog else None
            runner = web.AppRunner(
                app,
                logger=self.log,
                keepalive_timeout=self.cfg.keepalive,
                access_log=access_log,
                access_log_format=self._get_valid_log_format(
                    self.cfg.access_log_format
                ),
                shutdown_timeout=self.cfg.graceful_timeout / 100 * 95,
            )
        await runner.setup()

        ctx = self._create_ssl_context(self.cfg) if self.cfg.is_ssl else None

        runner = runner
        assert runner is not None
        server = runner.server
        assert server is not None
        for sock in self.sockets:
            site = web.SockSite(
                runner,
                sock,
                ssl_context=ctx,
            )
            await site.start()
...

GunicornWebWorker class initializes new event loop after fork and then runs worker. It’s responsible for creating web.AppRunner and web.SockSite which are used to handle incoming connections.

class SockSite(BaseSite):
    ...
    async def start(self) -> None:
        await super().start()
        loop = asyncio.get_event_loop()
        server = self._runner.server
        assert server is not None
        self._server = await loop.create_server(
            server, sock=self._sock, ssl=self._ssl_context, backlog=self._backlog
        )

web.SockSite creates server using loop.create_server which accepts previously opened socket and starts listening for incoming connections similarly to asyncio.start_server though loop.create_server is a bit more low-level and allows to pass protocol_factory directly.

class Server:
    ...
    def __call__(self) -> RequestHandler:
        try:
            return RequestHandler(self, loop=self._loop, **self._kwargs)
        except TypeError:
            # Failsafe creation: remove all custom handler_args
            kwargs = {
                k: v
                for k, v in self._kwargs.items()
                if k in ["debug", "access_log_class"]
            }
            return RequestHandler(self, loop=self._loop, **kwargs)
    ...

Protocol factory web.Server is called when new connection comes in and creates RequestHandler object which implements asyncio streaming protocol. Most connection handling and parsing is by done this object.

As you can see using aiohttp.GunicornWebWorker allows to run asynchronous code within gunicorn worker, but it’s not WSGI compliant. It uses its own HTTP parser and doesn’t adhere to WSGI specification. This is problem is to be solved by Asynchronous Server Gateway Interface. ASGI interface looks like this:

async def application(scope, receive, send):
    event = await receive()
    ...
    await send({"type": "websocket.send", ...})

and if HTTP request comes in, event would like this:

{
    "type": "http.request",
    "body": b"Hello World",
    "more_body": False,
}

Uvicorn is an example of ASGI server, it’s kind of gunicorn for ASGI applications. FastAPI is example of popular ASGI web framework just like Flask.

Conclusions

  • pre-fork server model allows to overcome GIL limitations and utilize multiple cores on single machine
  • WSGI specification allows to separate web framework from the web server but doesn’t support asynchronous execution
  • aiohttp.GunicornWebWorker is gunicorn worker class provided by aiohttp to handle incoming connections and recommended by gunicorn documentation for async applications, but it’s not WSGI compliant, for example it has its own HTTP parser
  • there is no load-balancing between gunicorn workers - whichever process is first to accept connection will handle it
  • ASGI gained steam as it’s more modular than aiohttp and allows to run asynchronous code for web applications - notable examples are FastAPI, Starlette
  • ASGI is not yet fully baked, there is no official PEP for it and there is some criticism about it, for example

    For instance, ASGI design is built around the idea that the socket transport and the threading paradigm is handled by Python itself; a condition that might lead to inefficient paradigms when looking at implementation coming from different languages. We can summarise this concept into this phrase: ASGI expects the lower protocol to be handled by Python. - granian/docs/spec/RSGI.md at master · emmett-framework/granian

References

  1. os — Miscellaneous operating system interfaces — Python 3.11.8 documentation
  2. WSGI for Web Developers (Ryan Wilson-Perkin) - YouTube
  3. Jacob Kaplan Moss Let’s build a web framework! PyCon 2017 - YouTube

Changelog

adds: less obvious conclusion around process co-operation
added: conclusions section
adds: dissecting gunicorn, wsgi and pre-fork model