다시 이음

Pytorch - Bert 사용하여 Fine-tuning하기 본문

Pre_Onboarding by Wanted(자연어 처리)

Pytorch - Bert 사용하여 Fine-tuning하기

Taeho(Damon) 2022. 3. 8. 20:17

코드 위주보다는 순서를 확인하여 'BERT 모델을 Fine_tuning 하는 과정'에 집중하려고 합니다.

 

Process

 

1. 데이터 불러오기

2. 데이터 전처리

  • 직접 pandas를 사용하여 전처리 혹은 Pytorch Dataset 클래스 사용

3. CustomClassifier 클래스 구현 ( 가중치 freeze, unfreeze )

  • 사전 학습(pre-trained)된 BERT 모델을 불러와 그 위에 1 hidden layer와 binary classifier layer를 쌓아 fine-tunning 모델을 생성
  • hidden layer 1개와 output layer(binary classifier layer)를 갖는 CustomClassifier 클래스를 구현

4. Iterator 구현

  • 학습 데이터를 배치 단위로 저장하는 이터레이터 함수 data_iterator 구현 혹은 Pytorch DataLoader 클래스 사용
  • text를 토큰 id로 변환하고 label은 텐서로 변환해 배치만큼 잘라 (input, target) 튜플 형태의 이터레이터를 생성

5. 모델 훈련하기

  • data_iterator 함수로 생성한 이터레이터를 for loop 돌면서 배치 단위의 데이터를 모델에 학습하는 train() 함수 구현
  • Loss, Optimizer 구현

 

1&2. 데이터 처리

 

데이터 전처리 같은 경우 결측치 제거, 랜덤 샘플링(라벨 비율 동일하게) 과 같은 부분을 진행합니다.

 

직접 진행할 경우와 Dataset 클래스를 사용하여 전처리를 진행하는 방법이 있습니다.

 

-Dataset 클래스 : 데이터 전처리, dict 또는 list 타입으로 변경

from torch.utils.data import Dataset

class CustomDataset(Dataset):
  """
  - input_data: list of string
  - target_data: list of int
  """

  def __init__(self, input_data:list, target_data:list):
      self.X = input_data
      self.Y = target_data
      
#필수로 지정해야하는 len,getitem
  def __len__(self):
      return len(self.Y)

  def __getitem__(self, index):
      return self.X[index], self.Y[index]
      
 #dataset 정의
 dataset = CustomDataset(input_data, target_data)

 

랜덤 샘플링의 경우, random 모듈을 사용하여 직접 함수를 정의하여 사용하는 방법과 생성되어 있는 모듈을 불러와 쓰는 방법이 있습니다.

 

 

3. CustomClassifier 클래스 구현 ( 가중치 freeze, unfreeze )

 

목표 : 사전 학습(pre-trained)된 BERT 모델을 불러와 그 위에 1 hidden layer와 binary classifier layer를 쌓아 fine-tunning 모델을 생성

Fine-tuning 모델 = CustomClassifier모델 --> BERT모델의 파라미터가 업데이트 하지 못하도록 설정하는 것을 freeze, 업데이트하도록 설정하는 것을 unfreeze 로 나눕니다.

*freeze되는 경우 BERT의 파라미터를 사용하지 않기 때문에 성능이 좋지 않습니다.

 

 

확인해야 할 것

 

  • 클래스 정의
    • 생성자 입력 매개변수
      • hidden_size : BERT의 embedding size
      • n_label : class(label) 개수
    • 생성자에서 생성할 변수
      • bert : BERT 모델 인스턴스
      • classifier : 1 hidden layer + relu + dropout + classifier layer를 stack한 nn.Sequential 모델
        • 첫번재 히든 레이어 (첫번째 nn.Linear)
          • input: BERT의 마지막 layer의 1번재 token ([CLS] 토큰) (shape: hidden_size)
          • output: (shape: linear_layer_hidden_size)
        • 아웃풋 레이어 (두번째 nn.Linear)
          • input: 첫번째 히든 레이어의 아웃풋 (shape: linear_layer_hidden_size)
          • output: target/label의 개수 (shape:2)
    • 메소드
      • forward()
        • BERT output에서 마지막 레이어의 첫번째 토큰 ('[CLS]')의 embedding을 가져와 self.classifier에 입력해 아웃풋으로 logits를 출력합니다.
    • 주의 사항
      • CustomClassifier 클래스는 부모 클래스로 nn.Module을 상속 받습니다.

- Unfreeze 모델

import torch.nn as nn
import torch.nn.functional as F

# classifier 구현 (Unfreezed)
class CustomClassifier(nn.Module):

  def __init__(self, hidden_size: int, n_label: int):
    super(CustomClassifier, self).__init__()

    self.bert = BertModel.from_pretrained("klue/bert-base")

    dropout_rate = 0.1
    linear_layer_hidden_size = 32

    self.classifier = nn.Sequential(
          nn.Linear(hidden_size, linear_layer_hidden_size),
          nn.ReLU(),
          nn.Dropout(dropout_rate),
          nn.Linear(linear_layer_hidden_size,n_label)
        ) # torch.nn에서 제공되는 Sequential, Linear, ReLU, Dropout 함수 활용


  def forward(self, input_ids=None, attention_mask=None, token_type_ids=None):

    outputs = self.bert(
        input_ids,
        attention_mask=attention_mask,
        token_type_ids=token_type_ids,
    )

    # BERT 모델의 마지막 레이어의 첫번재 토큰을 인덱싱
    cls_token_last_hidden_states = outputs['pooler_output'] # 마지막 layer의 첫 번째 토큰 ("[CLS]") 벡터를 가져오기, shape = (1, hidden_size)

    logits = self.classifier(cls_token_last_hidden_states)

    return logits

 

- Freeze 모델

class CustomClassifierFreezed(nn.Module):

  def __init__(self, hidden_size: int, n_label: int):
    super(CustomClassifierFreezed, self).__init__()

    self.bert = BertModel.from_pretrained("klue/bert-base")
    # freeze BERT parameter
    # BERT의 파라미터는 고정값으로 두고 BERT 위에 씌운 linear layer의 파라미터만 학습하려고 한다. 
    # 이 경우, BERT의 파라미터의 'requires_grad' 값을 False로 변경해줘야 학습 시 해당 파라미터의 미분값이 계산되지 않는다.
    for param in self.bert.parameters():
        param.requires_grad = False

    dropout_rate = 0.1
    linear_layer_hidden_size = 32

    self.classifier = nn.Sequential(
            nn.Linear(hidden_size, linear_layer_hidden_size),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(linear_layer_hidden_size, n_label)
        )

  def forward(self, input_ids=None, attention_mask=None, token_type_ids=None):

    outputs = self.bert(
        input_ids,
        attention_mask=attention_mask,
        token_type_ids=token_type_ids,
    )
    
    # BERT 모델의 마지막 레이어의 첫번재 토큰을 인덱싱
    cls_token_last_hidden_states = outputs['pooler_output'] # 마지막 layer의 첫 번째 토큰 ("[CLS]") 벡터를 가져오기, shape = (1, hidden_size)
    logits = self.classifier(cls_token_last_hidden_states)

    return logits

 

4. Iterator 구현 ( Data_iterator, DataLoader )

 

목표 : 데이터 프레임을 입력 받아 text를 토큰 id로 변환하고 label은 텐서로 변환해 배치만큼 잘라 (input, target) 튜플 형태의 이터레이터를 생성하는 data_iterator 함수를 구현합니다.

 

Iterator를 구현하여 사용할 수 도 있고, DataLoader 클래스를 사용할 수 도 있습니다.

 

확인해야 할 것

 

  • 함수 정의
    • 입력 매개변수
      • input_column : text 데이터 column 명
      • target_column : label 데이터 column 명
      • batch_size : 배치 사이즈
    • 조건
      • 함수는 다음을 수행해야 함
        • 데이터 프레임 랜덤 셔플링
        • tokenizer_bert로 text를 token_id로 변환 + 텐서화
        • target(label)을 텐서화
    • 반환값
      • (input, target) 튜플 형태의 이터레이터를 반환

 

- Data_iterator

from transformers import BertTokenizer, BertModel

def data_iterator(df, input_column, target_column, batch_size):
  """
  데이터 프레임을 셔플한 후 
  데이터 프레임의 input_column을 batch_size만큼 잘라 토크나이즈 + 텐서화하고, target_column을 batch_size만큼 잘라 텐서화 하여
  (input, output) 튜플 형태의 이터레이터를 생성
  """

  global tokenizer_bert

  # 1. 데이터 프레임 셔플
  #    pandas의 sample 함수 사용
  df = df.sample(frac=1).reset_index(drop=True)

  # 2. 이터레이터 생성
  for idx in range(0, df.shape[0], batch_size):
    batch_df = df.loc[idx:idx+31] # batch_size만큼 데이터 추출
	
    #BERT 토크나이저를 사용하여 텐서화
    tensorized_input = tokenizer_bert(list(batch_df[input_column].values),
          add_special_tokens=True,
          return_tensors='pt',
          padding="longest" # 가장 긴 문장을 기준으로 token개수를 맞춤. 모자란 토큰 위치는 "[PAD]" 토큰을 추가
          ) # df의 text를 토크나이징 + token id로 변환 + 텐서화 (df의 input_column 사용)
    
    tensorized_target = torch.Tensor(list(batch_df[target_column].values)) # target(label)을 텐서화 (df의 target_column 사용)
    
    yield tensorized_input, tensorized_target # 튜플 형태로 yield

 

- DataLoader 클래스

 

DataLoader 클래스 파라미터에서 이터레이터와 같이 batch_size에 맞게 데이터 사이즈를 맞출 때 사용하는 collate_fn 파라미터를 직접 구현해보겠습니다.

  • 함수 정의
    • 입력 매개변수
      • batch : (input(string), target(int)) 튜플을 담고 있는 리스트. 만약 batch_size가 32라면 리스트는 32개의 튜플을 갖고 있다.
    • 조건
      • input
        • BERT Tokenizer 클래스의 __call__() 메소드 사용 방법을 읽고, __call__() 파라미터를 조정해 dynamic padding을 구현한다.
        • 토크나이즈할 때 한 배치내 인풋들의 토큰 개수는 모두 동일할 수 있도록하라. 이때, 가장 긴 토큰을 가지고 있는 인풋을 기준으로 토큰 개수를 맞춘다. 즉, 토큰 개수가 모자란 인풋은 [PAD] 토큰을 추가한다. (이를 dynamic padding이라고 한다.)
        • 토크나이저에서 반환된 값은 Tensor 타입이어야 한다.
      • target
        • target은 Tensor 타입으로 변형한다.
    • 반환값
      • (tensorized_input, tensorized_label) 튜플
dataloader = DataLoader(dataset,batch_size=32, sampler=RandomSampler(dataset), collate_fn = custom_collate_fn)

def custom_collate_fn(batch):
  """
  - batch: list of tuples (input_data(string), target_data(int))
  
  한 배치 내 문장들을 tokenizing 한 후 텐서로 변환함. 
  이때, dynamic padding (즉, 같은 배치 내 토큰의 개수가 동일할 수 있도록, 부족한 문장에 [PAD] 토큰을 추가하는 작업)을 적용
  토큰 개수는 배치 내 가장 긴 문장으로 해야함.
  또한 최대 길이를 넘는 문장은 최대 길이 이후의 토큰을 제거하도록 해야 함
  토크나이즈된 결과 값은 텐서 형태로 반환하도록 해야 함
  
  한 배치 내 레이블(target)은 텐서화 함.
  
  (input, target) 튜플 형태를 반환.
  """
  global tokenizer_bert
  batch=list(batch)

  input_list=[]
  target_list=[]
  
  for k,v in batch:
    input_list.append(k)
    target_list.append(v)
  
  tensorized_input = tokenizer_bert(
        text = input_list,
        padding='longest',
        truncation = True,
        return_tensors = 'pt'
        )
  
  tensorized_label = torch.tensor(target_list)
  
  return (tensorized_input, tensorized_label)

 

5. 모델 구현

 

  • 함수 정의
    • 입력 매개변수
      • model : BERT + 1 hidden layer classifier 모델
      • data_iterator : train data iterator
from torch.optim import AdamW
from torch.nn import CrossEntropyLoss
from numpy.core.fromnumeric import nonzero

# 모델 클래스 정의 (Unfreezed 모델)
model = CustomClassifier(hidden_size=768, n_label=2)
# 모델 클래스(Freezed 모델)
model = CustomClassifierFreezed(hidden_size=768, n_label=2)

model.to(device)

batch_size = 32

# 데이터 이터레이터 정의 
train_iterator = data_iterator(sample_df, 'document', 'label', batch_size)

# 데이터 loader 정의
dataloader = DataLoader(dataset,batch_size=32, sampler=RandomSampler(dataset), collate_fn = custom_collate_fn)

# 로스 및 옵티마이저
loss_fct = CrossEntropyLoss()
optimizer = AdamW(
    model.parameters(),
    lr=2e-5,
    eps=1e-8
)

 

def train(model, data_iterator):

  global loss_fct # 위에서 정의한 loss 함수

  # 배치 단위 평균 loss와 총 평균 loss 계산하기위해 변수 생성
  total_loss, batch_loss, batch_count = 0,0,0
  
  # model을 train 모드로 설정 & device 할당
  model.train()
  model.to(device)
  
  # data iterator를 돌면서 하나씩 학습
  for step, batch in enumerate(data_iterator):
    batch_count+=1
    
    # tensor 연산 전, 각 tensor에 device 할당
    batch = tuple(item.to(device) for item in batch)
    
    batch_input, batch_label = batch
    
    batch_label=batch_label.to(torch.int64)
    # batch마다 모델이 갖고 있는 기존 gradient를 초기화
    model.zero_grad()
    
    # forward
    logits = model(batch_input['input_ids'],batch_input['attention_mask'],batch_input['token_type_ids'])
    
    # loss
    loss = loss_fct(logits, batch_label)
    batch_loss += loss.item()
    total_loss += loss.item()
    
    # backward -> 파라미터의 미분(gradient)를 자동으로 계산
    loss.backward()
    
    # optimizer 업데이트
    optimizer.step()
      
    # 배치 10개씩 처리할 때마다 평균 loss를 출력
    if (step % 10 == 0 and step != 0):
      print(f"Step : {step}, Avg Loss : {batch_loss / batch_count:.4f}")
      
      # 변수 초기화 
      batch_loss, batch_count = 0,0
  
    print(f"Mean Loss : {total_loss/(step+1):.4f}")
  print("Train Finished")