| import re | |
| PROMPT_UNIT_PREFIX = "#!PROMPT!:" | |
| ENHANCED_PROMPT_PREFIX = "!enhanced!\n" | |
| SPEAKER_OPTIONS_LINE_RE = re.compile(r"^\s*Speaker\s*\d+\s*\{[^{}\n]*\}\s*:", re.IGNORECASE) | |
| DEFAULT_MULTI_PROMPTS_MODE = "PG" | |
| def normalize_multi_prompts_mode(value, default="G"): | |
| if value is None: | |
| return default | |
| if isinstance(value, str): | |
| value = value.strip().upper() | |
| elif isinstance(value, (int, float)): | |
| value = int(value) | |
| else: | |
| return default | |
| value = {0: "G", 1: "W", 2: "FG", "": "FG", "0": "G", "1": "W", "2": "FG", "P": "PG"}.get(value, value) | |
| return value if value in {"FG", "G", "PG", "W", "PW"} else default | |
| def get_multi_prompts_gen_choices(medium="Video", include_sliding_window=True): | |
| choices = [ | |
| (f"Each New Line Will Add a new {medium} Request to the Generation Queue", "G"), | |
| (f"Each new Paragraph separated by an Empty Line Will Add a new {medium} Request to the Generation Queue", "PG"), | |
| ] | |
| if include_sliding_window: | |
| choices += [ | |
| ("Each Line Will be used for a new Sliding Window of the same Video Generation", "W"), | |
| ("Each Paragraph Separated by an Empty line will be used for a new Sliding Window of the same Video Generation", "PW"), | |
| ] | |
| choices += [("All the Lines are Part of the Same Prompt", "FG")] | |
| return choices | |
| def split_prompt_units(prompt_text, multi_prompts_gen_type, single_prompt=False, originals=False): | |
| multi_prompts_gen_type = multi_prompts_gen_type or "" | |
| prompt_text = prompt_text.replace("\r\n", "\n").replace("\r", "\n") | |
| if prompt_text.startswith(ENHANCED_PROMPT_PREFIX): | |
| prompt_text = prompt_text[len(ENHANCED_PROMPT_PREFIX):] | |
| if originals: | |
| return split_prompt_original_units(prompt_text, multi_prompts_gen_type, single_prompt=single_prompt) | |
| prompt_lines = [line.rstrip() for line in prompt_text.split("\n") if not line.strip().startswith("#")] | |
| prompt_text = "\n".join(prompt_lines).strip() | |
| if not prompt_text: | |
| return [] | |
| if single_prompt or multi_prompts_gen_type == "FG": | |
| return [prompt_text] | |
| if "P" in multi_prompts_gen_type: | |
| prompts, current_lines = [], [] | |
| for raw_line in prompt_text.split("\n"): | |
| if not raw_line.strip(): | |
| if current_lines: | |
| prompts.append("\n".join(current_lines).strip()) | |
| current_lines = [] | |
| continue | |
| current_lines.append(raw_line) | |
| if current_lines: | |
| prompts.append("\n".join(current_lines).strip()) | |
| return prompts | |
| return [one_line.strip() for one_line in prompt_text.split("\n") if one_line.strip()] | |
| def serialize_prompt_units(prompt_text, prompts, multi_prompts_gen_type): | |
| prompt_text = prompt_text.replace("\r\n", "\n").replace("\r", "\n") | |
| if prompt_text.startswith(ENHANCED_PROMPT_PREFIX): | |
| prompt_text = prompt_text[len(ENHANCED_PROMPT_PREFIX):] | |
| prompt_text = prompt_text.strip() | |
| prompts = [prompt.strip() for prompt in prompts if prompt.strip()] | |
| if not prompts: | |
| return "" | |
| return prompts[0] if multi_prompts_gen_type == "FG" else ("\n\n" if "P" in multi_prompts_gen_type else "\n").join(prompts) | |
| def split_prompt_original_units(prompt_text, multi_prompts_gen_type, single_prompt=False): | |
| multi_prompts_gen_type = multi_prompts_gen_type or "" | |
| prompt_text = prompt_text.replace("\r\n", "\n").replace("\r", "\n") | |
| if prompt_text.startswith(ENHANCED_PROMPT_PREFIX): | |
| prompt_text = prompt_text[len(ENHANCED_PROMPT_PREFIX):] | |
| def split_marker(line): | |
| return line[len(PROMPT_UNIT_PREFIX):].strip() if line.startswith(PROMPT_UNIT_PREFIX) else None | |
| if single_prompt or multi_prompts_gen_type == "FG": | |
| originals, visible_lines = [], [] | |
| for raw_line in prompt_text.split("\n"): | |
| marker = split_marker(raw_line) | |
| if marker is not None: | |
| if marker: | |
| originals.append(marker) | |
| continue | |
| if not raw_line.strip().startswith("#"): | |
| visible_lines.append(raw_line.rstrip()) | |
| visible_prompt = "\n".join(visible_lines).strip() | |
| prompt = "\n".join(originals) if originals else visible_prompt | |
| return [prompt] if prompt else [] | |
| if "P" in multi_prompts_gen_type: | |
| prompts, current_lines, current_original = [], [], None | |
| def flush_paragraph(): | |
| nonlocal current_lines, current_original | |
| visible_prompt = "\n".join(current_lines).strip() | |
| prompt = current_original or visible_prompt | |
| if prompt: | |
| prompts.append(prompt) | |
| current_lines, current_original = [], None | |
| for raw_line in prompt_text.split("\n"): | |
| marker = split_marker(raw_line) | |
| if marker is not None: | |
| if current_lines or current_original: | |
| flush_paragraph() | |
| current_original = marker or current_original | |
| continue | |
| if not raw_line.strip(): | |
| flush_paragraph() | |
| continue | |
| if raw_line.strip().startswith("#"): | |
| continue | |
| current_lines.append(raw_line.rstrip()) | |
| flush_paragraph() | |
| return prompts | |
| prompts, pending_original = [], None | |
| for raw_line in prompt_text.split("\n"): | |
| marker = split_marker(raw_line) | |
| if marker is not None: | |
| if pending_original: | |
| prompts.append(pending_original) | |
| pending_original = marker or pending_original | |
| continue | |
| if not raw_line.strip() or raw_line.strip().startswith("#"): | |
| continue | |
| prompts.append(pending_original or raw_line.rstrip().strip()) | |
| pending_original = None | |
| if pending_original: | |
| prompts.append(pending_original) | |
| return prompts | |
| def serialize_prompt_blocks_with_prefix(prompts, original_prompts=None): | |
| blocks = [] | |
| prompts = [prompt.strip() for prompt in prompts if prompt.strip()] | |
| if original_prompts is None: | |
| original_prompts = [] | |
| for idx, prompt in enumerate(prompts, start=1): | |
| original_prompt = original_prompts[idx - 1] if idx - 1 < len(original_prompts) else f"Prompt {idx}" | |
| original_prompt = re.sub(r"[\r\n]+", " ", str(original_prompt or "")).strip() | |
| blocks.append(f"{PROMPT_UNIT_PREFIX} {original_prompt}\n{prompt}") | |
| return "\n\n".join(blocks) | |
| def is_speaker_options_line(line): | |
| return SPEAKER_OPTIONS_LINE_RE.search(line or "") is not None | |
| def process_template(input_text, keep_comments=False, keep_empty_lines=False): | |
| """ | |
| Process a text template with macro instructions and variable substitution. | |
| Supports multiple values for variables to generate multiple output versions. | |
| Each section between macro lines is treated as a separate template. | |
| Args: | |
| input_text (str): The input template text | |
| Returns: | |
| tuple: (output_text, error_message) | |
| - output_text: Processed output with variables substituted, or empty string if error | |
| - error_message: Error description and problematic line, or empty string if no error | |
| """ | |
| normalized_input = str(input_text or "").replace("\r\n", "\n").replace("\r", "\n") | |
| lines = normalized_input.split("\n") if keep_empty_lines else normalized_input.strip().split("\n") | |
| current_variables = {} | |
| current_template_lines = [] | |
| all_output_lines = [] | |
| error_message = "" | |
| # Process the input line by line | |
| line_number = 0 | |
| while line_number < len(lines): | |
| orig_line = lines[line_number] | |
| line = orig_line.strip() | |
| line_number += 1 | |
| # Skip empty lines or comments | |
| if not line: | |
| if keep_empty_lines: | |
| current_template_lines.append("") | |
| continue | |
| if line.startswith('#') and not keep_comments: | |
| continue | |
| # Handle macro instructions | |
| if line.startswith('!'): | |
| # Process any accumulated template lines before starting a new macro | |
| if current_template_lines: | |
| # Process the current template with current variables | |
| template_output, err = process_current_template(current_template_lines, current_variables) | |
| if err: | |
| return "", err | |
| all_output_lines.extend(template_output) | |
| current_template_lines = [] # Reset template lines | |
| # Reset variables for the new macro | |
| current_variables = {} | |
| # Parse the macro line | |
| macro_line = line[1:].strip() | |
| # Check for unmatched braces in the whole line | |
| open_braces = macro_line.count('{') | |
| close_braces = macro_line.count('}') | |
| if open_braces != close_braces: | |
| error_message = f"Unmatched braces: {open_braces} opening '{{' and {close_braces} closing '}}' braces\nLine: '{orig_line}'" | |
| return "", error_message | |
| # Check for unclosed quotes | |
| if macro_line.count('"') % 2 != 0: | |
| error_message = f"Unclosed double quotes\nLine: '{orig_line}'" | |
| return "", error_message | |
| # Split by optional colon separator | |
| var_sections = re.split(r'\s*:\s*', macro_line) | |
| for section in var_sections: | |
| section = section.strip() | |
| if not section: | |
| continue | |
| # Extract variable name | |
| var_match = re.search(r'\{([^}]+)\}', section) | |
| if not var_match: | |
| if '{' in section or '}' in section: | |
| error_message = f"Malformed variable declaration\nLine: '{orig_line}'" | |
| return "", error_message | |
| continue | |
| var_name = var_match.group(1).strip() | |
| if not var_name: | |
| error_message = f"Empty variable name\nLine: '{orig_line}'" | |
| return "", error_message | |
| # Check variable value format | |
| value_part = section[section.find('}')+1:].strip() | |
| if not value_part.startswith('='): | |
| error_message = f"Missing '=' after variable '{{{var_name}}}'\nLine: '{orig_line}'" | |
| return "", error_message | |
| # Extract all quoted values | |
| var_values = re.findall(r'"([^"]*)"', value_part) | |
| # Check if there are values specified | |
| if not var_values: | |
| error_message = f"No quoted values found for variable '{{{var_name}}}'\nLine: '{orig_line}'" | |
| return "", error_message | |
| # Check for missing commas between values | |
| # Look for patterns like "value""value" (missing comma) | |
| if re.search(r'"[^,]*"[^,]*"', value_part): | |
| error_message = f"Missing comma between values for variable '{{{var_name}}}'\nLine: '{orig_line}'" | |
| return "", error_message | |
| # Store the variable values | |
| current_variables[var_name] = var_values | |
| # Handle template lines | |
| else: | |
| if not line.startswith('#') and not is_speaker_options_line(line): | |
| # Check for unknown variables in template line | |
| var_references = re.findall(r'\{([^}]+)\}', line) | |
| for var_ref in var_references: | |
| if var_ref not in current_variables: | |
| error_message = f"Unknown variable '{{{var_ref}}}' in template\nLine: '{orig_line}'" | |
| return "", error_message | |
| # Add to current template lines | |
| current_template_lines.append(line) | |
| # Process any remaining template lines | |
| if current_template_lines: | |
| template_output, err = process_current_template(current_template_lines, current_variables) | |
| if err: | |
| return "", err | |
| all_output_lines.extend(template_output) | |
| return '\n'.join(all_output_lines), "" | |
| def process_current_template(template_lines, variables): | |
| """ | |
| Process a set of template lines with the current variables. | |
| Args: | |
| template_lines (list): List of template lines to process | |
| variables (dict): Dictionary of variable names to lists of values | |
| Returns: | |
| tuple: (output_lines, error_message) | |
| """ | |
| if not variables or not template_lines: | |
| return template_lines, "" | |
| output_lines = [] | |
| # Find the maximum number of values for any variable | |
| max_values = max(len(values) for values in variables.values()) | |
| # Generate each combination | |
| for i in range(max_values): | |
| for template in template_lines: | |
| output_line = template | |
| for var_name, var_values in variables.items(): | |
| # Use modulo to cycle through values if needed | |
| value_index = i % len(var_values) | |
| var_value = var_values[value_index] | |
| output_line = output_line.replace(f"{{{var_name}}}", var_value) | |
| output_lines.append(output_line) | |
| return output_lines, "" | |
| def extract_variable_names(macro_line): | |
| """ | |
| Extract all variable names from a macro line. | |
| Args: | |
| macro_line (str): A macro line (with or without the leading '!') | |
| Returns: | |
| tuple: (variable_names, error_message) | |
| - variable_names: List of variable names found in the macro | |
| - error_message: Error description if any, empty string if no error | |
| """ | |
| # Remove leading '!' if present | |
| if macro_line.startswith('!'): | |
| macro_line = macro_line[1:].strip() | |
| variable_names = [] | |
| # Check for unmatched braces | |
| open_braces = macro_line.count('{') | |
| close_braces = macro_line.count('}') | |
| if open_braces != close_braces: | |
| return [], f"Unmatched braces: {open_braces} opening '{{' and {close_braces} closing '}}' braces" | |
| # Split by optional colon separator | |
| var_sections = re.split(r'\s*:\s*', macro_line) | |
| for section in var_sections: | |
| section = section.strip() | |
| if not section: | |
| continue | |
| # Extract variable name | |
| var_matches = re.findall(r'\{([^}]+)\}', section) | |
| for var_name in var_matches: | |
| new_var = var_name.strip() | |
| if not new_var in variable_names: | |
| variable_names.append(new_var) | |
| return variable_names, "" | |
| def extract_variable_values(macro_line): | |
| """ | |
| Extract all variable names and their values from a macro line. | |
| Args: | |
| macro_line (str): A macro line (with or without the leading '!') | |
| Returns: | |
| tuple: (variables_dict, error_message) | |
| - variables_dict: Dictionary mapping variable names to their values | |
| - error_message: Error description if any, empty string if no error | |
| """ | |
| # Remove leading '!' if present | |
| if macro_line.startswith('!'): | |
| macro_line = macro_line[1:].strip() | |
| variables = {} | |
| # Check for unmatched braces | |
| open_braces = macro_line.count('{') | |
| close_braces = macro_line.count('}') | |
| if open_braces != close_braces: | |
| return {}, f"Unmatched braces: {open_braces} opening '{{' and {close_braces} closing '}}' braces" | |
| # Check for unclosed quotes | |
| if macro_line.count('"') % 2 != 0: | |
| return {}, "Unclosed double quotes" | |
| # Split by optional colon separator | |
| var_sections = re.split(r'\s*:\s*', macro_line) | |
| for section in var_sections: | |
| section = section.strip() | |
| if not section: | |
| continue | |
| # Extract variable name | |
| var_match = re.search(r'\{([^}]+)\}', section) | |
| if not var_match: | |
| if '{' in section or '}' in section: | |
| return {}, "Malformed variable declaration" | |
| continue | |
| var_name = var_match.group(1).strip() | |
| if not var_name: | |
| return {}, "Empty variable name" | |
| # Check variable value format | |
| value_part = section[section.find('}')+1:].strip() | |
| if not value_part.startswith('='): | |
| return {}, f"Missing '=' after variable '{{{var_name}}}'" | |
| # Extract all quoted values | |
| var_values = re.findall(r'"([^"]*)"', value_part) | |
| # Check if there are values specified | |
| if not var_values: | |
| return {}, f"No quoted values found for variable '{{{var_name}}}'" | |
| # Check for missing commas between values | |
| if re.search(r'"[^,]*"[^,]*"', value_part): | |
| return {}, f"Missing comma between values for variable '{{{var_name}}}'" | |
| variables[var_name] = var_values | |
| return variables, "" | |
| def generate_macro_line(variables_dict): | |
| """ | |
| Generate a macro line from a dictionary of variable names and their values. | |
| Args: | |
| variables_dict (dict): Dictionary mapping variable names to lists of values | |
| Returns: | |
| str: A formatted macro line (including the leading '!') | |
| """ | |
| sections = [] | |
| for var_name, values in variables_dict.items(): | |
| # Format each value with quotes | |
| quoted_values = [f'"{value}"' for value in values] | |
| # Join values with commas | |
| values_str = ','.join(quoted_values) | |
| # Create the variable assignment | |
| section = f"{{{var_name}}}={values_str}" | |
| sections.append(section) | |
| # Join sections with a colon and space for readability | |
| return "! " + " : ".join(sections) | |
Xet Storage Details
- Size:
- 18 kB
- Xet hash:
- 17837df63cf9905d9b5306d82ff1af00fbbf257df8c51fa9d18fee9db8bb3d3d
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.