| #!/bin/bash |
|
|
| MODE=${1:-"split"} |
| OUTPUTWINDOW=${2:-16} |
| ITERATIVE_COMPRESS=${3:-"true"} |
| splits_dir=${4:-"ocpython_subsampled_50G_entropy90_splits_line"} |
| FORCE_PADDING=${5:-"true"} |
| SPLIT_CHUNK_SIZE=${6:-"lines"} |
| GLOBAL_NODE_OFFSET=${7:-0} |
|
|
| ENTROPY_QUANTILE=0.90 |
| NUM_GPUS_PER_NODE=$ARNOLD_WORKER_GPU |
| NODE_ID=$(( ARNOLD_ID + GLOBAL_NODE_OFFSET )) |
|
|
| if [ "$MODE" == "split" ]; then |
| JOBS_PER_GPU=1 |
| elif [ "$MODE" == "compress" ]; then |
| JOBS_PER_GPU=2 |
| else |
| echo "Error: Unknown mode '$MODE'." |
| echo "Available modes: split, compress" |
| exit 1 |
| fi |
|
|
| |
| TOTAL_JSONL_FILES=16 |
| TOTAL_JOBS_PER_FILE=64 |
| TOTAL_JOBS=$(( TOTAL_JOBS_PER_FILE * TOTAL_JSONL_FILES )) |
| JOBS_PER_NODE=$(( JOBS_PER_GPU * NUM_GPUS_PER_NODE )) |
| GLOBAL_TOTAL_NODES=$(( (TOTAL_JSONL_FILES * TOTAL_JOBS_PER_FILE) / JOBS_PER_NODE )) |
|
|
| |
| |
| START_JOB_IDX=$(( NODE_ID * JOBS_PER_NODE )) |
| END_JOB_IDX=$(( START_JOB_IDX + JOBS_PER_NODE - 1 )) |
|
|
| |
| if [ $END_JOB_IDX -ge $TOTAL_JOBS ]; then |
| END_JOB_IDX=$(( TOTAL_JOBS - 1 )) |
| fi |
|
|
| if [[ $SPLIT_CHUNK_SIZE == "lines" ]]; then |
| SPLIT_ARGS="--chunk_size 128 --apply_line_split" |
| else |
| SPLIT_ARGS="--chunk_size $SPLIT_CHUNK_SIZE" |
| fi |
|
|
| if [[ $ITERATIVE_COMPRESS == "false" ]]; then |
| ADDITIONAL_ARG="" |
| elif [[ $ITERATIVE_COMPRESS == "true" ]]; then |
| ADDITIONAL_ARG="--iterative_compress" |
| else |
| echo "Error: Unknown arg '$ITERATIVE_COMPRESS'." |
| echo "Available values: false, true" |
| exit 1 |
| fi |
|
|
| if [[ $FORCE_PADDING == "false" ]]; then |
| ADDITIONAL_ARG=${ADDITIONAL_ARG}" " |
| elif [[ $FORCE_PADDING == "true" ]]; then |
| ADDITIONAL_ARG=${ADDITIONAL_ARG}" --force_padding_to_threshold" |
| else |
| echo "Error: Unknown arg '$FORCE_PADDING'." |
| echo "Available values: false, true" |
| exit 1 |
| fi |
|
|
| compress_dir=${splits_dir}"_ow${OUTPUTWINDOW}_iterative-${ITERATIVE_COMPRESS}_forcepadding-${FORCE_PADDING}_ac" |
|
|
| |
| input_dir="opencoder" |
| entropy_model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000 |
| compression_model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000 |
|
|
| echo "==================================================" |
| echo "Starting processing on Node ${NODE_ID} of ${GLOBAL_TOTAL_NODES}" |
| echo "Node GPU Count: ${NUM_GPUS_PER_NODE}" |
| echo "Total Jobs per File: ${TOTAL_JOBS_PER_FILE}" |
| echo "Jobs handled per Node: ${JOBS_PER_NODE}" |
| echo "This node will handle global job indices: ${START_JOB_IDX} to ${END_JOB_IDX}" |
| echo "==================================================" |
|
|
| |
| mkdir -p logs |
|
|
| GLOBAL_JOB_COUNTER=0 |
| echo "--> Processing JSONL file: ${input_dir}/chunk.${JSONL_IDX}.jsonl" |
| for global_job_idx in $(seq $START_JOB_IDX $END_JOB_IDX); do |
| JSONL_IDX=$(( (global_job_idx / TOTAL_JOBS_PER_FILE) + 1 )) |
| job_index=$(( global_job_idx % TOTAL_JOBS_PER_FILE )) |
|
|
| GPU_IDX=$(( GLOBAL_JOB_COUNTER % NUM_GPUS_PER_NODE )) |
| echo " Launching job ${job_index} for file ${JSONL_IDX} on GPU ${GPU_IDX} (Global Job #${GLOBAL_JOB_COUNTER})..." |
| if [ "$MODE" == "split" ]; then |
| CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_split.py \ |
| --input_file /mnt/hdfs/user/linzheng/data/${input_dir}/chunk.${JSONL_IDX}.jsonl \ |
| --output_dir /mnt/hdfs/user/linzheng/data/${splits_dir} \ |
| --entropy_model_path $entropy_model_path \ |
| --compression_model_path $compression_model_path \ |
| --data_batch_size 256 \ |
| --max_entropy_batch_size 256 \ |
| --num_workers 1 \ |
| --process_id ${job_index} \ |
| --num_processes ${TOTAL_JOBS_PER_FILE} \ |
| --base_global_quantile ${ENTROPY_QUANTILE} \ |
| --base_monotonic_quantile ${ENTROPY_QUANTILE} \ |
| $SPLIT_ARGS > "logs/split_node${NODE_ID}_jsonl${JSONL_IDX}_process${job_index}.log" 2>&1 & |
| elif [ "$MODE" == "compress" ]; then |
| CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_compress_ac.py \ |
| --input_file /mnt/hdfs/user/linzheng/data/${splits_dir}/chunk.${JSONL_IDX}.jsonl \ |
| --output_dir /mnt/hdfs/user/linzheng/data/${compress_dir} \ |
| --entropy_model_path $entropy_model_path \ |
| --compression_model_path $compression_model_path \ |
| --firstbyte_prob_path /mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/ac_unigram_probs/opencoder13G_unigram_prob_smooth0.1.json \ |
| --data_batch_size 512 --max_compression_batch_size 256 \ |
| --output_window_size ${OUTPUTWINDOW} ${ADDITIONAL_ARG} \ |
| --num_workers 3 --process_id $job_index --num_processes $TOTAL_JOBS_PER_FILE > "logs/compress_node${NODE_ID}_jsonl${JSONL_IDX}_process${job_index}.log" 2>&1 & |
| else |
| echo "Error: Unknown mode '$MODE'." |
| echo "Available modes: split, compress" |
| exit 1 |
| fi |
|
|
| |
| GLOBAL_JOB_COUNTER=$(( GLOBAL_JOB_COUNTER + 1 )) |
|
|
| done |
|
|
| wait |
| cat logs/compress_node${NODE_ID}_jsonl${START_JSONL_IDX}_process0.log |
| echo "==================================================" |
| echo "Final check for errors in all log files on Node ${NODE_ID}..." |
| if grep -q -E 'Error|Traceback|failed|error' logs/*.log; then |
| echo "❌ Failure: An error keyword was found in one or more log files on this node." |
| grep -E 'Error|Traceback|failed|error' logs/*.log |
| exit 1 |
| else |
| echo "✅ Success: No errors found in logs on Node ${NODE_ID}." |
| fi |
| echo "" |
| echo "All jobs on Node ${NODE_ID} have successfully finished." |
| echo "==================================================" |
|
|