MLOps

TFT를 머신러닝 파이프라인에 통합하기

백악기작은펭귄 2022. 1. 8.
반응형

TFT를 머신러닝 파이프라인에 통합하기

앞선 데이터 검증 단계에서 우리는 데이터셋의 피처 별 타입 및 분포부터 문자열 표현과 벡터형 표현까지 피처의 특성을 파악했다. 이는 전처리 단계에서 feature engineering을 정의하는 데에 중요하다.

 

다음 코드에서는 피처를 정의한다. 간편한 처리를 위해 원-핫 인코딩 피처, 버킷 처리 피처, 원시 문자열 표현 세 그룹으로 그룹화하였다.

import tensorflow as tf
import tensorflow_transform as tft

LABEL_KEY = 'consumer_disputed'

# 'feature_name':feature_dimension
ONE_HOT_FEATURES = {
    'product':11,
    'sub+product':45,
    'company_response':5,
    'state':60,
    'issue':90
}

# 'feature_name':bucket_count
BUCKET_FEATURES = {
    'zip_code':10
}

# 'feature_name':non-defined value
TEXT_FEATURES = {
    'consumer_complaint_narrative':None
}

헬퍼 함수

먼저 데이터를 효율적으로 변환하는 몇 가지 헬퍼 함수를 정의하도록 하겠다.

 

다음 함수는 피처 이름에 접미사(ex. _xf)를 추가하는 함수이다. 접미사를 사용하면 오류가 입력과 출력 중 어디서 발생하는지 구별하는데 도움이 되며, 변환되지 않은 피쳐를 모델이 사용하게 되는 불상사를 미연에 방지할 수 있다.

def transformed_name(key):
  return key + '_xf'

 

TFT는 변환 출력이 dense 하다고 가정한다. 다음 함수를 통해 sparse feature를 dense feature로 변환하고, 결측값을 기본값으로 채울 수 있다.

def fill_in_missing(x):
  default_value = '' if x.dtype == tf.string or to_string else 0
  if type(x) == tf.SparseTensor:
    x = tf.sparse.to_dense(
        tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
        default_value
    )

  return tf.squeeze(x, axis=1)

 

우리 모델에서는 대부분의 입력 피처를 원-핫 인코딩 벡터로 나타낸다. 다음 함수는 지정된 인덱스를 원-핫 인코딩 표현으로 변환 후 벡터를 반환하는 함수이다.

def convert_num_to_one_hot(label_tensor, num_labels=2):
  one_hot_tensor = tf.one_hot(label_tensor, num_labels)

  return tf.reshape(one_hot_tensor, [-1, num_labels])

 

우리는 또한 문자열로 표시된 우편번호(zip code)를 실수형으로 변환하는 함수가 필요하다. 다음 코드는 누락된 자리 표시자를 0으로 교체하고 실수형 결괏값으로 만든 후 이를 10개의 버킷으로 버킷화하는 함수이다.

def convert_zip_code(zip_code):
  if zip_code == '':
    zip_code = '00000'
  zip_code = tf.strings.regex_replace(zip_code, r'X{0.5}', '0')
  zip_code = tf.strings.to_number(zip_code, out_type=tf.float32)

  return zip_code

피처 변환

위에서 정의한 헬퍼 함수들을 활용하여 피처를 변환할 수 있다.

 

피처를 원-핫 피처로 변환하기 위해서는 범주 이름을 tft.compute_and_apply_vocabulary()를 사용해 인덱스로 변환한 후 헬퍼 함수 convert_num_to_oh()를 사용하여 인덱스를 원-핫 벡터 표현으로 변환하면 된다.

def preprocessing_fn(inputs):
  outputs = {}
  for key in ONE_HOT_FEATURES. keys():
    dim = ONE_HOT_FEATURES[key]
    index = tft.compute_and_apply_vocabulary(
        fill_in_missing(inputs[key]), top_k=dim+1        
    )
    outputs[transformed_name(key)] = convert_num_to_one_hot(
        index, num_labels=dim+1
    )

    return outputs

 

버킷 피처에 대한 처리 방식을 보자. 이 데이터셋에서는 핫 인코딩 된 우편번호가 너무 sparse 하므로 우편번호를 버킷화하였다. 각 피처는 버킷 10개로 버킷화하고 버킷의 인덱스는 원-핫 벡터로 인코딩한다.

for key, bucket_count in BUCKET_FEATURES.items():
  temp_feature = tft.bucketize(
      convert_zip_code(fill_in_missing(inputs[key])),
      bucket_count,
      always_return_num_quantiles=False
  )
  outputs[transformed_name(key)] = convert_num_to_one_hot(
      temp_feature,
      num_labels=bucket_count+1
  )

 

텍스트 입력 피처와 레이블 열은 변환할 필요가 없으므로, 피처가 희소할 때 dense feature로 변환하기만 하면 된다.

for key in TEXT_FEATURES.keys():
  outputs[transformed_name(key)] = fill_in_missing(inputs[key])

 outputs[transformed_name(LABEL_KEY)] = fill_in_missing(inputs[LABEL_KEY])

 

파이프라인에서 TFX의 Transform 컴포넌트를 사용하면 별도의 파이썬 파일로 변환 코드가 제공된다. 모듈 파일 이름은 사용자가 설정할 수 있지만 preprocessing_fn()이 모듈 내에 포함되어야 하며, 함수의 이름을 바꿀 수 없다.

transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=os.path.abspath('module_file_name.py')
)

context.run(transform)

 

TFX는 Transform 컴포넌트 실행 시 로드된 데이터에 위로부터 만들어진 모듈 파일에 정의된 변환을 적용한다. 컴포넌트는 변환된 데이터, 변환 그래프, 필수 메타데이터를 출력한다. 이렇게 출력된 변환 데이터와 변환 그래프는 다음 단계인 모델 학습 단계의 Trainer 컴포넌트에서 사용할 수 있다. 


# 코드
from typing import Union

import tensorflow as tf
import tensorflow_transform as tft

LABEL_KEY = 'consumer_disputed'

# 'feature_name':feature_dimension
ONE_HOT_FEATURES = {
    'product':11,
    'sub+product':45,
    'company_response':5,
    'state':60,
    'issue':90
}

# 'feature_name':bucket_count
BUCKET_FEATURES = {
    'zip_code':10
}

# 'feature_name':non-defined value
TEXT_FEATURES = {
    'consumer_complaint_narrative':None
}

def transformed_name(key):
  return key + '_xf'

def fill_in_missing(x):
  default_value = '' if x.dtype == tf.string or to_string else 0
  if type(x) == tf.SparseTensor:
    x = tf.sparse.to_dense(
        tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
        default_value
    )

  return tf.squeeze(x, axis=1)

def convert_num_to_one_hot(label_tensor, num_labels=2):
  one_hot_tensor = tf.one_hot(label_tensor, num_labels)

  return tf.reshape(one_hot_tensor, [-1, num_labels])

def convert_zip_code(zip_code):
  if zip_code == '':
    zip_code = '00000'
  zip_code = tf.strings.regex_replace(zip_code, r'X{0.5}', '0')
  zip_code = tf.strings.to_number(zip_code, out_type=tf.float32)

  return zip_code

def preprocessing_fn(inputs):
  outputs = {}
  for key in ONE_HOT_FEATURES. keys():
    dim = ONE_HOT_FEATURES[key]
    index = tft.compute_and_apply_vocabulary(
        fill_in_missing(inputs[key]), top_k=dim+1        
    )
    outputs[transformed_name(key)] = convert_num_to_one_hot(
        index, num_labels=dim+1
    )

    return outputs

  for key, bucket_count in BUCKET_FEATURES.items():
    temp_feature = tft.bucketize(
        convert_zip_code(fill_in_missing(inputs[key])),
        bucket_count,
        always_return_num_quantiles=False
    )
    outputs[transformed_name(key)] = convert_num_to_one_hot(
        temp_feature,
        num_labels=bucket_count+1
    )
  
  for key in TEXT_FEATURES.keys():
    outputs[transformed_name(key)] = fill_in_missing(inputs[key])

  outputs[transformed_name(LABEL_KEY)] = fill_in_missing(inputs[LABEL_KEY])

  return outputs

 

반응형

'MLOps' 카테고리의 다른 글

TFX Trainer 컴포넌트  (0) 2022.01.11
TFX 모델 학습 - 모델 정의하기  (0) 2022.01.10
TFT 독립 실행형으로 실행하기  (0) 2022.01.06
TFT를 이용한 데이터 전처리  (0) 2022.01.05
데이터 전처리  (0) 2022.01.04

댓글