FastAPI에 Rate-limit(Throttling) 구현

FastAPI에 Rate-limit(Throttling) 구현
Redis로 Throttling 기능을 구현합니다.
너어어무 쉽다 이말이야!

Implementation

Redis 클러스터 혹은 인스턴스 생성

k8s 프로젝트 내에 redis 클러스터를 추가하였다. 운영 환경에 따라 다를 수도 있겠지만, 아래와 같은 manifest를 사용한다. 사실 꼭 redis를 사용할 필요는 없지만, 파이썬 라이브러리의 성숙함과 멀티 Pod으로 운영되었을 때를 고려한다면 쓰는 것이 좋다.

apiVersion: v1
kind: Pod
metadata:
  name: <APPLICATION>-redis
  labels:
    app: redis
spec:
  containers:
    - name: redis
      image: redis:latest
      command:
        - redis-server
      env:
        - name: MASTER
          value: "true"
      ports:
        - containerPort: 6379
          name: redis
      volumeMounts:
        - mountPath: /redis-master-data
          name: data
  volumes:
    - name: data
      emptyDir: { }
---
apiVersion: v1
kind: Service
metadata:
  name: <APPLICATION>-redis
  labels:
    app: redis
spec:
  selector:
    app: redis
  ports:
    - name: redis
      protocol: TCP
      port: 6379
      targetPort: 6379

Redis client 초기화

FastAPI 프로젝트는 아래의 구조를 갖고있다.

.
├── api
├── cache
├── core
├── dependencies
├── factories
├── middlewares
├── models
├── schemas
├── system
├── tasks
├── tests
├── utils
├── app_api.py
└── app_worker.py
Celery & FastAPI 가 공존하는 어플리케이션
GitHub - aio-libs/aioredis-py: asyncio (PEP 3156) Redis support
asyncio (PEP 3156) Redis support. Contribute to aio-libs/aioredis-py development by creating an account on GitHub.

redis 초기화 관련 코드는 cache 모듈을 통해 관리한다. aioredis 를 사용하였다.

from typing import Union

from aioredis import Redis, from_url

from core.config import settings

_redis: Union[Redis, None] = None


def set_client() -> None:
    global _redis
    _redis = from_url(url=settings.REDIS_HOST)


def get_client() -> Redis:
    global _redis
    return _redis


async def discard_client() -> None:
    global _redis
    await _redis.close()
cache/client.py
from .client import discard_client, get_client, set_client


__all__ = ['discard_client', 'get_client', 'set_client']
cache/__init__.py

app_api.py 내에 FastAPI가 초기화되는데,  (흔히 main.py 에 쓰이는 것들이 여기 모여있다.) 여기에 이벤트 핸들러를 통해 rate-limit 기능을 위한 redis 연결을 초기화하고 어플리케이션 종료 시 redis client 를 정리할 수 있도록 한다.

# ...


# Application
app = FastAPI(**metadata)

# ...

# Redis
app.add_event_handler('startup', cache.set_client)
app.add_event_handler('shutdown', cache.discard_client)


# ...
app_api.py

Dependency 구현

프로젝트에서 사용하는 dependency 들은 dependencies 모듈에 구현된다. dependencies/limiter.py 는 아래와 같이 구현된다.

from fastapi import HTTPException, Request

from cache import get_client


class RateLimiter:
    def __init__(self, name: str, limit_per_minutes: int, by_resource: bool = False):
        """
        Initialize RateLimiter
        Args:
            name: Unique name required
            limit_per_minutes: threshold of limiter, positive integer required.
        """
        self.name: str = name
        self.limit_per_minutes: int = limit_per_minutes
        self.by_resource: bool = by_resource

    async def limiter(self, key):
        redis = get_client()

        async with redis.client() as conn:
            requests = await conn.incr(key)
            if requests == 1:
                await conn.expire(key, 60)
                ttl = 60
            else:
                ttl = await conn.ttl(key)
            return {
                'call': requests <= self.limit_per_minutes,
                'ttl': ttl
            }

    async def __call__(self, request: Request):
        client_ip = request.client.host
        resource = f'{request.method}:{request.url.path}' if self.by_resource else ':'

        key = f'{self.name}::{client_ip}::{resource}'

        res = await self.limiter(key)

        if not res.get('call'):
            raise HTTPException(503, detail='Rate limit exceeded.')

적용은 아래와 같다.

from fastapi import APIRouter, Depends

from dependencies.rate_limiter import RateLimiter


rate_limiter = RateLimiter('resources', 10, by_resource=True)
router = APIRouter(dependencies=[Depends(rate_limiter)])


@router.post(path='/resources/')
def create():

    return {'key': 'value'}
    
    
@router.get(path='/resources/')
def retrieve():

    return {'key': 'value'}
    
    
@router.patch(path='/resources/')
def update():

    return {'key': 'value'}

물론 하나의 API 에도 가능하다.

rate_limiter = RateLimiter('resources', 5)

@router.get(path='/hardwork/', dependencies=[Depends(rate_limiter)])
def calculate():

    return {'key': 'value'}

필요한 rate throttling algorithm들은 구현 쪽 (`__call__.py` 등)을 손보면 어지간한 원하는 목표는 달성할 것이다. FastAPI와 같은 신생(?) 마이크로프레임워크를 쓸 때 필요한 것이 있다면 직접 만들어 쓸 수 있는 장점이자 단점이 있다. 특히 python은 원하는 로직을 개발하는데 까다롭지 않기 때문에, golang같은 언어보다 쉽게 확장이 가능하다고 생각한다. 하지만 이런 프레임워크와 개발환경일수록 문서화가 절실히 필요하다. Docstring을 꼭 빠짐없이 작성하도록 하자.