Nest.js 카프카로 이벤트 발행 모듈 만들기

@beygee· May 20, 2024 · 19 min read

TL;DR

  1. Event Emitter를 기반으로 한 이벤트 아키텍처를 Kafka로 이전
  2. kafkajs를 통해 Kafka Module 작성
  3. 데코레이터를 통한 선언적 이벤트 수신
  4. 컨슈머 그룹 별 카프카 커넥션 연결을 통한 리밸런싱 시간 단축
  5. 메트릭 및 로그 관찰을 통한 모니터링 및 장애 대응

Overview.

기존까지 이벤트 송/수신을 nestjs/event-emitter 라이브러리를 활용했습니다. 이에 따라 발행 이벤트도 약 100개 정도 되었고 도메인의 결합도를 낮추었습니다. 하지만 사용하면서 몇 가지 문제점이 있었습니다.

  1. 수평 확장의 어려움

수평 스케일링된 서버에서 메모리에 이벤트를 발행하므로 같은 서버에서 이벤트를 수신하여 처리할 수 밖에 없습니다. 한 서버에서 이벤트를 대량 발행 시 부하를 분산시킬 수 없습니다.

  1. 에러 핸들링의 어려움

이벤트를 수신하여 처리 중 만약 에러가 발생할 경우 유실될 가능성이 매우 높습니다. 이벤트를 영속적으로 저장하는 것이 아닌 메모리에 있으므로 재처리하기 까다롭고 추적하기 어렵습니다.

따라서 위 문제를 해결하기 위해 이벤트 브로커를 서버 외부의 인프라에 두고, 이벤트가 소비되더라도 영속성을 보장하는 Kafka로 구축했습니다.

초기에 빠른 환경 구축을 위해 로컬에서는 도커 컴포즈를 이용하여 카프카를 띄우고, 실제 환경에서는 AWS MSK를 이용했습니다.

Kafka + Nest.js

Nest.js에 카프카를 연결하기 위해 공식 독스에서 제공하는 방안이 있습니다. 독스에서 카프카를 Nest.js에 연결하기 위해서 마이크로서비스 패턴을 활성화하고, MessagePattern(비동기 요청/응답 패턴)을 장려합니다.

하지만 기존의 EventEmitter를 대체하기 위해선 단순히 이벤트를 발행하고, 해당 이벤트를 다른 여러 도메인 모듈에서 구독하여 처리해야 했습니다. 그리고 단순히 카프카를 이용하기 위해 nestjs/microservices 라이브러리를 포함하는 것은 너무 무거운 것 같아 카프카 모듈만 직접 제작하기로 했습니다.

Nest.js에서 kafkajs를 래핑하여 이용하고 있었기 때문에 kafkajs를 활용하여 모듈을 만들기로 합니다.

KafkaModule

카프카 모듈의 스켈레톤 코드는 다음과 같습니다.

// kafka.module.ts
import { Module } from "@nestjs/common"

import { ConsumerService } from "./consumer.service"
import { ProducerService } from "./producer.service"

@Module({
  imports: [],
  providers: [ProducerService, ConsumerService],
  exports: [ProducerService, ConsumerService],
})
export class KafkaModule {}

모듈에서는 별다른 코드는 없고, ConsumerServiceProducerService를 받아 Export 하기만 하고 있습니다. 이후에 KafkaModule에도 코드가 추가되지만, 먼저 ProducerServiceConsumerService의 구현을 보고 다시 돌아오겠습니다.

ProducerService

Producer는 이벤트를 발송하는 주체입니다. 먼저 발행 인터페이스를 정의합니다.

// producer.interface.ts
import type { Message } from 'kafkajs'

export interface IProducer {
  connect: () => Promise<void>
  disconnect: () => Promise<void>
  produce: (message: Message | Message[]) => Promise<void>
}

type JsonType = Record<string, unknown> | string | number | boolean | null | object
export type KafkaMessage<T = JsonType> = Omit<Message, 'value' | 'key'> & {
  value: T
  key?: string | number | null
}

Producer는 연결, 연결해제, 이벤트 발행 3가지 함수가 존재합니다. 이벤트는 단 건 또는 여러건 동시에 발행이 가능합니다. 위 인터페이스를 통해 Producer를 구현합니다.

먼저 kafkajs 의존성을 갖는 KafkajsProducer를 만들고, IProducer를 인터페이스를 받아 kafka 라이브러리와 분리된 ProducerService를 구현합니다.

// kafkajs.producer.ts
import type { Message, Producer } from "kafkajs"
import { Kafka } from "kafkajs"

import type { IProducer } from "./producer.interface"

export class KafkajsProducer implements IProducer {
  private readonly kafka: Kafka
  private readonly producer: Producer

  constructor(private readonly topic: string, brokers: string[]) {
    this.kafka = new Kafka({
      brokers,
      connectionTimeout: 5000,
    })
    this.producer = this.kafka.producer()
  }

  async produce(message: Message | Message[]) {
    await this.producer.send({
      topic: this.topic,
      messages: Array.isArray(message) ? message : [message],
    })
  }

  async connect() {
    await this.producer.connect()
  }

  async disconnect() {
    await this.producer.disconnect()
  }
}

실제 구현 상에는 kafkajs를 통해 실제 명세 구현만 해주면 됩니다.

다음은 ProducerService입니다.

// producer.service.ts
import type { OnApplicationShutdown } from "@nestjs/common"
import { Injectable } from "@nestjs/common"
import { ConfigService } from "@nestjs/config"

import type { KafkaTopic } from "./kafka.constant"
import { KafkajsProducer } from "./kafkajs.producer"
import type { IProducer, KafkaMessage } from "./producer.interface"

@Injectable()
export class ProducerService implements OnApplicationShutdown {
  constructor(private readonly configService: ConfigService) {}
  private readonly producers = new Map<string, IProducer>()

  async produce(
    topic: KafkaTopic,
    messagesOrMessage: KafkaMessage | KafkaMessage[]
  ) {
    const producer = await this.getProducer(topic)

    const messages = Array.isArray(messagesOrMessage)
      ? messagesOrMessage
      : [messagesOrMessage]

    const stringifiedMessages = messages.map(message => {
      const { key, value, ...rest } = message
      return { key: key?.toString(), value: JSON.stringify(value), ...rest }
    })

    await producer.produce(stringifiedMessages)
  }

  private async getProducer(topic: string) {
    let producer = this.producers.get(topic)
    if (!producer) {
      const KAFKA_BROKERS = this.configService.get<string>("KAFKA_BROKERS")
      producer = new KafkajsProducer(topic, KAFKA_BROKERS.split(","))
      await producer.connect()
      this.producers.set(topic, producer)
    }
    return producer
  }

  async onApplicationShutdown() {
    // 이미 인입된 API 트래픽 중 처리 완료 후 producer를 통해 메시지를 보낼 수 있습니다.
    // 이 때 이미 producer가 종료된 상태라면 메시지가 발송되지 않을 수 있으므로 딜레이를 줍니다.
    const delaySeconds = 5
    await new Promise(resolve => setTimeout(resolve, delaySeconds * 1000))
    await Promise.all(
      [...this.producers.values()].map(producer => producer.disconnect())
    )
  }
}

ProducerService는 이벤트 발행을 위해 getProducer 함수를 통해 각 토픽별로 프로듀서 인스턴스를 생성하여 이벤트를 발행합니다. producers 멤버 변수를 통하여 추후에 서버 종료 시 연결을 안전하게 종료합니다.

먼저 여러 Producer 인스턴스를 생성하는 이유는 병렬 전송을 통해 빠르게 이벤트 발행하기 위함입니다. topic별로 Producer를 생성하는 이유는 각 토픽별로 전송할 때 최소한 같은 이벤트는 순서대로, 그리고 방해받지 않게 독립적으로 발행하기 위함인데, 이는 비즈니스 목적에 따라 반드시 동일하게 구현할 필요는 없습니다.

ConsumerService

다음은 Consumer인 이벤트를 구독하여 처리하는 주체입니다.

// kafkajs.consumer.ts
import type { Consumer, ConsumerConfig, ConsumerSubscribeTopics, KafkaMessage } from 'kafkajs'
import { Kafka, logLevel } from 'kafkajs'


export class KafkajsConsumer {
  private readonly kafka: Kafka
  private readonly consumer: Consumer

  constructor(
    private readonly topics: ConsumerSubscribeTopics,
    config: ConsumerConfig,
    brokers: string[],
  ) {
    this.kafka = new Kafka({
      brokers,
      connectionTimeout: 5000,
    })
    this.consumer = this.kafka.consumer(config)
  }

  async consume(
    onMessage: (message: KafkaMessage, topic: string) => Promise<void>,
    { partitionsConsumedConcurrently = 1 }: { partitionsConsumedConcurrently?: number } = {},
  ) {
    await this.consumer.subscribe(this.topics)
    await this.consumer.run({
      eachMessage: async ({ message, partition, topic }) => {
        try {
          await onMessage(message, topic)
        } catch (err) {
          await this.addMessageToDlq(message, topic)
        }
      },
      partitionsConsumedConcurrently,
    })
  }

  private async addMessageToDlq(message: KafkaMessage, topic: string) {
    // ... Process Message Error Log
    // ... Save to DLQ
  }

  async connect() {
    await this.consumer.connect()
  }

  async disconnect() {
    await this.consumer.disconnect()
  }
}

KafkajsConsumer 역시 kafkajs를 이용하여 실제 구현을 해줍니다. 필요하다면 이벤트 처리 중 실패 시 DLQ에 전달하여 별도의 처리도 할 수 있습니다.

다음은 ConsumerService입니다.

// comsumer.service.ts
import type { OnApplicationShutdown } from "@nestjs/common"
import { Injectable } from "@nestjs/common"
import { ConfigService } from "@nestjs/config"
import type { ConsumerConfig, ConsumerSubscribeTopics } from "kafkajs"

import { KafkajsConsumer } from "./kafkajs.consumer"
import type { KafkaMessage } from "./producer.interface"

interface KafkajsConsumerOptions<T> {
  topics: ConsumerSubscribeTopics
  config: ConsumerConfig
  onMessage: (message: KafkaMessage<T>, topic: string) => Promise<void>
  runConfig?: { partitionsConsumedConcurrently?: number }
}
@Injectable()
export class ConsumerService implements OnApplicationShutdown {
  constructor(private readonly configService: ConfigService) {}
  private readonly consumers: KafkajsConsumer[] = []

  async consume<T = object>({
    topics,
    config,
    onMessage,
    runConfig,
  }: KafkajsConsumerOptions<T>) {
    const _config: ConsumerConfig = Object.assign<
      ConsumerConfig,
      Partial<ConsumerConfig>
    >(config, {
      sessionTimeout: 60000,
    })

    const KAFKA_BROKERS = this.configService.get<string>("KAFKA_BROKERS")
    const consumer = new KafkajsConsumer(
      topics,
      _config,
      KAFKA_BROKERS.split(",")
    )
    await consumer.connect()
    consumer.consume(async (message, topic) => {
      const { key, value, ...rest } = message
      await onMessage(
        { key: key?.toString(), value: JSON.parse(value.toString()), ...rest },
        topic
      )
    }, runConfig)
    this.consumers.push(consumer)
  }

  async onApplicationShutdown() {
    await Promise.all(this.consumers.map(consumer => consumer.disconnect()))
  }
}

위 역시 토픽에 따라 Consumer를 생성해주고, 이벤트를 구독할 수 있도록 합니다. consumers 멤버변수에 담아두어 추후에 서버 종료 시 컨슈머 그룹이 안전하게 종료될 수 있도록 합니다.

이벤트 발행/구독 방법

ProducerService와 ConsumerService를 구현하고 KafkaModule을 통하여 Export 하였으므로 이제 비즈니스 로직 상에서 KafkaModule을 Import하여 이용하면 됩니다.

예를 들어 UserModule에서 유저가 생성되었을 때 이벤트를 발행하는 것으로 가정하겠습니다.

// user.module.ts
import { KafkaModule } from "../kafka/kafka.module"
import { UserService } from "./user.service"

@Module({
  imports: [
    // ... Other Modules
    KafkaModule,
  ],
  providers: [UserService],
})
export class UserModule {}
// user.service.ts
@Injectable()
export class UserService {
  constructor(
    private readonly userRepository: UserRepository,
    private readonly producerService: ProducerService
  ) {}

  @Transactional()
  public async createUser({
    email,
    password,
  }: CreateUserCommand): Promise<User> {
    const user = new User({
      email,
      password,
    })

    await this.userRepository.save(user)

    runOnTransactionCommit(async () => {
      await this.producerService.produce(KafkaTopic.USER_CREATED, {
        value: new UserCreatedEvent({ id: user.id }),
      })
    })

    return user
  }
}

위 처럼 ProducerService를 이용하여 이벤트를 발행할 수 있습니다.

그렇다면 구독하는 방법은 어떨까요? 비즈니스 Service에서 this.consumer.consume 함수를 호출해야할까요? 요청에 의해 실행되는 함수에서 구독하는 함수를 호출하는 것이 좀 이상하게 느껴집니다.

서버가 실행 시에 구독하면서 선언적으로 코드를 작성할 수 있는 방법은 무엇이 있을까요? 데코레이터를 선언하여 처리할 수 있을 것 같습니다. Consumer 토픽 구독은 데코레이터를 통하여 처리할 수 있도록 합니다.

Consumer Decorator

Nest.js는 선언적인 코드를 통하여 로직을 처리할 수 있는 다양한 방법이 있습니다. 그 중 하나가 데코레이터입니다.

먼저 토픽 구독을 개시하는 데코레이터 코드를 작성합니다.

// kafka.consume.decorator.ts
import type { ServerType } from "@core/utils/utils"
import { applyDecorators, SetMetadata } from "@nestjs/common"

import type { KafkaTopic } from "./kafka.constant"

export const KAFKA_CONSUME_METADATA = "KAFKA_CONSUME_METADATA"

export function OnKafkaEvent(
  topic: KafkaTopic,
  groupId: string
): MethodDecorator {
  return applyDecorators(
    SetMetadata(KAFKA_CONSUME_METADATA, { topic, groupId })
  )
}

이 데코레이터는 다음과 같은 방식으로 이용할 수 있습니다.

유저 생성 시 환영 알림을 보내는 코드 예제입니다.

// notification.event.listener
export class NotificationEventListener {
  @OnKafkaEvent(KafkaTopic.USER_CREATED, "notification.handler")
  async handleUserCreatedEvent(message: UserCreatedEvent) {
    const { id } = message
    const notificationSendResponse =
      await this.notificationWelcomeUserDelegate.create(id)
    await this.produceNotificationAndPushMessage(notificationSendResponse)
  }
}

데코레이터 선언 시 실제로 Conuser가 토픽을 구독해야하므로 KafkaModule이 초기화할 때 Nest.js의 DiscoveryServiceMetadataScanner를 이용하여 작업을 수행합니다.

// kafka.module.ts
@Module({
  imports: [DiscoveryModule],
  providers: [ProducerService, ConsumerService],
  exports: [ProducerService, ConsumerService],
})
export class KafkaModule implements OnModuleInit {
  constructor(
    private readonly discovery: DiscoveryService,
    private readonly scanner: MetadataScanner,
    private readonly reflector: Reflector,
    private readonly consumerService: ConsumerService,
    private readonly configService: ConfigService
  ) {}

  async onModuleInit() {
    const handlers = this.discovery
      .getProviders()
      .filter(wrapper => wrapper.isDependencyTreeStatic())
      .filter(({ instance }) => instance && Object.getPrototypeOf(instance))
      .map(({ instance }) => {
        const methodNames = this.scanner.getAllMethodNames(
          Object.getPrototypeOf(instance)
        )

        const decoratoredMethods = methodNames
          .map(methodName => {
            const metadata = this.reflector.get(
              KAFKA_CONSUME_METADATA,
              instance[methodName]
            )
            return {
              methodName,
              metadata,
              instance,
            }
          })
          .filter(({ metadata }) => metadata)

        return decoratoredMethods
      })
      .filter(methods => methods.length > 0)

    const methods = flatten(handlers)
    this.subscribeTopics(methods)
  }

  private async subscribeTopics(
    methods: Array<{
      methodName: string
      metadata: KafkaConsumeOptions & { groupId: string; topic: string }
      instance: any
    }>
  ) {
    const groupedByGroupId = groupBy(methods, method => method.metadata.groupId)

    await Promise.all(
      Object.entries(groupedByGroupId).map(async ([groupId, methods]) => {
        const topics = methods.map(method => method.metadata.topic)
        if (topics.length === 0) return

        await this.consumerService.consume({
          topics: { topics },
          config: { groupId },
          onMessage: async (message, topic) => {
            const { value } = message

            const method = methods.find(
              method => method.metadata.topic === topic
            )
            await method.instance[method.methodName](value)
          },
          runConfig: { partitionsConsumedConcurrently: methods.length },
        })
      })
    )
  }
}

onModuleInit에서 DiscoveryServiceMetadataScanner로 기존에 데코레이터 내부의 SetMetadata로 선언한 이벤트 수신 함수를 필터링하여 가져옵니다. 이에 대한 자세한 내용은 토스의 아티클에 상세히 기술되어있습니다.

subscribeTopics 함수에서 데코레이터로 등록된 컨슘 데이터 리스트를 토대로 실제로 구독 함수를 실행합니다.

카프카에 연결되는 Consume 커넥션 포인트 줄이기

groupedByGroupId처럼 같은 컨슈머 그룹을 가진 컨슘 데이터 리스트를 묶어주면 GroupId 1개당 카프카에 연결을 한 번만 해줄 수 있다는 장점이 있습니다.

예를 들어 notificaion.listener라는 컨슈머 그룹이 user.created와 chat.created 두 이벤트를 구독한다고 가정합니다.

// notification.event.listener
export class NotificationEventListener {
  @OnKafkaEvent(KafkaTopic.USER_CREATED, "notification.handler")
  async handleUserCreatedEvent(message: UserCreatedEvent) {
    const { id } = message
    // ... do something !
  }

  @OnKafkaEvent(KafkaTopic.CHAT_CREATED, "notification.handler")
  async handleChatCreatedEvent(message: ChatCreatedEvent) {
    const { id } = message
    // ... do something too !
  }
}

위 처럼 두 번 데코레이터를 선언하면 일반적으로 두 개의 컨슈머 인스턴스를 생성하여 각각 카프카에 커넥션을 요청할 수 있습니다. 하지만 이렇게 되면 컨슈머가 선형적으로 증가하게 되고, 이는 리밸런싱 시간을 크게 증가시킬 수 있습니다.

이를 피하기 위해 groupId를 묶어주고, 실제로 consume할 때 그룹에 묶인 매칭되는 토픽의 함수를 찾아 호출해주는 방식을 취합니다.

  private async subscribeTopics(
    methods: Array<{
      methodName: string
      metadata: KafkaConsumeOptions & { groupId: string; topic: string }
      instance: any
    }>
  ) {
    // 같은 컨슈머 그룹끼리 묶어줍니다.
    const groupedByGroupId = groupBy(methods, method => method.metadata.groupId)

    await Promise.all(
      Object.entries(groupedByGroupId).map(async ([groupId, methods]) => {
        const topics = methods.map(method => method.metadata.topic)
        if (topics.length === 0) return

        await this.consumerService.consume({
          topics: { topics }, // user.created와 chat.created 둘 다 구독합니다.
          config: { groupId },
          onMessage: async (message, topic) => {
            const { value } = message

            // 실제로 어떤 토픽에 의해 이벤트가 실행되었는지 확인하고 해당 토픽에 맞는 함수를 실행시킵니다.
            // 예제에선 user.created를 통해 수신받았으면 handleUserCreatedEvent 함수를
            // chat.created를 통해 수신받았으면 handleChatCreatedEvent 함수를 호출합니다.
            const method = methods.find(
              method => method.metadata.topic === topic
            )
            await method.instance[method.methodName](value)
          },
          runConfig: { partitionsConsumedConcurrently: methods.length },
        })
      })
    )
  }

마지막으로 이 방법으로 문제 없이 실행되려면 kafkajspartitionsConsumedConcurrently 값을 조절해주어야합니다. partitionsConsumedConcurrently는 파티션에서 병렬적으로 카프카 메시지를 수신할 수 있도록 처리량을 조절하는 값입니다.

kafkajs에는 Kafka에서 제공하는 Incremental Cooperative Rebalancing이 구현되어 있지 않습니다.. 그래서 수평 스케일링으로 서버가 뜨거나 다운 시, 모든 카프카 컨슈머가 멈추어 리밸런싱 하는 시간이 존재합니다. 이 시간을 최소화 하고자 위와 같은 전략을 구현했습니다.

Conclusion

이벤트를 송수신 받기 위해 단순히 EventEmitter를 이용하다가 카프카로 마이그레이션 하다보니 구현 상에 복잡한 감이 있지만, 실제로 이용할 때는 무척 편리했고 투명한 모니터링으로 장애 대응이 편했습니다.

Kafka UI를 이용하여 발행된 이벤트 내역을 쉽게 확인할 수 있었고, MSK 메트릭을 프로메테우스로 전달하여 그라파나에서 초당 이벤트 수신량 또는 처리 지연량을 확인할 수도 있었습니다.

Reference

@beygee
미션 달성을 위해 실험적인 도전부터 안정적인 설계까지 구현하는 것을 즐겨합니다.