Overview
Flutter
에서 시작해 JavaScript
, 그리고 Nest.js
까지 이어지는 동시성 제어 기법에 대해 이야기해보려 합니다.
Completer
를 이용한 Lock
구현부터 시작해서 이를 JavaScript
의 Promise
로 확장하고 최종적으로 세마포어를 이용하여 Nest.js
에서의 요청 동시성 제어까지 다뤄보겠습니다.
Flutter에서 Completer를 이용한 Lock 구현
우선 Completer란 무엇일까요?
Flutter에서 Completer는 미래에 완료될 비동기 작업을 나타내는 Future(JS에선 Promise와 비슷)
객체를 수동으로 제어할 수 있게 해주는 클래스입니다. 일반적인 Future와 달리 Completer는 외부에서 완료(complete)를 호출하여 해당 Future를 완료 상태로 만들 수 있습니다.
비동기 작업의 순서를 제어하거나 특정 조건이 충족될 때까지 대기해야 하는 경우 Completer
를 활용하여 동기화 메커니즘을 구현할 수 있습니다. 특히 공유 자원에 대한 접근을 제어하기 위해 Lock
이나 Mutex
를 구현할 때 유용합니다.
여러 비동기 작업이 동시에 실행되지 않고 순차적으로 실행되도록 보장하는 예제를 살펴보겠습니다.
import 'dart:async';
class TaskQueue {
Completer<void>? _completer;
Future<T> enqueue<T>(Future<T> Function() task) async {
// 이전 작업이 완료될 때까지 대기
while (_completer != null && !_completer!.isCompleted) {
await _completer!.future;
}
_completer = Completer<void>();
try {
// 작업 실행
T result = await task();
return result;
} finally {
// 작업 완료 후 completer 완료 처리
_completer!.complete();
}
}
}
동작 원리는 다음과 같습니다.
enqueue
함수는 새로운 작업을 큐에 추가합니다._completer
가 존재하고 완료되지 않았다면 이전 작업이 완료될 때까지await
로 대기합니다.- 현재 작업을 수행하고 완료되면
_completer.complete()
를 호출하여 다음 작업이 진행될 수 있도록 합니다.
실제로 실행하면 결과는 다음과 같습니다.
void main() async {
TaskQueue taskQueue = TaskQueue();
// 여러 작업을 동시에 추가하지만 순차적으로 실행됨
taskQueue.enqueue(() async {
print('작업 1 시작');
await Future.delayed(Duration(seconds: 2));
print('작업 1 완료');
});
taskQueue.enqueue(() async {
print('작업 2 시작');
await Future.delayed(Duration(seconds: 1));
print('작업 2 완료');
});
taskQueue.enqueue(() async {
print('작업 3 시작');
await Future.delayed(Duration(seconds: 3));
print('작업 3 완료');
});
}
// 실행결과
작업 1 시작
작업 1 완료
작업 2 시작
작업 2 완료
작업 3 시작
작업 3 완료
작업들이 추가된 순서대로 순차적으로 실행되는 것을 볼 수 있습니다.
Completer
를 이용하여 현재 실행 중인 작업이 완료될 때까지 다음 작업이 대기해서 공유 자원에 대한 접근을 순차적으로 처리합니다.
이는 마치 하나의 작업이 완료될 때까지 다른 작업이 Lock
을 획득하지 못하도록 하는 Mutex
와 같은 역할을 합니다.
JavaScript에서 Promise를 Completer로 래핑하여 Mutex 구현
JavaScript
에서는 비동기 작업을 처리하기 위해 Future
대신 Promise
를 사용합니다. 이를 활용하여 Mutex
를 구현할 수 있습니다.
먼저 Promise
를 래핑하여 Completer
클래스를 구현해보겠습니다.
class Completer {
constructor() {
this.isCompleted = false
this.promise = new Promise((resolve, reject) => {
this._resolve = resolve
})
}
complete(value) {
if (!this.isCompleted) {
this.isCompleted = true
this._resolve(value)
}
}
}
위 Completer
를 이용하여 Mutex
를 구현하면 다음과 같습니다.
class Mutex {
constructor() {
this._completer = null
}
async acquire() {
while (this._completer && !this._completer.isCompleted) {
await this._completer.promise
}
this._completer = new Completer()
}
release() {
if (this._completer && !this._completer.isCompleted) {
this._completer.complete()
}
}
}
원리는 Flutter
의 Completer
와 같으므로, 동시 작업 수를 지정할 수 있는 Semaphore
구현으로 바로 확장해보겠습니다.
class Semaphore {
constructor(maxConcurrent) {
this.maxConcurrent = maxConcurrent
this.currentCount = 0
this._completer = null
}
async acquire() {
while (this.currentCount >= this.maxConcurrent) {
if (this._completer && !this._completer.isCompleted) {
await this._completer.promise
} else {
this._completer = new Completer()
await this._completer.promise
}
}
this.currentCount++
}
release() {
this.currentCount--
if (this._completer && !this._completer.isCompleted) {
this._completer.complete()
}
}
}
maxConcurrent
만큼은 동시 작업이 가능한 세마포어입니다. 그 이상으로 넘어가면 completer
가 생성되어 다른 요청의 acquire
를 대기시킬 수 있습니다.
여기서 while
문이 있는 이유에 대해 궁금할 수 있습니다. 다음 예제를 보겠습니다.
async function run() {
const semaphore = new Semaphore(3) // 한 번에 3개의 요청만 처리 가능
const promises = []
for (let i = 0; i < 10; i++) {
promises.push(
(async () => {
await semaphore.acquire()
try {
// 가짜 API 요청 처리. 1초 대기 시간을 갖습니다.
await fakeApiRequest()
} finally {
semaphore.release()
}
})()
)
}
await Promise.all(promises)
}
만약 while
문이 아니라 if
문으로 대체되어있었다면 위 코드는 3개를 동시에 실행 후, 나머지 7개를 동시에 실행하고 종료합니다.
while
문이 아니라면 이 시점에 모두 acquire
를 성공적으로 마치고 currentCount
가 maxConcurrent
를 초과하게 됩니다. 아래 예제를 실행해보겠습니다. 주석이 있는 부분이 변경 지점입니다.
class Semaphore {
constructor(maxConcurrent) {
this.maxConcurrent = maxConcurrent
this.currentCount = 0
this._completer = null
}
async acquire() {
// while 문에서 if 문으로 변경하여 테스트합니다.
// while (this.currentCount >= this.maxConcurrent) {
if (this.currentCount >= this.maxConcurrent) {
if (this._completer && !this._completer.isCompleted) {
await this._completer.promise
} else {
this._completer = new Completer()
await this._completer.promise
}
}
this.currentCount++
// 어떠한 경우에도 maxConcurrent를 넘기면 안됩니다.
console.log("currentCount:", this.currentCount)
}
release() {
this.currentCount--
if (this._completer && !this._completer.isCompleted) {
this._completer.complete()
}
}
}
async function fakeApiRequest() {
await new Promise(resolve => setTimeout(resolve, 1000))
}
async function run() {
const semaphore = new Semaphore(3)
const promises = []
for (let i = 0; i < 10; i++) {
promises.push(
(async () => {
await semaphore.acquire()
try {
await fakeApiRequest()
} finally {
semaphore.release()
}
})()
)
}
await Promise.all(promises)
}
run()
실행 결과는 다음과 같습니다.
currentCount: 1
currentCount: 2
currentCount: 3
# 1초 이후
currentCount: 3
currentCount: 4
currentCount: 5
currentCount: 6
currentCount: 7
currentCount: 8
currentCount: 9
이유는 await
이 동작하는 방식이 Node.js
의 이벤트 큐와 관련이 있기 때문입니다.
Node.js
에서 await
는 비동기 작업이 완료될 때까지 이벤트 큐에 작업을 밀어 넣고 기다립니다. 이는 while
문을 통해 maxConcurrent
에 따라 작업이 제한적으로 실행되는 동시성 제어와 연결됩니다.
따라서 위의 예제의 경우 await this._completer.promise
코드 줄에서 7개의 요청이 기다리고 있는 중입니다. 먼저 들어간 요청 중 1개라도 완료가 되는 시점에 await
이 풀리고 다음 코드를 실행하게 될 겁니다.
언어적인 특성을 고려하여 if
대신 while
을 사용하여 세마포어를 안전하게 구현합니다.
Nest.js에서의 응용: Semaphore를 이용한 요청 동시성 제어
서버 애플리케이션에서 특정 API 엔드포인트에 대한 동시 요청 수를 제한해야 하는 경우가 있습니다. 예를 들어 데이터베이스 또는 서버 부하를 줄이거나 외부 API 호출 제한합니다.
Nest.js
환경에서 인터셉터를 통하여 요청 동시성을 컨트롤 할 수 있습니다.
위에서 구현한 Semaphore
를 토대로 ConcurrencyInterceptor
를 구현합니다.
import {
CallHandler,
ExecutionContext,
Injectable,
NestInterceptor,
} from "@nestjs/common"
import { Observable } from "rxjs"
import { finalize } from "rxjs/operators"
@Injectable()
export class ConcurrencyInterceptor implements NestInterceptor {
private semaphore: Semaphore
constructor(maxConcurrent: number) {
this.semaphore = new Semaphore(maxConcurrent)
}
async intercept(
context: ExecutionContext,
next: CallHandler
): Promise<Observable<any>> {
await this.semaphore.acquire()
return next.handle().pipe(
finalize(() => {
this.semaphore.release()
})
)
}
}
ConcurrencyInterceptor
는 최대 동시 실행수를 인자로 받아 요청을 제한합니다.
maxConcurrent
를 초과하면 새로운 요청은 semaphore.acquire()
에서 대기하게 됩니다. 기존 요청이 완료되어 semaphore.release()
가 호출되면 대기 중인 요청이 진행됩니다.
위 인터셉터를 이용하는 방법은 간단합니다.
import { Controller, Post, UseInterceptors } from "@nestjs/common"
import { ConcurrencyInterceptor } from "./concurrency.interceptor"
@Controller("example")
export class ExampleController {
@Post()
@UseInterceptors(new ConcurrencyInterceptor(3)) // 동시성을 3으로 설정
async handleHeavyRequest(): Promise<any> {
// 요청 처리 로직
}
}
위처럼 설정하면 해당 요청은 최대 동시에 3번 실행됩니다. 만약 3번 이상의 요청이 들어오면 기다렸다가 이전 요청이 완료되면 작업이 시작됩니다.
CPU 및 메모리 바운드가 정해진 서버에서 무거운 요청 또는 동시성을 제한해야만 하는 엔드포인트에만 인터셉터를 적용하여 컨트롤 할 수 있습니다.
Summary
Flutter 코드 작성 중 Completer 컨셉에 흥미를 느껴 TS 진영에서도 구현해보았습니다.
물론 Flutter의 Future나 JS의 Promise만 이용해서도 위 문제를 풀어낼 수 있어 반드시 필요한 개념은 아니지만, 좀 더 선언적으로 Lock을 구현할 수 있는 것 같습니다.
마침 동시성 제한을 해야하는 경우가 생겨 응용까지 해볼 수 있었습니다. 공유 자원을 제어해야 하는 상황에서 유용하게 사용할 수 있을 것 같습니다.