본문 바로가기

프로그래밍/golang

[Golang] io.Pipe

 

대용량 데이터를 처리할 때 slice나 구조체를 적절히 잘 사용하는 것도 좋지만,

Go언어에서 제공하는 표준 라이브러라 "io.Pipe"를 사용한다면 "OOM"을 방지하는데 도움이 됩니다.


1.  io.Pipe

  • 왜 사용 하는가?
    보통 slice, buffer으로 데이터를 적재하고 이동시키는 방법은 데이터의 크기 만큼 RAM을 점유하기 때문에,
    OOM에 취약합니다.

    대신 io.Pipe는 데이터를 쌓아두지 않고, 생성된 데이터는 즉시 바로 목적지에 전달하는 실시간 통로를 제공합니다.
    데이터가 아무리 크더라도 전송 되는 동안에는, 설정된 버퍼만큼만 유지되며 속도도 빠릅니다.

  • 동작원리
    io.Pipe는 메모리 내부에서 Reader와 Writer가 연결된 동시식 통로를 만듭니다.

    Writer에서 데이터를 파이프로 넣으려고 하더라도, 반대편 Reader에서 읽을 준비가 되어 있지 않는다면 Writer는 대기 합니다.
    (이러한 것을 Backpressure이라고 하며, 소비 속도에 맞춰서 생산 속도를 맞춰 조절합니다.)

 

2.  예제 코드

package main

import (
	"bufio"
	"context"
	"encoding/json"
	"fmt"
	"io"
	"log"
	"sync"
	"time"
)

type event struct {
	ID   int    `json:"id"`
	Name string `json:"name"`
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	// 파이프 생성 (In-memory 동기식 파이프)
	pr, pw := io.Pipe()

	var wg sync.WaitGroup
	wg.Add(2)

	// 1. Producer: 데이터를 생성하여 파이프에 쓰기
	go func() {
		defer wg.Done()

		var prodErr error
		defer func() {
			if prodErr != nil {
				pw.CloseWithError(prodErr)
			} else {
				pw.Close()
			}
		}()

		bw := bufio.NewWriterSize(pw, 1024) // 데이터 크기가 1024 이상 자동 pipe
		defer bw.Flush()                    // 수동

		enc := json.NewEncoder(bw)

		for i := 1; i <= 10; i++ {
			select {
			case <-ctx.Done():
				prodErr = ctx.Err()
				return
			default:
				msg := event{ID: i, Name: fmt.Sprintf("evt-%d", i)}
				if err := enc.Encode(msg); err != nil {
					prodErr = fmt.Errorf("encode fail: %w", err)
					return
				}

				time.Sleep(50 * time.Millisecond) // temp logic
			}
		}
	}()

	// 2. Consumer: 파이프에서 데이터를 읽어 처리하기
	go func() {
		defer wg.Done()
		defer pr.Close()

		dec := json.NewDecoder(pr)
		for {
			var e event
			// Decode는 내부적으로 필요한 만큼 Reader(pr)에서 데이터를 읽어옵니다.
			if err := dec.Decode(&e); err != nil {
				if err == io.EOF {
					log.Println("consumer: [SUCCESS] streaming finished")
					return
				}
				// Producer가 CloseWithError로 보낸 에러가 여기서 잡힙니다.
				log.Printf("consumer: [ERROR] %v\n", err)
				return
			}
			log.Printf("consumer: [RECEIVED] id=%d, name=%s\n", e.ID, e.Name)
		}
	}()

	wg.Wait()
	log.Println("main: all routines finished")
}

 

 

3.  전체 코드

https://github.com/reochoi109/go-handbook/blob/main/io/example/main.go

 

go-handbook/io/example/main.go at main · reochoi109/go-handbook

A personal handbook of Go patterns and best practices. Lightweight, practical code snippets for real-world backend development. - reochoi109/go-handbook

github.com

 

 

4.  참조 자료

https://go.dev/blog/pipelines

 

Go Concurrency Patterns: Pipelines and cancellation - The Go Programming Language

Go Concurrency Patterns: Pipelines and cancellation Sameer Ajmani 13 March 2014 Introduction Go’s concurrency primitives make it easy to construct streaming data pipelines that make efficient use of I/O and multiple CPUs. This article presents examples o

go.dev