FastAPI 에서 Request Body 를 로깅하는 방법

FastAPI 에서 Request Body 를 로깅하는 방법
극찬하지만 쓰면 쓸수록...

의외로 운영에 매우 필요한 기능인데, 깔끔한 핸들링 방법을 못찾는다. 내가 원하는 요구사항은 이랬다.

  1. 절대로 API 로직이 있는 View코드 안에 로깅을 위한 코드가 1줄도 들어가선 안된다.
  2. 모든 함수마다 반복해서 적용해야 하는 것은 안된다.
  3. Middleware로 처리되었으면 좋겠다. (사실 이게 제일 어려운 포인트)

아무리 찾아봤지만, 뾰족하게 생각나는 방법은 없었다. starlette의 Request body를 까는 건 매우 까다로운 일이었다. Headers는 매우 쉽게 접근할 수 있었지만.

"Asynchronous..." 🤖

1. 로깅 코드를 API View 안에 넣으면 안되는 이유

422 Unprocessable Entity - HTTP | MDN
이 응답은 서버가 요청을 이해하고 요청 문법도 올바르지만 요청된 지시를 따를 수 없음을 나타냅니다.

일단, 어떤 요청으로 인해 422 (Unprocessable Entity) 가 일어났는데, 이를 디버깅할 수 없게 된다. view 함수를 실행하기도 전에 response 가 나가버리기 때문에 로깅되지 않을 것이다. 이런 이유때문에 Dependencies injection으로도 안되는 일이다.

2. 모든 함수마다 반복해서 적용해야 하는 것을 피해야 하는 이유

서비스가 커지게 되면, 유지보수에 문제가 생길 것이다. 이게 제일 큰 이유다. 그리고 다른 관점이지만 이는 1번의 이유도 포함된다.

3. Middleware 에서 처리하고 싶었던 이유

미들웨어 - 위키백과, 우리 모두의 백과사전

그러라고 미들웨어 쓰는거기 때문이다. 하지만, starlette 의 asynchronous처리를 위한 디자인때문에 이것또한 불가능하다. (FastAPI 를 쓰고 있다면 Starlette 문서도 꼭 숙지하고 개발하는 것을 추천한다.)

Starlette 의 Request

"미들웨어에서 request body를 컨슈밍하는 것이 문제다!" 라는 컨트리뷰터에게 👎 를 날리는 자들.. starlette 의 디자인을 알면 저런 반응을 할 리가...
Requests - Starlette
The little ASGI library that shines.
공식 문서를 읽어보세요. 껄껄.. 
  • await request.body() : 요청의 body를 bytes 로 받습니다.
  • await request.json() : 요청의 body를 json 포맷으로 받습니다.
  • await request.form() : 요청의 body를 form data 또는 multipart로 받습니다.

여기까지는 모두가 합의할 수 있는 인터페이스일 것이다. 하지만, 아래에서부턴 다른 라이브러리와는 좀 다르다.

request.stream()

타입은 AsyncGenerator[bytes, None] 이다. 소스 코드를 까보면 bytes chunk들을 yield해주는 Generator 함수다. 그리고 이 byte chunk들은 스트리밍되며 어떠한 공간에도 저장되지 않고 말 그대로 흐르기만 한다.

그래서 Stackoverflow에서 사람들은 이렇게 해결하고자 했다.

class CopyRequestMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        request_body = await request.json()
        request.state.body = request_body

        response = await call_next(request)
        return response

class LogRequestMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # Since it'll be loaded after CopyRequestMiddleware it can access request.state.body.
        request_body = request.state.body
        print(request_body)
    
        response = await call_next(request)
        return response
https://stackoverflow.com/questions/64115628/get-starlette-request-body-in-the-middleware-context

"흐르는 body를 state 라는 곳에 저장해보자!"라는 생각에 착안된 코드지만, 디자인적으로 맞지 않다. 이 방법은 request의 body를 middleware에서 미리 consume하기때문에, middleware 이후 body를 이용하고 싶을 때는 state 로 body를 사용해야 한다. 이 점이 매우 만족스럽지 못하다.  그리고 Starlette만 쓸 때는 가능할 수도 있겠다. 당연히 FastAPI에서는 사용 불가능이다.

해결책

Starlette.requests.Request특성 상 미들웨어 단에서 해결못한다는 결론을 내렸다. 그래서 Middleware와 View코드 사이의 순간을 찾아냈는데, FastAPI가 request body를 consume 하여 schema로 파싱하는 시점이다.  이 때 body가 consume되므로 schema를 validate 하는 시점에서 로깅하도록 pydantic 의 BaseRequestSchema 클래스를 아래와 같이 구현하였다.

import logging

from pydantic import BaseModel
from pydantic.class_validators import root_validator
from pythonjsonlogger import jsonlogger


formatter = jsonlogger.JsonFormatter()

logHandler = logging.StreamHandler()
logHandler.setFormatter(formatter)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logHandler)


class BaseRequestSchema(BaseModel):

    @root_validator(pre=True)
    def log_pre_validation(cls, values):

        logger.info('Request body', extra={'body': values})

        return values

그 외의 구현한 Middleware 도 공유한다.

import json
import logging

from fastapi import Request, Response
from pythonjsonlogger import jsonlogger
from starlette.responses import StreamingResponse


formatter = jsonlogger.JsonFormatter()

logHandler = logging.StreamHandler()
logHandler.setFormatter(formatter)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logHandler)


async def log_request(request: Request, call_next) -> StreamingResponse:

    # Headers
    headers = dict(zip(request.headers.keys(), request.headers.values()))
    logger.info('Request headers', extra=headers)

    # Query parameters
    if request.query_params:
        logger.info('Query parameters', extra={'query': request.query_params})

    response: StreamingResponse = await call_next(request)

    return response


async def log_response(request: Request, call_next) -> Response:

    response: StreamingResponse = await call_next(request)

    # Headers
    logger.info('Response headers', extra=dict(response.headers))

    response_body: bytes = b''
    async for chunk in response.body_iterator:
        response_body += chunk

    response_body: str = response_body.decode('utf-8')

    try:
        response_body_dict = json.loads(response_body)
    except json.JSONDecodeError:
        # `response_body`` was not a json format. (i.e. swagger page response, ...)
        logger.info('Response body', extra={'body': response_body})

    else:
        # Log json formatted response
        logger.info('Response body', extra={'body': response_body_dict})


    return Response(
        content=response_body,
        status_code=response.status_code,
        headers=dict(response.headers),
        media_type=response.media_type
    )

Updates..

  • 2022.2.18 갑자기 좋은 방식의 로깅이 떠올랐다. 주말에 디깅해보는 것으로!