👾1D CNN
1D-CNN을 이용한 IMDB 리뷰분류
# ========================================
# Import Library
# ========================================
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from collections import Counter
from tqdm import tqdm
import urllib.request
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.nn.functional as F
from nltk.tokenize import word_tokenize
import nltk
nltk.download('punkt')
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# ========================================
# Dataset Load
# ========================================
urllib.request.urlretrieve("https://raw.githubusercontent.com/ukairia777/pytorch-nlp-tutorial/main/10.%20RNN%20Text%20Classification/dataset/IMDB%20Dataset.csv", filename="IMDB Dataset.csv")
df = pd.read_csv('IMDB Dataset.csv')
# ========================================
# Dataset Preprocessing
# ========================================
# 결측값 파악 및 데이터 분포파악
def preprocessing(data):
if not data.isnull().values.any():
data['sentiment'].value_counts().plot(kind='bar')
print(data.groupby('sentiment').size().reset_index(name='count'))
print("깔끔하고 균형적인 데이터!")
else:
print("결측값 존재")
preprocessing(df)
X_data = df['review']
y_data = df['sentiment']
X_train, X_test, y_train, y_test = train_test_split(X_data, y_data, test_size=0.5, random_state=42, stratify=y_data)
X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, test_size=.2, random_state=42, stratify=y_train)
# ========================================
# Tokenizing
# ========================================
def tokenize(sentences):
tokenized_sentences = []
for sent in tqdm(sentences):
tokenized_sent = word_tokenize(sent)
tokenized_sent = [word.lower() for word in tokenized_sent]
tokenized_sentences.append(tokenized_sent)
return tokenized_sentences
tokenized_X_train = tokenize(X_train)
tokenized_X_valid = tokenize(X_valid)
tokenized_X_test = tokenize(X_test)
# ========================================
# Vocab 정제
# ========================================
word_list = []
for sent in tokenized_X_train:
for word in sent:
word_list.append(word)
word_counts = Counter(word_list)
vocab = sorted(word_counts, key=word_counts.get, reverse=True)
# 빈도수가 낮은 단어 배제
threshold = 3
total_cnt = len(word_counts) # 단어의 수
rare_cnt = 0 # 등장 빈도수가 threshold보다 작은 단어의 개수를 카운트
total_freq = 0 # 훈련 데이터의 전체 단어 빈도수 총 합
rare_freq = 0 # 등장 빈도수가 threshold보다 작은 단어의 등장 빈도수의 총 합
# 단어와 빈도수의 쌍(pair)을 key와 value로 받는다.
for key, value in word_counts.items():
total_freq = total_freq + value
# 단어의 등장 빈도수가 threshold보다 작으면
if(value < threshold):
rare_cnt = rare_cnt + 1
rare_freq = rare_freq + value
vocab_size = total_cnt - rare_cnt
vocab = vocab[:vocab_size]
word_to_index = {}
word_to_index['<PAD>'] = 0
word_to_index['<UNK>'] = 1
for index, word in enumerate(vocab) :
word_to_index[word] = index + 2
vocab_size = len(word_to_index)
def texts_to_sequences(tokenized_X_data, word_to_index):
encoded_X_data = []
for sent in tokenized_X_data:
index_sequences = []
for word in sent:
try:
index_sequences.append(word_to_index[word])
except KeyError:
index_sequences.append(word_to_index['<UNK>'])
encoded_X_data.append(index_sequences)
return encoded_X_data
# ========================================
# Text Encoding
# ========================================
encoded_X_train = texts_to_sequences(tokenized_X_train, word_to_index)
encoded_X_valid = texts_to_sequences(tokenized_X_valid, word_to_index)
encoded_X_test = texts_to_sequences(tokenized_X_test, word_to_index)
# ========================================
# Padding
# ========================================
def below_threshold_len(max_len, nested_list):
count = 0
for sentence in nested_list:
if(len(sentence) <= max_len):
count = count + 1
print('전체 샘플 중 길이가 %s 이하인 샘플의 비율: %s'%(max_len, (count / len(nested_list))*100))
def pad_sequences(sentences, max_len):
features = np.zeros((len(sentences), max_len), dtype=int)
for index, sentence in enumerate(sentences):
if len(sentence) != 0:
features[index, :len(sentence)] = np.array(sentence)[:max_len]
return features
padded_X_train = pad_sequences(encoded_X_train, max_len=max_len)
padded_X_valid = pad_sequences(encoded_X_valid, max_len=max_len)
padded_X_test = pad_sequences(encoded_X_test, max_len=max_len)
# ========================================
# Modeling
# ========================================
train_label_tensor = torch.tensor(np.array(y_train))
valid_label_tensor = torch.tensor(np.array(y_valid))
test_label_tensor = torch.tensor(np.array(y_test))
class CNN(torch.nn.Module):
def __init__(self, vocab_size, num_labels):
super(CNN, self).__init__()
self.num_filter_sizes = 1
self.num_filters = 256
self.word_embed = torch.nn.Embedding(num_embeddings=vocab_size, embedding_dim=128, padding_idx=0)
self.conv1 = torch.nn.Conv1d(128, self.num_filters, 5, stride=1)
self.dropout = torch.nn.Dropout(0.5)
self.fc1 = torch.nn.Linear(1 * self.num_filters, num_labels, bias=True)
def forward(self, inputs):
embedded = self.word_embed(inputs).permute(0, 2, 1)
x = F.relu(self.conv1(embedded).permute(0, 2, 1).max(1)[0])
y_pred = self.fc1(self.dropout(x))
return y_pred
model = CNN(vocab_size, num_labels = len(set(y_train)))
model.to(device)
# ========================================
# Dataloader
# ========================================
encoded_train = torch.tensor(padded_X_train).to(torch.int64)
train_dataset = torch.utils.data.TensorDataset(encoded_train, train_label_tensor)
train_dataloader = torch.utils.data.DataLoader(train_dataset, shuffle=True, batch_size=32)
encoded_test = torch.tensor(padded_X_test).to(torch.int64)
test_dataset = torch.utils.data.TensorDataset(encoded_test, test_label_tensor)
test_dataloader = torch.utils.data.DataLoader(test_dataset, shuffle=True, batch_size=1)
encoded_valid = torch.tensor(padded_X_valid).to(torch.int64)
valid_dataset = torch.utils.data.TensorDataset(encoded_valid, valid_label_tensor)
valid_dataloader = torch.utils.data.DataLoader(valid_dataset, shuffle=True, batch_size=1)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# ========================================
# Train
# ========================================
def calculate_accuracy(logits, labels):
predicted = torch.argmax(logits, dim=1)
correct = (predicted == labels).sum().item()
total = labels.size(0)
accuracy = correct / total
return accuracy
def evaluate(model, valid_dataloader, criterion, device):
val_loss = 0
val_correct = 0
val_total = 0
model.eval()
with torch.no_grad():
# 데이터로더로부터 배치 크기만큼의 데이터를 연속으로 로드
for batch_X, batch_y in valid_dataloader:
batch_X, batch_y = batch_X.to(device), batch_y.to(device)
# 모델의 예측값
logits = model(batch_X)
# 손실을 계산
loss = criterion(logits, batch_y)
# 정확도와 손실을 계산함
val_loss += loss.item()
val_correct += calculate_accuracy(logits, batch_y) * batch_y.size(0)
val_total += batch_y.size(0)
val_accuracy = val_correct / val_total
val_loss /= len(valid_dataloader)
return val_loss, val_accuracy
num_epochs = 5
# Training loop
best_val_loss = float('inf')
# Training loop
for epoch in range(num_epochs):
# Training
train_loss = 0
train_correct = 0
train_total = 0
model.train()
for batch_X, batch_y in train_dataloader:
batch_X, batch_y = batch_X.to(device), batch_y.to(device)
logits = model(batch_X)
# Compute loss
loss = criterion(logits, batch_y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
train_loss += loss.item()
train_correct += calculate_accuracy(logits, batch_y) * batch_y.size(0)
train_total += batch_y.size(0)
train_accuracy = train_correct / train_total
train_loss /= len(train_dataloader)
# Validation
val_loss, val_accuracy = evaluate(model, valid_dataloader, criterion, device)
print(f'Epoch {epoch+1}/{num_epochs}:')
print(f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')
# 검증 손실이 최소일 때 체크포인트 저장
if val_loss < best_val_loss:
print(f'Validation loss improved from {best_val_loss:.4f} to {val_loss:.4f}. 체크포인트를 저장합니다.')
best_val_loss = val_loss
torch.save(model.state_dict(), 'best_model_checkpoint.pth')
# ========================================
# Evaluate
# ========================================
model.load_state_dict(torch.load('best_model_checkpoint.pth'))
model.to(device)
val_loss, val_accuracy = evaluate(model, valid_dataloader, criterion, device)
print(f'Best model validation loss: {val_loss:.4f}')
print(f'Best model validation accuracy: {val_accuracy:.4f}')
test_loss, test_accuracy = evaluate(model, test_dataloader, criterion, device)
print(f'Best model test loss: {test_loss:.4f}')
print(f'Best model test accuracy: {test_accuracy:.4f}')
# ========================================
# Prediction
# ========================================
index_to_tag = {0 : '부정', 1 : '긍정'}
def predict(text, model, word_to_index, index_to_tag):
model.eval()
# 토큰화 및 정수 인코딩. OOV 문제 발생 시 <UNK> 토큰에 해당하는 인덱스 1 할당
tokens = word_tokenize(text)
token_indices = [word_to_index.get(token.lower(), 1) for token in tokens]
# 리스트를 텐서로 변경
input_tensor = torch.tensor([token_indices], dtype=torch.long).to(device) # (1, seq_length)
# 모델의 예측
with torch.no_grad():
logits = model(input_tensor) # (1, output_dim)
# 레이블 인덱스 예측
_, predicted_index = torch.max(logits, dim=1) # (1,)
# 인덱스와 매칭되는 카테고리 문자열로 변경
predicted_tag = index_to_tag[predicted_index.item()]
return predicted_tag
Test_sample = "This is Wonderful movie!!, wow... I afraid to show this mood...!"
predict(Test_sample, model, word_to_index, index_to_tag)
긍정
Last updated