File size: 6,103 Bytes
72c0672
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/bin/bash

MODE=${1:-"split"}
OUTPUTWINDOW=${2:-16}
ITERATIVE_COMPRESS=${3:-"true"}
splits_dir=${4:-"ocpython_subsampled_50G_entropy90_splits_line"}
FORCE_PADDING=${5:-"true"}
SPLIT_CHUNK_SIZE=${6:-"lines"}
GLOBAL_NODE_OFFSET=${7:-0}

ENTROPY_QUANTILE=0.90
NUM_GPUS_PER_NODE=$ARNOLD_WORKER_GPU
NODE_ID=$(( ARNOLD_ID + GLOBAL_NODE_OFFSET ))

if [ "$MODE" == "split" ]; then
    JOBS_PER_GPU=1
elif [ "$MODE" == "compress" ]; then
    JOBS_PER_GPU=2
else
    echo "Error: Unknown mode '$MODE'."
    echo "Available modes: split, compress"
    exit 1
fi

# Total number of JSONL files to process
TOTAL_JSONL_FILES=16
TOTAL_JOBS_PER_FILE=64 # JOBS_PER_GPU = 2 so 32 GPU process one file
TOTAL_JOBS=$(( TOTAL_JOBS_PER_FILE * TOTAL_JSONL_FILES ))
JOBS_PER_NODE=$(( JOBS_PER_GPU * NUM_GPUS_PER_NODE ))
GLOBAL_TOTAL_NODES=$(( (TOTAL_JSONL_FILES * TOTAL_JOBS_PER_FILE) / JOBS_PER_NODE ))

# Calculate the start and end job indices for THIS specific node.
# These are global indices ranging from 0 to (TOTAL_JOBS_PER_FILE - 1).
START_JOB_IDX=$(( NODE_ID * JOBS_PER_NODE ))
END_JOB_IDX=$(( START_JOB_IDX + JOBS_PER_NODE - 1 ))

# Ensure the last node doesn't try to run more jobs than exist.
if [ $END_JOB_IDX -ge $TOTAL_JOBS ]; then
    END_JOB_IDX=$(( TOTAL_JOBS - 1 ))
fi

if [[ $SPLIT_CHUNK_SIZE == "lines" ]]; then
    SPLIT_ARGS="--chunk_size 128 --apply_line_split"
else
    SPLIT_ARGS="--chunk_size $SPLIT_CHUNK_SIZE"
fi

if [[ $ITERATIVE_COMPRESS == "false" ]]; then
    ADDITIONAL_ARG=""
elif [[ $ITERATIVE_COMPRESS == "true" ]]; then
    ADDITIONAL_ARG="--iterative_compress"
else
    echo "Error: Unknown arg '$ITERATIVE_COMPRESS'."
    echo "Available values: false, true"
    exit 1
fi

if [[ $FORCE_PADDING == "false" ]]; then
    ADDITIONAL_ARG=${ADDITIONAL_ARG}" "
elif [[ $FORCE_PADDING == "true" ]]; then
    ADDITIONAL_ARG=${ADDITIONAL_ARG}" --force_padding_to_threshold"
else
    echo "Error: Unknown arg '$FORCE_PADDING'."
    echo "Available values: false, true"
    exit 1
fi

compress_dir=${splits_dir}"_ow${OUTPUTWINDOW}_iterative-${ITERATIVE_COMPRESS}_forcepadding-${FORCE_PADDING}_ac"

# Directory and model paths
input_dir="opencoder"
entropy_model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000
compression_model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000

echo "=================================================="
echo "Starting processing on Node ${NODE_ID} of ${GLOBAL_TOTAL_NODES}"
echo "Node GPU Count: ${NUM_GPUS_PER_NODE}"
echo "Total Jobs per File: ${TOTAL_JOBS_PER_FILE}"
echo "Jobs handled per Node: ${JOBS_PER_NODE}"
echo "This node will handle global job indices: ${START_JOB_IDX} to ${END_JOB_IDX}"
echo "=================================================="

# Create a directory for log files if it doesn't exist
mkdir -p logs

GLOBAL_JOB_COUNTER=0
    echo "--> Processing JSONL file: ${input_dir}/chunk.${JSONL_IDX}.jsonl"
    for global_job_idx in $(seq $START_JOB_IDX $END_JOB_IDX); do
        JSONL_IDX=$(( (global_job_idx / TOTAL_JOBS_PER_FILE) + 1 ))
        job_index=$(( global_job_idx % TOTAL_JOBS_PER_FILE ))

        GPU_IDX=$(( GLOBAL_JOB_COUNTER % NUM_GPUS_PER_NODE ))
        echo "    Launching job ${job_index} for file ${JSONL_IDX} on GPU ${GPU_IDX} (Global Job #${GLOBAL_JOB_COUNTER})..."
        if [ "$MODE" == "split" ]; then
            CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_split.py \
                --input_file /mnt/hdfs/user/linzheng/data/${input_dir}/chunk.${JSONL_IDX}.jsonl \
                --output_dir /mnt/hdfs/user/linzheng/data/${splits_dir} \
                --entropy_model_path $entropy_model_path \
                --compression_model_path $compression_model_path \
                --data_batch_size 256 \
                --max_entropy_batch_size 256 \
                --num_workers 1 \
                --process_id ${job_index} \
                --num_processes ${TOTAL_JOBS_PER_FILE} \
                --base_global_quantile ${ENTROPY_QUANTILE} \
                --base_monotonic_quantile ${ENTROPY_QUANTILE} \
                $SPLIT_ARGS > "logs/split_node${NODE_ID}_jsonl${JSONL_IDX}_process${job_index}.log" 2>&1 &
        elif [ "$MODE" == "compress" ]; then
            CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_compress_ac.py \
                --input_file /mnt/hdfs/user/linzheng/data/${splits_dir}/chunk.${JSONL_IDX}.jsonl \
                --output_dir /mnt/hdfs/user/linzheng/data/${compress_dir} \
                --entropy_model_path $entropy_model_path \
                --compression_model_path $compression_model_path \
                --firstbyte_prob_path /mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/ac_unigram_probs/opencoder13G_unigram_prob_smooth0.1.json \
                --data_batch_size 512 --max_compression_batch_size 256 \
                --output_window_size ${OUTPUTWINDOW} ${ADDITIONAL_ARG} \
                --num_workers 3 --process_id $job_index --num_processes $TOTAL_JOBS_PER_FILE > "logs/compress_node${NODE_ID}_jsonl${JSONL_IDX}_process${job_index}.log" 2>&1 &
        else
            echo "Error: Unknown mode '$MODE'."
            echo "Available modes: split, compress"
            exit 1
        fi

        # Increment the global counter for the next job
        GLOBAL_JOB_COUNTER=$(( GLOBAL_JOB_COUNTER + 1 ))

    done

wait
cat logs/compress_node${NODE_ID}_jsonl${START_JSONL_IDX}_process0.log
echo "=================================================="
echo "Final check for errors in all log files on Node ${NODE_ID}..."
if grep -q -E 'Error|Traceback|failed|error' logs/*.log; then
    echo "❌ Failure: An error keyword was found in one or more log files on this node."
    grep -E 'Error|Traceback|failed|error' logs/*.log
    exit 1
else
    echo "✅ Success: No errors found in logs on Node ${NODE_ID}."
fi
echo ""
echo "All jobs on Node ${NODE_ID} have successfully finished."
echo "=================================================="