import tensorflow as tf from tensorflow.keras import layers, Model import numpy as np import tensorflow.keras.backend as K from tensorflow.keras import mixed_precision import sentencepiece as spm import os, json import requests print('1') tf.get_logger().setLevel("ERROR") SEED = 42 tf.random.set_seed(SEED) np.random.seed(SEED) max_len = 128 # 기존 코드에서 200으로 설정됨 batch_size = 64 # TPU 초기화 (기존 코드와 동일) try: resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu="local") tf.tpu.experimental.initialize_tpu_system(resolver) strategy = tf.distribute.TPUStrategy(resolver) print("✅ TPU 초기화 완료:", resolver.cluster_spec().as_dict()) on_tpu = True except Exception as e: print("⚠️ TPU 미사용, GPU/CPU로 진행:", e) strategy = tf.distribute.get_strategy() on_tpu = False # Mixed precision (기존 코드와 동일) policy = mixed_precision.Policy("mixed_bfloat16" if on_tpu else "float32") mixed_precision.set_global_policy(policy) print("✅ Mixed precision:", policy) # ======================= # 1) 파일 다운로드 및 토크나이저 초기화 (기존 코드와 동일) # ======================= def download_file(url, save_path): r = requests.get(url, stream=True) r.raise_for_status() with open(save_path, "wb") as f: for chunk in r.iter_content(8192*2): f.write(chunk) print(f"✅ {save_path} 저장됨") DATA_PATH = "converted.jsonl" TOKENIZER_PATH = "ko_unigram.model" TOKENIZER_PATH1 = "en_bpe.model" if not os.path.exists(DATA_PATH): download_file( "https://huggingface.co/datasets/Yuchan5386/Translation-set/resolve/main/shuffled.jsonl?download=true", DATA_PATH ) if not os.path.exists(TOKENIZER_PATH): download_file( "https://huggingface.co/datasets/Yuchan5386/Translation-set/resolve/main/unigram.model?download=true", TOKENIZER_PATH ) if not os.path.exists(TOKENIZER_PATH1): download_file( "https://huggingface.co/datasets/Yuchan5386/Translation-set/resolve/main/bpe.model?download=true", TOKENIZER_PATH1 ) sp = spm.SentencePieceProcessor(TOKENIZER_PATH) sp_en = spm.SentencePieceProcessor(TOKENIZER_PATH1) pad_id = sp.piece_to_id("") if sp.piece_to_id("") != -1 else 0 start_id = sp.piece_to_id("") sep_id = sp.piece_to_id("") end_id = sp.piece_to_id("") unk_id = sp.piece_to_id("") vocab_size = sp.get_piece_size() print(f"✅ Vocabulary size: {vocab_size}") epad_id = sp_en.piece_to_id("") if sp.piece_to_id("") != -1 else 0 estart_id = sp_en.piece_to_id("") esep_id = sp_en.piece_to_id("") eend_id = sp_en.piece_to_id("") eunk_id = sp_en.piece_to_id("") evocab_size = sp_en.get_piece_size() print(f"✅ Vocabulary size: {evocab_size}") def text_to_ids(text): return sp.encode(text, out_type=int) def ids_to_text(ids): return sp.decode(ids) def etext_to_ids(text): return sp_en.encode(text, out_type=int) def eids_to_text(ids): return sp_en.decode(ids) # ======================= # JSONL → TF Dataset 로드 (ID 레벨 특수 토큰 포함) # ======================= def jsonl_stream(file_path): with open(file_path, "r", encoding="utf-8") as f: for line in f: data = json.loads(line) prompt = data["ko"] answer = data["en"] # ======================= # Encoder input: ID 레벨에서 특수 토큰 명시 # ======================= enc_ids = text_to_ids(prompt) enc_ids = enc_ids[:max_len] # max_len 제한 # ======================= # Decoder input: + answer # ======================= dec_input_ids = [estart_id] + text_to_ids(answer) dec_input_ids = dec_input_ids[:max_len] # ======================= # Target: answer + # ======================= target_ids = etext_to_ids(answer) + [eend_id] target_ids = target_ids[:max_len] # ======================= # Padding # ======================= enc_ids += [pad_id] * (max_len - len(enc_ids)) dec_input_ids += [pad_id] * (max_len - len(dec_input_ids)) target_ids += [pad_id] * (max_len - len(target_ids)) yield ( tf.convert_to_tensor(enc_ids, dtype=tf.int32), tf.convert_to_tensor(dec_input_ids, dtype=tf.int32), tf.convert_to_tensor(target_ids, dtype=tf.int32), ) # ======================= # TF Dataset 생성 # ======================= dataset = tf.data.Dataset.from_generator( lambda: jsonl_stream(DATA_PATH), output_signature=( tf.TensorSpec(shape=(max_len,), dtype=tf.int32), # enc_inputs tf.TensorSpec(shape=(max_len,), dtype=tf.int32), # dec_inputs tf.TensorSpec(shape=(max_len,), dtype=tf.int32), # target ) ) # 학습을 위해 딕셔너리 형태로 매핑 def map_fn(enc_input, dec_input, dec_target): return {"enc_inputs": enc_input, "dec_inputs": dec_input}, dec_target dataset = dataset.map(map_fn, num_parallel_calls=tf.data.AUTOTUNE) dataset = dataset.shuffle(1000, seed=SEED).batch(batch_size, drop_remainder=True).prefetch(tf.data.AUTOTUNE) with strategy.scope(): dist_dataset = strategy.experimental_distribute_dataset(dataset) print("✅ ID 레벨 특수 토큰 적용 Dataset 로드 완료:", dist_dataset) # ======================= # 3) 모델 레이어 (기존 코드 유지) # ======================= class SwiGLU(layers.Layer): def __init__(self, d_model, d_ff): super().__init__() self.proj = layers.Dense(d_ff) self.out = layers.Dense(d_model) def call(self, x): x_proj = self.proj(x) x_val, x_gate = tf.split(x_proj, 2, axis=-1) return self.out(x_val * tf.nn.silu(x_gate)) class EncoderBlock(layers.Layer): def __init__(self, d_model, num_heads, dff, dropout=0.1): super().__init__() self.mha = layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model//num_heads) self.ffn = SwiGLU(d_model, dff) self.norm1 = layers.LayerNormalization(epsilon=1e-6) self.norm2 = layers.LayerNormalization(epsilon=1e-6) self.dropout1 = layers.Dropout(dropout) self.dropout2 = layers.Dropout(dropout) def call(self, x, mask=None, training=False): attn_out = self.dropout1(self.mha(x, x, x, attention_mask=mask), training=training) out1 = self.norm1(attn_out + x) ffn_out = self.dropout2(self.ffn(out1), training=training) return self.norm2(out1 + ffn_out) class DecoderBlock(layers.Layer): def __init__(self, d_model, num_heads, dff, dropout=0.1): super().__init__() self.self_mha = layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model//num_heads) self.cross_mha = layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model//num_heads) self.ffn = SwiGLU(d_model, dff) self.norm1 = layers.LayerNormalization(epsilon=1e-6) self.norm2 = layers.LayerNormalization(epsilon=1e-6) self.norm3 = layers.LayerNormalization(epsilon=1e-6) self.dropout1 = layers.Dropout(dropout) self.dropout2 = layers.Dropout(dropout) self.dropout3 = layers.Dropout(dropout) def call(self, x, enc_out, training=False): attn1 = self.dropout1(self.self_mha(x, x, x, use_causal_mask=True), training=training) out1 = self.norm1(attn1 + x) attn2 = self.dropout2(self.cross_mha(out1, enc_out, enc_out), training=training) out2 = self.norm2(out1 + attn2) ffn_out = self.dropout3(self.ffn(out2), training=training) return self.norm3(out2 + ffn_out) class Transformer(tf.keras.Model): def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, max_len=128, dropout=0.1): super().__init__() self.max_len = max_len self.d_model = d_model self.enc_embedding = layers.Embedding(input_vocab_size, d_model) self.enc_pos_embedding = layers.Embedding(max_len, d_model) self.dec_embedding = layers.Embedding(target_vocab_size, d_model) self.dec_pos_embedding = layers.Embedding(max_len, d_model) self.enc_layers = [EncoderBlock(d_model, num_heads, dff, dropout) for _ in range(num_layers)] self.dec_layers = [DecoderBlock(d_model, num_heads, dff, dropout) for _ in range(num_layers)] self.final_layer = layers.Dense(target_vocab_size, use_bias=False) def call(self, inputs, training=False): enc_inputs = inputs["enc_inputs"] dec_inputs = inputs["dec_inputs"] enc_pos = tf.range(tf.shape(enc_inputs)[1])[tf.newaxis, :] dec_pos = tf.range(tf.shape(dec_inputs)[1])[tf.newaxis, :] x = self.enc_embedding(enc_inputs) + self.enc_pos_embedding(enc_pos) for layer in self.enc_layers: x = layer(x, training=training) enc_out = x y = self.dec_embedding(dec_inputs) + self.dec_pos_embedding(dec_pos) for layer in self.dec_layers: y = layer(y, enc_out, training=training) return self.final_layer(y) # 5) 학습 설정 및 실행 # ======================= def smoothed_loss_keras(y_true, y_pred, eps=0.1): y_true = tf.cast(y_true, tf.int32) mask = tf.cast(tf.not_equal(y_true, pad_id), tf.float32) vocab = tf.shape(y_pred)[-1] y_true_oh = tf.one_hot(y_true, depth=vocab, dtype=tf.float32) y_true_ls = (1.0 - eps) * y_true_oh + eps / tf.cast(vocab, tf.float32) log_probs = tf.nn.log_softmax(y_pred, axis=-1) per_tok = -tf.reduce_sum(y_true_ls * log_probs, axis=-1) per_tok = per_tok * mask return tf.reduce_sum(per_tok) / (tf.reduce_sum(mask) + 1e-8) def masked_perplexity(y_true, y_pred, eps=0.1): y_true = tf.cast(y_true, tf.int32) mask = tf.cast(tf.not_equal(y_true, pad_id), tf.float32) vocab = tf.shape(y_pred)[-1] y_true_oh = tf.one_hot(y_true, depth=vocab, dtype=tf.float32) y_true_ls = (1.0 - eps) * y_true_oh + eps / tf.cast(vocab, tf.float32) log_probs = tf.nn.log_softmax(y_pred, axis=-1) per_tok = -tf.reduce_sum(y_true_ls * log_probs, axis=-1) per_tok = per_tok * mask mean_loss = tf.reduce_sum(per_tok) / (tf.reduce_sum(mask) + 1e-8) return tf.exp(mean_loss) def create_lr_schedule(initial_lr=5e-5, decay_steps=10000, decay_rate=0.9): return tf.keras.optimizers.schedules.ExponentialDecay( initial_learning_rate=initial_lr, decay_steps=decay_steps, decay_rate=decay_rate, staircase=False ) with strategy.scope(): # ⚠️ 수정: chat_vocab_size 대신 정의된 vocab_size 사용 chat_model = Transformer(num_layers=6, d_model=256, num_heads=4, dff=768, input_vocab_size=vocab_size, target_vocab_size=evocab_size, max_len=128, dropout=0.1) dummy_input = { "enc_inputs": tf.zeros((1, max_len), dtype=tf.int32), "dec_inputs": tf.zeros((1, max_len), dtype=tf.int32) } _ = chat_model(dummy_input) # 옵티마이저 설정 optimizer = tf.keras.optimizers.Adam( learning_rate=create_lr_schedule(), beta_1=0.9, beta_2=0.95, epsilon=1e-8, clipnorm=1.0 ) chat_model.compile(optimizer=optimizer, loss=smoothed_loss_keras, metrics=[masked_perplexity]) chat_model.summary() print("✅ 모델 컴파일 완료, 학습 시작...") # ⚠️ 학습 실행 history = chat_model.fit(dataset, epochs=1, verbose=1) # 가중치 저장 chat_model.save_weights("chat_model.weights.h5") print("\n✅ 모델 가중치 저장 완료!") def generate_translation_beam(model, input_text, max_len=128, beam_width=5): # Encoder input enc_ids = text_to_ids(input_text) enc_ids = enc_ids[-max_len:] enc_tensor = tf.convert_to_tensor([np.pad(enc_ids, (0, max_len - len(enc_ids)), constant_values=pad_id)], dtype=tf.int32) # Beam 초기화 beams = [( [start_id], 0.0 )] # (generated_ids, log_prob) for _ in range(max_len): all_candidates = [] for seq, score in beams: if seq[-1] == end_id: all_candidates.append((seq, score)) continue dec_input = seq[-max_len:] dec_tensor = tf.convert_to_tensor([np.pad(dec_input, (0, max_len - len(dec_input)), constant_values=pad_id)], dtype=tf.int32) logits = model({"enc_inputs": enc_tensor, "dec_inputs": dec_tensor}, training=False) next_logits = logits[0, len(dec_input) - 1].numpy() next_logits[pad_id] = -1e9 # 패딩 억제 # 상위 beam_width 후보 선택 top_indices = np.argsort(next_logits)[-beam_width:][::-1] top_probs = tf.nn.softmax(next_logits[top_indices]).numpy() for token_id, prob in zip(top_indices, top_probs): candidate = (seq + [int(token_id)], score + np.log(prob + 1e-9)) all_candidates.append(candidate) # Score 기준 상위 beam_width 유지 beams = sorted(all_candidates, key=lambda x: x[1], reverse=True)[:beam_width] # 모든 beam 끝났으면 종료 if all(seq[-1] == end_id for seq, _ in beams): break # 최고 점수 beam 선택 best_seq = beams[0][0] # start_id 제거 후 decode return eids_to_text(best_seq[1:]) # 사용 예시 src_text = "안녕하세요! 오늘 날씨는 어때요?" translation = generate_translation_beam(chat_model, src_text, max_len=128, beam_width=5) print("번역 결과:", translation)