Byte-lingua-code / run_7G_data.sh
2ira's picture
offline_compression_graph_code
72c0672 verified
#!/bin/bash
# 调整后:强制覆盖数据+200万条数据+4文件4并行
# 核心:不判断直接覆盖,减少数据量,降低并行度为4
# ==============================================
# 核心变量(修改为4文件4并行,200万条数据)
# ==============================================
DATA_NAME=${1:-"ocpython_subsampled_2M"} # 新名称区分实验
ENTROPY_QUANTILE=${2:-0.90}
CHUNK_SIZE=${3:-512}
OUTPUT_WINDOW=${4:-20}
ITERATIVE=${5:-"true"}
FORCE_PADDING=${6:-"true"}
# 关键调整:文件数和并行任务数均为4
TOTAL_JSONL_FILES=4 # 分割为4个文件
TOTAL_JOBS_PER_FILE=4 # 每个文件4个并行任务
NUM_GPUS_PER_NODE=${ARNOLD_WORKER_GPU:-4} # 保持4GPU
NODE_ID=${ARNOLD_ID:-0}
# ==============================================
# 路径定义(强制覆盖,不保留旧数据)
# ==============================================
HDFS_INPUT_DIR="/mnt/hdfs/linzheng/data/${DATA_NAME}" # 新路径避免冲突
HDFS_SPLITS_DIR="/mnt/hdfs/linzheng/data/${DATA_NAME}_entropy${ENTROPY_QUANTILE//./}_splits_chunk${CHUNK_SIZE}"
HDFS_COMPRESS_DIR="/mnt/hdfs/linzheng/data/${DATA_NAME}_entropy${ENTROPY_QUANTILE//./}_splits_chunk${CHUNK_SIZE}_ow${OUTPUT_WINDOW}_iterative-${ITERATIVE}_forcepadding-${FORCE_PADDING}_merged_ac"
# 本地临时路径
LOCAL_TEMP_DIR="./local_temp_2M"
LOCAL_INPUT_DIR="${LOCAL_TEMP_DIR}/raw"
# mkdir -p ${LOCAL_INPUT_DIR}
# 模型路径(不变)
model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000
firstbyte_prob_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/ac_unigram_probs/opencoder13G_unigram_prob_smooth0.1.json
# 日志目录(新目录避免混淆)
LOG_ROOT="logs_2M_fixed"
# rm -rf ${LOG_ROOT} # 强制清理旧日志
# mkdir -p ${LOG_ROOT}/split_stage/node${NODE_ID}
# mkdir -p ${LOG_ROOT}/compress_stage/node${NODE_ID}
SPLIT_LOG_DIR="${LOG_ROOT}/split_stage/node${NODE_ID}"
COMPRESS_LOG_DIR="${LOG_ROOT}/compress_stage/node${NODE_ID}"
# ==============================================
# 步骤1:数据准备(强制覆盖,抽取200万条)
# ==============================================
# echo "===== Step 1/3: 准备200万条数据(强制覆盖) ====="
# # 强制删除HDFS旧目录(确保覆盖)
# rm -rf "${HDFS_INPUT_DIR}" >/dev/null 2>&1
# mkdir -p "${HDFS_INPUT_DIR}"
# # 检查原始数据源
# RAW_DATA_SOURCE="/mnt/hdfs/linzheng/data/opencoder_python/opencoder_python.chunk.1.jsonl"
# if [ ! -f "${RAW_DATA_SOURCE}" ] || [ ! -r "${RAW_DATA_SOURCE}" ]; then
# echo "错误:原始数据源不存在或无法读取!${RAW_DATA_SOURCE}"
# exit 1
# fi
# # 本地抽取200万条数据(核心调整)
# echo "从原始文件抽取200万条数据到本地..."
# rm -rf ${LOCAL_TEMP_DIR}/temp.jsonl # 清理旧临时文件
# head -n 2000000 "${RAW_DATA_SOURCE}" > "${LOCAL_TEMP_DIR}/temp.jsonl"
# # 检查抽取数据有效性
# if [ ! -s "${LOCAL_TEMP_DIR}/temp.jsonl" ]; then
# echo "错误:抽取的200万条数据为空!"
# exit 1
# fi
# # 本地分割为4个文件(核心调整)
# echo "本地分割为${TOTAL_JSONL_FILES}个文件..."
# split -n r/${TOTAL_JSONL_FILES} \
# --suffix-length=1 \
# --numeric-suffixes=1 \
# --additional-suffix=.jsonl \
# "${LOCAL_TEMP_DIR}/temp.jsonl" \
# "${LOCAL_INPUT_DIR}/ocp.chunk."
# # 检查本地分割文件
# for i in $(seq 1 ${TOTAL_JSONL_FILES}); do
# local_file="${LOCAL_INPUT_DIR}/ocp.chunk.${i}.jsonl"
# if [ ! -f "${local_file}" ] || [ ! -s "${local_file}" ]; then
# echo "错误:本地分割文件无效!${local_file}"
# exit 1
# fi
# done
# # 强制复制到HDFS(覆盖旧数据)
# echo "强制复制到HDFS..."
# cp -f ${LOCAL_INPUT_DIR}/ocp.chunk.*.jsonl "${HDFS_INPUT_DIR}/"
# # 检查HDFS文件
# for i in $(seq 1 ${TOTAL_JSONL_FILES}); do
# hdfs_file="${HDFS_INPUT_DIR}/ocp.chunk.${i}.jsonl"
# if [ ! -f "${hdfs_file}" ] || [ ! -s "${hdfs_file}" ]; then
# echo "错误:HDFS文件无效!${hdfs_file}"
# exit 1
# fi
# done
echo "200万条数据准备完成,HDFS路径: ${HDFS_INPUT_DIR}"
# ==============================================
# 步骤2:数据分割(4并行任务)
# ==============================================
# echo -e "\n===== Step 2/3: 窗口分割(4并行) ====="
# # 强制清理旧分割结果
# rm -rf "${HDFS_SPLITS_DIR}" >/dev/null 2>&1
# mkdir -p "${HDFS_SPLITS_DIR}"
# 计算任务范围(4文件×4任务=16总任务)
# JOBS_PER_NODE=$(( NUM_GPUS_PER_NODE * 1 )) # 4GPU各跑1任务
# TOTAL_JOBS=$(( TOTAL_JSONL_FILES * TOTAL_JOBS_PER_FILE ))
# START_JOB_IDX=$(( NODE_ID * JOBS_PER_NODE ))
# END_JOB_IDX=$(( START_JOB_IDX + JOBS_PER_NODE - 1 ))
# [ $END_JOB_IDX -ge $TOTAL_JOBS ] && END_JOB_IDX=$(( TOTAL_JOBS - 1 ))
# echo "分割阶段节点${NODE_ID}:处理全局任务${START_JOB_IDX}~${END_JOB_IDX}"
# echo "分割结果输出到:${HDFS_SPLITS_DIR}"
# 启动分割任务
# GLOBAL_JOB_COUNTER=0
# for global_job_idx in $(seq ${START_JOB_IDX} ${END_JOB_IDX}); do
# JSONL_IDX=$(( (global_job_idx / TOTAL_JOBS_PER_FILE) + 1 )) # 1-4
# job_index=$(( global_job_idx % TOTAL_JOBS_PER_FILE )) # 0-3
# GPU_IDX=$(( GLOBAL_JOB_COUNTER % NUM_GPUS_PER_NODE ))
# input_file="${HDFS_INPUT_DIR}/ocp.chunk.${JSONL_IDX}.jsonl"
# echo "启动分割任务:文件${JSONL_IDX},任务${job_index}(GPU${GPU_IDX})"
# CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_split.py \
# --input_file "${input_file}" \
# --output_dir "${HDFS_SPLITS_DIR}" \
# --entropy_model_path "${model_path}" \
# --compression_model_path "${model_path}" \
# --data_batch_size 256 \
# --max_entropy_batch_size 256 \
# --num_workers 1 \
# --process_id ${job_index} \
# --num_processes ${TOTAL_JOBS_PER_FILE} \
# --base_global_quantile ${ENTROPY_QUANTILE} \
# --base_monotonic_quantile ${ENTROPY_QUANTILE} \
# --chunk_size ${CHUNK_SIZE} > "${SPLIT_LOG_DIR}/split_file${JSONL_IDX}_task${job_index}.log" 2>&1 &
# GLOBAL_JOB_COUNTER=$(( GLOBAL_JOB_COUNTER + 1 ))
# done
# 等待并检查分割结果
# wait
# echo "分割阶段节点${NODE_ID}任务结束,检查输出文件..."
# for global_job_idx in $(seq ${START_JOB_IDX} ${END_JOB_IDX}); do
# JSONL_IDX=$(( (global_job_idx / TOTAL_JOBS_PER_FILE) + 1 ))
# job_index=$(( global_job_idx % TOTAL_JOBS_PER_FILE ))
# split_output_file="${HDFS_SPLITS_DIR}/ocp.chunk.${JSONL_IDX}_out_${job_index}.jsonl"
# if [ ! -f "${split_output_file}" ] || [ ! -s "${split_output_file}" ]; then
# echo "错误:分割输出文件无效!${split_output_file}"
# exit 1
# fi
# done
# echo "✅ 分割阶段节点${NODE_ID}成功完成"
# ==============================================
# 步骤3:数据压缩(4并行任务)
# ==============================================
echo -e "\n===== Step 3/3: 数据压缩(4并行) ====="
# 强制清理旧压缩结果
rm -rf "${HDFS_COMPRESS_DIR}" >/dev/null 2>&1
mkdir -p "${HDFS_COMPRESS_DIR}"
# 计算压缩任务范围(与分割任务匹配)
JOBS_PER_NODE=$(( NUM_GPUS_PER_NODE * 1 )) # 4GPU各跑1任务(避免索引越界)
TOTAL_JOBS=$(( TOTAL_JSONL_FILES * TOTAL_JOBS_PER_FILE ))
START_JOB_IDX=$(( NODE_ID * JOBS_PER_NODE ))
END_JOB_IDX=$(( START_JOB_IDX + JOBS_PER_NODE - 1 ))
[ $END_JOB_IDX -ge $TOTAL_JOBS ] && END_JOB_IDX=$(( TOTAL_JOBS - 1 ))
echo "压缩阶段节点${NODE_ID}:处理全局任务${START_JOB_IDX}~${END_JOB_IDX}"
echo "压缩结果输出到:${HDFS_COMPRESS_DIR}"
# 压缩参数
ADDITIONAL_ARGS=""
[ "${ITERATIVE}" == "true" ] && ADDITIONAL_ARGS="--iterative_compress"
[ "${FORCE_PADDING}" == "true" ] && ADDITIONAL_ARGS="${ADDITIONAL_ARGS} --force_padding_to_threshold"
# 启动压缩任务
GLOBAL_JOB_COUNTER=0
for global_job_idx in $(seq ${START_JOB_IDX} ${END_JOB_IDX}); do
JSONL_IDX=$(( (global_job_idx / TOTAL_JOBS_PER_FILE) + 1 ))
job_index=$(( global_job_idx % TOTAL_JOBS_PER_FILE ))
GPU_IDX=$(( GLOBAL_JOB_COUNTER % NUM_GPUS_PER_NODE ))
input_file="${HDFS_SPLITS_DIR}/ocp.chunk.${JSONL_IDX}_out_${job_index}.jsonl"
echo "启动压缩任务:文件${JSONL_IDX},任务${job_index}(GPU${GPU_IDX})"
CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_compress_ac.py \
--input_file "${input_file}" \
--output_dir "${HDFS_COMPRESS_DIR}" \
--entropy_model_path "${model_path}" \
--compression_model_path "${model_path}" \
--firstbyte_prob_path "${firstbyte_prob_path}" \
--data_batch_size 512 \
--max_compression_batch_size 256 \
--output_window_size ${OUTPUT_WINDOW} \
--num_workers 3 \
--process_id ${job_index} \
--num_processes ${TOTAL_JOBS_PER_FILE} \
--debug \
${ADDITIONAL_ARGS} > "${COMPRESS_LOG_DIR}/compress_file${JSONL_IDX}_task${job_index}.log" 2>&1 &
GLOBAL_JOB_COUNTER=$(( GLOBAL_JOB_COUNTER + 1 ))
done
# 等待并检查压缩结果
wait
echo "压缩阶段节点${NODE_ID}任务结束,检查日志..."
if grep -q -E 'Error|Traceback|failed' ${COMPRESS_LOG_DIR}/*.log; then
echo "❌ 压缩阶段节点${NODE_ID}出现错误!"
exit 1
else
echo "✅ 压缩阶段节点${NODE_ID}成功完成"
fi
# ==============================================
# 步骤4:主节点合并结果
# ==============================================
if [ ${NODE_ID} -eq 0 ]; then
echo -e "\n===== 合并所有压缩结果 ====="
merged_file="${HDFS_COMPRESS_DIR}/merged_final.jsonl"
find "${HDFS_COMPRESS_DIR}" -name "ocp.chunk.*.jsonl" -exec cat {} \; > "${merged_file}"
echo "✅ 所有结果已合并到:${merged_file}"
fi
# 清理本地临时文件
rm -rf ${LOCAL_TEMP_DIR}
echo -e "\n===== 全流程完成 ====="
# #!/bin/bash
# DATA_NAME=${1:-"ocpython_subsampled_7G"} # data name
# # for split
# ENTROPY_QUANTILE=${2:-0.90} # entropy9
# CHUNK_SIZE=${3:-512} # chunk512
# INPUT_DIR="/mnt/hdfs/linzheng/data/${DATA_NAME}" # checkout data path
# # data preparing - split to 8 cuts
# if [[ ! -d "$INPUT_DIR" || -z "$(ls -A $INPUT_DIR)" ]]; then
# echo "Preparing 7G dataset..."
# # acquire about 7g data
# head -n 3675000 /mnt/hdfs/linzheng/data/opencoder_python/opencoder_python.chunk.1.jsonl > temp.jsonl
# split -n r/$TOTAL_JSONLS --suffix-length=1 --numeric-suffixes=1 --additional-suffix=.jsonl temp.jsonl ${INPUT_DIR}/ocp.chunk.
# rm temp.jsonl
# echo "7G data preparation completed."
# else
# echo "Directory '$INPUT_DIR' already exists, using existing data."
# fi
# # 输出目录命名:包含数据名、熵分位数、chunk大小
# SPLITS_DIR="${DATA_NAME}_entropy${ENTROPY_QUANTILE//./}_splits_chunk${CHUNK_SIZE}"
# OUTPUT_DIR="/mnt/hdfs/linzheng/data/${SPLITS_DIR}"
# # NUM_GPUS=4 # same gpu
# # TOTAL_JOBS=8 # more task
# # TOTAL_JSONLS=8 # split for 8 jsonl
# # # set dir
# # mkdir -p $INPUT_DIR
# # mkdir -p $OUTPUT_DIR
# # mkdir -p $LOG_DIR
# # # model path
# # entropy_model_path=/mnt/hdfs/checkpoints/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_python/checkpoints/0000200000
# # compression_model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_python/checkpoints/0000200000
# # # Step 1:offline_entropy_window_split.py
# # echo "Starting window splitting for 7G data..."
# # for JSONL_IDX in $(seq 1 $TOTAL_JSONLS); do
# # for index in $(seq 0 $((TOTAL_JOBS - 1))); do
# # echo "Starting split job $index for chunk $JSONL_IDX..."
# # GPU_IDX=$(( (JSONL_IDX + index) % NUM_GPUS ))
# # CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_split.py \
# # --input_file "${INPUT_DIR}/chunk.${JSONL_IDX}.jsonl" \
# # --output_dir "${OUTPUT_DIR}" \
# # --entropy_model_path $entropy_model_path \
# # --compression_model_path $compression_model_path \
# # --data_batch_size 256 \
# # --max_entropy_batch_size 256 \
# # --num_workers 2 \
# # --process_id $index \
# # --num_processes $TOTAL_JOBS \
# # --base_global_quantile ${ENTROPY_QUANTILE} \
# # --base_monotonic_quantile ${ENTROPY_QUANTILE} \
# # --chunk_size ${CHUNK_SIZE} > "${LOG_DIR}/split_file${JSONL_IDX}_task${index}.log" 2>&1 &
# # # 每启动等于GPU数量的任务就等待一次,避免资源竞争
# # if (( (index + 1) % NUM_GPUS == 0 )); then
# # wait
# # fi
# # done
# # done
# # # wait for split task
# # wait
# # echo "Window splitting for 7G data completed."
# DATA_NAME=${1:-"ocpython_subsampled_7G"}
# ENTROPY_QUANTILE=${2:-0.90} # 熵分位数(对应命名中的entropy90)
# CHUNK_SIZE=${3:-512} # 分割块大小(对应命名中的chunk512)
# INPUT_DIR="/mnt/hdfs/linzheng/data/${DATA_NAME}" # 原始数据路径
# # 输出目录命名:包含数据名、熵分位数、chunk大小
# SPLITS_DIR="${DATA_NAME}_entropy${ENTROPY_QUANTILE//./}_splits_chunk${CHUNK_SIZE}"
# OUTPUT_DIR="/mnt/hdfs/linzheng/data/${SPLITS_DIR}"
# # 集群环境参数(适配ARNOLD调度)
# NUM_GPUS_PER_NODE=$ARNOLD_WORKER_GPU
# NODE_ID=$ARNOLD_ID
# JOBS_PER_GPU=1 # 分割阶段每个GPU跑1个任务
# TOTAL_JOBS_PER_FILE=8 # 每个文件的并行任务数
# TOTAL_JSONL_FILES=8 # 总文件数(根据数据量调整)
# # 计算当前节点任务范围
# JOBS_PER_NODE=$(( JOBS_PER_GPU * NUM_GPUS_PER_NODE ))
# TOTAL_JOBS=$(( TOTAL_JSONL_FILES * TOTAL_JOBS_PER_FILE ))
# START_JOB_IDX=$(( NODE_ID * JOBS_PER_NODE ))
# END_JOB_IDX=$(( START_JOB_IDX + JOBS_PER_NODE - 1 ))
# if [ $END_JOB_IDX -ge $TOTAL_JOBS ]; then
# END_JOB_IDX=$(( TOTAL_JOBS - 1 ))
# fi
# mkdir -p logs/split_stage/node${NODE_ID}
# LOG_DIR="logs/split_stage/node${NODE_ID}"
# echo "=================================================="
# echo "分割阶段 - 节点${NODE_ID}"
# echo "数据名称: ${DATA_NAME}, 熵分位数: ${ENTROPY_QUANTILE}"
# echo "Chunk大小: ${CHUNK_SIZE}, 输出目录: ${SPLITS_DIR}"
# echo "处理任务范围: 全局任务${START_JOB_IDX}~${END_JOB_IDX}"
# echo "=================================================="
# model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000
# # 启动分割任务
# GLOBAL_JOB_COUNTER=0
# for global_job_idx in $(seq $START_JOB_IDX $END_JOB_IDX); do
# # 计算文件索引和任务索引
# JSONL_IDX=$(( (global_job_idx / TOTAL_JOBS_PER_FILE) + 1 ))
# job_index=$(( global_job_idx % TOTAL_JOBS_PER_FILE ))
# GPU_IDX=$(( GLOBAL_JOB_COUNTER % NUM_GPUS_PER_NODE )) # 轮询分配GPU
# echo "启动分割任务: 文件${JSONL_IDX},任务${job_index}(GPU${GPU_IDX})"
# CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_split.py \
# --input_file "${INPUT_DIR}/chunk.${JSONL_IDX}.jsonl" \
# --output_dir "${OUTPUT_DIR}" \
# --entropy_model_path "${model_path}" \
# --compression_model_path "${model_path}" \
# --data_batch_size 256 \
# --max_entropy_batch_size 256 \
# --num_workers 1 \
# --process_id ${job_index} \
# --num_processes ${TOTAL_JOBS_PER_FILE} \
# --base_global_quantile ${ENTROPY_QUANTILE} \
# --base_monotonic_quantile ${ENTROPY_QUANTILE} \
# --chunk_size ${CHUNK_SIZE} > "${LOG_DIR}/split_file${JSONL_IDX}_task${job_index}.log" 2>&1 &
# GLOBAL_JOB_COUNTER=$(( GLOBAL_JOB_COUNTER + 1 ))
# done
# wait
# echo "分割阶段节点${NODE_ID}任务结束,检查日志..."
# if grep -q -E 'Error|Traceback|failed' ${LOG_DIR}/*.log; then
# echo "❌ 分割阶段节点${NODE_ID}出现错误!"
# exit 1
# else
# echo "✅ 分割阶段节点${NODE_ID}成功完成"
# fi
# ## stage2: compress data
# DATA_NAME=${1:-"ocpython_subsampled_50G"}
# ENTROPY_QUANTILE=${2:-0.90}
# CHUNK_SIZE=${3:-512}
# OUTPUT_WINDOW=${4:-20}
# ITERATIVE=${5:-"true"}
# FORCE_PADDING=${6:-"true"}
# SPLITS_DIR="${DATA_NAME}_entropy${ENTROPY_QUANTILE//./}_splits_chunk${CHUNK_SIZE}"
# INPUT_DIR="/mnt/hdfs/linzheng/data/${SPLITS_DIR}"
# COMPRESS_DIR="${DATA_NAME}_entropy${ENTROPY_QUANTILE//./}_splits_chunk${CHUNK_SIZE}_ow${OUTPUT_WINDOW}_iterative-${ITERATIVE}_forcepadding-${FORCE_PADDING}_merged_ac"
# OUTPUT_DIR="/mnt/hdfs/linzheng/data/${COMPRESS_DIR}"
# # 集群环境参数
# NUM_GPUS_PER_NODE=$ARNOLD_WORKER_GPU
# NODE_ID=$ARNOLD_ID
# JOBS_PER_GPU=2 # 压缩阶段每个GPU跑2个任务(提高利用率)
# TOTAL_JOBS_PER_FILE=8 # 与分割阶段保持一致
# TOTAL_JSONL_FILES=8
# # 计算当前节点任务范围
# JOBS_PER_NODE=$(( JOBS_PER_GPU * NUM_GPUS_PER_NODE ))
# TOTAL_JOBS=$(( TOTAL_JSONL_FILES * TOTAL_JOBS_PER_FILE ))
# START_JOB_IDX=$(( NODE_ID * JOBS_PER_NODE ))
# END_JOB_IDX=$(( START_JOB_IDX + JOBS_PER_NODE - 1 ))
# if [ $END_JOB_IDX -ge $TOTAL_JOBS ]; then
# END_JOB_IDX=$(( TOTAL_JOBS - 1 ))
# fi
# mkdir -p logs/compress_stage/node${NODE_ID}
# LOG_DIR="logs/compress_stage/node${NODE_ID}"
# echo "=================================================="
# echo "压缩阶段 - 节点${NODE_ID}"
# echo "输入分割目录: ${SPLITS_DIR}"
# echo "输出压缩目录: ${COMPRESS_DIR}"
# echo "窗口大小: ${OUTPUT_WINDOW}, 迭代压缩: ${ITERATIVE}, 强制填充: ${FORCE_PADDING}"
# echo "处理任务范围: 全局任务${START_JOB_IDX}~${END_JOB_IDX}"
# echo "=================================================="
# model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000
# firstbyte_prob_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/ac_unigram_probs/opencoder13G_unigram_prob_smooth0.1.json
# # 压缩参数拼接(迭代压缩和强制填充)
# ADDITIONAL_ARGS=""
# if [ "$ITERATIVE" == "true" ]; then
# ADDITIONAL_ARGS="--iterative_compress"
# fi
# if [ "$FORCE_PADDING" == "true" ]; then
# ADDITIONAL_ARGS="${ADDITIONAL_ARGS} --force_padding_to_threshold"
# fi
# # 启动压缩任务
# GLOBAL_JOB_COUNTER=0
# for global_job_idx in $(seq $START_JOB_IDX $END_JOB_IDX); do
# JSONL_IDX=$(( (global_job_idx / TOTAL_JOBS_PER_FILE) + 1 ))
# job_index=$(( global_job_idx % TOTAL_JOBS_PER_FILE ))
# GPU_IDX=$(( GLOBAL_JOB_COUNTER % NUM_GPUS_PER_NODE ))
# echo "启动压缩任务: 文件${JSONL_IDX},任务${job_index}(GPU${GPU_IDX})"
# CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_compress_ac.py \
# --input_file "${INPUT_DIR}/chunk.${JSONL_IDX}_out_${job_index}.jsonl" \ # 对应分割阶段的输出
# --output_dir "${OUTPUT_DIR}" \
# --entropy_model_path "${model_path}" \
# --compression_model_path "${model_path}" \
# --firstbyte_prob_path "${firstbyte_prob_path}" \
# --data_batch_size 512 \
# --max_compression_batch_size 256 \
# --output_window_size ${OUTPUT_WINDOW} \
# --num_workers 3 \
# --process_id ${job_index} \
# --num_processes ${TOTAL_JOBS_PER_FILE} \
# ${ADDITIONAL_ARGS} > "${LOG_DIR}/compress_file${JSONL_IDX}_task${job_index}.log" 2>&1 &
# GLOBAL_JOB_COUNTER=$(( GLOBAL_JOB_COUNTER + 1 ))
# done
# # 等待任务完成并检查错误
# wait
# echo "压缩阶段节点${NODE_ID}任务结束,检查日志..."
# if grep -q -E 'Error|Traceback|failed' ${LOG_DIR}/*.log; then
# echo "❌ 压缩阶段节点${NODE_ID}出现错误!"
# exit 1
# else
# echo "✅ 压缩阶段节点${NODE_ID}成功完成"
# fi
# # 主节点负责合并结果(仅让节点0执行合并)
# if [ $NODE_ID -eq 0 ]; then
# echo "开始合并所有压缩结果到${OUTPUT_DIR}..."
# # 合并逻辑(根据你的merge_output函数实现,这里简化为示例)
# find "${OUTPUT_DIR}" -name "*.jsonl" -exec cat {} \; > "${OUTPUT_DIR}/merged_final.jsonl"
# echo "✅ 所有结果已合并到${OUTPUT_DIR}/merged_final.jsonl"
# fi