database

BullMQ - Queue

웹 애플리케이션이나 백엔드 시스템을 개발할 때 장시간 소요되는 작업을 비동기적으로 처리하거나, 트래픽 급증 상황에서도 안정적으로 작업을 처리하려면 작업 큐(Job Queue)를 사용합니다. Node.js 생태계에서 가장 인기 있고 신뢰성 높은 큐 관리 도구 중 하나가 바로 BullMQ입니다.

이 글에서는 BullMQ를 활용하여 작업 큐를 생성하고 관리하는 방법을 기초부터 심화까지 단계별로 소개합니다.


Queue(큐)란 무엇인가?

Queue는 처리되기를 기다리는 작업(Job)의 목록입니다. 작업은 작고 간단한 메시지 형태일 수도 있고, 오랜 시간이 걸리는 대규모 작업일 수도 있습니다. BullMQ에서 Queue는 간단하면서도 강력한 Queue 클래스에 의해 관리됩니다.

Queue 생성

Queue 클래스의 인스턴스를 생성하면 기존의 큐가 있을 경우 이를 그대로 가져오거나, 새롭게 생성됩니다.

BullMQ - Queue
const { Queue } = require('bullmq')
const IORedis = require('ioredis')
 
const connection = new IORedis()
const queue = new Queue('Cars', { connection })

Redis 연결 정보는 반드시 제공되어야 합니다. 자세한 정보는 Connections 문서를 참조하세요.


작업(Job) 추가하기

BullMQ에서 가장 중요한 메서드 중 하나가 바로 add입니다. 이 메서드를 사용하여 큐에 다양한 작업을 추가할 수 있습니다.

BullMQ - Queue
await queue.add('paint', { color: 'red' })

위 코드는 paint라는 이름의 작업을 { color: 'red' }라는 데이터를 포함하여 큐에 추가합니다. 이렇게 추가된 작업은 Redis 내에 저장되고, Worker가 연결되었을 때 처리됩니다.

작업 옵션 사용하기

작업 추가 시 다양한 옵션을 지정하여 작업의 동작 방식을 크게 바꿀 수 있습니다.

지연(Delay) 작업

지정된 시간만큼 지연된 후 처리됩니다.

BullMQ - Queue
await queue.add('paint', { color: 'blue' }, { delay: 5000 })

이 예제는 5초 후에 처리됩니다.

BullMQ 2.0 이전 버전에서는 지연 작업을 사용하려면 반드시 QueueScheduler가 필요했습니다. 하지만 BullMQ 2.0부터는 QueueScheduler가 필요하지 않습니다.

작업 자동 제거(Auto-removal)

기본적으로 작업이 완료되거나 실패하면, 이 작업들은 각각 'completed' 또는 'failed' 세트에 저장됩니다. 하지만 운영 환경에서는 Redis에 불필요한 데이터가 쌓이는 것을 방지하기 위해 자동 제거 옵션을 사용할 수 있습니다.

BullMQ - Queue
// 모든 작업을 즉시 제거
await queue.add(
  'test',
  { foo: 'bar' },
  { removeOnComplete: true, removeOnFail: true },
)
 
// 특정 개수의 작업만 보관
await queue.add(
  'test',
  { foo: 'bar' },
  { removeOnComplete: 1000, removeOnFail: 5000 },
)
 
// 특정 기간 동안만 작업을 보관
await queue.add(
  'test',
  { foo: 'bar' },
  {
    removeOnComplete: { age: 3600, count: 1000 },
    removeOnFail: { age: 86400 },
  },
)

이 기능은 작업이 완료 또는 실패할 때마다 지연(lazy) 방식으로 동작하며, 새로운 작업의 완료나 실패가 발생할 때만 이전 작업들을 정리합니다.


BullMQ에서 멱등성을 구현할 때 주로 사용하는 방법 중 하나는 작업에 고유한 Job ID를 부여하는 것입니다. 이미 큐에 존재하는 ID를 가진 작업을 다시 추가하면 해당 작업은 무시되고 중복 이벤트가 발생합니다. 이때 자동 제거 옵션을 사용한 작업은 제거되면 더 이상 큐에 존재하지 않는 것으로 간주되므로, 같은 ID의 새로운 작업이 추가될 때 중복으로 처리되지 않습니다. 따라서 자동 제거 옵션을 사용할 때는 이 점을 반드시 고려해야 합니다.

작업 대량 추가(Bulk add)

한 번에 여러 작업을 원자적으로 추가해야 하는 경우도 있습니다. 예를 들어 모든 작업이 성공적으로 큐에 추가되거나 전혀 추가되지 않아야 하는 경우, 다음과 같이 사용할 수 있습니다.

BullMQ - Queue
const jobs = await queue.addBulk([
  { name: 'paint', data: { paint: 'car' } },
  { name: 'paint', data: { paint: 'house' } },
  { name: 'paint', data: { paint: 'boat' } },
])

이 작업은 원자적(atomicity)으로 동작하며 모든 작업이 추가되거나 하나도 추가되지 않습니다. 즉, "모두 성공" 아니면 "전부 실패" 방식으로 작동합니다. addBulk()는 전체 작업을 하나의 단위로 처리하여, 일부만 추가되는 일관성 없는 상태를 방지합니다.

Global Concurrency(전역 동시성)

Global Concurrency는 **큐(queue)**에 등록된 모든 작업자(worker) 인스턴스들을 통틀어 동시에 처리할 수 있는 최대 작업 수를 제한하는 설정입니다. 한 번에 너무 많은 작업이 실행되면 DB, CPU, 네트워크 등에 부하가 생길 수 있으므로 이를 제한하기 위해 사용합니다. 또한, 외부 API 호출이 포함된 작업의 경우, 한 번에 너무 많이 호출하면 차단당할 수 있어 전역 제한이 필요합니다.

BullMQ - Queue
// 최대 작업 수 제한
await queue.setGlobalConcurrency(4)
 
// 설정된 동시성 값 확인
const globalConcurrency = await queue.getGlobalConcurrency()

전체 시스템에서 동시에 최대 4개의 작업만 처리되도록 제한하겠다는 것입니다. 아무리 많은 worker 인스턴스를 띄워도, 동시에 돌아가는 작업은 4개까지만 허용된다는 뜻입니다. 여기서 주의해야하는 것은 Worker와 Global Concurrency의 관계를 확인해 보아야 합니다.

BullMQ - Queue
// worker concurrency
const worker = new Worker(
  'queue-name',
  async (job) => {
    // 작업 처리
  },
  { concurrency: 10 },
)
 
// global concurrency
await queue.setGlobalConcurrency(4)
  • worker concurrency: 그 워커 하나가 동시에 처리할 수 있는 작업 수
  • global concurrency: 전체 큐에 대해 모든 워커가 동시에 처리 가능한 총 작업 수 (상한선)

정리하면, 각 워커는 많이 처리할 수 있어도, 전체 합은 전역 제한을 넘지 못함을 의미합니다.

항목의미
setGlobalConcurrency(n)전체 워커에서 동시에 실행 가능한 작업 수 제한
getGlobalConcurrency()현재 설정된 전역 동시성 값 반환
워커별 concurrency 설정개별 워커가 동시에 실행 가능한 작업 수 (단, 전역 제한에 종속됨)

작업 제거(Removing Jobs)

작업을 큐에서 제거할 때 사용할 수 있는 메서드는 다음과 같습니다:

메서드제거 대상예외/설명
drain()대기 중(waiting), 지연 중(delayed) 작업 제거실행 중(active), 완료/실패 작업은 유지됨
clean()특정 상태의 작업을 시간 조건에 맞춰 제거완료된/실패한/지연된 작업 등을 선택적으로 제거
obliterate()큐 자체와 모든 작업 완전 삭제위험함! 되돌릴 수 없음
BullMQ - Queue
// 대기/지연된 작업만 제거
await queue.drain()
 
// 특정 상태 + 조건에 맞는 오래된 작업 정리
const deletedJobIds = await queue.clean(60000, 1000, 'paused') // 'paused' 상태에서 1분 이상된 작업 1000개까지 삭제, 'completed', 'failed', 'delayed', 'wait', 'paused' 등을 작업 상태로 사용.
 
// 큐 전체 삭제 (작업 + 설정 포함)
await queue.obliterate()

Obliterate: 큐와 그 내용을 완전히 제거합니다.


작업 조회하기

getDelayed()

Queue 인스턴스에서 아직 실행되지 않고 지연되어 대기 중인 작업들을 조회할 때 사용합니다. 사용 파라미터는 아래와 같습니다.

BullMQ - Queue
queue.getDelayed(start?: number, end?: number, asc?: boolean): Promise<Job[]>
파라미터설명기본값
start시작 인덱스 (페이징용)0
end끝 인덱스 (포함)-1 (전체)
asc오래된 순서대로 정렬할지 여부false (최신 순서)
BullMQ - Queue
// 이 작업들은 delay 옵션으로 등록된 후 아직 실행되지 않은 상태
const delayedJobs: Job[] = await queue.getDelayed();
 
// 전체 지연 작업 가져오기
const jobs = await queue.getDelayed(); // start=0, end=-1, 최신 순
 
// 오래된 순서대로 상위 10개 지연 작업 가져오기
const jobs = await queue.getDelayed(0, 9, true);

getDelayed() 자체에는 시간 필터 기능이 없기 때문에, 가져온 후 직접 필터링해야 합니다.

BullMQ - Queue
const allDelayedJobs = await queue.getDelayed();
const now = Date.now();
 
// 예: 앞으로 30분 안에 실행될 작업만 보기
const within30Min = allDelayedJobs.filter(job => {
  const scheduledAt = job.timestamp + job.delay;
  return scheduledAt <= now + 30 * 60 * 1000;
});
 
console.log(`30분 내 실행될 작업 수: ${within30Min.length}`);
  • job.timestamp: 작업이 큐에 추가된 시점 (ms)
  • job.delay: 설정된 지연 시간 (ms)
  • timestamp + delay = 실제 실행 예정 시각

작업 처리하기 (Worker)

추가된 작업을 실제로 처리하기 위해 Worker를 사용합니다.

BullMQ - Queue
const { Worker } = require('bullmq')
 
const worker = new Worker(
  'Cars',
  async (job) => {
    if (job.name === 'paint') {
      console.log(`Painting car with color ${job.data.color}`)
      // 실제 작업 처리 로직
    }
  },
  { connection },
)

Worker가 연결되면 자동으로 큐에서 작업을 가져와 처리합니다.


참고 자료