MLOps

아파치 빔

백악기작은펭귄 2021. 12. 15.
반응형

아파치 빔

다양한 TFX 컴포넌트와 라이브러리는 아파치 빔을 사용하여 파이프라인 데이터를 효율적으로 처리한다. 아파치 빔을 파이프라인 오케스트레이션 툴로 사용하는 방법은 추후 자세히 알아보도록 하고, 여기서는 TFX 컴포넌트에서 아파치 빔이 어떻게 작동하는지 알아보도록 하겠다.

 

아파치 빔(Apache Beam)은 2016년 오픈소스 형태로 공개된 unified programming model로, ETL, 배치 프로세스, 스트리밍 작업을 포함하여 다양한 데이터 처리 파이프라인을 정의하고 실행하기 위한 프로그램이다. TFX는 아파치 빔에 의존하며, 다양한 컴포넌트 내에서 이를 사용하고 있다.


설치

아파치 빔은 다음 명령어로 설치할 수 있다.

pip install apache-beam
pip install 'apache-beam[gcp]'		# GCP상에서 설치
pip install 'apache-beam[boto]'		# AWS상에서 설치

 

만약 앞서 TFX를 pip으로 설치한 경우 아파치 빔은 자동으로 설치되므로 위 명령어는 생략해도 무방하다.


기본 데이터 파이프라인

아파치 빔의 추상화는 Collection과 Transform이라는 두 가지 개념을 기반으로 한다.

 

아파치 빔의 Collection은 지정된 파일 또는 스트림에서 데이터를 읽고 쓰는 작업을, Transform은 데이터를 조작하는 작업을 제공한다. 모든 Collection과 Transform은 파이프라인의 Context에서 실행되고, Context manager 명령어를 이용해 파이썬에서 표시될 수 있다.


기본 Collection 예제

데이터 파이프라인은 데이터를 읽으면서 시작하고, 처리 완료된 데이터를 다시 쓰면서 종료된다. 보통 PCollection(Parallel Collection)을 통해 아파치 빔에서 처리되고, 변환된 최종 결과는 다시 Collection 형태로 표시되어 파일 시스템에 기록된다.

 

아래는 텍스트 파일을 읽고 모든 행을 반환하는 방법이다.

import apache_beam as beam

with beam.Pipeline() as p: # define pipeline with context manager
  lines = p | beam.io.ReadFromText(input_file) # Read text with PCollection
  # 여기서 | 는 파이프 연산자로, 수식의 결과를 다른 수식의 첫 번째 매개변수로 전달하는 역할을 한다.

 

아파치 빔은 또한 텍스트 파일에 Collection을 기록하는 feature인 WriteToText를 제공한다. 일반적으로 쓰기 작업은 모든 변환이 실행된 후에 실행된다.

with beam.Pipeline() as p:
  ...
  output |  beam.io.WriteToText(output_file)

기본 Transform 예제

위에서 언급하였듯이, 아파치 빔에서는 Transform을 이용하여 데이터를 조작한다. Transform은 파이프 연산자를 사용하여 체인으로 연결 가능하다. 만약 같은 유형의 Transfrom 여러 개를 연결하려면 | 와 >> 사이에 문자열 식별자(' ')로 표시된 작업 이름을 넣어주어야 한다.

 

아래 예시에서는 텍스트 파일에서 추출된 행에 모든 Transform을 순차적으로 진행한다.

counts = (
    lines
    | 'Split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
    | 'PairWithOne' >> beam.Map(lambda x: (x,1))
    | 'GroupAndSum' >> beam.CombinePerKey(sum)
)

 

각각의 Transform을 'Hello, how do you do?'라는 문장과 'I am fine, thank you'라는 문장을 예시로 하여 차례로 좀 더 자세히 살펴보겠다. 

 

먼저 Split transform은 알파벳으로 이루어진 문장을 띄어쓰기를 기준으로 한 토큰 list로 분할한다.

["Hello", "how", "do", "you", "do"]
["I", "am", "fine", "thank", "you"]

 

이렇게 분할된 list는 beam.FlatMap에 의하여 PCollection에 매핑된다.

"Hello", "how", "do", "you", "do", "I", "am", "fine", "thank", "you"

 

다음으로 PairWithOne tranform은 beam.Map을 사용하여 각 토큰과 숫자 1을 튜플로 만들도록 매핑한다. 이때의 1은 토큰의 카운트를 위해 사용되게 된다.

("Hello", 1) ("how", 1) ("do", 1) ("you", 1) ("do", 1) ("I", 1) ("am", 1) ("fine", 1) ("thank", 1) ("you", 1)

 

마지막 transform인 GroupAndSum transform은 위에서 만들어진 튜플을 토큰 별로 요약한다. 토큰 값이 키가 되어 뒤 카운트가 통합되는 것이다.

("Hello", 1) ("how", 1) ("do", 2) ("you", 2) ("I", 1) ("am", 1) ("fine", 1) ("thank", 1)

 

이외에도 파이썬 함수를 transform의 일부로 적용할 수 있다. 아래 예시는 위에서 만들어진 counts를 텍스트 파일에 쓸 수 있는 문자열로 변환한다.

def format_result(word_count):
  # 튜플(word, count)를 문자열로 변환
  (word, count) = word_count
  return f"{word}: {count}"

output = counts | beam.Map(format_result)

 

아파치 빔은 기본적으로 다양한 Transform feature를 제공하지만, 만약 원하는 feature가 없다면 파이썬 함수로 만들어 beam.Map을 이용해 직접 만들 수 있다.

 

다음은 완성된 예제이다.

import re

import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions

# 텍스트 파일은 구글 클라우드 스토리지 버킷에 저장된다.
input_file = "gs://dataflow-samples/shakespeare/kinglear.txt"
output_file = '/tmp/output.txt'

# 파이프라인 옵션 객체
pipeline_options = PipelineOptions()

# 아파치 빔 파이프라인 설정
with beam.Pipeline(options=pipeline_options) as p:
  lines = p | beam.io.ReadFromText(input_file)
  
  counts = (
    lines
    | 'Split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
    | 'PairWithOne' >> beam.Map(lambda x: (x,1))
    | 'GroupAndSum' >> beam.CombinePerKey(sum)
    )
  
  def format_result(word_count):
    # 튜플(word, count)를 문자열로 변환
    (word, count) = word_count
    return f"{word}: {count}"

  output = counts | beam.Map(format_result)

  output |  beam.io.WriteToText(output_file)

기본 파이프라인 실행

파이프라인은 아파치 빔의 DirectRunner로 실행할 수 있다. 위 예시 코드를 pipeline_tutorial.py라는 이름으로 저장했다고 가정했을 때, 다음 명령어로 간단하게 실행할 수 있다. (아파치 스파크나 아파치 플링크 등 다른 아파치 빔 실행기에서 파이프라인을 실행하고자 할 때는 pipeline_options로 파이프라인 구성을 설정해줘야 한다.)

python pipeline_tutorial.py
반응형

'MLOps' 카테고리의 다른 글

데이터 준비  (0) 2021.12.21
데이터 수집  (0) 2021.12.19
대화형 파이프라인  (0) 2021.12.13
ML 메타데이터  (0) 2021.12.12
TFX 컴포넌트 개요  (0) 2021.12.11

댓글