FastAPI에 Rate-limit(Throttling) 구현


Implementation
Redis 클러스터 혹은 인스턴스 생성
k8s 프로젝트 내에 redis 클러스터를 추가하였다. 운영 환경에 따라 다를 수도 있겠지만, 아래와 같은 manifest를 사용한다. 사실 꼭 redis를 사용할 필요는 없지만, 파이썬 라이브러리의 성숙함과 멀티 Pod으로 운영되었을 때를 고려한다면 쓰는 것이 좋다.
apiVersion: v1
kind: Pod
metadata:
name: <APPLICATION>-redis
labels:
app: redis
spec:
containers:
- name: redis
image: redis:latest
command:
- redis-server
env:
- name: MASTER
value: "true"
ports:
- containerPort: 6379
name: redis
volumeMounts:
- mountPath: /redis-master-data
name: data
volumes:
- name: data
emptyDir: { }
---
apiVersion: v1
kind: Service
metadata:
name: <APPLICATION>-redis
labels:
app: redis
spec:
selector:
app: redis
ports:
- name: redis
protocol: TCP
port: 6379
targetPort: 6379
Redis client 초기화
FastAPI 프로젝트는 아래의 구조를 갖고있다.
.
├── api
├── cache
├── core
├── dependencies
├── factories
├── middlewares
├── models
├── schemas
├── system
├── tasks
├── tests
├── utils
├── app_api.py
└── app_worker.py
redis 초기화 관련 코드는 cache
모듈을 통해 관리한다. aioredis
를 사용하였다.
from typing import Union
from aioredis import Redis, from_url
from core.config import settings
_redis: Union[Redis, None] = None
def set_client() -> None:
global _redis
_redis = from_url(url=settings.REDIS_HOST)
def get_client() -> Redis:
global _redis
return _redis
async def discard_client() -> None:
global _redis
await _redis.close()
from .client import discard_client, get_client, set_client
__all__ = ['discard_client', 'get_client', 'set_client']
app_api.py
내에 FastAPI가 초기화되는데, (흔히 main.py
에 쓰이는 것들이 여기 모여있다.) 여기에 이벤트 핸들러를 통해 rate-limit 기능을 위한 redis 연결을 초기화하고 어플리케이션 종료 시 redis client 를 정리할 수 있도록 한다.
# ...
# Application
app = FastAPI(**metadata)
# ...
# Redis
app.add_event_handler('startup', cache.set_client)
app.add_event_handler('shutdown', cache.discard_client)
# ...
Dependency 구현
프로젝트에서 사용하는 dependency 들은 dependencies
모듈에 구현된다. dependencies/limiter.py
는 아래와 같이 구현된다.
from fastapi import HTTPException, Request
from cache import get_client
class RateLimiter:
def __init__(self, name: str, limit_per_minutes: int, by_resource: bool = False):
"""
Initialize RateLimiter
Args:
name: Unique name required
limit_per_minutes: threshold of limiter, positive integer required.
"""
self.name: str = name
self.limit_per_minutes: int = limit_per_minutes
self.by_resource: bool = by_resource
async def limiter(self, key):
redis = get_client()
async with redis.client() as conn:
requests = await conn.incr(key)
if requests == 1:
await conn.expire(key, 60)
ttl = 60
else:
ttl = await conn.ttl(key)
return {
'call': requests <= self.limit_per_minutes,
'ttl': ttl
}
async def __call__(self, request: Request):
client_ip = request.client.host
resource = f'{request.method}:{request.url.path}' if self.by_resource else ':'
key = f'{self.name}::{client_ip}::{resource}'
res = await self.limiter(key)
if not res.get('call'):
raise HTTPException(503, detail='Rate limit exceeded.')
적용은 아래와 같다.
from fastapi import APIRouter, Depends
from dependencies.rate_limiter import RateLimiter
rate_limiter = RateLimiter('resources', 10, by_resource=True)
router = APIRouter(dependencies=[Depends(rate_limiter)])
@router.post(path='/resources/')
def create():
return {'key': 'value'}
@router.get(path='/resources/')
def retrieve():
return {'key': 'value'}
@router.patch(path='/resources/')
def update():
return {'key': 'value'}
물론 하나의 API 에도 가능하다.
rate_limiter = RateLimiter('resources', 5)
@router.get(path='/hardwork/', dependencies=[Depends(rate_limiter)])
def calculate():
return {'key': 'value'}
필요한 rate throttling algorithm들은 구현 쪽 (`__call__.py` 등)을 손보면 어지간한 원하는 목표는 달성할 것이다. FastAPI와 같은 신생(?) 마이크로프레임워크를 쓸 때 필요한 것이 있다면 직접 만들어 쓸 수 있는 장점이자 단점이 있다. 특히 python은 원하는 로직을 개발하는데 까다롭지 않기 때문에, golang같은 언어보다 쉽게 확장이 가능하다고 생각한다. 하지만 이런 프레임워크와 개발환경일수록 문서화가 절실히 필요하다. Docstring을 꼭 빠짐없이 작성하도록 하자.