Byte-lingua-code / distributed_run_opencoder_compress.sh
2ira's picture
offline_compression_graph_code
72c0672 verified
#!/bin/bash
MODE=${1:-"split"}
OUTPUTWINDOW=${2:-16}
ITERATIVE_COMPRESS=${3:-"true"}
splits_dir=${4:-"ocpython_subsampled_50G_entropy90_splits_line"}
FORCE_PADDING=${5:-"true"}
SPLIT_CHUNK_SIZE=${6:-"lines"}
GLOBAL_NODE_OFFSET=${7:-0}
ENTROPY_QUANTILE=0.90
NUM_GPUS_PER_NODE=$ARNOLD_WORKER_GPU
NODE_ID=$(( ARNOLD_ID + GLOBAL_NODE_OFFSET ))
if [ "$MODE" == "split" ]; then
JOBS_PER_GPU=1
elif [ "$MODE" == "compress" ]; then
JOBS_PER_GPU=2
else
echo "Error: Unknown mode '$MODE'."
echo "Available modes: split, compress"
exit 1
fi
# Total number of JSONL files to process
TOTAL_JSONL_FILES=16
TOTAL_JOBS_PER_FILE=64 # JOBS_PER_GPU = 2 so 32 GPU process one file
TOTAL_JOBS=$(( TOTAL_JOBS_PER_FILE * TOTAL_JSONL_FILES ))
JOBS_PER_NODE=$(( JOBS_PER_GPU * NUM_GPUS_PER_NODE ))
GLOBAL_TOTAL_NODES=$(( (TOTAL_JSONL_FILES * TOTAL_JOBS_PER_FILE) / JOBS_PER_NODE ))
# Calculate the start and end job indices for THIS specific node.
# These are global indices ranging from 0 to (TOTAL_JOBS_PER_FILE - 1).
START_JOB_IDX=$(( NODE_ID * JOBS_PER_NODE ))
END_JOB_IDX=$(( START_JOB_IDX + JOBS_PER_NODE - 1 ))
# Ensure the last node doesn't try to run more jobs than exist.
if [ $END_JOB_IDX -ge $TOTAL_JOBS ]; then
END_JOB_IDX=$(( TOTAL_JOBS - 1 ))
fi
if [[ $SPLIT_CHUNK_SIZE == "lines" ]]; then
SPLIT_ARGS="--chunk_size 128 --apply_line_split"
else
SPLIT_ARGS="--chunk_size $SPLIT_CHUNK_SIZE"
fi
if [[ $ITERATIVE_COMPRESS == "false" ]]; then
ADDITIONAL_ARG=""
elif [[ $ITERATIVE_COMPRESS == "true" ]]; then
ADDITIONAL_ARG="--iterative_compress"
else
echo "Error: Unknown arg '$ITERATIVE_COMPRESS'."
echo "Available values: false, true"
exit 1
fi
if [[ $FORCE_PADDING == "false" ]]; then
ADDITIONAL_ARG=${ADDITIONAL_ARG}" "
elif [[ $FORCE_PADDING == "true" ]]; then
ADDITIONAL_ARG=${ADDITIONAL_ARG}" --force_padding_to_threshold"
else
echo "Error: Unknown arg '$FORCE_PADDING'."
echo "Available values: false, true"
exit 1
fi
compress_dir=${splits_dir}"_ow${OUTPUTWINDOW}_iterative-${ITERATIVE_COMPRESS}_forcepadding-${FORCE_PADDING}_ac"
# Directory and model paths
input_dir="opencoder"
entropy_model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000
compression_model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000
echo "=================================================="
echo "Starting processing on Node ${NODE_ID} of ${GLOBAL_TOTAL_NODES}"
echo "Node GPU Count: ${NUM_GPUS_PER_NODE}"
echo "Total Jobs per File: ${TOTAL_JOBS_PER_FILE}"
echo "Jobs handled per Node: ${JOBS_PER_NODE}"
echo "This node will handle global job indices: ${START_JOB_IDX} to ${END_JOB_IDX}"
echo "=================================================="
# Create a directory for log files if it doesn't exist
mkdir -p logs
GLOBAL_JOB_COUNTER=0
echo "--> Processing JSONL file: ${input_dir}/chunk.${JSONL_IDX}.jsonl"
for global_job_idx in $(seq $START_JOB_IDX $END_JOB_IDX); do
JSONL_IDX=$(( (global_job_idx / TOTAL_JOBS_PER_FILE) + 1 ))
job_index=$(( global_job_idx % TOTAL_JOBS_PER_FILE ))
GPU_IDX=$(( GLOBAL_JOB_COUNTER % NUM_GPUS_PER_NODE ))
echo " Launching job ${job_index} for file ${JSONL_IDX} on GPU ${GPU_IDX} (Global Job #${GLOBAL_JOB_COUNTER})..."
if [ "$MODE" == "split" ]; then
CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_split.py \
--input_file /mnt/hdfs/user/linzheng/data/${input_dir}/chunk.${JSONL_IDX}.jsonl \
--output_dir /mnt/hdfs/user/linzheng/data/${splits_dir} \
--entropy_model_path $entropy_model_path \
--compression_model_path $compression_model_path \
--data_batch_size 256 \
--max_entropy_batch_size 256 \
--num_workers 1 \
--process_id ${job_index} \
--num_processes ${TOTAL_JOBS_PER_FILE} \
--base_global_quantile ${ENTROPY_QUANTILE} \
--base_monotonic_quantile ${ENTROPY_QUANTILE} \
$SPLIT_ARGS > "logs/split_node${NODE_ID}_jsonl${JSONL_IDX}_process${job_index}.log" 2>&1 &
elif [ "$MODE" == "compress" ]; then
CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_compress_ac.py \
--input_file /mnt/hdfs/user/linzheng/data/${splits_dir}/chunk.${JSONL_IDX}.jsonl \
--output_dir /mnt/hdfs/user/linzheng/data/${compress_dir} \
--entropy_model_path $entropy_model_path \
--compression_model_path $compression_model_path \
--firstbyte_prob_path /mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/ac_unigram_probs/opencoder13G_unigram_prob_smooth0.1.json \
--data_batch_size 512 --max_compression_batch_size 256 \
--output_window_size ${OUTPUTWINDOW} ${ADDITIONAL_ARG} \
--num_workers 3 --process_id $job_index --num_processes $TOTAL_JOBS_PER_FILE > "logs/compress_node${NODE_ID}_jsonl${JSONL_IDX}_process${job_index}.log" 2>&1 &
else
echo "Error: Unknown mode '$MODE'."
echo "Available modes: split, compress"
exit 1
fi
# Increment the global counter for the next job
GLOBAL_JOB_COUNTER=$(( GLOBAL_JOB_COUNTER + 1 ))
done
wait
cat logs/compress_node${NODE_ID}_jsonl${START_JSONL_IDX}_process0.log
echo "=================================================="
echo "Final check for errors in all log files on Node ${NODE_ID}..."
if grep -q -E 'Error|Traceback|failed|error' logs/*.log; then
echo "❌ Failure: An error keyword was found in one or more log files on this node."
grep -E 'Error|Traceback|failed|error' logs/*.log
exit 1
else
echo "✅ Success: No errors found in logs on Node ${NODE_ID}."
fi
echo ""
echo "All jobs on Node ${NODE_ID} have successfully finished."
echo "=================================================="