nodejs

Buffers & streams

Buffers

일반적으로 저장된 파일을 readFile이나 readFileSync와 같은 메서드를 통해 특정 파일을 읽는 경우에, 해당 파일의 텍스트를 확인하기 위해서는 toString()이라는 메서드를 활용하여 출력을 변환합니다. 왜냐하면 읽어진 파일은 바로 Buffer이기 때문입니다.


기본적으로 Nodejs는 파일을 읽을때, 메모리에 파일 크기만큼 공간을 할당하고 파일 데이터를 메모리에 저장한 후에 사용자가 조작할 수 있도록 만들게 됩니다. 이때 버퍼를 사용할 수 있도록 만들어 주는것이 바로 Buffer 클래스 입니다. 사용법은 아래와 같습니다.

Buffer
function runBufferExamples(): void {
  // 1. Buffer.from
  const buffer: Buffer = Buffer.from('Welcome to hanpy world!')
  console.log('from():', buffer)
  // from() :  <Buffer 57 65 6c 63 6f 6d 65 20 74 6f 20 68 61 6e 70 79 20 77 6f 72 6c 64 21>
  console.log('length:', buffer.length)
  // length:  23
  console.log('toString():', buffer.toString())
  // toString():  Welcome to hanpy world!
 
  // 2. Buffer.concat
  const testArray: Buffer[] = [
    Buffer.from('This '),
    Buffer.from('is '),
    Buffer.from('hanpy blog'),
  ]
  const buffer2: Buffer = Buffer.concat(testArray)
  console.log('concat():', buffer2.toString())
  // concat() :  This is hanpy blog
 
  const buffer3: Buffer = Buffer.alloc(4)
  console.log('alloc() : ' buffer3)
  // alloc() :  <Buffer 00 00 00 00>
}
runBufferExamples()

Buffer 객체의 메소드

  • from(문자열) : 문자여을 버퍼로 변경합니다.
  • .length : 버퍼의 크기를 표시합니다.
  • toString(버퍼) : 버퍼를 문자열로 다시 반환합니다.
  • concat(배열) : 배열 내의 버퍼들을 병합합니다.
  • alloc(바이트) : 빈 버퍼를 생성하고, 인수로 바이트를 넣어 해당 크기의 버퍼를 생성합니다.

Streams - File Read

Buffer의 한계

저장된어 있는 파일을 읽는다고(readFile) 가정해보겠습니다. 읽는 파일 크기에 따라 메모리에 그만큼의 메모리를 할당해야합니다. 만약 300MB 파일을 읽기 위해서는 메모리에 300MB의 버퍼를 할당하게 됩니다. 이는 서버의 운영관점에서 굉장히 크리티컬한 문제가 됩니다. 불확실한 트레픽과 버퍼 처리 완료를 예상없이 기다린다는 점은 서버 장애의 확률이 올라가게 됩니다. 이러한 문제점을 해결하기 위해 strame 방법이 등장하게 됩니다.


스트림의 원리는 간단합니다. 버퍼의 크기를 쪼개서 작게만들고, 한번에 보내는 것이 아닌 여러 번에 걸쳐서 나눠서 보내는 방식을 의미합니다. 조금 더 쉽게 예시를 들면 300MB 파일을 1MB씩 300번 걸처서 보내는 방법을 통해 메모리 사용을 최소화 하는 것을 의미합니다.

createReadStream
import * as fs from 'fs'
import { Buffer } from 'buffer'
 
function readFileStream(filePath: string): void {
  console.log('start')
  const data: Buffer[] = []
 
  const readStream: fs.ReadStream = fs.createReadStream(filePath, {
    highWaterMark: 4,
  })
 
  readStream.on('data', (chunk: Buffer) => {
    data.push(chunk)
    console.log('data :', chunk, chunk.length)
  })
 
  readStream.on('end', (): void => {
    const content = Buffer.concat(data).toString()
    console.log('end :', content)
  })
 
  readStream.on('error', (err: NodeJS.ErrnoException): void => {
    console.log('error :', err)
  })
}
 
// 실행
readFileStream('./readme-sample.txt')
 
// data :  <Buffer 77 65 6c 63> 4
// data :  <Buffer 6f 6d 65 20> 4
// data :  <Buffer 74 6f 20 68> 4
// data :  <Buffer 61 6e 70 79> 4
// data :  <Buffer 20 62 6c 6f> 4
// data :  <Buffer 67> 1
// end :  welcome to hanpy blog
  • createReadStream()의 첫 번째 인수는 읽을 파일의 경로를 나타내고, 두 번째 인수는 옵션을 넣을 수 있는 객체로 highWaterMark 옵션은 버퍼의 크기를 바이트 단위로 정할 수 있습니다. 기본 값은 64KB이고 위의 예시에서는 4Byte로 지정했습니다.
  • createReadStream()로 생성한 readStream 변수에는 아래의 이벤트 리스너를 사용 할 수 있습니다.
    • data : 파일 읽기가 시작되면 발생하는 이벤트 입니다.
    • end : 파일을 모두 읽으면 발생하는 이벤트 입니다.
    • error : error 발생 시 발생하는 이벤트 입니다.
  • 위 코드는 생성되는 4byte의 chunk를 data array에 하나씩 push 합니다. 그리고 마지막에 concat을 통해 합쳐서 문자열로 변환 했습니다.
  • 결과 값을 보면 21byte를 5번에 chunk를 통해 파일을 읽을 것을 확인 할 수 있습니다.

Streams - File Write

createWriteStream을 통한 파일 읽기 방법을 알아봅시다.

createReadStream
import * as fs from 'fs'
 
function writeFileStream(filePath: string): void {
  const writeStream: fs.WriteStream = fs.createWriteStream(filePath)
 
  writeStream.on('finish', (): void => {
    console.log('Finish - File Write')
  })
 
  writeStream.write('hi, ')
  writeStream.write('hanpy')
  writeStream.end()
}
 
// 실행
writeFileStream('./writeme-sample.txt')
  • 위 실행결과는 writeme-sample.txt 파일에 hi, hanpy라는 text가 포함되어 생성됩니다.

Streams - Pipeline

파일을 스트리밍으로 읽고 변환후에 저장하는 파이프라인을 만들어 봅시다. 기본 돈작은 아래와 같습니다.

pipeline
import { pipeline } from 'stream/promises';
 
await pipeline(
  readableStream,
  transformStream1,
  transformStream2,
  ..., 
  writableStream
);

입력 스트림(readableStream) 에서 중간 변환(transformStream1, transformStream2, …) 을 거쳐 출력 스트림(writableStream) 으로 자동으로 데이터를 흘려보냅니다.

.pipe() vs pipeline

항목.pipe()pipeline()
에러 처리개별 스트림마다 on('error') 달아야 함단 한 번의 catch 로 모두 처리 가능
종료 처리writableStream.on('finish') 별도 핸들러 필요await 이후 코드가 바로 완료 시점에서 실행
코드 복잡도이벤트 핸들러가 늘어나 가독성 저하Promise 기반으로 깔끔한 try/catch/finally
pipeline
// Case 1. .pipe()
const rs = fs.createReadStream('input.txt');
const ts = toUpperCaseTransform;
const ws = fs.createWriteStream('output.txt');
 
rs.pipe(ts).pipe(ws);
 
// 에러·완료를 각각 잡아야 함
rs.on('error', err => console.error('read error', err));
ts.on('error', err => console.error('transform error', err));
ws.on('error', err => console.error('write error', err));
 
ws.on('finish', () => console.log('완료!!'));
 
// Case 2. pipeline()
import { pipeline } from 'stream/promises';
 
try {
  await pipeline(
    fs.createReadStream('input.txt'),
    toUpperCaseTransform,
    fs.createWriteStream('output.txt'),
  );
  console.log('전체 완료!');
} catch (err) {
  console.error('파이프라인 중 에러:', err);
}
  • 에러 핸들러가 3곳, 종료 핸들러 한 곳. 중간에 놓친 이벤트가 있으면 종료 콜백이 실행되지 않거나, 에러가 누락될 수 있습니다.
  • 한 번의 await/catch 로 파이프라인 전체를 감싸 처리하고, 내부에서 모든 스트림을 자동으로 연결하고, 에러 발생 시 중단됩니다.

간단한 파일 복사 예시는 아래와 같습니다.

pipeline
import * as fs from 'fs';
import { pipeline } from 'stream/promises';
 
async function copyFile(src: string, dest: string) {
  try {
    await pipeline(
      fs.createReadStream(src),
      fs.createWriteStream(dest),
    );
    console.log('파일 복사 완료!');
  } catch (err) {
    console.error('파일 복사 실패:', err);
  }
}
 
copyFile('a.txt', 'b.txt');

조금 더 응용한 예시를 봅시다.

pipeline
// transform-file.ts
import * as fs from 'fs';
import { pipeline } from 'stream/promises';
import { Transform } from 'stream';
import * as readline from 'readline';
 
async function transformFile(
  srcPath: string,
  destPath: string
): Promise<void> {
  if (!fs.existsSync(srcPath)) {
    throw new Error(`Source file not found: ${srcPath}`);
  }
 
  const readStream = fs.createReadStream(srcPath, { encoding: 'utf8' });
  // 2. 쓰기 스트림 (필요한 디렉터리 생성)
  await fs.promises.mkdir(require('path').dirname(destPath), { recursive: true });
  const writeStream = fs.createWriteStream(destPath, { encoding: 'utf8' });
 
  // 3. 라인별로 처리하기 위한 Transform 스트림
  const toUpperCaseTransform = new Transform({
    transform(chunk: Buffer | string, _encoding: string, callback: TransformCallback) {
      const data = chunk.toString().toUpperCase();
      callback(null, data);
    },
  });
 
  // 4. 파이프라인 연결
  try {
    await pipeline(
      readStream,
      toUpperCaseTransform,
      writeStream
    );
    console.log(`✔ Transformed "${srcPath}" → "${destPath}"`);
  } catch (err) {
    console.error('✖ Pipeline failed:', err);
    throw err;
  }
}
 
// 실행 예시
(async () => {
  const src = './data/input.txt';
  const dest = './data/output.txt';
  await transformFile(src, dest);
})();
  • 파일을 스트림(fs.createReadStream, fs.createWriteStream)으로 읽고 쓸 수 있도록 각각 생성합니다. 쓰기 전에 fs.promises.mkdir으로 디렉터리를 보장합니다.
  • Transform을 통해 들어오는 청크를 모두 대문자로 변경하는 로직을 추가합니다.
  • readStream → transform → writeStream 을 에러 핸들링과 함께 Promise로 연결합니다. 에러 발생 시 catch로 잡아내고, 완료 시 await가 풀리면서 사용자에게 완료 메시지를 출력합니다.

new Transform

Transform은 Node.js의 stream 모듈이 제공하는 클래스 중 하나로, 입력 스트림(ReadStream) 을 받아 데이터를 가공한 뒤 출력 스트림(WriteStream) 으로 넘겨주는 역할을 합니다. 쉽게 말해, 파이프라인에서 “필터”나 “변환기” 같은 역할을 합니다.

pipeline
import { Transform } from 'stream';
 
// 1) 단순 대문자 변환 스트림 정의
const toUpperCaseTransform = new Transform({
  transform(chunk, _encoding, callback) {
    // chunk를 문자열로 바꿔서 대문자로 변환
    const upper = chunk.toString().toUpperCase();
    // 변환된 데이터를 다음으로 넘겨줌
    callback(null, upper);
  }
});
 
// 2) 사용 예시: ReadStream → Transform → WriteStream
import { pipeline } from 'stream/promises';
 
await pipeline(
  fs.createReadStream('input.txt'),
  toUpperCaseTransform,
  fs.createWriteStream('output.txt'),
);
 
  • 읽기(readable)와 쓰기(writable) 인터페이스를 모두 가지고, 들어온 데이터를 내부에서 처리(transform)한 뒤 내보내는 구조입니다
  • transform(chunk, encoding, callback) 메서드
    • chunk : 들어오는 데이터 조각(버퍼 또는 문자열)
    • encoding : 문자열 인코딩(버퍼가 아니라 문자열일 때)
    • callback(error?, data?) : 에러가 있으면 error를 넘기고, 변환된 데이터를 넘겨주면 다음 단계로 흘러갑니다.

대용량 파일을 메모리에 한 번에 올리지 않고 읽으면서 바로 (chunk 단위로) 변환·압축·암호화·파싱 등을 해야 할 때, 이 경우 Transform을 쓰면 메모리 부담을 줄이면서도 파이프라인으로 깔끔하게 처리할 수 있습니다.