[Python] WSGI 서버를 직접!! 만들어보자

OSI 7계층, 소켓, 파이썬 WSGI를 통해 직접 웹서버를 만들어보았다.

계기

사실 이 포스팅은 원래 서버의 동시성에 대해 포스팅하려고 했었다. 그런데 글을 작성하다 보니 생각 외로 꼬리 질문이 많이 생겨났다. 그래서 그 의문점을 해소해 보고자 그냥 직접 HTTP 서버를 만들면서 동시성을 테스트해 보기로 했다. 이번에는 파이썬을 이용해 개발했다. 파이썬에서는 GIL 때문에 동시성이 어려운데, 이를 어떻게 해결했는지도 궁금해서 파이썬으로 골랐다.

전체 코드는 깃허브에서 확인해 볼 수 있다.

OSI 7계층

OSI 7계층과 TCP/IP 4계층

우선 OSI 7계층에 대해 알아보자.

네트워크 통신 체계는 계층마다 특정 프로토콜로 이루어져 있고, 데이터 전달은 높은 계층에서 낮은 계층으로 각각의 프로토콜에 맞게 캡슐화(encapsulation)되고 데이터 수신은 낮은 계층에서 높은 계층으로 역캡슐화(decapsulation)된다.

OSI 7계층에서 전송과 수신

OSI 7계층에서의 L1, L2, L3, L4의 저수준 계층의 경우 하드웨어를 직접 조작하는 것이 아니면, 일반적으로 접근할 방법이 없다. 그래서 추상화가 중요해지는데 OS 단계에서 제공하는 추상화가 바로 소켓이다.

여기서 말하는 추상화란?
추상화는 비슷하지만, 다른 두 가지의 타입을 하나로 여기고 작업을 할 수 있도록 하는 것을 말한다. 소켓은 모두 다른 L1, L2, L3, L4 하드웨어 드라이버를 신경 쓰지 않고 하나의 코드로 네트워크 작업을 할 수 있도록 도와준다. 만약 소켓이 없었다면, 개발자는 L1, L2, L3, L4 하드웨어 드라이버에 맞게 코드를 짜야 한다. 또한 그렇게 작성한 코드는 동일한 하드웨어 구조가 아니면 다른 기기에서 작동하지 않는다.

소켓

소켓은 앞서 말한 것처럼, 추상화된 네트워크 접속 방법이다. 그렇다면 그 위치는 어디 있을까?

소켓 위치

바로 전송 계층 위에 존재한다. 대부분의 유명한 프로토콜들은(HTTP, SMTP, FTP, DNS)등은 응용 계층 프로토콜이다. 소켓을 사용할 줄 알고, 프로토콜에 대한 명세를 알고 있다면, 대부분 비슷하게 따라 만들 수 있다.

그래서 필자도 직접 웹서버를 만들어 볼 생각을 할 수 있었다.

HTTP

그럼 HTTP는 실제로 어떻게 요청을 받고 어떻게 응답할까?

생각보다 간단하다.

  • 요청(Request)

    • 기본 구조
      {method} {path} {version}
      {header_key_1}: {header_value_1}
      {header_key_2}: {header_value_2}
      {header_key_3}: {header_value_3}
      ...
      
      {body}
      
    • 예시
      GET / HTTP/1.1
      Host: fhdufhdu.github.io
      Accept: */*
      

      POST / HTTP/1.1
      Host: fhdufhdu.github.io
      Accept: */*
      Content-Type: application/json
      
      {"test": "테스트 데이터 입니다."}
      
  • 응답(Reponse)

    • 기본 구조
      {version} {status}
      {header_key_1}: {header_value_1}
      {header_key_2}: {header_value_2}
      {header_key_3}: {header_value_3}
      ...
      
      {body}
      
    • 예시
      HTTP/1.1 200 OK
      Date: Sat, 09 Oct 2010 14:28:02 GMT
      Server: Apache
      Last-Modified: Tue, 01 Dec 2009 20:18:22 GMT
      ETag: "51142bc1-7449-479b075b2891b"
      Accept-Ranges: bytes
      Content-Length: 29769
      Content-Type: text/html
      
      <html>...</html>
      

위의 형태를 소켓에 주고 받기를 하면 된다.

WSGI

파이썬에는 WSGI라는 것이 있다. Web Server Gateway Interface 라는 것의 약자로, 웹 서버와 웹 어플리케이션 사이의 요청/응답 방법을 정의해놓은 것이다.

구조도

WSGI는 WSGI Server와 WSGI Application으로 구성된다.

  • WSGI Server
    • 소켓을 이용해서 TCP 통신을 하는 곳
    • 받은 데이터에서 path, method, query, body, header 등 HTTP Request 데이터 역캡슐화
    • WSGI Application에서 받은 응답 데이터를 HTTP Response 명세에 맞게 캡슐화
  • WSGI Application
    • WSGI Server에게 받은 데이터로 처리 후 응답
    • 대표적인 WSGI Application에는 Django가 있음
    • Django가 하는 일이 WSGI Application의 역할임

WSGI Application 구현

WSGI Application은 WSGI Server가 전달한 데이터를 받고 처리를 거친 후 데이터를 반환할 것이다.

그렇게하기 위해서는 WSGI Server(이하 server)와 WSGI Application(이하 app) 간에 약속된 호출 방식과 데이터 구조가 있어야한다.

  • app은 Callable Object 이어야한다
    • app(environ, start_response) 형태로 실행 가능 해야 한다
  • server는 app에게 두 가지 정보를 전달해주어야 한다.
    1. environ
      • method, path, query, version, host, body 등의 정보를 담은 Dict 객체
    2. start_response
      • status, headers를 파라미터로 가지는 Callback 함수
      • app에서 이 함수를 호출해서 server에게 status와 header를 전달해야한다.
  • app은 Iterator[bytes]를 반환해야한다.
    • 이 데이터는 HTTP body에 들어갈 데이터이다.
    • 이터레이터를 반환해야 한다는 말은,
      for x in app(environ, start_response)가 작동해야 한다는 의미이다.

그럼 간단한 app 구현체를 보자. 파이썬의 빌트인 라이브러리에서 간단한 app을 제공해준다.

## from wsgiref.simple_server import demo_app

def demo_app(environ, start_response):
    start_response("200 OK", [('Content-Type','text/plain; charset=utf-8')])
    return ["Hello World!!".encode("utf-8")]

이 app은 함수로 구성되어 있다. 파이썬에서는 함수도 Callable Object이기 때문에 유효한 구현이다.

그리고 start_response를 통해 status와 header를 서버에게 전달해준다. return 또한 list로 반환하기 때문에 for x in demo_app(environ, start_response)가 구동 가능하므로 WSGI를 충족한다.

이제 함수로 구성된 app을 클래스로 변경해보자

class DemoApp:
    def __init__(self, environ, start_response):
        self.environ = environ
        self.start_response = start_response
    
    def __iter__(self):
        self.start_response("200 OK", [('Content-Type','text/plain; charset=utf-8')])
        yield "Hello World!!".encode("utf-8")

조금은 복잡해졌다. 이 코드를 이해하려면 __iter__, __next__, generator를 이해해야 한다. 이 게시글에서는 설명하지 않도록 하겠다.

__iter__함수가 generator이므로 for x in DemoApp(environ, start_response)가 구동 가능하다. 그러므로 WSGI를 충족한다.

WSGI Server 구현

app이 어떻게 구성되는지 알아보았다. 이제 핵심은 server를 구현해보고자 한다. 앞서 간단한 app을 만들어 보았지만, app 부분은 이번 구현 목적에서 크게 중요한 것이 아니기 때문에 Django로 대체하고자 한다.

소켓으로 HTTP 서버 만들기

import socket

## TCP로 소켓 연결
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as ss:
    # 소켓이 사용하던 포트가 종료되면, 기본적으로 해당 포트를 일정시간 동안 사용을 막음. 그 설정을 풀어주는 코드

    ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    # 포트에서 사용할 host와 port
    ss.bind(('localhost', 1026))
    # 포트를 실제로 점유하고, accept 요청을 얼마나 저장할 지 설정(=100)
    ss.listen(100)

    while True:
        # 클라이언트 접속
        cc, address = ss.accept()

        # 클라이언트로부터 데이터 수신
        recv_data = cc.recv(1048576)

        # http request 양식대로 분류 작업
        recv_data = recv_data.replace(b'\r\n', b'\n')

        splited_recv_data = recv_data.split(b'\n\n', 1)
        if len(splited_recv_data) == 2:
            http_info, body = splited_recv_data
        else:
            http_info = recv_data
            body = b''
        # 분류 작업 끝
        
        # 받은 데이터 출력
        print("[http 정보]",  http_info.decode('utf-8'))
        print("body 정보", body)
        
        # 클라이언트에게 데이터 발신
        cc.sendall(b'HTTP/1.1 200 OK\nDate: Sat, 09 Oct 2010 14:28:02 GMT\nServer: Apache\nLast-Modified: Tue, 01 Dec 2009 20:18:22 GMT\nETag: "51142bc1-7449-479b075b2891b"\nAccept-Ranges: bytes\nContent-Length: 29769\nContent-Type: text/html\n\n<html><body>Hello World!!</body></html>')
        cc.close()

이 코드를 실행시키고 localhost:1026으로 접속하면 아래와 같이 접속이 잘 되는 것을 확인 할 수 있다.

소켓 서버 HTTP로 접속 성공

또한 서버 내 로그도 아래와 같이 잘 작동하는 모습을 보여준다.

소켓 서버 로그

소켓 서버를 WSGI로 변경하기

자 이제 소켓으로 HTTP 서버를 열어보았다. 이제 WSGI에 충족하도록 WSGI Server를 제작해보자.

전체코드는 아래와 같다. 코드는 계속해서 업데이트 될 예정이므로, 깃허브 링크도 참고하자!

import importlib
import io
import multiprocessing
import re
import signal
import socket
from multiprocessing import Process


class MyGunicornHandler:
    def __init__(self, ss: socket.socket, app_path: str):
        self.ss = ss
        self.app_path = app_path
        self.status_and_headers = {"status": 200, "headers": []}
    

    def run(self):
        try:
            c_proc = multiprocessing.current_process()
            _from, _import = self.app_path.split(":", 1)

            module = importlib.import_module(_from)
            app = getattr(module, _import)

            while True:
                print("START: ", c_proc.name, " || ", "PID: ", c_proc.pid)
                conn, address = self.ss.accept()

                raw_data = conn.recv(1048576)
                if not raw_data:
                    conn.close()
                    break

                raw_data = raw_data.replace(b"\r\n", b"\n")
                splited_raw_data = raw_data.split(b"\n\n", 1)

                if len(splited_raw_data) == 2:
                    b_headers, b_body = splited_raw_data
                else:
                    b_headers, b_body = (raw_data, b"")

                headers = b_headers.decode("utf-8")
                headers = headers.rsplit("\n")

                method, path, version_of_protocol = headers[0].split(" ")
                if "?" in path:
                    path, query = path.split("?", 1)
                else:
                    path, query = path, ""
                environ = {
                    "REQUEST_METHOD": method,
                    "SERVER_PROTOCOL": version_of_protocol,
                    "SERVER_SOFTWARE": "WOOSEONG_WSGI",
                    "PATH_INFO": path,
                    "QUERY_STRING": query,
                    "REMOTE_HOST": address[0],
                    "REMOTE_ADDR": address[0],
                    "wsgi.input": io.BytesIO(b_body),
                    "wsgi.url_scheme": "http",
                    "wsgi.version": (1, 0),
                }

                for idx, header in enumerate(headers):
                    if idx == 0:
                        continue
                    key, value = re.split(r"\s*:\s*", header, 1)

                    key = key.replace("-", "_").upper()
                    value = value.strip()

                    make_key = lambda x: "HTTP_" + x
                    if key in ("CONTENT_TYPE", "CONTENT_LENGTH"):
                        environ[key] = value
                    elif make_key(key) in environ:
                        environ[make_key(key)] += "," + value
                    else:
                        environ[make_key(key)] = value

                def start_response(status, headers):
                    self.status_and_headers["status"] = status
                    self.status_and_headers["headers"] = headers

                response_body = app(environ, start_response)
                # 응답 첫번째 라인 구성
                response_first = (
                    f"{version_of_protocol} {self.status_and_headers['status']}"
                )
                # 응답 헤더부분 구성
                response_headers = "\r\n".join(
                    list(
                        map(
                            lambda x: f"{x[0]}: {x[1]}",
                            self.status_and_headers["headers"],
                        )
                    )
                )
                # 응답 첫번째 라인 + 헤더 부분
                response = (
                    response_first
                    + ("\r\n" if response_headers else "")
                    + response_headers
                    + "\r\n\r\n"
                )
                # byte로 인코딩
                response = response.encode("utf-8")
                # response_body 붙이기
                for b in response_body:
                    response += b

                conn.send(response)
                conn.close()

                print("END: ", c_proc.name, " || ", "PID: ", c_proc.pid)
        except KeyboardInterrupt:
            pass


class MyGunicorn:
    def __init__(self):
        # 소켓 생성
        self.ss = self.__init_socket__()
        # 프로세스 리스트
        self.ps = []
        # graceful shutdown 추가
        signal.signal(signal.SIGINT, self.close)
        signal.signal(signal.SIGTERM, self.close)

    # 소켓 생성
    def __init_socket__(self):
        ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        return ss

    def run(
        self,
        app_path: str,
        host: str = "localhost",
        port: int = 1026,
        backlog: int = 100,
        worker=4,
    ):
        self.ss.bind((host, port))
        self.ss.listen(backlog)

        for _ in range(worker):
            Process(
                target=MyGunicornHandler.run_on_process, args=(self.ss, app_path)
            ).start()

    def close(self, signum, frame):
        print(f"shutdown: {signum}")
        self.ss.close()


if __name__ == "__main__":
    MyGunicorn().run(app_path="wsgiserver.wsgi:application", worker=16, backlog=1000)

소켓을 사용하는 서버부분

class MyGunicorn:
    def __init__(self):
        # 소켓 생성
        self.ss = self.__init_socket__()
        # graceful shutdown 추가
        signal.signal(signal.SIGINT, self.close)
        signal.signal(signal.SIGTERM, self.close)

    # 소켓 생성
    def __init_socket__(self):
        ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        return ss

    def run(
        self,
        app_path: str,
        host: str = "localhost",
        port: int = 1026,
        backlog: int = 100,
        worker=4,
    ):
        self.ss.bind((host, port))
        self.ss.listen(backlog)

        for _ in range(worker):
            handler = MyGunicornHandler(ss=ss, app_path=app_path)
            Process(target=handler.run).start()

    def close(self, signum, frame):
        print(f"shutdown: {signum}")
        self.ss.close()

서버가 생성되면, 소켓을 먼저 설정한다. 그리고 종료시 안전하게 종료될 수 있도록 signal를 이용해서 종료 신호에 반응하도록 한다.

이후 동시성을 위해 멀티 프로세싱을 이용해 프로세스를 여러 개 띄운다. 이 프로세스에서 클라이언트의 요청을 실제로 처리하게 된다. 실제로 Gunicorn은 동시성을 위해 스레드를 사용하는 것이 아니라, 프로세스를 사용한다. 그 이유는 맨 처음 언급했듯이, GIL 때문이다.

GIL(Global Interpreter Lock)
GIL은 하나의 스레드가 활성화 되면 다른 스레드가 활성화 되지 않도록 락을 걸고 사용하는 것을 의미한다. 이는 CPython에만 존재하며 다른 파이썬 구현체에는 존재하지 않는다. I/O 바운드에는 GIL이 잘 적용되지 않아서 스레드가 효율적이고, CPU 바운드에는 GIL이 적극적으로 적용된다고 한다. 웹 서버의 경우 CPU 바운드와 I/O 바운드가 적절하게 존재하므로 스레드를 사용하는 것보다 프로세스를 사용하는 것이 더 효율적이다.

요청을 처리하는 부분

각 프로세스마다 Handler 클래스를 하나씩 만들어 실행시킨다.

이후 해당 run 함수 부분에서는 크게 “request를 파싱해서 environstart_response를 적절하게 만들어서 app에게 제공하는 부분”, “app에게 response를 받아서 소켓을 통해 발신하는 부분"으로 나뉜다.

  1. environstart_response를 적절하게 만들어서 app에게 제공하는 부분
    • 상세 명세는 해당 문서를 확인하자
                # 클라이언트 연결
                conn, address = self.ss.accept()

                # 데이터 수신
                raw_data = conn.recv(1048576)

                # 데이터가 없으면 종료
                if not raw_data:
                    conn.close()
                    break

                # 데이터를 받아서 http info 부분과 body 부분 분리
                raw_data = raw_data.replace(b"\r\n", b"\n")
                splited_raw_data = raw_data.split(b"\n\n", 1)

                if len(splited_raw_data) == 2:
                    b_headers, b_body = splited_raw_data
                else:
                    b_headers, b_body = (raw_data, b"")

                headers = b_headers.decode("utf-8")
                headers = headers.rsplit("\n")
                
                # http info 첫 번째 라인에서 method, paht, version_of_protocol 추출
                method, path, version_of_protocol = headers[0].split(" ")
                if "?" in path:
                    path, query = path.split("?", 1)
                else:
                    path, query = path, ""
                
                # environ 제작
                environ = {
                    "REQUEST_METHOD": method,
                    "SERVER_PROTOCOL": version_of_protocol,
                    "SERVER_SOFTWARE": "WOOSEONG_WSGI",
                    "PATH_INFO": path,
                    "QUERY_STRING": query,
                    "REMOTE_HOST": address[0],
                    "REMOTE_ADDR": address[0],
                    "wsgi.input": io.BytesIO(b_body),
                    "wsgi.url_scheme": "http",
                    "wsgi.version": (1, 0),
                }

                # environ에 http header 정보 추가
                for idx, header in enumerate(headers):
                    if idx == 0:
                        continue
                    key, value = re.split(r"\s*:\s*", header, 1)

                    key = key.replace("-", "_").upper()
                    value = value.strip()

                    make_key = lambda x: "HTTP_" + x
                    if key in ("CONTENT_TYPE", "CONTENT_LENGTH"):
                        environ[key] = value
                    elif make_key(key) in environ:
                        environ[make_key(key)] += "," + value
                    else:
                        environ[make_key(key)] = value

                # app에게서 response status와 header를 받는 함수 개발
                def start_response(status, headers):
                    self.status_and_headers["status"] = status
                    self.status_and_headers["headers"] = headers
  1. app에게 response를 받아서 소켓을 통해 발신하는 부분
                response_body = app(environ, start_response)
                # 응답 첫번째 라인 구성
                response_first = (
                    f"{version_of_protocol} {self.status_and_headers['status']}"
                )
                # 응답 헤더부분 구성
                response_headers = "\r\n".join(
                    list(
                        map(
                            lambda x: f"{x[0]}: {x[1]}",
                            self.status_and_headers["headers"],
                        )
                    )
                )
                # 응답 첫번째 라인 + 헤더 부분
                response = (
                    response_first
                    + ("\r\n" if response_headers else "")
                    + response_headers
                    + "\r\n\r\n"
                )
                # byte로 인코딩
                response = response.encode("utf-8")
                # response_body 붙이기
                for b in response_body:
                    response += b

                conn.send(response)
                conn.close()

결과

실행 시

첫 실행에 프로세스가 제대로 띄워지는 것을 볼 수 있다.

이후 브라우저에서 접속을 하면

접속 시

아래 처럼 잘 접속되는 것을 볼 수 있다.

동시성 테스트도 진행해보았다. 조건은 아래와 같다.

  • 0.1초 단위로 요청 전송
  • 동시에 1000명 접속
  • worker=16
  • backlog =1000

동시성 테스트 결과

왼쪽이 필자가 구현한 MyGunicorn이고 오른쪽이 Gunicorn이다. 상당히 유사한 그래프를 그리는 것을 볼 수 있다.

이 결과를 보고나서, 정말 기뻤다. 생각보다 성능이 밀리지 않는 것 같아서 기분이 매우 좋더라. 물론, Gunicorn에 비해 많은 허점이 있을 것이다.

그렇지만 처음부터 공식문서만 보고 하나하나 시도해서 성공했다는 것에 너무 기쁘고 성장한 것 같아서 기분이 좋다.

Last updated on 2024-02-15 AM 9:00