FastAPI 에서 Request Body 를 로깅하는 방법

의외로 운영에 매우 필요한 기능인데, 깔끔한 핸들링 방법을 못찾는다. 내가 원하는 요구사항은 이랬다.
- 절대로 API 로직이 있는 View코드 안에 로깅을 위한 코드가 1줄도 들어가선 안된다.
- 모든 함수마다 반복해서 적용해야 하는 것은 안된다.
- Middleware로 처리되었으면 좋겠다. (사실 이게 제일 어려운 포인트)
아무리 찾아봤지만, 뾰족하게 생각나는 방법은 없었다. starlette의 Request body를 까는 건 매우 까다로운 일이었다. Headers는 매우 쉽게 접근할 수 있었지만.

1. 로깅 코드를 API View 안에 넣으면 안되는 이유

일단, 어떤 요청으로 인해 422 (Unprocessable Entity) 가 일어났는데, 이를 디버깅할 수 없게 된다. view 함수를 실행하기도 전에 response 가 나가버리기 때문에 로깅되지 않을 것이다. 이런 이유때문에 Dependencies injection으로도 안되는 일이다.
2. 모든 함수마다 반복해서 적용해야 하는 것을 피해야 하는 이유
서비스가 커지게 되면, 유지보수에 문제가 생길 것이다. 이게 제일 큰 이유다. 그리고 다른 관점이지만 이는 1번의 이유도 포함된다.
3. Middleware 에서 처리하고 싶었던 이유

그러라고 미들웨어 쓰는거기 때문이다. 하지만, starlette 의 asynchronous처리를 위한 디자인때문에 이것또한 불가능하다. (FastAPI 를 쓰고 있다면 Starlette 문서도 꼭 숙지하고 개발하는 것을 추천한다.)
Starlette 의 Request

👎
를 날리는 자들.. starlette 의 디자인을 알면 저런 반응을 할 리가...
await request.body()
: 요청의 body를 bytes 로 받습니다.await request.json()
: 요청의 body를 json 포맷으로 받습니다.await request.form()
: 요청의 body를 form data 또는 multipart로 받습니다.
여기까지는 모두가 합의할 수 있는 인터페이스일 것이다. 하지만, 아래에서부턴 다른 라이브러리와는 좀 다르다.
request.stream()
타입은 AsyncGenerator[bytes, None]
이다. 소스 코드를 까보면 bytes chunk들을 yield해주는 Generator 함수다. 그리고 이 byte chunk들은 스트리밍되며 어떠한 공간에도 저장되지 않고 말 그대로 흐르기만 한다.
그래서 Stackoverflow에서 사람들은 이렇게 해결하고자 했다.
class CopyRequestMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
request_body = await request.json()
request.state.body = request_body
response = await call_next(request)
return response
class LogRequestMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Since it'll be loaded after CopyRequestMiddleware it can access request.state.body.
request_body = request.state.body
print(request_body)
response = await call_next(request)
return response
"흐르는 body를 state 라는 곳에 저장해보자!"라는 생각에 착안된 코드지만, 디자인적으로 맞지 않다. 이 방법은 request의 body를 middleware에서 미리 consume하기때문에, middleware 이후 body를 이용하고 싶을 때는 state 로 body를 사용해야 한다. 이 점이 매우 만족스럽지 못하다. 그리고 Starlette만 쓸 때는 가능할 수도 있겠다. 당연히 FastAPI에서는 사용 불가능이다.
해결책
Starlette.requests.Request
의 특성 상 미들웨어 단에서 해결못한다는 결론을 내렸다. 그래서 Middleware와 View코드 사이의 순간을 찾아냈는데, FastAPI가 request body를 consume 하여 schema로 파싱하는 시점이다. 이 때 body가 consume되므로 schema를 validate 하는 시점에서 로깅하도록 pydantic 의 BaseRequestSchema
클래스를 아래와 같이 구현하였다.
import logging
from pydantic import BaseModel
from pydantic.class_validators import root_validator
from pythonjsonlogger import jsonlogger
formatter = jsonlogger.JsonFormatter()
logHandler = logging.StreamHandler()
logHandler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logHandler)
class BaseRequestSchema(BaseModel):
@root_validator(pre=True)
def log_pre_validation(cls, values):
logger.info('Request body', extra={'body': values})
return values
그 외의 구현한 Middleware 도 공유한다.
import json
import logging
from fastapi import Request, Response
from pythonjsonlogger import jsonlogger
from starlette.responses import StreamingResponse
formatter = jsonlogger.JsonFormatter()
logHandler = logging.StreamHandler()
logHandler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logHandler)
async def log_request(request: Request, call_next) -> StreamingResponse:
# Headers
headers = dict(zip(request.headers.keys(), request.headers.values()))
logger.info('Request headers', extra=headers)
# Query parameters
if request.query_params:
logger.info('Query parameters', extra={'query': request.query_params})
response: StreamingResponse = await call_next(request)
return response
async def log_response(request: Request, call_next) -> Response:
response: StreamingResponse = await call_next(request)
# Headers
logger.info('Response headers', extra=dict(response.headers))
response_body: bytes = b''
async for chunk in response.body_iterator:
response_body += chunk
response_body: str = response_body.decode('utf-8')
try:
response_body_dict = json.loads(response_body)
except json.JSONDecodeError:
# `response_body`` was not a json format. (i.e. swagger page response, ...)
logger.info('Response body', extra={'body': response_body})
else:
# Log json formatted response
logger.info('Response body', extra={'body': response_body_dict})
return Response(
content=response_body,
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.media_type
)
Updates..
- 2022.2.18 갑자기 좋은 방식의 로깅이 떠올랐다. 주말에 디깅해보는 것으로!