File size: 6,103 Bytes
72c0672 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | #!/bin/bash
MODE=${1:-"split"}
OUTPUTWINDOW=${2:-16}
ITERATIVE_COMPRESS=${3:-"true"}
splits_dir=${4:-"ocpython_subsampled_50G_entropy90_splits_line"}
FORCE_PADDING=${5:-"true"}
SPLIT_CHUNK_SIZE=${6:-"lines"}
GLOBAL_NODE_OFFSET=${7:-0}
ENTROPY_QUANTILE=0.90
NUM_GPUS_PER_NODE=$ARNOLD_WORKER_GPU
NODE_ID=$(( ARNOLD_ID + GLOBAL_NODE_OFFSET ))
if [ "$MODE" == "split" ]; then
JOBS_PER_GPU=1
elif [ "$MODE" == "compress" ]; then
JOBS_PER_GPU=2
else
echo "Error: Unknown mode '$MODE'."
echo "Available modes: split, compress"
exit 1
fi
# Total number of JSONL files to process
TOTAL_JSONL_FILES=16
TOTAL_JOBS_PER_FILE=64 # JOBS_PER_GPU = 2 so 32 GPU process one file
TOTAL_JOBS=$(( TOTAL_JOBS_PER_FILE * TOTAL_JSONL_FILES ))
JOBS_PER_NODE=$(( JOBS_PER_GPU * NUM_GPUS_PER_NODE ))
GLOBAL_TOTAL_NODES=$(( (TOTAL_JSONL_FILES * TOTAL_JOBS_PER_FILE) / JOBS_PER_NODE ))
# Calculate the start and end job indices for THIS specific node.
# These are global indices ranging from 0 to (TOTAL_JOBS_PER_FILE - 1).
START_JOB_IDX=$(( NODE_ID * JOBS_PER_NODE ))
END_JOB_IDX=$(( START_JOB_IDX + JOBS_PER_NODE - 1 ))
# Ensure the last node doesn't try to run more jobs than exist.
if [ $END_JOB_IDX -ge $TOTAL_JOBS ]; then
END_JOB_IDX=$(( TOTAL_JOBS - 1 ))
fi
if [[ $SPLIT_CHUNK_SIZE == "lines" ]]; then
SPLIT_ARGS="--chunk_size 128 --apply_line_split"
else
SPLIT_ARGS="--chunk_size $SPLIT_CHUNK_SIZE"
fi
if [[ $ITERATIVE_COMPRESS == "false" ]]; then
ADDITIONAL_ARG=""
elif [[ $ITERATIVE_COMPRESS == "true" ]]; then
ADDITIONAL_ARG="--iterative_compress"
else
echo "Error: Unknown arg '$ITERATIVE_COMPRESS'."
echo "Available values: false, true"
exit 1
fi
if [[ $FORCE_PADDING == "false" ]]; then
ADDITIONAL_ARG=${ADDITIONAL_ARG}" "
elif [[ $FORCE_PADDING == "true" ]]; then
ADDITIONAL_ARG=${ADDITIONAL_ARG}" --force_padding_to_threshold"
else
echo "Error: Unknown arg '$FORCE_PADDING'."
echo "Available values: false, true"
exit 1
fi
compress_dir=${splits_dir}"_ow${OUTPUTWINDOW}_iterative-${ITERATIVE_COMPRESS}_forcepadding-${FORCE_PADDING}_ac"
# Directory and model paths
input_dir="opencoder"
entropy_model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000
compression_model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000
echo "=================================================="
echo "Starting processing on Node ${NODE_ID} of ${GLOBAL_TOTAL_NODES}"
echo "Node GPU Count: ${NUM_GPUS_PER_NODE}"
echo "Total Jobs per File: ${TOTAL_JOBS_PER_FILE}"
echo "Jobs handled per Node: ${JOBS_PER_NODE}"
echo "This node will handle global job indices: ${START_JOB_IDX} to ${END_JOB_IDX}"
echo "=================================================="
# Create a directory for log files if it doesn't exist
mkdir -p logs
GLOBAL_JOB_COUNTER=0
echo "--> Processing JSONL file: ${input_dir}/chunk.${JSONL_IDX}.jsonl"
for global_job_idx in $(seq $START_JOB_IDX $END_JOB_IDX); do
JSONL_IDX=$(( (global_job_idx / TOTAL_JOBS_PER_FILE) + 1 ))
job_index=$(( global_job_idx % TOTAL_JOBS_PER_FILE ))
GPU_IDX=$(( GLOBAL_JOB_COUNTER % NUM_GPUS_PER_NODE ))
echo " Launching job ${job_index} for file ${JSONL_IDX} on GPU ${GPU_IDX} (Global Job #${GLOBAL_JOB_COUNTER})..."
if [ "$MODE" == "split" ]; then
CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_split.py \
--input_file /mnt/hdfs/user/linzheng/data/${input_dir}/chunk.${JSONL_IDX}.jsonl \
--output_dir /mnt/hdfs/user/linzheng/data/${splits_dir} \
--entropy_model_path $entropy_model_path \
--compression_model_path $compression_model_path \
--data_batch_size 256 \
--max_entropy_batch_size 256 \
--num_workers 1 \
--process_id ${job_index} \
--num_processes ${TOTAL_JOBS_PER_FILE} \
--base_global_quantile ${ENTROPY_QUANTILE} \
--base_monotonic_quantile ${ENTROPY_QUANTILE} \
$SPLIT_ARGS > "logs/split_node${NODE_ID}_jsonl${JSONL_IDX}_process${job_index}.log" 2>&1 &
elif [ "$MODE" == "compress" ]; then
CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_compress_ac.py \
--input_file /mnt/hdfs/user/linzheng/data/${splits_dir}/chunk.${JSONL_IDX}.jsonl \
--output_dir /mnt/hdfs/user/linzheng/data/${compress_dir} \
--entropy_model_path $entropy_model_path \
--compression_model_path $compression_model_path \
--firstbyte_prob_path /mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/ac_unigram_probs/opencoder13G_unigram_prob_smooth0.1.json \
--data_batch_size 512 --max_compression_batch_size 256 \
--output_window_size ${OUTPUTWINDOW} ${ADDITIONAL_ARG} \
--num_workers 3 --process_id $job_index --num_processes $TOTAL_JOBS_PER_FILE > "logs/compress_node${NODE_ID}_jsonl${JSONL_IDX}_process${job_index}.log" 2>&1 &
else
echo "Error: Unknown mode '$MODE'."
echo "Available modes: split, compress"
exit 1
fi
# Increment the global counter for the next job
GLOBAL_JOB_COUNTER=$(( GLOBAL_JOB_COUNTER + 1 ))
done
wait
cat logs/compress_node${NODE_ID}_jsonl${START_JSONL_IDX}_process0.log
echo "=================================================="
echo "Final check for errors in all log files on Node ${NODE_ID}..."
if grep -q -E 'Error|Traceback|failed|error' logs/*.log; then
echo "❌ Failure: An error keyword was found in one or more log files on this node."
grep -E 'Error|Traceback|failed|error' logs/*.log
exit 1
else
echo "✅ Success: No errors found in logs on Node ${NODE_ID}."
fi
echo ""
echo "All jobs on Node ${NODE_ID} have successfully finished."
echo "=================================================="
|