diff --git a/src/antsibull_docs/ansible_output/__init__.py b/src/antsibull_docs/ansible_output/__init__.py new file mode 100644 index 00000000..6d1bb5e2 --- /dev/null +++ b/src/antsibull_docs/ansible_output/__init__.py @@ -0,0 +1,4 @@ +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or +# https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later +# SPDX-FileCopyrightText: 2025, Ansible Project diff --git a/src/antsibull_docs/ansible_output/load.py b/src/antsibull_docs/ansible_output/load.py new file mode 100644 index 00000000..b09ff355 --- /dev/null +++ b/src/antsibull_docs/ansible_output/load.py @@ -0,0 +1,331 @@ +# Author: Felix Fontein +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or +# https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later +# SPDX-FileCopyrightText: 2025, Ansible Project +"""Ansible-output data loading.""" + +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path + +import pydantic +from antsibull_core.logging import log +from antsibull_core.pydantic import get_formatted_error_messages +from antsibull_docutils.rst_code_finder import ( + CodeBlockInfo, + find_code_blocks, + mark_antsibull_code_block, +) +from docutils import nodes +from yaml import MarkedYAMLError + +from sphinx_antsibull_ext.directive_helper import YAMLDirective +from sphinx_antsibull_ext.schemas.ansible_output_data import ( + AnsibleOutputData, + NonRefPostprocessor, + PostprocessorNameRef, +) + +from ..schemas.collection_config import CollectionConfig + +mlog = log.fields(mod=__name__) + + +@dataclass +class Error: + path: Path + line: int | None + col: int | None + message: str + + +@dataclass +class Block: + codeblock: CodeBlockInfo + data: AnsibleOutputData + data_line: int + data_col: int + previous_blocks: list[CodeBlockInfo] + + merged_env: dict[str, str] + merged_postprocessors: list[NonRefPostprocessor] + + +@dataclass +class Environment: + env: dict[str, str] + global_postprocessors: dict[str, NonRefPostprocessor] + + +@dataclass +class FileData: + path: Path + blocks: list[Block] + + +_ANSIBLE_OUTPUT_DATA_IDENTIFIER = "{}[]XXXXXX" + + +class AnsibleOutputDataDirective(YAMLDirective[AnsibleOutputData]): + wrap_as_data = False + schema = AnsibleOutputData + + def _handle_error(self, message: str, from_exc: Exception) -> list[nodes.Node]: + literal = nodes.literal_block("", "") + mark_antsibull_code_block( + literal, + language=_ANSIBLE_OUTPUT_DATA_IDENTIFIER, + line=self.lineno, + other={ + "error": message, + "exception": from_exc, + }, + ) + return [literal] + + def _run(self, content_str: str, content: AnsibleOutputData) -> list[nodes.Node]: + literal = nodes.literal_block(content_str, "") + mark_antsibull_code_block( + literal, + language=_ANSIBLE_OUTPUT_DATA_IDENTIFIER, + line=self.lineno, + other={ + "data": content, + }, + ) + return [literal] + + +_DIRECTIVES = { + "ansible-output-data": AnsibleOutputDataDirective, +} + + +@dataclass +class _AnsibleOutputDataExt: + data: AnsibleOutputData + line: int + col: int + + +def _get_ansible_output_data_error(block: CodeBlockInfo) -> tuple[int, int, str]: + message = block.attributes["antsibull-other-error"] + exc = block.attributes.get("antsibull-other-exception") + line = block.row_offset + 1 + col = block.col_offset + 1 + if isinstance(exc, MarkedYAMLError) and exc.problem_mark: + # YAML's line/column are 0-based + if isinstance(exc.problem_mark.line, int): + line += exc.problem_mark.line + if isinstance(exc.problem_mark.column, int): + col += exc.problem_mark.column + if isinstance(exc, pydantic.ValidationError): + message = ( + "Error while validating ansible-output-data directive's contents:\n" + + "\n".join(get_formatted_error_messages(exc)) + ) + return line, col, message + + +def _compose_block( + codeblock: CodeBlockInfo, + *, + path: Path, + data: _AnsibleOutputDataExt, + previous_blocks: list[CodeBlockInfo], + errors: list[Error], + environment: Environment, +) -> Block: + env = environment.env.copy() + env.update(data.data.env) + postprocessors = [] + for postprocessor in data.data.postprocessors: + if isinstance(postprocessor, PostprocessorNameRef): + ref = postprocessor.name + try: + postprocessor = environment.global_postprocessors[ref] + except KeyError: + errors.append( + Error( + path, + data.line, + data.col, + f"No global postprocessor of name {ref!r} defined", + ) + ) + continue + postprocessors.append(postprocessor) + return Block( + codeblock=codeblock, + data=data.data, + data_line=data.line, + data_col=data.col, + previous_blocks=previous_blocks, + merged_env=env, + merged_postprocessors=postprocessors, + ) + + +def _find_blocks( + *, + content: str, + path: Path, + root: Path | None = None, + errors: list[Error], + environment: Environment, +) -> list[Block]: + blocks: list[Block] = [] + data: _AnsibleOutputDataExt | None = None + previous_blocks: list[CodeBlockInfo] = [] + for block in find_code_blocks( + content, path=path, root_prefix=root, extra_directives=_DIRECTIVES + ): + if block.language == _ANSIBLE_OUTPUT_DATA_IDENTIFIER: + if data is not None: + errors.append( + Error( + path, + data.line, + data.col, + "ansible-output-data directive not used", + ) + ) + if "antsibull-other-data" in block.attributes: + data = _AnsibleOutputDataExt( + data=block.attributes["antsibull-other-data"], + line=block.row_offset + 1, + col=block.col_offset + 1, + ) + if "antsibull-other-error" in block.attributes: + line, col, message = _get_ansible_output_data_error(block) + errors.append(Error(path, line, col, message)) + continue + previous_blocks.append(block) + if data is None: + continue + if block.language != data.data.language: + continue + blocks.append( + _compose_block( + block, + path=path, + data=data, + previous_blocks=previous_blocks[:-1], + errors=errors, + environment=environment, + ) + ) + data = None + if data is not None: + errors.append( + Error( + path, + data.line, + data.col, + "ansible-output-data directive not used", + ) + ) + return blocks + + +def load_blocks_from_content( + content: str, + *, + path: Path, + root: Path | None = None, + errors: list[Error], + environment: Environment, +) -> FileData | None: + """ + Extract blocks from loaded RST file. + + Returns ``FileData`` object, or ``None`` in case a fatal error happened. + + **Note**: must not be used in parallel. + """ + flog = mlog.fields(func="load_blocks_from_content") + flog.notice("Find code blocks in file") + try: + blocks = _find_blocks( + content=content, + path=path, + root=root, + errors=errors, + environment=environment, + ) + except Exception as exc: # pylint: disable=broad-exception-caught + flog.notice("Error while finding code blocks: {}", exc) + errors.append( + Error(path, None, None, f"Error while finding code blocks: {exc}") + ) + return None + + flog.notice("Found {} blocks", len(blocks)) + + return FileData( + path=path, + blocks=blocks, + ) + + +def load_blocks_from_file( + path: Path, + *, + root: Path | None = None, + errors: list[Error], + environment: Environment, +) -> FileData | None: + """ + Extract blocks from RST file on disk. + + Returns ``FileData`` object, or ``None`` in case a fatal error happened. + + **Note**: must not be used in parallel. + """ + flog = mlog.fields(func="load_blocks_from_file") + + flog.notice("Load {}", path) + try: + with open(path, "r", encoding="utf-8") as f: + content = f.read() + except Exception as exc: # pylint: disable=broad-exception-caught + flog.notice("Error while reading content: {}", exc) + errors.append(Error(path, None, None, f"Error while reading content: {exc}")) + return None + + return load_blocks_from_content( + content, + path=path, + root=root, + errors=errors, + environment=environment, + ) + + +def get_environment( + collection_path: Path | None = None, + collection_config: CollectionConfig | None = None, +) -> Environment: + """ + Load/create environment. + """ + flog = mlog.fields(func="get_environment") + env = os.environ.copy() + env.pop("ANSIBLE_FORCE_COLOR", None) + env["NO_COLOR"] = "true" + if collection_path is not None: + collections_path = env.get("ANSIBLE_COLLECTIONS_PATH") or "" + if collections_path: + collections_path = f"{collection_path}:{collections_path}" + else: + collections_path = f"{collection_path}" + env["ANSIBLE_COLLECTIONS_PATH"] = collections_path + postprocessors = {} + if collection_config is not None: + env.update(collection_config.ansible_output.global_env) + postprocessors.update(collection_config.ansible_output.global_postprocessors) + flog.notice("Environment template: {}", env) + return Environment(env=env, global_postprocessors=postprocessors) diff --git a/src/antsibull_docs/ansible_output/process.py b/src/antsibull_docs/ansible_output/process.py new file mode 100644 index 00000000..aab271d7 --- /dev/null +++ b/src/antsibull_docs/ansible_output/process.py @@ -0,0 +1,261 @@ +# Author: Felix Fontein +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or +# https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later +# SPDX-FileCopyrightText: 2025, Ansible Project +"""Ansible-output data processing.""" + +from __future__ import annotations + +import subprocess +from dataclasses import dataclass +from pathlib import Path + +import jinja2 +from antsibull_core.logging import log +from antsibull_docutils.rst_code_finder import ( + CodeBlockInfo, +) +from antsibull_fileutils.tempfile import AnsibleTemporaryDirectory + +from sphinx_antsibull_ext.schemas.ansible_output_data import ( + AnsibleOutputData, + NonRefPostprocessor, + PostprocessorCLI, + VariableSource, + VariableSourceCodeBlock, + VariableSourceValue, +) + +from .load import Block, Error + +mlog = log.fields(mod=__name__) + + +def _get_variable_value( + *, key: str, value: VariableSource, previous_blocks: list[CodeBlockInfo] +) -> str: + if isinstance(value, VariableSourceValue): + return value.value + if not isinstance(value, VariableSourceCodeBlock): + raise AssertionError( # pragma: no cover + "Implementation error: cannot handle {value!r}" + ) + + candidates = [ + block + for block in previous_blocks + if block.language == value.previous_code_block + ] + try: + return candidates[value.previous_code_block_index].content + except IndexError: + raise ValueError( # pylint: disable=raise-missing-from + f"Found {len(candidates)} previous code block(s) of" + f" language {value.previous_code_block!r} for variable {key!r}," + f" which does not allow index {value.previous_code_block_index}" + ) + + +def _compose_playbook( + data: AnsibleOutputData, *, previous_blocks: list[CodeBlockInfo] +) -> str: + if all(s not in data.playbook for s in ("@{%", "@{{", "@{#")): + return data.playbook + + variables = {} + for key, value in data.variables.items(): + variables[key] = _get_variable_value( + key=key, value=value, previous_blocks=previous_blocks + ) + + env = jinja2.Environment( + block_start_string="@{%", + block_end_string="%}@", + variable_start_string="@{{", + variable_end_string="}}@", + comment_start_string="@{#", + comment_end_string="#}@", + trim_blocks=True, + optimized=False, # we use every template once + ) + template = env.from_string(data.playbook) + return template.render(**variables) + + +def _strip_empty_lines(lines: list[str]) -> list[str]: + first = 0 + last = len(lines) + while first < last and not lines[first]: + first += 1 + while first < last and not lines[last - 1]: + last -= 1 + return lines[first:last] + + +def _strip_common_indent(lines: list[str]) -> list[str]: + indent = None + for line in lines: + line_strip = line.lstrip() + if not line_strip: + continue + li = len(line) - len(line_strip) + if indent is None or indent > li: + indent = li + if indent is None: + raise ValueError("Output is empty") + return [line[indent:] for line in lines] + + +def _massage_stdout( + stdout: str, + *, + skip_first_lines: int = 0, + skip_last_lines: int = 0, + prepend_lines: str | None = None, +) -> list[str]: + # Compute result lines + lines = [line.rstrip() for line in stdout.split("\n")] + lines = _strip_empty_lines(lines) + + # Skip lines + if skip_first_lines > 0: + lines = lines[skip_first_lines:] + if skip_last_lines > 0: + lines = lines[:-skip_last_lines] + + # Prepend lines + if prepend_lines: + lines = prepend_lines.split("\n") + lines + + return _strip_common_indent(_strip_empty_lines(lines)) + + +def _apply_postprocessor( + lines: list[str], + *, + env: dict[str, str], + postprocessor: NonRefPostprocessor, +) -> list[str]: + flog = mlog.fields(func="_apply_postprocessor") + + if isinstance(postprocessor, PostprocessorCLI): + flog.notice("Run postprocessor command: {}", postprocessor.command) + try: + result = subprocess.run( + postprocessor.command, + capture_output=True, + input="\n".join(lines) + "\n", + env=env, + check=True, + encoding="utf-8", + ) + except subprocess.CalledProcessError as exc: + raise ValueError( + f"{exc}\nError output:\n{exc.stderr}\n\nStandard output:\n{exc.stdout}" + ) from exc + lines = _massage_stdout(result.stdout) + + return lines + + +def _compute_code_block_content( + block: Block, +) -> list[str]: + flog = mlog.fields(func="_compute_code_block_content") + + flog.notice("Prepare environment") + flog.notice("Environment: {}", block.merged_env) + + flog.notice("Prepare temporary directory") + with AnsibleTemporaryDirectory(prefix="antsibull-docs-output") as directory: + file = directory / "playbook.yml" + flog.notice("Directory: {}; playbook: {}", directory, file) + with open(file, "w", encoding="utf-8") as f: + f.write( + _compose_playbook(block.data, previous_blocks=block.previous_blocks) + ) + + command = ["ansible-playbook", "playbook.yml"] + flog.notice("Run ansible-playbook: {}", command) + try: + result = subprocess.run( + command, + capture_output=True, + cwd=directory, + env=block.merged_env, + check=True, + encoding="utf-8", + ) + except subprocess.CalledProcessError as exc: + raise ValueError( + f"{exc}\nError output:\n{exc.stderr}\n\nStandard output:\n{exc.stdout}" + ) from exc + + flog.notice("Post-process result") + lines = _massage_stdout( + result.stdout, + skip_first_lines=block.data.skip_first_lines, + skip_last_lines=block.data.skip_last_lines, + prepend_lines=block.data.prepend_lines, + ) + for postprocessor in block.merged_postprocessors: + flog.notice("Run post-processor {}", postprocessor) + try: + lines = _apply_postprocessor( + lines, + env=block.merged_env, + postprocessor=postprocessor, + ) + except ValueError as exc: + raise ValueError( + f"Error while running post-processor {postprocessor}:\n{exc}" + ) from exc + return lines + + +@dataclass +class Replacement: + path: Path + codeblock: CodeBlockInfo + new_content: list[str] + + +def compute_replacement( + block: Block, + *, + path: Path, +) -> Replacement | Error | None: + """ + Compute replacement for a block. + + Returns either a ``Replacement`` object, + ``None`` in case the replacement is identical to the current content, + or an ``Error`` object in case the computation failed. + """ + flog = mlog.fields(func="compute_replacement") + + try: + new_content = _compute_code_block_content(block) + except Exception as exc: # pylint: disable=broad-exception-caught + flog.notice("Error while computing replacement: {}", exc) + return Error( + path, + block.data_line, + block.data_col, + f"Error while computing code block's expected contents:\n{exc}", + ) + + flog.notice("Computed replacement: {}", new_content) + + old_content = block.codeblock.content.split("\n") + if old_content and old_content[-1] == "": + old_content.pop() + if new_content == old_content: + return None + + return Replacement( + path=path, + codeblock=block.codeblock, + new_content=new_content, + ) diff --git a/src/antsibull_docs/ansible_output/replace.py b/src/antsibull_docs/ansible_output/replace.py new file mode 100644 index 00000000..1a8c3432 --- /dev/null +++ b/src/antsibull_docs/ansible_output/replace.py @@ -0,0 +1,243 @@ +# Author: Felix Fontein +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or +# https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later +# SPDX-FileCopyrightText: 2025, Ansible Project +"""Ansible-output data replacing.""" + +from __future__ import annotations + +import difflib +import sys +import typing as t +from pathlib import Path + +from antsibull_core.logging import log +from antsibull_docutils.rst_code_finder import ( + CodeBlockInfo, +) + +from .load import Error +from .process import Replacement + +mlog = log.fields(mod=__name__) + + +_COLORS = { + # Regular + "faint": "\033[2m", # faint + "error": "\033[0;31m", # red + "bold": "\033[1m", # bold + # Diff + "unchanged": "\033[2m", # faint + "remove": "\033[0;31m", # red + "add": "\033[0;32m", # green + "hint": "\033[1m", # bold + "omit": "\033[1m", # bold + # Reset + "reset": "\033[0m", +} + +ColorCode = t.Literal[ + "faint", "error", "bold", "unchanged", "remove", "add", "hint", "omit", "reset" +] + + +def colorize(text: str, *, color_code: ColorCode, color: bool) -> str: + if not color: + return text + return f"{_COLORS[color_code]}{text}{_COLORS['reset']}" + + +def _colorize_diff(diff_line: str, *, color: bool) -> str: + if not color: + return diff_line + col: ColorCode | None = None + skip = 0 + if diff_line.startswith("+"): + col = "add" + skip = 1 + if diff_line.startswith("-"): + col = "remove" + skip = 1 + if diff_line.startswith("?"): + col = "hint" + skip = 1 + if diff_line.startswith(" "): + col = "unchanged" + skip = 1 + if diff_line.startswith("["): + col = "omit" + if col is None: + return diff_line + return ( + f"{diff_line[:skip]}{colorize(diff_line[skip:], color_code=col, color=color)}" + ) + + +def convert_replacements_to_errors( + *, + replacements: list[Replacement], + environment_lines: int = 2, + color: bool, +) -> list[Error]: + """ + Given a list of replacements, convert them to ``Error`` objects. + """ + if not replacements: + return [] + d = difflib.Differ() + errors = [] + for replacement in replacements: + old_content = replacement.codeblock.content.rstrip().split("\n") + changes: list[str] = [] + no_changes = 0 + last_change_add_lines = 0 + for change in d.compare(old_content, replacement.new_content): + if change.startswith("?") and change.endswith("\n"): + change = change[:-1] + changes.append(_colorize_diff(change, color=color)) + if change.startswith(" "): + no_changes += 1 + else: + if no_changes > environment_lines + last_change_add_lines + 1: + lines_to_skip = ( + no_changes - environment_lines - last_change_add_lines + ) + changes[ + -lines_to_skip - environment_lines - 1 : -environment_lines - 1 + ] = [ + _colorize_diff( + f"[... {lines_to_skip} lines skipped ...]", color=color + ) + ] + no_changes = 0 + last_change_add_lines = environment_lines + if no_changes > environment_lines + 1: + lines_to_skip = no_changes - environment_lines + changes[-lines_to_skip:] = [ + _colorize_diff(f"[... {lines_to_skip} lines skipped ...]", color=color) + ] + message = "Output would differ:\n" + "\n".join(changes) + errors.append( + Error( + replacement.path, + replacement.codeblock.row_offset + 1, + replacement.codeblock.col_offset + 1, + message, + ) + ) + return errors + + +def _replace( + content_lines: list[str], *, block: CodeBlockInfo, new_content: list[str] +) -> list[str]: + first_line = block.row_offset + last_line = first_line + block.content.count("\n") - 1 + before = content_lines[:first_line] + after = content_lines[last_line + 1 :] + indent = " " * block.col_offset + new_lines = [f"{indent}{line}" if line else "" for line in new_content] + return before + new_lines + after + + +def _apply_replacements_in_content( + content: str, + replacements: list[Replacement], + *, + path: Path, + errors: list[Error], +) -> tuple[str, bool]: + content_lines = content.split("\n") + changed = False + + # Apply replacements sorted back to front + for replacement in sorted( + replacements, key=lambda replacement: -replacement.codeblock.row_offset + ): + if not replacement.codeblock.directly_replacable_in_content: + errors.append( + Error( + path, + replacement.codeblock.row_offset + 1, + replacement.codeblock.col_offset + 1, + "Code block is not replacable", + ) + ) + continue + content_lines = _replace( + content_lines, + block=replacement.codeblock, + new_content=replacement.new_content, + ) + changed = True + + # Ensure trailing newline + if content_lines and content_lines[-1]: + content_lines.append("") + return "\n".join(content_lines), changed + + +def _apply_replacements_in_file( + *, + path: Path, + replacements: list[Replacement], + errors: list[Error], +) -> None: + flog = mlog.fields(func="_apply_replacements_in_file") + + flog.notice("Load {}", path) + try: + with open(path, "r", encoding="utf-8") as f: + content = f.read() + except Exception as exc: # pylint: disable=broad-exception-caught + flog.notice("Error while reading content: {}", exc) + errors.append(Error(path, None, None, f"Error while reading content: {exc}")) + return + + content, changed = _apply_replacements_in_content( + content, replacements, path=path, errors=errors + ) + if not changed: + return + + flog.notice("Write {}", path) + print(f"Write {path}...") + try: + with open(path, "w", encoding="utf-8") as f: + f.write(content) + except Exception as exc: # pylint: disable=broad-exception-caught + flog.notice("Error while writing content: {}", exc) + errors.append(Error(path, None, None, f"Error while writing content: {exc}")) + + +def apply_replacements( + replacements: list[Replacement], + *, + errors: list[Error], +) -> None: + """ + Apply replacements to RST files. + """ + flog = mlog.fields(func="apply_replacements") + + flog.notice("Group replacements by file") + replacements_by_file: dict[Path, list[Replacement]] = {} + for replacement in replacements: + if replacement.path not in replacements_by_file: + replacements_by_file[replacement.path] = [] + replacements_by_file[replacement.path].append(replacement) + + flog.notice("Process {} files", len(replacements_by_file)) + for path, reps in sorted(replacements_by_file.items()): + _apply_replacements_in_file(path=path, replacements=reps, errors=errors) + + +def detect_color(*, force: bool | None = None) -> bool: + """ + Detect whether the output should be colorized. + """ + if force is not None: + return force + return sys.stdout.isatty() diff --git a/src/antsibull_docs/cli/doc_commands/ansible_output.py b/src/antsibull_docs/cli/doc_commands/ansible_output.py index d7ba6679..dcef19c0 100644 --- a/src/antsibull_docs/cli/doc_commands/ansible_output.py +++ b/src/antsibull_docs/cli/doc_commands/ansible_output.py @@ -8,555 +8,38 @@ from __future__ import annotations import asyncio -import difflib import os -import subprocess -import sys from collections.abc import Sequence -from dataclasses import dataclass from pathlib import Path -import jinja2 -import pydantic from antsibull_core.logging import log -from antsibull_core.pydantic import get_formatted_error_messages -from antsibull_docutils.rst_code_finder import ( - CodeBlockInfo, - find_code_blocks, - mark_antsibull_code_block, -) -from antsibull_fileutils.tempfile import AnsibleTemporaryDirectory -from docutils import nodes -from yaml import MarkedYAMLError - -from sphinx_antsibull_ext.directive_helper import YAMLDirective -from sphinx_antsibull_ext.schemas.ansible_output_data import ( - AnsibleOutputData, - Postprocessor, - PostprocessorCLI, - PostprocessorNameRef, - VariableSource, - VariableSourceCodeBlock, - VariableSourceValue, -) from ... import app_context +from ...ansible_output.load import ( + Environment, + Error, + get_environment, + load_blocks_from_file, +) +from ...ansible_output.process import compute_replacement +from ...ansible_output.replace import ( + apply_replacements, + colorize, + convert_replacements_to_errors, + detect_color, +) from ...collection_config import load_collection_config from ...lint_helpers import load_collection_info -from ...schemas.collection_config import CollectionConfig from ...utils.collection_copier import CollectionCopier mlog = log.fields(mod=__name__) -_ANSIBLE_OUTPUT_DATA_IDENTIFIER = "{}[]XXXXXX" - - -class AnsibleOutputDataDirective(YAMLDirective[AnsibleOutputData]): - wrap_as_data = False - schema = AnsibleOutputData - - def _handle_error(self, message: str, from_exc: Exception) -> list[nodes.Node]: - literal = nodes.literal_block("", "") - mark_antsibull_code_block( - literal, - language=_ANSIBLE_OUTPUT_DATA_IDENTIFIER, - line=self.lineno, - other={ - "error": message, - "exception": from_exc, - }, - ) - return [literal] - - def _run(self, content_str: str, content: AnsibleOutputData) -> list[nodes.Node]: - literal = nodes.literal_block(content_str, "") - mark_antsibull_code_block( - literal, - language=_ANSIBLE_OUTPUT_DATA_IDENTIFIER, - line=self.lineno, - other={ - "data": content, - }, - ) - return [literal] - - -_DIRECTIVES = { - "ansible-output-data": AnsibleOutputDataDirective, -} - - -@dataclass -class _AnsibleOutputDataExt: - data: AnsibleOutputData - line: int - col: int - - -def _get_ansible_output_data_error(block: CodeBlockInfo) -> tuple[int, int, str]: - message = block.attributes["antsibull-other-error"] - exc = block.attributes.get("antsibull-other-exception") - line = block.row_offset + 1 - col = block.col_offset + 1 - if isinstance(exc, MarkedYAMLError) and exc.problem_mark: - # YAML's line/column are 0-based - if isinstance(exc.problem_mark.line, int): - line += exc.problem_mark.line - if isinstance(exc.problem_mark.column, int): - col += exc.problem_mark.column - if isinstance(exc, pydantic.ValidationError): - message = ( - "Error while validating ansible-output-data directive's contents:\n" - + "\n".join(get_formatted_error_messages(exc)) - ) - return line, col, message - - -def _find_blocks( - *, - content: str, - path: Path, - root: Path | None = None, - errors: list[tuple[Path, int | None, int | None, str]], -) -> list[tuple[CodeBlockInfo, _AnsibleOutputDataExt, list[CodeBlockInfo]]]: - blocks: list[tuple[CodeBlockInfo, _AnsibleOutputDataExt, list[CodeBlockInfo]]] = [] - data: _AnsibleOutputDataExt | None = None - previous_blocks: list[CodeBlockInfo] = [] - for block in find_code_blocks( - content, path=path, root_prefix=root, extra_directives=_DIRECTIVES - ): - if block.language == _ANSIBLE_OUTPUT_DATA_IDENTIFIER: - if data is not None: - errors.append( - ( - path, - data.line, - data.col, - "ansible-output-data directive not used", - ) - ) - if "antsibull-other-data" in block.attributes: - data = _AnsibleOutputDataExt( - data=block.attributes["antsibull-other-data"], - line=block.row_offset + 1, - col=block.col_offset + 1, - ) - if "antsibull-other-error" in block.attributes: - line, col, message = _get_ansible_output_data_error(block) - errors.append((path, line, col, message)) - continue - previous_blocks.append(block) - if data is None: - continue - if block.language != data.data.language: - continue - blocks.append((block, data, previous_blocks[:-1])) - data = None - if data is not None: - errors.append( - ( - path, - data.line, - data.col, - "ansible-output-data directive not used", - ) - ) - return blocks - - -@dataclass -class Environment: - env: dict[str, str] - global_postprocessors: dict[str, Postprocessor] - - -def _get_variable_value( - *, key: str, value: VariableSource, previous_blocks: list[CodeBlockInfo] -) -> str: - if isinstance(value, VariableSourceValue): - return value.value - if not isinstance(value, VariableSourceCodeBlock): - raise AssertionError( # pragma: no cover - "Implementation error: cannot handle {value!r}" - ) - - candidates = [ - block - for block in previous_blocks - if block.language == value.previous_code_block - ] - try: - return candidates[value.previous_code_block_index].content - except IndexError: - raise ValueError( # pylint: disable=raise-missing-from - f"Found {len(candidates)} previous code block(s) of" - f" language {value.previous_code_block!r} for variable {key!r}," - f" which does not allow index {value.previous_code_block_index}" - ) - - -def _compose_playbook( - data: AnsibleOutputData, *, previous_blocks: list[CodeBlockInfo] -) -> str: - if all(s not in data.playbook for s in ("@{%", "@{{", "@{#")): - return data.playbook - - variables = {} - for key, value in data.variables.items(): - variables[key] = _get_variable_value( - key=key, value=value, previous_blocks=previous_blocks - ) - - env = jinja2.Environment( - block_start_string="@{%", - block_end_string="%}@", - variable_start_string="@{{", - variable_end_string="}}@", - comment_start_string="@{#", - comment_end_string="#}@", - trim_blocks=True, - optimized=False, # we use every template once - ) - template = env.from_string(data.playbook) - return template.render(**variables) - - -def _strip_empty_lines(lines: list[str]) -> list[str]: - first = 0 - last = len(lines) - while first < last and not lines[first]: - first += 1 - while first < last and not lines[last - 1]: - last -= 1 - return lines[first:last] - - -def _strip_common_indent(lines: list[str]) -> list[str]: - indent = None - for line in lines: - line_strip = line.lstrip() - if not line_strip: - continue - li = len(line) - len(line_strip) - if indent is None or indent > li: - indent = li - if indent is None: - raise ValueError("Output is empty") - return [line[indent:] for line in lines] - - -def _massage_stdout( - stdout: str, - *, - skip_first_lines: int = 0, - skip_last_lines: int = 0, - prepend_lines: str | None = None, -) -> list[str]: - # Compute result lines - lines = [line.rstrip() for line in stdout.split("\n")] - lines = _strip_empty_lines(lines) - - # Skip lines - if skip_first_lines > 0: - lines = lines[skip_first_lines:] - if skip_last_lines > 0: - lines = lines[:-skip_last_lines] - - # Prepend lines - if prepend_lines: - lines = prepend_lines.split("\n") + lines - - return _strip_common_indent(_strip_empty_lines(lines)) - - -def _apply_postprocessor( - lines: list[str], - *, - env: dict[str, str], - postprocessor: Postprocessor, - environment: Environment, -) -> list[str]: - flog = mlog.fields(func="_apply_postprocessor") - - if isinstance(postprocessor, PostprocessorNameRef): - ref = postprocessor.name - try: - postprocessor = environment.global_postprocessors[ref] - except KeyError: - raise ValueError( # pylint: disable=raise-missing-from - f"No global postprocessor of name {ref!r} defined" - ) - - if isinstance(postprocessor, PostprocessorCLI): - flog.notice("Run postprocessor command: {}", postprocessor.command) - try: - result = subprocess.run( - postprocessor.command, - capture_output=True, - input="\n".join(lines) + "\n", - env=env, - check=True, - encoding="utf-8", - ) - except subprocess.CalledProcessError as exc: - raise ValueError( - f"{exc}\nError output:\n{exc.stderr}\n\nStandard output:\n{exc.stdout}" - ) from exc - lines = _massage_stdout(result.stdout) - - return lines - - -def _compute_code_block_content( - data: _AnsibleOutputDataExt, - *, - environment: Environment, - previous_blocks: list[CodeBlockInfo], -) -> list[str]: - flog = mlog.fields(func="_compute_code_block_content") - - flog.notice("Prepare environment") - env = environment.env.copy() - env.update(data.data.env) - flog.notice("Environment: {}", env) - - flog.notice("Prepare temporary directory") - with AnsibleTemporaryDirectory(prefix="antsibull-docs-output") as directory: - file = directory / "playbook.yml" - flog.notice("Directory: {}; playbook: {}", directory, file) - with open(file, "w", encoding="utf-8") as f: - f.write(_compose_playbook(data.data, previous_blocks=previous_blocks)) - - command = ["ansible-playbook", "playbook.yml"] - flog.notice("Run ansible-playbook: {}", command) - try: - result = subprocess.run( - command, - capture_output=True, - cwd=directory, - env=env, - check=True, - encoding="utf-8", - ) - except subprocess.CalledProcessError as exc: - raise ValueError( - f"{exc}\nError output:\n{exc.stderr}\n\nStandard output:\n{exc.stdout}" - ) from exc - - flog.notice("Post-process result") - lines = _massage_stdout( - result.stdout, - skip_first_lines=data.data.skip_first_lines, - skip_last_lines=data.data.skip_last_lines, - prepend_lines=data.data.prepend_lines, - ) - for postprocessor in data.data.postprocessors: - flog.notice("Run post-processor {}", postprocessor) - try: - lines = _apply_postprocessor( - lines, - env=env, - postprocessor=postprocessor, - environment=environment, - ) - except ValueError as exc: - raise ValueError( - f"Error while running post-processor {postprocessor}:\n{exc}" - ) from exc - return lines - - -def _replace( - content_lines: list[str], *, block: CodeBlockInfo, new_content: list[str] -) -> list[str]: - first_line = block.row_offset - last_line = first_line + block.content.count("\n") - 1 - before = content_lines[:first_line] - after = content_lines[last_line + 1 :] - indent = " " * block.col_offset - new_lines = [f"{indent}{line}" if line else "" for line in new_content] - return before + new_lines + after - - -def _apply_replacements( - content: str, - replacements: list[tuple[CodeBlockInfo, list[str]]], - *, - path: Path, - errors: list[tuple[Path, int | None, int | None, str]], -) -> tuple[str, bool]: - content_lines = content.split("\n") - changed = False - - # Apply replacements sorted back to front - for block, new_content in sorted( - replacements, key=lambda entry: -entry[0].row_offset - ): - if not block.directly_replacable_in_content: - errors.append( - ( - path, - block.row_offset + 1, - block.col_offset + 1, - "Code block is not replacable", - ) - ) - continue - content_lines = _replace(content_lines, block=block, new_content=new_content) - changed = True - - # Ensure trailing newline - if content_lines and content_lines[-1]: - content_lines.append("") - return "\n".join(content_lines), changed - - -def _compute_replacements( - content: str, - *, - path: Path, - root: Path | None = None, - errors: list[tuple[Path, int | None, int | None, str]], - environment: Environment, -) -> list[tuple[CodeBlockInfo, list[str]]]: - flog = mlog.fields(func="_compute_replacements") - flog.notice("Find code blocks in file") - try: - blocks = _find_blocks(content=content, path=path, root=root, errors=errors) - except Exception as exc: # pylint: disable=broad-exception-caught - flog.notice("Error while finding code blocks: {}", exc) - errors.append((path, None, None, f"Error while finding code blocks: {exc}")) - return [] - - flog.notice("Found {} blocks", len(blocks)) - - replacements: list[tuple[CodeBlockInfo, list[str]]] = [] - for block, block_data, previous_blocks in blocks: - flog.notice("Processing block at line {}", block.row_offset + 1) - try: - new_content = _compute_code_block_content( - block_data, previous_blocks=previous_blocks, environment=environment - ) - except Exception as exc: # pylint: disable=broad-exception-caught - flog.notice("Error while computing code block's expected contents: {}", exc) - errors.append( - ( - path, - block_data.line, - block_data.col, - f"Error while computing code block's expected contents:\n{exc}", - ) - ) - continue - old_content = block.content.split("\n") - if old_content and old_content[-1] == "": - old_content.pop() - if new_content != old_content: - replacements.append((block, new_content)) - return replacements - - -_COLORS = { - # Regular - "faint": "\033[2m", # faint - "error": "\033[0;31m", # red - "bold": "\033[1m", # bold - # Diff - "unchanged": "\033[2m", # faint - "remove": "\033[0;31m", # red - "add": "\033[0;32m", # green - "hint": "\033[1m", # bold - "omit": "\033[1m", # bold - # Reset - "reset": "\033[0m", -} - - -def _colorize(text: str, *, color_code: str, color: bool) -> str: - if not color: - return text - return f"{_COLORS[color_code]}{text}{_COLORS['reset']}" - - -def _colorize_diff(diff_line: str, *, color: bool) -> str: - if not color: - return diff_line - col = None - skip = 0 - if diff_line.startswith("+"): - col = "add" - skip = 1 - if diff_line.startswith("-"): - col = "remove" - skip = 1 - if diff_line.startswith("?"): - col = "hint" - skip = 1 - if diff_line.startswith(" "): - col = "unchanged" - skip = 1 - if diff_line.startswith("["): - col = "omit" - if col is None: - return diff_line - return ( - f"{diff_line[:skip]}{_colorize(diff_line[skip:], color_code=col, color=color)}" - ) - - -def _add_replacements_as_errors( - errors: list[tuple[Path, int | None, int | None, str]], - *, - path: Path, - replacements: list[tuple[CodeBlockInfo, list[str]]], - environment_lines: int = 2, - color: bool, -) -> None: - if not replacements: - return - d = difflib.Differ() - for code_block, new_content in replacements: - old_content = code_block.content.rstrip().split("\n") - changes: list[str] = [] - no_changes = 0 - last_change_add_lines = 0 - for change in d.compare(old_content, new_content): - if change.startswith("?") and change.endswith("\n"): - change = change[:-1] - changes.append(_colorize_diff(change, color=color)) - if change.startswith(" "): - no_changes += 1 - else: - if no_changes > environment_lines + last_change_add_lines + 1: - lines_to_skip = ( - no_changes - environment_lines - last_change_add_lines - ) - changes[ - -lines_to_skip - environment_lines - 1 : -environment_lines - 1 - ] = [ - _colorize_diff( - f"[... {lines_to_skip} lines skipped ...]", color=color - ) - ] - no_changes = 0 - last_change_add_lines = environment_lines - if no_changes > environment_lines + 1: - lines_to_skip = no_changes - environment_lines - changes[-lines_to_skip:] = [ - _colorize_diff(f"[... {lines_to_skip} lines skipped ...]", color=color) - ] - message = "Output would differ:\n" + "\n".join(changes) - errors.append( - (path, code_block.row_offset + 1, code_block.col_offset + 1, message) - ) - - def process_file( path: Path, *, root: Path | None = None, - errors: list[tuple[Path, int | None, int | None, str]], + errors: list[Error], environment: Environment, check: bool, color: bool, @@ -568,53 +51,36 @@ def process_file( """ flog = mlog.fields(func="process_file") - flog.notice("Load {}", path) - try: - with open(path, "r", encoding="utf-8") as f: - content = f.read() - except Exception as exc: # pylint: disable=broad-exception-caught - flog.notice("Error while reading content: {}", exc) - errors.append((path, None, None, f"Error while reading content: {exc}")) + data = load_blocks_from_file( + path, root=root, errors=errors, environment=environment + ) + if not data: return flog.notice("Compute replacements") - replacements = _compute_replacements( - content, - path=path, - root=root, - errors=errors, - environment=environment, - ) - if not replacements: - return + replacements = [] + for block in data.blocks: + replacement = compute_replacement(block, path=path) + if replacement is None: + continue + if isinstance(replacement, Error): + errors.append(replacement) + else: + replacements.append(replacement) if check: - _add_replacements_as_errors( - errors, path=path, replacements=replacements, color=color + errors.extend( + convert_replacements_to_errors(replacements=replacements, color=color) ) return - flog.notice("Do replacements for {}", path) - content, changed = _apply_replacements( - content, replacements, path=path, errors=errors - ) - if not changed: - return - - flog.notice("Write {}", path) - print(f"Write {path}...") - try: - with open(path, "w", encoding="utf-8") as f: - f.write(content) - except Exception as exc: # pylint: disable=broad-exception-caught - flog.notice("Error while writing content: {}", exc) - errors.append((path, None, None, f"Error while writing content: {exc}")) + apply_replacements(replacements, errors=errors) def process_directory( path: Path, *, - errors: list[tuple[Path, int | None, int | None, str]], + errors: list[Error], environment: Environment, check: bool, color: bool, @@ -635,36 +101,7 @@ def process_directory( color=color, ) except Exception as exc: # pylint: disable=broad-exception-caught - errors.append((path, None, None, f"Error while listing files: {exc}")) - - -def get_environment( - collection_path: Path | None = None, - collection_config: CollectionConfig | None = None, -) -> Environment: - flog = mlog.fields(func="get_environment") - env = os.environ.copy() - env.pop("ANSIBLE_FORCE_COLOR", None) - env["NO_COLOR"] = "true" - if collection_path is not None: - collections_path = env.get("ANSIBLE_COLLECTIONS_PATH") or "" - if collections_path: - collections_path = f"{collection_path}:{collections_path}" - else: - collections_path = f"{collection_path}" - env["ANSIBLE_COLLECTIONS_PATH"] = collections_path - postprocessors = {} - if collection_config is not None: - env.update(collection_config.ansible_output.global_env) - postprocessors.update(collection_config.ansible_output.global_postprocessors) - flog.notice("Environment template: {}", env) - return Environment(env=env, global_postprocessors=postprocessors) - - -def detect_color(*, force: bool | None = None) -> bool: - if force is not None: - return force - return sys.stdout.isatty() + errors.append(Error(path, None, None, f"Error while listing files: {exc}")) def check_rst_files( @@ -673,12 +110,12 @@ def check_rst_files( environment: Environment | None = None, check: bool = False, color: bool | None = None, -) -> list[tuple[Path, int | None, int | None, str]]: +) -> list[Error]: if environment is None: environment = get_environment() if color is None: color = detect_color() - errors: list[tuple[Path, int | None, int | None, str]] = [] + errors: list[Error] = [] for path in paths: path_obj = Path(path) if path_obj.is_dir(): @@ -698,12 +135,12 @@ def check_rst_files( color=color, ) else: - errors.append((path_obj, None, None, "Does not exist")) + errors.append(Error(path_obj, None, None, "Does not exist")) return errors def print_errors( - errors: list[tuple[Path, int | None, int | None, str]], + errors: list[Error], *, with_header: bool, color: bool, @@ -711,21 +148,23 @@ def print_errors( if with_header and errors: print() print( - _colorize( + colorize( f"Found {len(errors)} error{'' if len(errors) == 1 else 's'}:", color_code="bold", color=color, ) ) - for error_path, line, col, error in sorted( + for error in sorted( errors, - key=lambda entry: (str(entry[0]), entry[1] or 0, entry[2] or 0, entry[3]), + key=lambda error: (error.path, error.line or 0, error.col or 0, error.message), ): - prefix = _colorize( - f"{error_path}:{line or '-'}:{col or '-'}: ", color_code="bold", color=color + prefix = colorize( + f"{error.path}:{error.line or '-'}:{error.col or '-'}: ", + color_code="bold", + color=color, ) - error_lines = [error_line.rstrip() for error_line in error.split("\n")] - print(f"{prefix}{_colorize(error_lines[0], color_code='error', color=color)}") + error_lines = [error_line.rstrip() for error_line in error.message.split("\n")] + print(f"{prefix}{colorize(error_lines[0], color_code='error', color=color)}") if len(error_lines) > 1: prefix = " " for error_line in error_lines[1:]: @@ -777,9 +216,9 @@ def run_ansible_output() -> int: namespace = info["namespace"] name = info["name"] except Exception: # pylint: disable=broad-exception-caught - errors: list[tuple[Path, int | None, int | None, str]] = [] + errors: list[Error] = [] errors.append( - ( + Error( Path(path_to_collection), None, None, diff --git a/src/antsibull_docs/schemas/collection_config.py b/src/antsibull_docs/schemas/collection_config.py index 8e26a04c..7f6a6207 100644 --- a/src/antsibull_docs/schemas/collection_config.py +++ b/src/antsibull_docs/schemas/collection_config.py @@ -5,24 +5,13 @@ # SPDX-FileCopyrightText: 2023, Ansible Project """Schemas for collection config files.""" -import typing as t - import pydantic as p from sphinx_antsibull_ext.schemas.ansible_output_data import ( - Postprocessor, - PostprocessorNameRef, + NonRefPostprocessor, ) -def _is_not_name_ref(value: Postprocessor) -> Postprocessor: - if isinstance(value, PostprocessorNameRef): - raise ValueError( - "Cannot define name reference postprocessors in collection config" - ) - return value - - class ChangelogConfig(p.BaseModel): # Whether to write the changelog write_changelog: bool = False @@ -33,9 +22,7 @@ class AnsibleOutputConfig(p.BaseModel): global_env: dict[str, str] = {} # Named postprocessors - global_postprocessors: dict[ - str, t.Annotated[Postprocessor, p.AfterValidator(_is_not_name_ref)] - ] = {} + global_postprocessors: dict[str, NonRefPostprocessor] = {} @p.field_validator("global_env", mode="before") @classmethod diff --git a/src/sphinx_antsibull_ext/schemas/ansible_output_data.py b/src/sphinx_antsibull_ext/schemas/ansible_output_data.py index 82347e2b..dee759db 100644 --- a/src/sphinx_antsibull_ext/schemas/ansible_output_data.py +++ b/src/sphinx_antsibull_ext/schemas/ansible_output_data.py @@ -47,6 +47,7 @@ class PostprocessorNameRef(p.BaseModel): Postprocessor = t.Union[PostprocessorCLI, PostprocessorNameRef] +NonRefPostprocessor = t.Union[PostprocessorCLI] class AnsibleOutputData(p.BaseModel): diff --git a/tests/functional/test_ansible_output.py b/tests/functional/test_ansible_output.py index ce4a7c4d..6263fb31 100644 --- a/tests/functional/test_ansible_output.py +++ b/tests/functional/test_ansible_output.py @@ -100,7 +100,7 @@ def subprocess_run( raise AssertionError("should not happen") # pragma: no cover with mock.patch( - "antsibull_docs.cli.doc_commands.ansible_output.subprocess.run", + "antsibull_docs.ansible_output.process.subprocess.run", subprocess_run, ): yield