- 14일차 - 야너두 작사가 될 수 있어
23년 이전 글/모두의연구소 아이펠

- 14일차 - 야너두 작사가 될 수 있어

너두?

[E-4] RNN, LSTM 모델로 작사가 만들어보기

해당 과제는 깃허브에 있습니다

 

GitHub - dlfrnaos19/rock_scissors_paper_classifier: task 1

task 1. Contribute to dlfrnaos19/rock_scissors_paper_classifier development by creating an account on GitHub.

github.com

본운동에 앞서서 몸을 풀어봐야죠 먼저 다른 데이터셋을 통해 예제를 따라가봅니다

import os, re
import numpy as np
import tensorflow as tf

file_path = '파일 경로를 적어주세요'(셰익스피어 데이터)
with open(file_path, "r") as f:
    raw_corpus = f.read().splitlines()

print(raw_corpus[:9])
len(raw_corpus)
#result
['First Citizen:', 'Before we proceed any further, hear me speak.', '', 'All:', 'Speak, speak.', '', 'First Citizen:', 'You are all resolved rather to die than to famish?', '']
40000

데이터셋에 앞서서 누구나 한번 쯤 들어봤을 소설이죠

읽어보진 않았는데 데이터를 살짝 봤더니 각 종 특수문자들이 많습니다

컴퓨터가 문자를 이해할 때에는 단어 각각의 의미를 숫자로 맵핑하여 이해를 하는데

각종 특수문자의 존재는 단어를 이해하는데 있어서 방해가 되는 요소일테니 전처리를 해야 합니다

 

Data Preprocess

전처리는 다음과 같은 방식으로도 설정할 수 있지만

# remove empty space, ":" 
for idx, sentence in enumerate(raw_corpus):
    if len(sentence) == 0:
        continue
    if sentence[-1] == ":":
        continue
    if idx > 9:
        break
    print(sentence)
#result
Before we proceed any further, hear me speak.
Speak, speak.
You are all resolved rather to die than to famish?

역시 정규식이 훨씬 강력합니다 예제에서는 간간히 특수문자를 남겨두었는데 이 부분에 대해서는 어느 부분이

효과적인지 확인 할 필요가 있어보였습니다, 다른 예제 참고시에 왠만한 특수문자는 전부 제거하는걸로 봤기 때문입니다

 

def preprocess_sentence(sentence):
    sentence = sentence.lower().strip() # 1 소문자로 바꾸고, 양쪽 공백을 지웁니다
    sentence = re.sub(r"([?.!,¿])", r" \1 ", sentence) # 2 특수문자 양쪽에 공백을 넣고
    sentence = re.sub(r'[" "]+', " ", sentence) # 3 여러개의 공백은 하나의 공백으로 바꿉니다
    sentence = re.sub(r"[^a-zA-Z?.!,¿]+", " ", sentence) # 4 a-zA-Z?.!,¿가 아닌 모든 문자를 하나의 공백으로 바꿉니다
    sentence = sentence.strip() # 5 다시 양쪽 공백을 지웁니다
    sentence = '<start> ' + sentence + ' <end>' # 6 문장 시작에는 <start>, 끝에는 <end>를 추가합니다
    return sentence

# how function work
print(preprocess_sentence("This @_is ;;;sample        sentence."))
#result
<start> this is sample sentence . <end>

여기서 문장의 앞뒤로 <start>와 <end>의 표시는 모델이 시작과 끝을 말해주기 위해서

입력해줍니다

필터할 함수가 완료되었으니, for문을 통해 필요한 문장만 corpus에 모아줍니다

corpus = []

for sentence in raw_corpus:
    if len(sentence) == 0: continue
    if sentence[-1] == ":": continue
        
    preprocessed_sentence = preprocess_sentence(sentence)
    corpus.append(preprocessed_sentence)

corpus[:10]
#result
['<start> before we proceed any further , hear me speak . <end>',
 '<start> speak , speak . <end>',
 '<start> you are all resolved rather to die than to famish ? <end>',
 '<start> resolved . resolved . <end>',
 '<start> first , you know caius marcius is chief enemy to the people . <end>',
 '<start> we know t , we know t . <end>',
 '<start> let us kill him , and we ll have corn at our own price . <end>',
 '<start> is t a verdict ? <end>',
 '<start> no more talking on t let it be done away , away ! <end>',
 '<start> one word , good citizens . <end>']

 

Tokenizing

단어를 매핑하기에 앞서서 아직 문장의 구조로 되어있기 때문에 단어 단위로 분리가 필요합니다

토크나이저는 Tensorflow에 있는 것을 사용할 예정입니다

 

tf.keras.preprocessing.text.Tokenizer  |  TensorFlow Core v2.7.0

Text tokenization utility class.

www.tensorflow.org

 

대체로 토크나이저로서 따로 라이브러리를 활용하기만 했었는데

텐서플로우에 있는 Tokenizer를 처음 사용하게 되었습니다

해당 API를 모두 둘러보진 못했지만 용도에 맞게 다른 토크나이저들과 비교할 거리는 생겼습니다

def tokenize(corpus):
    # 7000단어를 기억할 수 있는 tokenizer를 만들겁니다
    # 우리는 이미 문장을 정제했으니 filters가 필요없어요
    # 7000단어에 포함되지 못한 단어는 '<unk>'로 바꿀거에요
    tokenizer = tf.keras.preprocessing.text.Tokenizer(
        num_words=7000, 
        filters=' ',
        oov_token="<unk>"
    )
    # corpus를 이용해 tokenizer 내부의 단어장을 완성합니다
    tokenizer.fit_on_texts(corpus)
    # 준비한 tokenizer를 이용해 corpus를 Tensor로 변환합니다
    tensor = tokenizer.texts_to_sequences(corpus)   
    # 입력 데이터의 시퀀스 길이를 일정하게 맞춰줍니다
    # 만약 시퀀스가 짧다면 문장 뒤에 패딩을 붙여 길이를 맞춰줍니다.
    # 문장 앞에 패딩을 붙여 길이를 맞추고 싶다면 padding='pre'를 사용합니다
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post')  
    
    print(tensor,tokenizer)
    return tensor, tokenizer

tensor, tokenizer = tokenize(corpus)
#result
[[   2  143   40 ...    0    0    0]
 [   2  110    4 ...    0    0    0]
 [   2   11   50 ...    0    0    0]
 ...
 [   2  149 4553 ...    0    0    0]
 [   2   34   71 ...    0    0    0]
 [   2  945   34 ...    0    0    0]] <keras_preprocessing.text.Tokenizer object at 0x7fa790f85520>

숫자에 어떻게 매핑되어있는지 확인 해봅니다

for idx in tokenizer.index_word:
    print(idx, ":", tokenizer.index_word[idx])
    
    if idx >= 10: break
#result
1 : <unk>
2 : <start>
3 : <end>
4 : ,
5 : .
6 : the
7 : and
8 : i
9 : to
10 : of

이제 전처리가 완료된 데이터를 train, test용으로 분리합니다

여기에서 제가 transformer 라이브러리를 주로 활용했어서 그런지 모르겠는데

train과 test를 분리하는 부분 또한 몰랐던 부분 중 하나였습니다

결국 딥러닝도 지도학습이었다는 걸 다시 한번 깨달은 부분입니다

# target input과 tensor의 크기가 맞아야 합니다, 공백(padding)을 뒤에 오도록 설정했으므로 padding중 하나를 제거합니다
src_input = tensor[:, :-1]  
# tensor에서 <start>를 잘라내서 타겟 문장을 생성합니다.
tgt_input = tensor[:, 1:]    

print(src_input[0])
print(tgt_input[0])
#result
[  2 143  40 933 140 591   4 124  24 110   5   3   0   0   0   0   0   0
   0   0]
[143  40 933 140 591   4 124  24 110   5   3   0   0   0   0   0   0   0
   0   0]

준비된 데이터를 Tensorflow의 Dataset 형태로 만듭니다 

Dataset으로 구성하면 GPU를 활용하거나 데이터셋에 대한 기능을 활용할 수 있는 장점이 있습니다

BUFFER_SIZE = len(src_input)
BATCH_SIZE = 256
steps_per_epoch = len(src_input) // BATCH_SIZE
# tokenizer가 구축한 단어사전 내 7000개와, 여기 포함되지 않은 0:<pad>를 포함하여 7001개
VOCAB_SIZE = tokenizer.num_words + 1

dataset = tf.data.Dataset.from_tensor_slices((src_input, tgt_input))
dataset = dataset.shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
dataset
#result
<BatchDataset shapes: ((256, 20), (256, 20)), types: (tf.int32, tf.int32)>

 

Set Model

텐서플로우 또한 파이써닉하게!

# Model
class TextGenerator(tf.keras.Model):
    def __init__(self, vocab_size, embedding_size, hidden_size):
        super().__init__()
        
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_size)
        self.rnn_1 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.rnn_2 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.linear = tf.keras.layers.Dense(vocab_size)
    
    def call(self, x):
        out = self.embedding(x)
        out = self.rnn_1(out)
        out = self.rnn_2(out)
        out = self.linear(out)
        
        return out
    
embedding_size = 256
hidden_size = 1024
model = TextGenerator(tokenizer.num_words + 1, embedding_size, hidden_size)

하나의 데이터세트를 넣어서 확인 하는 과정을 거칩니다

# 데이터셋에서 데이터 한 배치만 불러오는 방법입니다.
for src_sample, tgt_sample in dataset.take(1): break

# 한 배치만 불러온 데이터를 모델에 넣어봅니다
model(src_sample)
#result
array([[[-1.84470282e-05, -8.64091708e-05,  9.17484649e-05, ...,
         -8.70071890e-05, -1.59231466e-04,  1.35392518e-04],
        [-2.76876690e-05, -1.68445622e-04,  2.08218145e-04, ...,
         -3.97569966e-04, -1.72723867e-05, -6.12717777e-05],
        [-7.77352398e-05, -3.93499504e-05,  3.23454966e-04, ...,
         -5.76642517e-04, -1.27173829e-04, -6.93226466e-05],

모델의 구조

Model: "text_generator"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
embedding (Embedding)        multiple                  1792256   
_________________________________________________________________
lstm (LSTM)                  multiple                  5246976   
_________________________________________________________________
lstm_1 (LSTM)                multiple                  8392704   
_________________________________________________________________
dense (Dense)                multiple                  7176025   
=================================================================
Total params: 22,607,961
Trainable params: 22,607,961
Non-trainable params: 0

 

Train Model(Set Hyperparameters)

만들었던 dataset을 통해 모델을 훈련합니다

Optimizer와 Loss선택은 다음의 링크를 참고했습니다

 

Module: tf.keras.optimizers  |  TensorFlow Core v2.7.0

Public API for tf.keras.optimizers namespace.

www.tensorflow.org

 

 

Module: tf.keras.losses  |  TensorFlow Core v2.7.0

Public API for tf.keras.losses namespace.

www.tensorflow.org

코랩 기준으로 대략 10~20분 걸리는 듯 합니다 커피한잔 하세요

import time
start_time = time.time()
optimizer = tf.keras.optimizers.Adam()
loss = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True,
    reduction='none'
)

model.compile(loss=loss, optimizer=optimizer)
model.fit(dataset, epochs=30)
print(time.time() - start_time)
#result
Epoch 30/30
93/93 [==============================] - 18s 195ms/step - loss: 1.0919
544.0572500228882

마지막으로 문장 생성을 위한 간단한 함수구현

def generate_text(model, tokenizer, init_sentence="<start>", max_len=20):
    # 테스트를 위해서 입력받은 init_sentence도 텐서로 변환합니다
    test_input = tokenizer.texts_to_sequences([init_sentence])
    test_tensor = tf.convert_to_tensor(test_input, dtype=tf.int64)
    end_token = tokenizer.word_index["<end>"]

    # 단어 하나씩 예측해 문장을 만듭니다
    #    1. 입력받은 문장의 텐서를 입력합니다
    #    2. 예측된 값 중 가장 높은 확률인 word index를 뽑아냅니다
    #    3. 2에서 예측된 word index를 문장 뒤에 붙입니다
    #    4. 모델이 <end>를 예측했거나, max_len에 도달했다면 문장 생성을 마칩니다
    while True:
        # 1
        predict = model(test_tensor) 
        # 2
        predict_word = tf.argmax(tf.nn.softmax(predict, axis=-1), axis=-1)[:, -1] 
        # 3 
        test_tensor = tf.concat([test_tensor, tf.expand_dims(predict_word, axis=0)], axis=-1)
        # 4
        if predict_word.numpy()[0] == end_token: break
        if test_tensor.shape[1] >= max_len: break

    generated = ""
    # tokenizer를 이용해 word index를 단어로 하나씩 변환합니다 
    for word_index in test_tensor[0].numpy():
        generated += tokenizer.index_word[word_index] + " "

    return generated

너는 뭐냐는 말에 되돌려치는 여유를 가진 인공지능이 나왔습니다 예제 끝

generate_text(model, tokenizer, init_sentence="<start> what are you")
#result
'<start> what are you , sir ? <end> '

 

Project Cool Lyricist

데이터를 로드합니다

import glob
import os

txt_file_path = os.getenv('HOME')+'/aiffel/lyricist/data/lyrics/*'

txt_list = glob.glob(txt_file_path)

raw_corpus = []

# 여러개의 txt 파일을 모두 읽어서 raw_corpus 에 담습니다.
for txt_file in txt_list:
    with open(txt_file, "r") as f:
        raw = f.read().splitlines()
        raw_corpus.extend(raw)

print("데이터 크기:", len(raw_corpus))
print("Examples:\n", raw_corpus[:3])
#result
데이터 크기: 187088
Examples:
 ["Now I've heard there was a secret chord", 'That David played, and it pleased the Lord', "But you don't really care for music, do you?"]

파일의 제목을 보면 아시겠지만 화려한 라인업을 보실 수 있습니다 hello.. its me..

예제에서 했던 것과 똑같은 전처리를 합니다

여기에서 추가한 내용은 한 문장에 15 Token 이상인 문장을 배제하는 코드 입니다

# check if sentence over 15 tokens
list(map(lambda x: len(x.split()),corpus))

사용하는 데이터가 커진 만큼 저장할 워드의 숫자도 늘어야 겠죠

저는 이전에 사용했던 데이터의 절대량이 늘어난 만큼 4배를 해주었습니다

def tokenize(corpus):
    # 28000단어를 기억할 수 있는 tokenizer를 만들겁니다
    # 우리는 이미 문장을 정제했으니 filters가 필요없어요
    # 28000단어에 포함되지 못한 단어는 '<unk>'로 바꿀거에요
    tokenizer = tf.keras.preprocessing.text.Tokenizer(
        num_words=28000, # <- 데이터수만큼 늘린 숫자
        filters=' ',
        oov_token="<unk>"
    )
    # corpus를 이용해 tokenizer 내부의 단어장을 완성합니다
    tokenizer.fit_on_texts(corpus)
    # 준비한 tokenizer를 이용해 corpus를 Tensor로 변환합니다
    tensor = tokenizer.texts_to_sequences(corpus)   
    # 입력 데이터의 시퀀스 길이를 일정하게 맞춰줍니다
    # 만약 시퀀스가 짧다면 문장 뒤에 패딩을 붙여 길이를 맞춰줍니다.
    # 문장 앞에 패딩을 붙여 길이를 맞추고 싶다면 padding='pre'를 사용합니다
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post')  
    
    print(tensor,tokenizer)
    return tensor, tokenizer

tensor, tokenizer = tokenize(corpus)

이상없는지 Tensor의 형태를 확인해봅니다

tensor.shape
#result
(156227, 15)

숫자로 임베딩이 잘 되었는지 확인하고..

# Target sentence create
for idx in tokenizer.index_word:
    print(idx, ":", tokenizer.index_word[idx])
    
    if idx >= 10: break
# tensor에서 마지막 토큰을 잘라내서 소스 문장을 생성합니다
# 마지막 토큰은 <end>가 아니라 <pad>일 가능성이 높습니다.
src_input = tensor[:, :-1]  
# tensor에서 <start>를 잘라내서 타겟 문장을 생성합니다.
tgt_input = tensor[:, 1:]    

print(src_input[0])
print(tgt_input[0])

Split Train Test set

예제에서도 했었지만 여태 딥러닝을 할때 train과 test셋을 나누는 예제를 따라하지 않았었다보니

굉장히 낯선경험이었습니다, 기존의 머신러닝 알고리즘 위주로 이러한 방법을 썼었기 때문인데요

역시 바닥부터 해볼수록 배우는게 많다는 걸 느낀 부분이었습니다. 데이터셋 분리

# tensor에서 마지막 토큰을 잘라내서 소스 문장을 생성합니다
# 마지막 토큰은 <end>가 아니라 <pad>일 가능성이 높습니다.
src_input = tensor[:, :-1]  
# tensor에서 <start>를 잘라내서 타겟 문장을 생성합니다.
tgt_input = tensor[:, 1:]    

print(src_input[0])
print(tgt_input[0])

이런 분리는 사이킷런이 제일 잘하죠 train 80%, test 20%의 비율로 조절합니다

from sklearn.model_selection import train_test_split
enc_train, enc_val, dec_train, dec_val = train_test_split(
    src_input,
    tgt_input,
    test_size=0.2,
    random_state=42,
)

쓰레기가 들어가면 쓰레기가 나온다.. 확인.. 또 확인..

print("Source Train:", enc_train.shape)
print("Target Train:", dec_train.shape)
#result
Source Train: (124981, 14)
Target Train: (124981, 14)

변수명만 바꿨을 뿐 위의 코드와 동일합니다(enc_train, dec_train)

# Create Tensorflow Dataset
BUFFER_SIZE = len(enc_train)
BATCH_SIZE = 256
steps_per_epoch = len(enc_train) // BATCH_SIZE
# padding에 대한 토큰만 1개 추가합니다
VOCAB_SIZE = tokenizer.num_words + 1

dataset = tf.data.Dataset.from_tensor_slices((enc_train, dec_train))
dataset = dataset.shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)

동일한 모델 형태에서 hidden_size만 바꿨습니다(1024, epoch=10에서 성능이 조금 아쉽게 나와서)

# Model
class TextGenerator(tf.keras.Model):
    def __init__(self, vocab_size, embedding_size, hidden_size):
        super().__init__()
        
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_size)
        self.rnn_1 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.rnn_2 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.linear = tf.keras.layers.Dense(vocab_size)
    
    def call(self, x):
        out = self.embedding(x)
        out = self.rnn_1(out)
        out = self.rnn_2(out)
        out = self.linear(out)
        
        return out
    
embedding_size = 256
hidden_size = 2048
model = TextGenerator(tokenizer.num_words + 1, embedding_size, hidden_size)

시간은 1.5~2배 정도 더 드는 듯 합니다 고성능의 PC가 그리워지는 순간입니다

시간이 제일 금이라고 하는데 고성능의 PC를 가진 분이라면 더 빨리 끝내고 더 많은 공부나 시험을 할 수 있을테니..

그래픽카드 가격이 얼른 내렸으면 좋겠습니다

 

start_time = time.time()
optimizer = tf.keras.optimizers.Adam()
loss = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True,
    reduction='none',
)

model.compile(loss=loss, optimizer=optimizer)
model.fit(dataset, epochs=10)
print(time.time() - start_time)

저는 총 3398초로 대략 56분 걸렸습니다 그래도 손실을 0.4정도 줄였네요 감사하게도..

테스트 하기전 마지막 관문입니다  

def generate_text(model, tokenizer, init_sentence="<start>", max_len=20):
    # 테스트를 위해서 입력받은 init_sentence도 텐서로 변환합니다
    test_input = tokenizer.texts_to_sequences([init_sentence])
    test_tensor = tf.convert_to_tensor(test_input, dtype=tf.int64)
    end_token = tokenizer.word_index["<end>"]

    # 단어 하나씩 예측해 문장을 만듭니다
    #    1. 입력받은 문장의 텐서를 입력합니다
    #    2. 예측된 값 중 가장 높은 확률인 word index를 뽑아냅니다
    #    3. 2에서 예측된 word index를 문장 뒤에 붙입니다
    #    4. 모델이 <end>를 예측했거나, max_len에 도달했다면 문장 생성을 마칩니다
    while True:
        # 1
        predict = model(test_tensor) 
        # 2
        predict_word = tf.argmax(tf.nn.softmax(predict, axis=-1), axis=-1)[:, -1] 
        # 3 
        test_tensor = tf.concat([test_tensor, tf.expand_dims(predict_word, axis=0)], axis=-1)
        # 4
        if predict_word.numpy()[0] == end_token: break
        if test_tensor.shape[1] >= max_len: break

    generated = ""
    # tokenizer를 이용해 word index를 단어로 하나씩 변환합니다 
    for word_index in test_tensor[0].numpy():
        generated += tokenizer.index_word[word_index] + " "

    return generated

뭘 배운거니 너?

 

반성

여태 다른 예제를 따라하면서 자연어처리에 대해서 어느정도 안다고 생각하고 있었는데

train과 test를 나누는 부분에서 뜬금없이 나와 벙쪄서 퍼실님을 찾아가서 질문을 했었습니다

이제껏 저는 만들어진 pretrained 모델을 사용해서 사용하기만 했던 것이고, 이전에 kobart라는

모델을 테스트할 때도 편하게 train 셋팅이 되어있던 코드를 사용했었기 때문에 인지하기 못했던 부분이었습니다

곧 논문을 통해서 정확하게 내용을 파고들터이지만 다시한번 기본에 충실하게 바닥부터 구현하는 연습이

필요함을 느꼈습니다

반응형