import tensorflow as tf from tensorflow.keras import layers, Model import numpy as np import tensorflow.keras.backend as K from tensorflow.keras import mixed_precision import sentencepiece as spm import os, json import requests print('1') tf.get_logger().setLevel("ERROR") SEED = 42 tf.random.set_seed(SEED) np.random.seed(SEED) max_len = 150 # 기존 코드에서 200으로 설정됨 batch_size = 128 # TPU 초기화 (기존 코드와 동일) try: resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu="local") tf.tpu.experimental.initialize_tpu_system(resolver) strategy = tf.distribute.TPUStrategy(resolver) print("✅ TPU 초기화 완료:", resolver.cluster_spec().as_dict()) on_tpu = True except Exception as e: print("⚠️ TPU 미사용, GPU/CPU로 진행:", e) strategy = tf.distribute.get_strategy() on_tpu = False # Mixed precision (기존 코드와 동일) policy = mixed_precision.Policy("mixed_bfloat16" if on_tpu else "float32") mixed_precision.set_global_policy(policy) print("✅ Mixed precision:", policy) # ======================= # 1) 파일 다운로드 및 토크나이저 초기화 (기존 코드와 동일) # ======================= def download_file(url, save_path): r = requests.get(url, stream=True) r.raise_for_status() with open(save_path, "wb") as f: for chunk in r.iter_content(8192*2): f.write(chunk) print(f"✅ {save_path} 저장됨") DATA_PATH = "converted.jsonl" TOKENIZER_PATH = "ko_unigram.model" if not os.path.exists(DATA_PATH): download_file( "https://huggingface.co/datasets/Yuchan5386/TinyInst/resolve/main/output.jsonl?download=true", DATA_PATH ) if not os.path.exists(TOKENIZER_PATH): download_file( "https://huggingface.co/datasets/Yuchan5386/TinyInst/resolve/main/ko_unigram.model?download=true", TOKENIZER_PATH ) sp = spm.SentencePieceProcessor(TOKENIZER_PATH) pad_id = sp.piece_to_id("") if sp.piece_to_id("") != -1 else 0 start_id = sp.piece_to_id("") sep_id = sp.piece_to_id("") end_id = sp.piece_to_id("") unk_id = sp.piece_to_id("") vocab_size = sp.get_piece_size() print(f"✅ Vocabulary size: {vocab_size}") def text_to_ids(text): return sp.encode(text, out_type=int) def ids_to_text(ids): return sp.decode(ids) # ======================= # 3) 모델 레이어 (기존 코드 유지) # ======================= class SwiGLU(layers.Layer): def __init__(self, d_model, d_ff): super().__init__() self.proj = layers.Dense(d_ff) self.out = layers.Dense(d_model) def call(self, x): x_proj = self.proj(x) x_val, x_gate = tf.split(x_proj, 2, axis=-1) return self.out(x_val * tf.nn.silu(x_gate)) class gMLPBlock(layers.Layer): def __init__(self, d_model, seq_len, dropout=0.1): super().__init__() self.d_model = d_model self.seq_len = seq_len self.norm = layers.LayerNormalization(epsilon=1e-6) # FFN: Channel Expansion # d_model * 4로 확장 self.channel_proj = layers.Dense(d_model * 4, use_bias=True) self.dropout = layers.Dropout(dropout) # Spatial Gating Unit (SGU) self.sgu_norm = layers.LayerNormalization(epsilon=1e-6) self.sgu_proj = layers.Dense(seq_len, use_bias=False) # 출력 차원을 d_model * 2 (U의 차원)로 설정 self.sgu_final = layers.Dense(d_model * 2, use_bias=True) self.out_proj = layers.Dense(d_model, use_bias=True) def call(self, x, training=False): # 1. Norm and Channel Expansion residual = x x_norm = self.norm(x) x_proj = self.channel_proj(x_norm) # Shape: (B, L, 4*D) # 2. Split (U and V streams) u, v = tf.split(x_proj, 2, axis=-1) # u, v Shape: (B, L, 2*D) # 3. Spatial Gating Unit (SGU) v_norm = self.sgu_norm(v) v_norm_T = tf.transpose(v_norm, perm=[0, 2, 1]) # (B, 2D, L) # 💡 토큰 믹싱 발생 (시퀀스 축으로 Dense 적용) v_proj = self.sgu_proj(v_norm_T) # (B, 2D, L) v_proj_T = tf.transpose(v_proj, perm=[0, 2, 1]) # (B, L, 2D) # 4. Activation and Gate Generation # 표준 gMLP는 U에 GELU를 적용하고 V는 선형 게이트로 사용 # 여기서는 U에 GELU를 적용 u_act = tf.nn.gelu(u) v_gate = self.sgu_final(v_proj_T) # Shape: (B, L, 2*D) # 5. Gating and Contraction z = u_act * v_gate # 게이팅 z = self.dropout(z, training=training) out = self.out_proj(z) # Shape: (B, L, D) # 6. Residual Connection return residual + out class CrossBlock(layers.Layer): def __init__(self, clip_value=5.0, eps=1e-6): # 💡 d_model 인자 추가 super().__init__() self.clip_value = clip_value self.eps = eps # 💡 수정: 출력 차원을 1에서 d_model로 변경 def call(self, x, z): # a의 shape: (Batch, Seq_len, D_model) g_q = (tf.nn.tanh(x) + 1.0) / 2.0 g_k = (tf.nn.tanh(z) + 1.0) / 2.0 score = (g_q * g_k) score = tf.cumsum(score, axis=1) seq_len = tf.shape(score)[1] # [1, 2, 3, ..., L]을 D_model 차원으로 확장 count_for_mean = tf.cast(tf.range(seq_len) + 1, score.dtype) count_for_mean = tf.reshape(count_for_mean, (1, seq_len, 1)) # 누적합을 현재까지의 토큰 개수로 나누어 평균 누적합 계산 (B, L, D) score_mean = score / count_for_mean # 정규화 분모 설정 denom = tf.maximum(score_mean, self.eps) score_norm = score / denom # ----------------------------------------------- score_clipped = tf.clip_by_value(score_norm, -self.clip_value, self.clip_value) y = score_clipped * z return y class LoU(layers.Layer): def __init__(self, d_model, clip_value=5.0, eps=1e-6): super().__init__() self.d_model = d_model self.clip_value = float(clip_value) self.mha = layers.MultiHeadAttention(8, 20) self.norm1 = layers.LayerNormalization(epsilon=1e-5, dtype='float32') self.norm = layers.LayerNormalization(epsilon=1e-5, dtype='float32') self.glu = SwiGLU(d_model, 320) self.cross = CrossBlock() def call(self, x, z): x_f32 = tf.cast(x, tf.float32) residual = x_f32 x = self.norm1(x) x_comb = self.mha(x, x, x, use_causal_mask=True) out = self.norm(x_comb + residual) out = self.cross(out, z) out = self.glu(out) return tf.cast(out, x.dtype) # ======================= # 4) AlphaS2S 모델 (기존 코드 유지) # ======================= class AlphaS2S(tf.keras.Model): def __init__(self, num_layers, d_model, num_heads, input_vocab_size, target_vocab_size, max_len=200, dropout=0.1): super().__init__() self.max_len = max_len self.d_model = d_model # 인코더와 디코더 임베딩 및 위치 임베딩은 모두 max_len을 사용 self.enc_embedding = layers.Embedding(input_vocab_size, d_model) self.enc_pos_embedding = layers.Embedding(max_len, d_model) self.dec_embedding = layers.Embedding(target_vocab_size, d_model) self.dec_pos_embedding = layers.Embedding(max_len, d_model) # EncoderBlock과 LoU는 기존 코드와 동일한 구조 self.enc_layers = [gMLPBlock(d_model, seq_len=max_len) for _ in range(num_layers)] self.dec_layers = [LoU(d_model) for _ in range(num_layers)] self.final_layer = layers.Dense(target_vocab_size, use_bias=False) def call(self, inputs, training=False): # enc_inputs와 dec_inputs는 동일한 시퀀스 (Unified Input) enc_inputs = inputs["enc_inputs"] dec_inputs = inputs["dec_inputs"] enc_pos = tf.range(tf.shape(enc_inputs)[1])[tf.newaxis, :] dec_pos = tf.range(tf.shape(dec_inputs)[1])[tf.newaxis, :] # 인코더 실행 x = self.enc_embedding(enc_inputs) + self.enc_pos_embedding(enc_pos) # Note: 마스크 없음 -> Bi-directional (BERT-like Encoder) for layer in self.enc_layers: x = layer(x, training=training) enc_out = x # 인코더의 최종 출력 (디코더의 'z' 입력) # 디코더 실행 y = self.dec_embedding(dec_inputs) + self.dec_pos_embedding(dec_pos) # Note: LoU는 내부적으로 EMA를 사용하며, 일반적인 Cross-Attention 블록의 역할을 수행 for layer in self.dec_layers: y = layer(y, enc_out, training=training) return self.final_layer(y) # 가중치 저장 chat_model = AlphaS2S(num_layers=4, d_model=160, num_heads=8, input_vocab_size=vocab_size, target_vocab_size=vocab_size, max_len=max_len) dummy_input = { "enc_inputs": tf.zeros((1, max_len), dtype=tf.int32), "dec_inputs": tf.zeros((1, max_len), dtype=tf.int32) } _ = chat_model(dummy_input) chat_model.load_weights('/kaggle/working/chat_model.weights.h5') print("모델 가중치 로드 완료!") # ======================= # 6) 추론 함수 (기존 코드 유지) # ======================= def generate_text_topp(model, prompt, max_len=150, max_gen=100, p=0.9, temperature=0.8, min_len=20): # 인코더 입력은 Prompt 만 사용 model_input = text_to_ids(f" {prompt} ") model_input = model_input[:max_len] generated = list(model_input) for step in range(max_gen): current_len = len(generated) # 현재까지 생성된 시퀀스를 입력으로 사용 if current_len > max_len: input_seq = generated[-max_len:] else: input_seq = generated # 패딩 input_padded = np.pad(input_seq, (0, max_len - len(input_seq)), constant_values=pad_id) input_tensor = tf.convert_to_tensor([input_padded]) # 모델 추론 (enc_inputs, dec_inputs 모두 동일한 시퀀스를 사용) dummy_input = { "enc_inputs": input_tensor, "dec_inputs": input_tensor } logits = model(dummy_input, training=False) # 다음 토큰의 로짓은 시퀀스의 마지막 토큰 위치에서 가져옴 (0-based index: current_len - 1) # 하지만 패딩 후 input_tensor의 실제 시퀀스 길이는 len(input_seq) next_token_logits = logits[0, len(input_seq) - 1].numpy() # 특수 토큰 생성 억제 next_token_logits[end_id] -= 5.0 next_token_logits[pad_id] -= 10.0 probs = tf.nn.softmax(next_token_logits / temperature).numpy() sorted_indices = np.argsort(probs)[::-1] sorted_probs = probs[sorted_indices] # Top-p (Nucleus) Sampling cumulative_probs = np.cumsum(sorted_probs) cutoff = np.searchsorted(cumulative_probs, p) top_indices = sorted_indices[:cutoff + 1] top_probs = sorted_probs[:cutoff + 1] top_probs /= np.sum(top_probs) next_token_id = np.random.choice(top_indices, p=top_probs) if next_token_id == end_id and len(generated) >= min_len: break generated.append(int(next_token_id)) # 토큰 제거 및 이전 부분 제거 try: sep_index = generated.index(sep_id) # 이후부터 이전까지의 응답만 반환 result_ids = generated[sep_index + 1:] try: end_index = result_ids.index(end_id) result_ids = result_ids[:end_index] except ValueError: pass return ids_to_text(result_ids) except ValueError: return ids_to_text(generated) # 이 없으면 전체 반환 print("\n\n===== 생성 결과 =====") # 모델이 1 epoch만 학습되었으므로 의미 있는 결과가 아닐 수 있습니다. print(generate_text_topp(chat_model, "제가 이따가 버스를 타야 해서 준비 좀 해야겠어요. 재미있는 대화였습니다!", p=0.9))