Nest.js 세마포어로 요청 동시성 제어하기

@beygee· August 12, 2024 · 11 min read

Overview

Flutter에서 시작해 JavaScript, 그리고 Nest.js까지 이어지는 동시성 제어 기법에 대해 이야기해보려 합니다.

Completer를 이용한 Lock 구현부터 시작해서 이를 JavaScriptPromise로 확장하고 최종적으로 세마포어를 이용하여 Nest.js에서의 요청 동시성 제어까지 다뤄보겠습니다.

Flutter에서 Completer를 이용한 Lock 구현

우선 Completer란 무엇일까요?

Flutter에서 Completer는 미래에 완료될 비동기 작업을 나타내는 Future(JS에선 Promise와 비슷) 객체를 수동으로 제어할 수 있게 해주는 클래스입니다. 일반적인 Future와 달리 Completer는 외부에서 완료(complete)를 호출하여 해당 Future를 완료 상태로 만들 수 있습니다.

비동기 작업의 순서를 제어하거나 특정 조건이 충족될 때까지 대기해야 하는 경우 Completer를 활용하여 동기화 메커니즘을 구현할 수 있습니다. 특히 공유 자원에 대한 접근을 제어하기 위해 Lock이나 Mutex를 구현할 때 유용합니다.

여러 비동기 작업이 동시에 실행되지 않고 순차적으로 실행되도록 보장하는 예제를 살펴보겠습니다.

import 'dart:async';

class TaskQueue {
  Completer<void>? _completer;

  Future<T> enqueue<T>(Future<T> Function() task) async {
    // 이전 작업이 완료될 때까지 대기
    while (_completer != null && !_completer!.isCompleted) {
      await _completer!.future;
    }

    _completer = Completer<void>();

    try {
      // 작업 실행
      T result = await task();
      return result;
    } finally {
      // 작업 완료 후 completer 완료 처리
      _completer!.complete();
    }
  }
}

동작 원리는 다음과 같습니다.

  1. enqueue 함수는 새로운 작업을 큐에 추가합니다.
  2. _completer가 존재하고 완료되지 않았다면 이전 작업이 완료될 때까지 await로 대기합니다.
  3. 현재 작업을 수행하고 완료되면 _completer.complete()를 호출하여 다음 작업이 진행될 수 있도록 합니다.

실제로 실행하면 결과는 다음과 같습니다.

void main() async {
  TaskQueue taskQueue = TaskQueue();

  // 여러 작업을 동시에 추가하지만 순차적으로 실행됨
  taskQueue.enqueue(() async {
    print('작업 1 시작');
    await Future.delayed(Duration(seconds: 2));
    print('작업 1 완료');
  });

  taskQueue.enqueue(() async {
    print('작업 2 시작');
    await Future.delayed(Duration(seconds: 1));
    print('작업 2 완료');
  });

  taskQueue.enqueue(() async {
    print('작업 3 시작');
    await Future.delayed(Duration(seconds: 3));
    print('작업 3 완료');
  });
}

// 실행결과
작업 1 시작
작업 1 완료
작업 2 시작
작업 2 완료
작업 3 시작
작업 3 완료

작업들이 추가된 순서대로 순차적으로 실행되는 것을 볼 수 있습니다.

Completer를 이용하여 현재 실행 중인 작업이 완료될 때까지 다음 작업이 대기해서 공유 자원에 대한 접근을 순차적으로 처리합니다.

이는 마치 하나의 작업이 완료될 때까지 다른 작업이 Lock을 획득하지 못하도록 하는 Mutex와 같은 역할을 합니다.

JavaScript에서 Promise를 Completer로 래핑하여 Mutex 구현

JavaScript에서는 비동기 작업을 처리하기 위해 Future 대신 Promise를 사용합니다. 이를 활용하여 Mutex를 구현할 수 있습니다.

먼저 Promise를 래핑하여 Completer 클래스를 구현해보겠습니다.

class Completer {
  constructor() {
    this.isCompleted = false
    this.promise = new Promise((resolve, reject) => {
      this._resolve = resolve
    })
  }

  complete(value) {
    if (!this.isCompleted) {
      this.isCompleted = true
      this._resolve(value)
    }
  }
}

Completer를 이용하여 Mutex를 구현하면 다음과 같습니다.

class Mutex {
  constructor() {
    this._completer = null
  }

  async acquire() {
    while (this._completer && !this._completer.isCompleted) {
      await this._completer.promise
    }
    this._completer = new Completer()
  }

  release() {
    if (this._completer && !this._completer.isCompleted) {
      this._completer.complete()
    }
  }
}

원리는 FlutterCompleter와 같으므로, 동시 작업 수를 지정할 수 있는 Semaphore 구현으로 바로 확장해보겠습니다.

class Semaphore {
  constructor(maxConcurrent) {
    this.maxConcurrent = maxConcurrent
    this.currentCount = 0
    this._completer = null
  }

  async acquire() {
    while (this.currentCount >= this.maxConcurrent) {
      if (this._completer && !this._completer.isCompleted) {
        await this._completer.promise
      } else {
        this._completer = new Completer()
        await this._completer.promise
      }
    }
    this.currentCount++
  }

  release() {
    this.currentCount--
    if (this._completer && !this._completer.isCompleted) {
      this._completer.complete()
    }
  }
}

maxConcurrent 만큼은 동시 작업이 가능한 세마포어입니다. 그 이상으로 넘어가면 completer가 생성되어 다른 요청의 acquire를 대기시킬 수 있습니다.

여기서 while 문이 있는 이유에 대해 궁금할 수 있습니다. 다음 예제를 보겠습니다.

async function run() {
  const semaphore = new Semaphore(3) // 한 번에 3개의 요청만 처리 가능
  const promises = []

  for (let i = 0; i < 10; i++) {
    promises.push(
      (async () => {
        await semaphore.acquire()
        try {
          // 가짜 API 요청 처리. 1초 대기 시간을 갖습니다.
          await fakeApiRequest()
        } finally {
          semaphore.release()
        }
      })()
    )
  }

  await Promise.all(promises)
}

만약 while문이 아니라 if문으로 대체되어있었다면 위 코드는 3개를 동시에 실행 후, 나머지 7개를 동시에 실행하고 종료합니다.

while문이 아니라면 이 시점에 모두 acquire를 성공적으로 마치고 currentCountmaxConcurrent를 초과하게 됩니다. 아래 예제를 실행해보겠습니다. 주석이 있는 부분이 변경 지점입니다.

class Semaphore {
  constructor(maxConcurrent) {
    this.maxConcurrent = maxConcurrent
    this.currentCount = 0
    this._completer = null
  }

  async acquire() {
    // while 문에서 if 문으로 변경하여 테스트합니다.
    // while (this.currentCount >= this.maxConcurrent) {
    if (this.currentCount >= this.maxConcurrent) {
      if (this._completer && !this._completer.isCompleted) {
        await this._completer.promise
      } else {
        this._completer = new Completer()
        await this._completer.promise
      }
    }
    this.currentCount++
    // 어떠한 경우에도 maxConcurrent를 넘기면 안됩니다.
    console.log("currentCount:", this.currentCount)
  }

  release() {
    this.currentCount--
    if (this._completer && !this._completer.isCompleted) {
      this._completer.complete()
    }
  }
}

async function fakeApiRequest() {
  await new Promise(resolve => setTimeout(resolve, 1000))
}

async function run() {
  const semaphore = new Semaphore(3)
  const promises = []

  for (let i = 0; i < 10; i++) {
    promises.push(
      (async () => {
        await semaphore.acquire()
        try {
          await fakeApiRequest()
        } finally {
          semaphore.release()
        }
      })()
    )
  }

  await Promise.all(promises)
}

run()

실행 결과는 다음과 같습니다.

currentCount: 1
currentCount: 2
currentCount: 3
# 1초 이후
currentCount: 3
currentCount: 4
currentCount: 5
currentCount: 6
currentCount: 7
currentCount: 8
currentCount: 9

이유는 await이 동작하는 방식이 Node.js의 이벤트 큐와 관련이 있기 때문입니다.

Node.js에서 await는 비동기 작업이 완료될 때까지 이벤트 큐에 작업을 밀어 넣고 기다립니다. 이는 while 문을 통해 maxConcurrent에 따라 작업이 제한적으로 실행되는 동시성 제어와 연결됩니다.

따라서 위의 예제의 경우 await this._completer.promise 코드 줄에서 7개의 요청이 기다리고 있는 중입니다. 먼저 들어간 요청 중 1개라도 완료가 되는 시점에 await이 풀리고 다음 코드를 실행하게 될 겁니다.

언어적인 특성을 고려하여 if 대신 while을 사용하여 세마포어를 안전하게 구현합니다.

Nest.js에서의 응용: Semaphore를 이용한 요청 동시성 제어

서버 애플리케이션에서 특정 API 엔드포인트에 대한 동시 요청 수를 제한해야 하는 경우가 있습니다. 예를 들어 데이터베이스 또는 서버 부하를 줄이거나 외부 API 호출 제한합니다.

Nest.js 환경에서 인터셉터를 통하여 요청 동시성을 컨트롤 할 수 있습니다.

위에서 구현한 Semaphore를 토대로 ConcurrencyInterceptor를 구현합니다.

import {
  CallHandler,
  ExecutionContext,
  Injectable,
  NestInterceptor,
} from "@nestjs/common"
import { Observable } from "rxjs"
import { finalize } from "rxjs/operators"

@Injectable()
export class ConcurrencyInterceptor implements NestInterceptor {
  private semaphore: Semaphore

  constructor(maxConcurrent: number) {
    this.semaphore = new Semaphore(maxConcurrent)
  }

  async intercept(
    context: ExecutionContext,
    next: CallHandler
  ): Promise<Observable<any>> {
    await this.semaphore.acquire()

    return next.handle().pipe(
      finalize(() => {
        this.semaphore.release()
      })
    )
  }
}

ConcurrencyInterceptor는 최대 동시 실행수를 인자로 받아 요청을 제한합니다.

maxConcurrent를 초과하면 새로운 요청은 semaphore.acquire()에서 대기하게 됩니다. 기존 요청이 완료되어 semaphore.release()가 호출되면 대기 중인 요청이 진행됩니다.

위 인터셉터를 이용하는 방법은 간단합니다.

import { Controller, Post, UseInterceptors } from "@nestjs/common"
import { ConcurrencyInterceptor } from "./concurrency.interceptor"

@Controller("example")
export class ExampleController {
  @Post()
  @UseInterceptors(new ConcurrencyInterceptor(3)) // 동시성을 3으로 설정
  async handleHeavyRequest(): Promise<any> {
    // 요청 처리 로직
  }
}

위처럼 설정하면 해당 요청은 최대 동시에 3번 실행됩니다. 만약 3번 이상의 요청이 들어오면 기다렸다가 이전 요청이 완료되면 작업이 시작됩니다.

CPU 및 메모리 바운드가 정해진 서버에서 무거운 요청 또는 동시성을 제한해야만 하는 엔드포인트에만 인터셉터를 적용하여 컨트롤 할 수 있습니다.

Summary

Flutter 코드 작성 중 Completer 컨셉에 흥미를 느껴 TS 진영에서도 구현해보았습니다.

물론 Flutter의 Future나 JS의 Promise만 이용해서도 위 문제를 풀어낼 수 있어 반드시 필요한 개념은 아니지만, 좀 더 선언적으로 Lock을 구현할 수 있는 것 같습니다.

마침 동시성 제한을 해야하는 경우가 생겨 응용까지 해볼 수 있었습니다. 공유 자원을 제어해야 하는 상황에서 유용하게 사용할 수 있을 것 같습니다.

@beygee
미션 달성을 위해 실험적인 도전부터 안정적인 설계까지 구현하는 것을 즐겨합니다.