| # 调整后:强制覆盖数据+200万条数据+4文件4并行 | |
| # 核心:不判断直接覆盖,减少数据量,降低并行度为4 | |
| # ============================================== | |
| # 核心变量(修改为4文件4并行,200万条数据) | |
| # ============================================== | |
| DATA_NAME=${1:-"ocpython_subsampled_2M"} # 新名称区分实验 | |
| ENTROPY_QUANTILE=${2:-0.90} | |
| CHUNK_SIZE=${3:-512} | |
| OUTPUT_WINDOW=${4:-20} | |
| ITERATIVE=${5:-"true"} | |
| FORCE_PADDING=${6:-"true"} | |
| # 关键调整:文件数和并行任务数均为4 | |
| TOTAL_JSONL_FILES=4 # 分割为4个文件 | |
| TOTAL_JOBS_PER_FILE=4 # 每个文件4个并行任务 | |
| NUM_GPUS_PER_NODE=${ARNOLD_WORKER_GPU:-4} # 保持4GPU | |
| NODE_ID=${ARNOLD_ID:-0} | |
| # ============================================== | |
| # 路径定义(强制覆盖,不保留旧数据) | |
| # ============================================== | |
| HDFS_INPUT_DIR="/mnt/hdfs/linzheng/data/${DATA_NAME}" # 新路径避免冲突 | |
| HDFS_SPLITS_DIR="/mnt/hdfs/linzheng/data/${DATA_NAME}_entropy${ENTROPY_QUANTILE//./}_splits_chunk${CHUNK_SIZE}" | |
| HDFS_COMPRESS_DIR="/mnt/hdfs/linzheng/data/${DATA_NAME}_entropy${ENTROPY_QUANTILE//./}_splits_chunk${CHUNK_SIZE}_ow${OUTPUT_WINDOW}_iterative-${ITERATIVE}_forcepadding-${FORCE_PADDING}_merged_ac" | |
| # 本地临时路径 | |
| LOCAL_TEMP_DIR="./local_temp_2M" | |
| LOCAL_INPUT_DIR="${LOCAL_TEMP_DIR}/raw" | |
| # mkdir -p ${LOCAL_INPUT_DIR} | |
| # 模型路径(不变) | |
| model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000 | |
| firstbyte_prob_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/ac_unigram_probs/opencoder13G_unigram_prob_smooth0.1.json | |
| # 日志目录(新目录避免混淆) | |
| LOG_ROOT="logs_2M_fixed" | |
| # rm -rf ${LOG_ROOT} # 强制清理旧日志 | |
| # mkdir -p ${LOG_ROOT}/split_stage/node${NODE_ID} | |
| # mkdir -p ${LOG_ROOT}/compress_stage/node${NODE_ID} | |
| SPLIT_LOG_DIR="${LOG_ROOT}/split_stage/node${NODE_ID}" | |
| COMPRESS_LOG_DIR="${LOG_ROOT}/compress_stage/node${NODE_ID}" | |
| # ============================================== | |
| # 步骤1:数据准备(强制覆盖,抽取200万条) | |
| # ============================================== | |
| # echo "===== Step 1/3: 准备200万条数据(强制覆盖) =====" | |
| # # 强制删除HDFS旧目录(确保覆盖) | |
| # rm -rf "${HDFS_INPUT_DIR}" >/dev/null 2>&1 | |
| # mkdir -p "${HDFS_INPUT_DIR}" | |
| # # 检查原始数据源 | |
| # RAW_DATA_SOURCE="/mnt/hdfs/linzheng/data/opencoder_python/opencoder_python.chunk.1.jsonl" | |
| # if [ ! -f "${RAW_DATA_SOURCE}" ] || [ ! -r "${RAW_DATA_SOURCE}" ]; then | |
| # echo "错误:原始数据源不存在或无法读取!${RAW_DATA_SOURCE}" | |
| # exit 1 | |
| # fi | |
| # # 本地抽取200万条数据(核心调整) | |
| # echo "从原始文件抽取200万条数据到本地..." | |
| # rm -rf ${LOCAL_TEMP_DIR}/temp.jsonl # 清理旧临时文件 | |
| # head -n 2000000 "${RAW_DATA_SOURCE}" > "${LOCAL_TEMP_DIR}/temp.jsonl" | |
| # # 检查抽取数据有效性 | |
| # if [ ! -s "${LOCAL_TEMP_DIR}/temp.jsonl" ]; then | |
| # echo "错误:抽取的200万条数据为空!" | |
| # exit 1 | |
| # fi | |
| # # 本地分割为4个文件(核心调整) | |
| # echo "本地分割为${TOTAL_JSONL_FILES}个文件..." | |
| # split -n r/${TOTAL_JSONL_FILES} \ | |
| # --suffix-length=1 \ | |
| # --numeric-suffixes=1 \ | |
| # --additional-suffix=.jsonl \ | |
| # "${LOCAL_TEMP_DIR}/temp.jsonl" \ | |
| # "${LOCAL_INPUT_DIR}/ocp.chunk." | |
| # # 检查本地分割文件 | |
| # for i in $(seq 1 ${TOTAL_JSONL_FILES}); do | |
| # local_file="${LOCAL_INPUT_DIR}/ocp.chunk.${i}.jsonl" | |
| # if [ ! -f "${local_file}" ] || [ ! -s "${local_file}" ]; then | |
| # echo "错误:本地分割文件无效!${local_file}" | |
| # exit 1 | |
| # fi | |
| # done | |
| # # 强制复制到HDFS(覆盖旧数据) | |
| # echo "强制复制到HDFS..." | |
| # cp -f ${LOCAL_INPUT_DIR}/ocp.chunk.*.jsonl "${HDFS_INPUT_DIR}/" | |
| # # 检查HDFS文件 | |
| # for i in $(seq 1 ${TOTAL_JSONL_FILES}); do | |
| # hdfs_file="${HDFS_INPUT_DIR}/ocp.chunk.${i}.jsonl" | |
| # if [ ! -f "${hdfs_file}" ] || [ ! -s "${hdfs_file}" ]; then | |
| # echo "错误:HDFS文件无效!${hdfs_file}" | |
| # exit 1 | |
| # fi | |
| # done | |
| echo "200万条数据准备完成,HDFS路径: ${HDFS_INPUT_DIR}" | |
| # ============================================== | |
| # 步骤2:数据分割(4并行任务) | |
| # ============================================== | |
| # echo -e "\n===== Step 2/3: 窗口分割(4并行) =====" | |
| # # 强制清理旧分割结果 | |
| # rm -rf "${HDFS_SPLITS_DIR}" >/dev/null 2>&1 | |
| # mkdir -p "${HDFS_SPLITS_DIR}" | |
| # 计算任务范围(4文件×4任务=16总任务) | |
| # JOBS_PER_NODE=$(( NUM_GPUS_PER_NODE * 1 )) # 4GPU各跑1任务 | |
| # TOTAL_JOBS=$(( TOTAL_JSONL_FILES * TOTAL_JOBS_PER_FILE )) | |
| # START_JOB_IDX=$(( NODE_ID * JOBS_PER_NODE )) | |
| # END_JOB_IDX=$(( START_JOB_IDX + JOBS_PER_NODE - 1 )) | |
| # [ $END_JOB_IDX -ge $TOTAL_JOBS ] && END_JOB_IDX=$(( TOTAL_JOBS - 1 )) | |
| # echo "分割阶段节点${NODE_ID}:处理全局任务${START_JOB_IDX}~${END_JOB_IDX}" | |
| # echo "分割结果输出到:${HDFS_SPLITS_DIR}" | |
| # 启动分割任务 | |
| # GLOBAL_JOB_COUNTER=0 | |
| # for global_job_idx in $(seq ${START_JOB_IDX} ${END_JOB_IDX}); do | |
| # JSONL_IDX=$(( (global_job_idx / TOTAL_JOBS_PER_FILE) + 1 )) # 1-4 | |
| # job_index=$(( global_job_idx % TOTAL_JOBS_PER_FILE )) # 0-3 | |
| # GPU_IDX=$(( GLOBAL_JOB_COUNTER % NUM_GPUS_PER_NODE )) | |
| # input_file="${HDFS_INPUT_DIR}/ocp.chunk.${JSONL_IDX}.jsonl" | |
| # echo "启动分割任务:文件${JSONL_IDX},任务${job_index}(GPU${GPU_IDX})" | |
| # CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_split.py \ | |
| # --input_file "${input_file}" \ | |
| # --output_dir "${HDFS_SPLITS_DIR}" \ | |
| # --entropy_model_path "${model_path}" \ | |
| # --compression_model_path "${model_path}" \ | |
| # --data_batch_size 256 \ | |
| # --max_entropy_batch_size 256 \ | |
| # --num_workers 1 \ | |
| # --process_id ${job_index} \ | |
| # --num_processes ${TOTAL_JOBS_PER_FILE} \ | |
| # --base_global_quantile ${ENTROPY_QUANTILE} \ | |
| # --base_monotonic_quantile ${ENTROPY_QUANTILE} \ | |
| # --chunk_size ${CHUNK_SIZE} > "${SPLIT_LOG_DIR}/split_file${JSONL_IDX}_task${job_index}.log" 2>&1 & | |
| # GLOBAL_JOB_COUNTER=$(( GLOBAL_JOB_COUNTER + 1 )) | |
| # done | |
| # 等待并检查分割结果 | |
| # wait | |
| # echo "分割阶段节点${NODE_ID}任务结束,检查输出文件..." | |
| # for global_job_idx in $(seq ${START_JOB_IDX} ${END_JOB_IDX}); do | |
| # JSONL_IDX=$(( (global_job_idx / TOTAL_JOBS_PER_FILE) + 1 )) | |
| # job_index=$(( global_job_idx % TOTAL_JOBS_PER_FILE )) | |
| # split_output_file="${HDFS_SPLITS_DIR}/ocp.chunk.${JSONL_IDX}_out_${job_index}.jsonl" | |
| # if [ ! -f "${split_output_file}" ] || [ ! -s "${split_output_file}" ]; then | |
| # echo "错误:分割输出文件无效!${split_output_file}" | |
| # exit 1 | |
| # fi | |
| # done | |
| # echo "✅ 分割阶段节点${NODE_ID}成功完成" | |
| # ============================================== | |
| # 步骤3:数据压缩(4并行任务) | |
| # ============================================== | |
| echo -e "\n===== Step 3/3: 数据压缩(4并行) =====" | |
| # 强制清理旧压缩结果 | |
| rm -rf "${HDFS_COMPRESS_DIR}" >/dev/null 2>&1 | |
| mkdir -p "${HDFS_COMPRESS_DIR}" | |
| # 计算压缩任务范围(与分割任务匹配) | |
| JOBS_PER_NODE=$(( NUM_GPUS_PER_NODE * 1 )) # 4GPU各跑1任务(避免索引越界) | |
| TOTAL_JOBS=$(( TOTAL_JSONL_FILES * TOTAL_JOBS_PER_FILE )) | |
| START_JOB_IDX=$(( NODE_ID * JOBS_PER_NODE )) | |
| END_JOB_IDX=$(( START_JOB_IDX + JOBS_PER_NODE - 1 )) | |
| [ $END_JOB_IDX -ge $TOTAL_JOBS ] && END_JOB_IDX=$(( TOTAL_JOBS - 1 )) | |
| echo "压缩阶段节点${NODE_ID}:处理全局任务${START_JOB_IDX}~${END_JOB_IDX}" | |
| echo "压缩结果输出到:${HDFS_COMPRESS_DIR}" | |
| # 压缩参数 | |
| ADDITIONAL_ARGS="" | |
| [ "${ITERATIVE}" == "true" ] && ADDITIONAL_ARGS="--iterative_compress" | |
| [ "${FORCE_PADDING}" == "true" ] && ADDITIONAL_ARGS="${ADDITIONAL_ARGS} --force_padding_to_threshold" | |
| # 启动压缩任务 | |
| GLOBAL_JOB_COUNTER=0 | |
| for global_job_idx in $(seq ${START_JOB_IDX} ${END_JOB_IDX}); do | |
| JSONL_IDX=$(( (global_job_idx / TOTAL_JOBS_PER_FILE) + 1 )) | |
| job_index=$(( global_job_idx % TOTAL_JOBS_PER_FILE )) | |
| GPU_IDX=$(( GLOBAL_JOB_COUNTER % NUM_GPUS_PER_NODE )) | |
| input_file="${HDFS_SPLITS_DIR}/ocp.chunk.${JSONL_IDX}_out_${job_index}.jsonl" | |
| echo "启动压缩任务:文件${JSONL_IDX},任务${job_index}(GPU${GPU_IDX})" | |
| CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_compress_ac.py \ | |
| --input_file "${input_file}" \ | |
| --output_dir "${HDFS_COMPRESS_DIR}" \ | |
| --entropy_model_path "${model_path}" \ | |
| --compression_model_path "${model_path}" \ | |
| --firstbyte_prob_path "${firstbyte_prob_path}" \ | |
| --data_batch_size 512 \ | |
| --max_compression_batch_size 256 \ | |
| --output_window_size ${OUTPUT_WINDOW} \ | |
| --num_workers 3 \ | |
| --process_id ${job_index} \ | |
| --num_processes ${TOTAL_JOBS_PER_FILE} \ | |
| --debug \ | |
| ${ADDITIONAL_ARGS} > "${COMPRESS_LOG_DIR}/compress_file${JSONL_IDX}_task${job_index}.log" 2>&1 & | |
| GLOBAL_JOB_COUNTER=$(( GLOBAL_JOB_COUNTER + 1 )) | |
| done | |
| # 等待并检查压缩结果 | |
| wait | |
| echo "压缩阶段节点${NODE_ID}任务结束,检查日志..." | |
| if grep -q -E 'Error|Traceback|failed' ${COMPRESS_LOG_DIR}/*.log; then | |
| echo "❌ 压缩阶段节点${NODE_ID}出现错误!" | |
| exit 1 | |
| else | |
| echo "✅ 压缩阶段节点${NODE_ID}成功完成" | |
| fi | |
| # ============================================== | |
| # 步骤4:主节点合并结果 | |
| # ============================================== | |
| if [ ${NODE_ID} -eq 0 ]; then | |
| echo -e "\n===== 合并所有压缩结果 =====" | |
| merged_file="${HDFS_COMPRESS_DIR}/merged_final.jsonl" | |
| find "${HDFS_COMPRESS_DIR}" -name "ocp.chunk.*.jsonl" -exec cat {} \; > "${merged_file}" | |
| echo "✅ 所有结果已合并到:${merged_file}" | |
| fi | |
| # 清理本地临时文件 | |
| rm -rf ${LOCAL_TEMP_DIR} | |
| echo -e "\n===== 全流程完成 =====" | |
| # #!/bin/bash | |
| # DATA_NAME=${1:-"ocpython_subsampled_7G"} # data name | |
| # # for split | |
| # ENTROPY_QUANTILE=${2:-0.90} # entropy9 | |
| # CHUNK_SIZE=${3:-512} # chunk512 | |
| # INPUT_DIR="/mnt/hdfs/linzheng/data/${DATA_NAME}" # checkout data path | |
| # # data preparing - split to 8 cuts | |
| # if [[ ! -d "$INPUT_DIR" || -z "$(ls -A $INPUT_DIR)" ]]; then | |
| # echo "Preparing 7G dataset..." | |
| # # acquire about 7g data | |
| # head -n 3675000 /mnt/hdfs/linzheng/data/opencoder_python/opencoder_python.chunk.1.jsonl > temp.jsonl | |
| # split -n r/$TOTAL_JSONLS --suffix-length=1 --numeric-suffixes=1 --additional-suffix=.jsonl temp.jsonl ${INPUT_DIR}/ocp.chunk. | |
| # rm temp.jsonl | |
| # echo "7G data preparation completed." | |
| # else | |
| # echo "Directory '$INPUT_DIR' already exists, using existing data." | |
| # fi | |
| # # 输出目录命名:包含数据名、熵分位数、chunk大小 | |
| # SPLITS_DIR="${DATA_NAME}_entropy${ENTROPY_QUANTILE//./}_splits_chunk${CHUNK_SIZE}" | |
| # OUTPUT_DIR="/mnt/hdfs/linzheng/data/${SPLITS_DIR}" | |
| # # NUM_GPUS=4 # same gpu | |
| # # TOTAL_JOBS=8 # more task | |
| # # TOTAL_JSONLS=8 # split for 8 jsonl | |
| # # # set dir | |
| # # mkdir -p $INPUT_DIR | |
| # # mkdir -p $OUTPUT_DIR | |
| # # mkdir -p $LOG_DIR | |
| # # # model path | |
| # # entropy_model_path=/mnt/hdfs/checkpoints/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_python/checkpoints/0000200000 | |
| # # compression_model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_python/checkpoints/0000200000 | |
| # # # Step 1:offline_entropy_window_split.py | |
| # # echo "Starting window splitting for 7G data..." | |
| # # for JSONL_IDX in $(seq 1 $TOTAL_JSONLS); do | |
| # # for index in $(seq 0 $((TOTAL_JOBS - 1))); do | |
| # # echo "Starting split job $index for chunk $JSONL_IDX..." | |
| # # GPU_IDX=$(( (JSONL_IDX + index) % NUM_GPUS )) | |
| # # CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_split.py \ | |
| # # --input_file "${INPUT_DIR}/chunk.${JSONL_IDX}.jsonl" \ | |
| # # --output_dir "${OUTPUT_DIR}" \ | |
| # # --entropy_model_path $entropy_model_path \ | |
| # # --compression_model_path $compression_model_path \ | |
| # # --data_batch_size 256 \ | |
| # # --max_entropy_batch_size 256 \ | |
| # # --num_workers 2 \ | |
| # # --process_id $index \ | |
| # # --num_processes $TOTAL_JOBS \ | |
| # # --base_global_quantile ${ENTROPY_QUANTILE} \ | |
| # # --base_monotonic_quantile ${ENTROPY_QUANTILE} \ | |
| # # --chunk_size ${CHUNK_SIZE} > "${LOG_DIR}/split_file${JSONL_IDX}_task${index}.log" 2>&1 & | |
| # # # 每启动等于GPU数量的任务就等待一次,避免资源竞争 | |
| # # if (( (index + 1) % NUM_GPUS == 0 )); then | |
| # # wait | |
| # # fi | |
| # # done | |
| # # done | |
| # # # wait for split task | |
| # # wait | |
| # # echo "Window splitting for 7G data completed." | |
| # DATA_NAME=${1:-"ocpython_subsampled_7G"} | |
| # ENTROPY_QUANTILE=${2:-0.90} # 熵分位数(对应命名中的entropy90) | |
| # CHUNK_SIZE=${3:-512} # 分割块大小(对应命名中的chunk512) | |
| # INPUT_DIR="/mnt/hdfs/linzheng/data/${DATA_NAME}" # 原始数据路径 | |
| # # 输出目录命名:包含数据名、熵分位数、chunk大小 | |
| # SPLITS_DIR="${DATA_NAME}_entropy${ENTROPY_QUANTILE//./}_splits_chunk${CHUNK_SIZE}" | |
| # OUTPUT_DIR="/mnt/hdfs/linzheng/data/${SPLITS_DIR}" | |
| # # 集群环境参数(适配ARNOLD调度) | |
| # NUM_GPUS_PER_NODE=$ARNOLD_WORKER_GPU | |
| # NODE_ID=$ARNOLD_ID | |
| # JOBS_PER_GPU=1 # 分割阶段每个GPU跑1个任务 | |
| # TOTAL_JOBS_PER_FILE=8 # 每个文件的并行任务数 | |
| # TOTAL_JSONL_FILES=8 # 总文件数(根据数据量调整) | |
| # # 计算当前节点任务范围 | |
| # JOBS_PER_NODE=$(( JOBS_PER_GPU * NUM_GPUS_PER_NODE )) | |
| # TOTAL_JOBS=$(( TOTAL_JSONL_FILES * TOTAL_JOBS_PER_FILE )) | |
| # START_JOB_IDX=$(( NODE_ID * JOBS_PER_NODE )) | |
| # END_JOB_IDX=$(( START_JOB_IDX + JOBS_PER_NODE - 1 )) | |
| # if [ $END_JOB_IDX -ge $TOTAL_JOBS ]; then | |
| # END_JOB_IDX=$(( TOTAL_JOBS - 1 )) | |
| # fi | |
| # mkdir -p logs/split_stage/node${NODE_ID} | |
| # LOG_DIR="logs/split_stage/node${NODE_ID}" | |
| # echo "==================================================" | |
| # echo "分割阶段 - 节点${NODE_ID}" | |
| # echo "数据名称: ${DATA_NAME}, 熵分位数: ${ENTROPY_QUANTILE}" | |
| # echo "Chunk大小: ${CHUNK_SIZE}, 输出目录: ${SPLITS_DIR}" | |
| # echo "处理任务范围: 全局任务${START_JOB_IDX}~${END_JOB_IDX}" | |
| # echo "==================================================" | |
| # model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000 | |
| # # 启动分割任务 | |
| # GLOBAL_JOB_COUNTER=0 | |
| # for global_job_idx in $(seq $START_JOB_IDX $END_JOB_IDX); do | |
| # # 计算文件索引和任务索引 | |
| # JSONL_IDX=$(( (global_job_idx / TOTAL_JOBS_PER_FILE) + 1 )) | |
| # job_index=$(( global_job_idx % TOTAL_JOBS_PER_FILE )) | |
| # GPU_IDX=$(( GLOBAL_JOB_COUNTER % NUM_GPUS_PER_NODE )) # 轮询分配GPU | |
| # echo "启动分割任务: 文件${JSONL_IDX},任务${job_index}(GPU${GPU_IDX})" | |
| # CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_split.py \ | |
| # --input_file "${INPUT_DIR}/chunk.${JSONL_IDX}.jsonl" \ | |
| # --output_dir "${OUTPUT_DIR}" \ | |
| # --entropy_model_path "${model_path}" \ | |
| # --compression_model_path "${model_path}" \ | |
| # --data_batch_size 256 \ | |
| # --max_entropy_batch_size 256 \ | |
| # --num_workers 1 \ | |
| # --process_id ${job_index} \ | |
| # --num_processes ${TOTAL_JOBS_PER_FILE} \ | |
| # --base_global_quantile ${ENTROPY_QUANTILE} \ | |
| # --base_monotonic_quantile ${ENTROPY_QUANTILE} \ | |
| # --chunk_size ${CHUNK_SIZE} > "${LOG_DIR}/split_file${JSONL_IDX}_task${job_index}.log" 2>&1 & | |
| # GLOBAL_JOB_COUNTER=$(( GLOBAL_JOB_COUNTER + 1 )) | |
| # done | |
| # wait | |
| # echo "分割阶段节点${NODE_ID}任务结束,检查日志..." | |
| # if grep -q -E 'Error|Traceback|failed' ${LOG_DIR}/*.log; then | |
| # echo "❌ 分割阶段节点${NODE_ID}出现错误!" | |
| # exit 1 | |
| # else | |
| # echo "✅ 分割阶段节点${NODE_ID}成功完成" | |
| # fi | |
| # ## stage2: compress data | |
| # DATA_NAME=${1:-"ocpython_subsampled_50G"} | |
| # ENTROPY_QUANTILE=${2:-0.90} | |
| # CHUNK_SIZE=${3:-512} | |
| # OUTPUT_WINDOW=${4:-20} | |
| # ITERATIVE=${5:-"true"} | |
| # FORCE_PADDING=${6:-"true"} | |
| # SPLITS_DIR="${DATA_NAME}_entropy${ENTROPY_QUANTILE//./}_splits_chunk${CHUNK_SIZE}" | |
| # INPUT_DIR="/mnt/hdfs/linzheng/data/${SPLITS_DIR}" | |
| # COMPRESS_DIR="${DATA_NAME}_entropy${ENTROPY_QUANTILE//./}_splits_chunk${CHUNK_SIZE}_ow${OUTPUT_WINDOW}_iterative-${ITERATIVE}_forcepadding-${FORCE_PADDING}_merged_ac" | |
| # OUTPUT_DIR="/mnt/hdfs/linzheng/data/${COMPRESS_DIR}" | |
| # # 集群环境参数 | |
| # NUM_GPUS_PER_NODE=$ARNOLD_WORKER_GPU | |
| # NODE_ID=$ARNOLD_ID | |
| # JOBS_PER_GPU=2 # 压缩阶段每个GPU跑2个任务(提高利用率) | |
| # TOTAL_JOBS_PER_FILE=8 # 与分割阶段保持一致 | |
| # TOTAL_JSONL_FILES=8 | |
| # # 计算当前节点任务范围 | |
| # JOBS_PER_NODE=$(( JOBS_PER_GPU * NUM_GPUS_PER_NODE )) | |
| # TOTAL_JOBS=$(( TOTAL_JSONL_FILES * TOTAL_JOBS_PER_FILE )) | |
| # START_JOB_IDX=$(( NODE_ID * JOBS_PER_NODE )) | |
| # END_JOB_IDX=$(( START_JOB_IDX + JOBS_PER_NODE - 1 )) | |
| # if [ $END_JOB_IDX -ge $TOTAL_JOBS ]; then | |
| # END_JOB_IDX=$(( TOTAL_JOBS - 1 )) | |
| # fi | |
| # mkdir -p logs/compress_stage/node${NODE_ID} | |
| # LOG_DIR="logs/compress_stage/node${NODE_ID}" | |
| # echo "==================================================" | |
| # echo "压缩阶段 - 节点${NODE_ID}" | |
| # echo "输入分割目录: ${SPLITS_DIR}" | |
| # echo "输出压缩目录: ${COMPRESS_DIR}" | |
| # echo "窗口大小: ${OUTPUT_WINDOW}, 迭代压缩: ${ITERATIVE}, 强制填充: ${FORCE_PADDING}" | |
| # echo "处理任务范围: 全局任务${START_JOB_IDX}~${END_JOB_IDX}" | |
| # echo "==================================================" | |
| # model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000 | |
| # firstbyte_prob_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/ac_unigram_probs/opencoder13G_unigram_prob_smooth0.1.json | |
| # # 压缩参数拼接(迭代压缩和强制填充) | |
| # ADDITIONAL_ARGS="" | |
| # if [ "$ITERATIVE" == "true" ]; then | |
| # ADDITIONAL_ARGS="--iterative_compress" | |
| # fi | |
| # if [ "$FORCE_PADDING" == "true" ]; then | |
| # ADDITIONAL_ARGS="${ADDITIONAL_ARGS} --force_padding_to_threshold" | |
| # fi | |
| # # 启动压缩任务 | |
| # GLOBAL_JOB_COUNTER=0 | |
| # for global_job_idx in $(seq $START_JOB_IDX $END_JOB_IDX); do | |
| # JSONL_IDX=$(( (global_job_idx / TOTAL_JOBS_PER_FILE) + 1 )) | |
| # job_index=$(( global_job_idx % TOTAL_JOBS_PER_FILE )) | |
| # GPU_IDX=$(( GLOBAL_JOB_COUNTER % NUM_GPUS_PER_NODE )) | |
| # echo "启动压缩任务: 文件${JSONL_IDX},任务${job_index}(GPU${GPU_IDX})" | |
| # CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_compress_ac.py \ | |
| # --input_file "${INPUT_DIR}/chunk.${JSONL_IDX}_out_${job_index}.jsonl" \ # 对应分割阶段的输出 | |
| # --output_dir "${OUTPUT_DIR}" \ | |
| # --entropy_model_path "${model_path}" \ | |
| # --compression_model_path "${model_path}" \ | |
| # --firstbyte_prob_path "${firstbyte_prob_path}" \ | |
| # --data_batch_size 512 \ | |
| # --max_compression_batch_size 256 \ | |
| # --output_window_size ${OUTPUT_WINDOW} \ | |
| # --num_workers 3 \ | |
| # --process_id ${job_index} \ | |
| # --num_processes ${TOTAL_JOBS_PER_FILE} \ | |
| # ${ADDITIONAL_ARGS} > "${LOG_DIR}/compress_file${JSONL_IDX}_task${job_index}.log" 2>&1 & | |
| # GLOBAL_JOB_COUNTER=$(( GLOBAL_JOB_COUNTER + 1 )) | |
| # done | |
| # # 等待任务完成并检查错误 | |
| # wait | |
| # echo "压缩阶段节点${NODE_ID}任务结束,检查日志..." | |
| # if grep -q -E 'Error|Traceback|failed' ${LOG_DIR}/*.log; then | |
| # echo "❌ 压缩阶段节点${NODE_ID}出现错误!" | |
| # exit 1 | |
| # else | |
| # echo "✅ 压缩阶段节点${NODE_ID}成功完成" | |
| # fi | |
| # # 主节点负责合并结果(仅让节点0执行合并) | |
| # if [ $NODE_ID -eq 0 ]; then | |
| # echo "开始合并所有压缩结果到${OUTPUT_DIR}..." | |
| # # 合并逻辑(根据你的merge_output函数实现,这里简化为示例) | |
| # find "${OUTPUT_DIR}" -name "*.jsonl" -exec cat {} \; > "${OUTPUT_DIR}/merged_final.jsonl" | |
| # echo "✅ 所有结果已合并到${OUTPUT_DIR}/merged_final.jsonl" | |
| # fi |