MLOps

데이터 수집

백악기작은펭귄 2021. 12. 19.
반응형

데이터 수집

 

TFX를 이용한 머신러닝 파이프라인에서는 기본 TFX 설정과 ML 메타데이터스토어를 사용하여 다양한 컴포넌트에서 활용할 수 있는 데이터셋을 수집할 수 있다.

 

머신러닝 파이프라인 중 데이터 수집 단계

 

TFX는 파일이나 서비스로부터 데이터를 수집하는 컴포넌트를 제공한다. 이는 수집뿐만 아니라 학습 데이터와 검증 데이터로의 분할, 그리고 추출된 데이터를 하나의 데이터셋으로 결합하는 작업까지의 프로세스를 모두 수행한다.


TFRecord & ExampleGen

TFRecord는 대용량 데이터셋 스트리밍에 최적화된 경량화 포맷이다. TFRecord는 직렬화된 프로토콜 버퍼를 포함하여 거의 모든 바이너리 데이터의 저장을 지원한다.

import tensorflow as tf

with tf.io.TFRecordWriter('test.tfrecord') as w:
  w.write(b'First record')
  w.write(b'Second record')

for record in tf.data.TFRecordDataset('test.tfrecord'):
  print(record)
>> tf.Tensor(b'First Record', shape=(), dtype=string)
>> tf.Tensor(b'Second Record', shape=(), dtype=string)

데이터셋은 로컬 및 원격 폴더에서 읽을 수 있을 뿐만 아니라 구글 클라우드 빅쿼리 등의 데이터 서비스에서 요청할 수도 있다.


로컬 데이터 파일 수집

데이터셋을 수집, 분할, 변환하는 프로세스는 ExampleGen 컴포넌트가 수행한다. 이는 CSV, 사전 계산된 TFRecord 파일, 아파치 아브로(Apache Avro), 아파치 파케이(Apache Parquet)의 직렬화 출력 등 다양한 데이터 구조를 수집할 수 있다.

 

CSV 데이터를 tf.Example로 변환하기

머신러닝에서 흔히 다루는 정형 데이터 혹은 텍스트 데이터는 CSV 파일에 저장된 경우가 많다. TFX는 이런 CSV 파일을 읽고 tf.Example로 변환하는 feature를 제공한다. 다음 코드는 예제 프로젝트의 CSV 데이터를 포함하는 데이터 폴더의 수집을 수행하는 코드이다.

import os
from tfx.components import CsvExampleGen
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

context = InteractiveContext()

dir_path = os.getcwd()

# 데이터 경로 정의
imput_base = os.path.join(dir_path, "..", "..", "data", "taxi")

# 파이프라인 컴포넌트 인스턴스화
example_gen = CsvExampleGen(input_base=input_base)

# 대화형 파이프라인 실행
context.run(example_gen)

 

컴포넌트를 대화형 파이프라인으로 실행하면 실행 메타데이터가 출력된다. 이는 학습 및 평가 데이터셋의 스토리지 위치를 강조 표시한다.


기존 TFRecord 파일 가져오기

이미지, 대용량 텍스트 데이터 등을 표현하기 위해서 CSV 파일 형태는 적절치 않다. 이처럼 종종 CSV로 표현이 어렵거나 불편한 데이터들이 존재하는데, 이때는 데이터셋을 TFRecord 데이터 구조로 변환 후 ImportExampleGen 컴포넌트로 로드하면 편리하다.

아래는 TFRecord 파일을 로드하는 예제 코드이다.

import os
from tfx.components import ImportExampleGen
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

context = InteractiveContext()

dir_path = os.getcwd()
tfrecord_dir = os.path.join(dir_path, "..", "..", "data", "tfrecord_data")

example_gen = ImportExampleGen(input_base=tfrecord_dir)

context.run(example_gen)

 

데이터셋은 이미 TFRecord 파일 내부에 tf.Example 형태로 저장되어 있기 때문에 따로 변환이 필요치 않다.


Parquet로 직렬화된 데이터를 tf.Example로 변환하기

위에서 설명한 데이터 외에도 다양한 데이터 형태가 존재한다. 기본적으로 새로운 파일 형식을 로드하기 위해서는 새로운 컴포넌트가 필요하지만, TFX의 경우 파케이(Parquet)로 직렬화된 데이터를 비롯해 다양한 파일 형식을 로드하는 실행자 클래스가 포함되어 있기 때문에 executor_class를 재정의하는 것만으로도 새로운 컴포넌트의 작성 없이 수행할 수 있다.

다음 예시는 executor_class를 재정의하여 파일을 읽어들이는 동작을 변경하는 코드이다. 이는 일반 파일 로더 컴포넌트인 FileBasedExampleGen을 사용하여 executor_class를 재정의함으로써 수행할 수 있다.

from tfx.components.base import executor_spec

# 일반 파일 로더 컴포넌트
from tfx.components import FileBasedExampleGen

# Parquet 관련 실행자
from tfx.components.example_gen.custom_executors import parquet_executor

parquet_dir_path = 'parquet_data'

# Executor 재정의
example_gen = FileBasedExampleGen(
    input_base=parquet_dir_path,
    custom_executor_spec=executor_spec.ExecutorClassSpec(
        parquet_executor.Executor
  )
)

Avro로 직렬화된 데이터를 tf.Example로 변환하기

executor_class는 대부분의 파일 형식을 지원한다. 따라서 위와 유사한 방식으로 아브로(Avro)로 직렬화된 데이터 또한 로드할 수 있다.

from tfx.components.base import executor_spec

from tfx.components import FileBasedExampleGen

# Avro 관련 실행자
from tfx.components.example_gen.custom_executors import avro_executor

avro_dir_path = 'avro_data'

example_gen = FileBasedExampleGen(
    input_base=avro_dir_path,
    custom_executor_spec=executor_spec.ExecutorClassSpec(
        avro_executor.Executor
  )
)

 

위 두 번에 걸쳐 볼 수 있듯이, 다른 파일 형식을 로드하기 위해서는 먼저 해당 유형 실행자를 작성하여 Executor 재정의를 진행하면 된다.


사용자 지정 데이터를 TFRecord 데이터 구조로 변환하기

웹 상에서 데이터를 다운로드한 후 용도에 맞게 전처리를 진행해보도록 하자.

import os
import tensorflow as tf
import pandas as pd
import numpy as np
import shutil
from pathlib import Path

# 원시 데이터 다운로드
filepath = tf.keras.utils.get_file(
    'complaints.csv.zip',
    'http://files.consumerfinance.gov/ccdb/complaints.csv.zip'
)

dir_path = os.getcwd()
data_dir = os.path.join(dir_path, "..", "..", "data")
processed_dir = os.path.join(dir_path, "..", "..", "data", "processed")
Path(processed_dir).mkdir(parents=True, exist_ok=True)

# 압축 해제
shutil.unpack_archive(filepath, data_dir)

df = pd.read_csv(os.path.join(data_dir, 'complaints.csv'))

df.columns = [
              'datae_recieved', 'product', 'sub_product', 'issue', 'sub_issue',
              'consumer_complaint_narrative', 'company_public_response',
              'company', 'state', 'zip_code', 'tags',
              'consumer_consent_provided', 'submitted_via',
              'date_sent_to_company', 'company_response',
              'timely_response', 'consumer_disputed', 'complaint_id'
]

df.loc[df['consumer_disputed'] == "", "consumer_disputed"] = np.nan

# 주요 필드 결측치일 경우 레코드 제외
df = df.dropna(subset=['consumer_complaint_narrative', 'consumer_disputed'])

# consumer_disputed의 Yes와 No를 1과 0으로 변경
df.loc[df['consumer_disputed'] == 'Yes', 'consumer_disputed'] = 1
df.loc[df['consumer_disputed'] == 'No', 'consumer_disputed'] = 0

# zip_code
df.loc[df['zip_code'] == "", "zip_code"] = '000000'
df.loc[pd.isna(df['zip_code']), 'zip_code'] = '000000'

df = df[df['zip_code'].str.len() == 5]
df['zip_code'] = df['zip_code'].str.replace("XX", "00")
df = df.reset_index(drop=True)
df['zip_code'] = pd.to_numeric(df['zip_code'], errors='coerce')

# dataframe to csv
df.to_csv(os.path.join(processed_dir, 'processed-complaints.csv'), index=False)

 

기존 데이터셋을 TFRecord 데이터 구조로 변환 후 ImportExampleGen을 통해 수집하는 것이 더 간단한 경우도 있지만, 이와 같은 방식은 효율적인 데이터 스트리밍이 어려운 플랫폼을 통해 데이터를 사용하는 경우에 유용하다.

 

데이터는 CSV 뿐만 아니라 JSON, XML 등의 형태로 제공되기도 하는데, 이 경우 해당 데이터의 형식을 tf.Example 구조로 만들어야만 ImportExmpleGen 컴포넌트를 통해 데이터를 수집할 수 있다.

 

tf.Example 구조

 

TFRecord 데이터 구조의 경우 tf.Example 내부에 키-값 매핑이 포함된 Dictionary를 사용할 수 있는 tf.Features 객체를 포함한다. 이때의 키는 해당 열을 나타내는 string 형태의 식별자이며, 값은 tf.train.Feature 객체이다.

# TFRecord 데이터 구조
tf.Example
	tf.Features
    	'column A':tf.train.Feature
        'column B':tf.train.Feature
        'column C':tf.train.Feature
        .
        .
        .

 

tf.train.Feature는 다음 3가지 데이터 타입을 허용한다.

  1. tf.train.ByteList
  2. tf.train.FloatList
  3. tf.train.Int64List

데이터 구조를 변환하는 헬퍼 함수를 정의하면 코드 중복을 줄일 수 있다. 헬퍼 함수를 사용해서 데모 데이터셋을 TFRecord 데이터 형식의 파일로 변환해보자.

import os
import re

import tensorflow as tf
import pandas as pd

# helper functions
def _bytes_feature(value):
  return tf.train.Feature(
      bytes_list=tf.train.BytesList(value=[value.encode()])
      )
  
def _float_feature(value):
  return tf.train.Feature(
      float_list=tf.train.FloatList(value=[value])
      )

def _int64_feature(value):
  return tf.train.Feature(
      int64_list=tf.train.Int64List(value=[value])
      )

def clean_rows(row):
  if pd.isna(row['zip_code']):
    row['zip_code'] = '99999'
  return row

def convert_zipcode_to_int(zipcode):
  nums = re.findall(r'\d+', zipcode)
  if len(nums) > 0:
    int_zipcode = int(nums[0])
  else:
    int_zipcode = 99999
  return int_zipcode

# path
dir_path = os.getcwd()
data_dir = os.path.join(dir_path, "..", "..", "data")
tfrecord_dir = os.path.join(dir_path, "..", "..", "data", "tfrecord")

df = pd.read_csv(os.path.join(data_dir, 'processed/processed-complaints.csv'))

Path(tfrecord_dir).mkdir(parents=True, exist_ok=True)

tfrecord_filename = 'consumer-complaints.tfrecord'
tfrecord_filepath = os.path.join(tfrecord_dir, tfrecord_filename)

# TFRecord 객체 생성
tf_record_writer = tf.io.TFRecordWriter(tfrecord_filepath)

for idx, row in df.iterrows():
  row = clean_rows(row)

  # 모든 데이터 레코드를 tf.train.Example로 변환
  example = tf.train.Example(
      features=tf.train.Features(
          feature={
              'product': _bytes_feature(str(row['product'])),
              'sub_product': _bytes_feature(str(row['sub_product'])),
              'issue': _bytes_feature(str(row['issue'])),
              'sub_issue': _bytes_feature(str(row['sub_issue'])),
              'state': _bytes_feature(str(row['state'])),
              'zip_code': _int64_feature(convert_zipcode_to_int(str(row['zip_code']))),
              'company': _bytes_feature(str(row['company'])),
              'company_response': _bytes_feature(str(row['company_response'])),
              'timely_response': _bytes_feature(str(row['timely_response'])),
              'consumer_disputed': _float_feature(row['consumer_disputed']),
          }
      )
  )

  # 데이터 구조 직렬화
  tf_record_writer.write(example.SerializeToString())
tf_record_writer.close()

 

이제 생성된 TFRecord 파일인 consumer-complaints.tfrecord를 ImportExampleGen 컴포넌트로 가져올 수 있다.


원격 데이터 파일 수집

ExampleGen 컴포넌트는 구글 클라우드 스토리지(Google Cloud Storage)나 AWS S3(Simple Storage Service)등의 원격 클라우드 저장소 버킷에서 파일을 읽어올 수 있다. 다음 예시는 ExampleGen 내 input_base네 external_input에 대한 버킷 경로를 설정하는 방법이다.

from tfx.components import CsvExampleGen
example_gen = CsvExampleGen(input_base=("gs://example_compliance_data/"))

 

개인 클라우드 스토리지 버킷에 액세스 하기 위해서는 클라우드 제공자 자격 증명의 설정이 필요하다. AWS는 사용자별 액세스 키와 암호로 사용자를 인증하며 GCP의 경우 서비스 계정으로 사용자를 인증한다.


데이터베이스에서 직접 수집

TFX는 데이터베이스에서 직접 데이터셋을 수집할 수 있도록 하는 두 가지 컴포넌트를 제공하는데, 빅쿼리 테이블의 데이터를 쿼리 하는 BigQueryExampleGen과 프레스토(Presto) 데이터베이스의 데이터를 쿼리 하는 PrestoExampleGen이 그것이다.

 

구글 클라우드 빅쿼리

TFX가 제공하는 BigQueryExampleGen 컴포넌트는 GCP 생태계에서 머신러닝 파이프라인을 실행할 때 정형 데이터를 매우 효율적으로 수집할 수 있도록 한다.

구글 클라우드 자격증명
BigQueryExampleGen 컴포넌트를 실행하기 위해서는 로컬 환경에서 필요한 구글 클라우드 자격 증명 설정이 요구된다. BigQuery Data Viewer나 BigQuery Job User 등의 역할로 서비스 계정을 생성하여야 한다.

아파치 빔이나 아파치 에어플로를 사용하여 대화형 콘텍스트에서 컴포넌트를 실행할 경우, 다음과 같이 환경 변수 GOOGLE_APPLICATION_CREDENTIALS로 서비스 계정 인증 파일 경로를 지정해줘야 한다.
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/path/to/credential_file.json"​

 

다음 예시는 빅쿼리 테이블을 쿼리 하는 가장 간단한 방법이다.

from tfx.extensions.google_cloud_big_query.example_gen.component import BigQueryExampleGen

query = """
	SELECT * FROM `<project_id>.<database>.<table_name>`
"""

example_gen = BigQueryExampleGen(query=query)

프레스토 데이터베이스

프레스토 데이터베이스에서 데이터를 수집하기 위해서는 PrestoExampleGen 컴포넌트를 사용하면 된다. 사용법은 위에서 본 BigQueryExampleGen과 굉장히 유사한데, PrestoExampleGen의 경우 데이터베이스의 연결 세부 정보를 지정하는 추가 구성이 필요하다.

pip install presto-python-client
from tfx.examples.custom_components.presto_example_gen.proto import presto_config_pb2
from tfx.examples.custom_components.presto_example_gen.presto_component.component
import PrestoExampleGen

query = """
	SELECT * FROM `<proect_id>.<database>.<table_name>`
"""

# 데이터베이스 연결 세부 정보 지정
presto_config = presto_config_pb2.PrestoConnConfig(
	host='localhost',
    port=8080)

example_gen = PrestoExampleGen(presto_config, query=query)
반응형

'MLOps' 카테고리의 다른 글

데이터 검증  (0) 2021.12.27
데이터 준비  (0) 2021.12.21
아파치 빔  (0) 2021.12.15
대화형 파이프라인  (0) 2021.12.13
ML 메타데이터  (0) 2021.12.12

댓글