Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 179 additions & 30 deletions src/pyinfra/facts/apt.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from pyinfra.api import FactBase

from .gpg import GpgFactBase
from .util import make_cat_files_command


def noninteractive_apt(command: str, force=False):
Expand Down Expand Up @@ -60,44 +59,178 @@ def parse_apt_repo(name):
}


class AptSources(FactBase):
def parse_deb822_stanza(lines: list[str]) -> list[dict[str, object]]:
"""Parse a deb822 style repository stanza.

deb822 sources are key/value pairs separated by blank lines, eg::

Types: deb
URIs: http://deb.debian.org/debian
Suites: bookworm
Components: main contrib
Architectures: amd64
Signed-By: /usr/share/keyrings/debian-archive-keyring.gpg

Returns a list of dicts matching the legacy ``parse_apt_repo`` output so the
rest of pyinfra can remain backwards compatible. A stanza may define
multiple types/URIs/suites which we expand into individual repo dicts.
"""
Returns a list of installed apt sources:

.. code:: python
if not lines:
return []

data: dict[str, str] = {}
for line in lines:
if not line or line.startswith("#"):
continue
# Field-Name: value
try:
key, value = line.split(":", 1)
except ValueError: # malformed line
continue
data[key.strip()] = value.strip()

required = ("Types", "URIs", "Suites")
if not all(field in data for field in required): # not a valid stanza
return []

types = data.get("Types", "").split()
uris = data.get("URIs", "").split()
suites = data.get("Suites", "").split()
components = data.get("Components", "").split()

# Map deb822 specific fields to legacy option names
options: dict[str, object] = {}
if architectures := data.get("Architectures"):
archs = architectures.split()
if archs:
options["arch"] = archs if len(archs) > 1 else archs[0]
if signed_by := data.get("Signed-By"):
signed = signed_by.split()
options["signed-by"] = signed if len(signed) > 1 else signed[0]
if trusted := data.get("Trusted"):
options["trusted"] = trusted.lower()

repos = []
# Produce combinations – in most real-world cases these will each be one.
for _type in types or ["deb"]:
for uri in uris:
for suite in suites:
repos.append(
{
"options": dict(options), # copy per entry
"type": _type,
"url": uri,
"distribution": suite,
"components": components,
}
)
return repos


def parse_apt_list_file(lines: list[str]) -> list[dict[str, object]]:
"""Parse legacy .list style apt source file.

Each non-comment, non-empty line is a discrete repository definition in the
traditional ``deb http://... suite components`` syntax.
Returns a list of repo dicts (may be empty).
"""
repos = []
for raw in lines:
line = raw.strip()
if not line or line.startswith("#"):
continue
repo = parse_apt_repo(line)
if repo:
repos.append(repo)
return repos


def parse_deb822_sources_file(
lines: list[str],
) -> list[dict[str, object]]:
"""Parse a full deb822 ``.sources`` file.

Splits on blank lines into stanzas and uses ``parse_deb822_stanza`` for each
stanza. Returns a combined list of repo dicts for all stanzas.
"""
repos = []
stanza: list[str] = []
for raw in lines + [""]: # sentinel blank line to flush last stanza
line = raw.rstrip("\n")
if line.strip() == "":
if stanza:
repos.extend(parse_deb822_stanza(stanza))
stanza = []
continue
stanza.append(line)
return repos

[
{
"type": "deb",
"url": "http://archive.ubuntu.org",
"distribution": "trusty",
"components", ["main", "multiverse"],
},
]

class AptSources(FactBase):
"""Returns a list of installed apt sources (legacy .list + deb822 .sources).

Backwards compatible with historical output: a flat list of dicts:

{
"type": "deb",
"url": "http://archive.ubuntu.org",
"distribution": "bookworm",
"components": ["main", "contrib"],
"options": { ... },
}
"""

@override
def command(self) -> str:
return make_cat_files_command(
"/etc/apt/sources.list",
"/etc/apt/sources.list.d/*.list",
# We emit file boundary markers so the parser can select the correct
# parsing function based on filename extension.
return (
"sh -c '"
"for f in "
"/etc/apt/sources.list "
"/etc/apt/sources.list.d/*.list "
"/etc/apt/sources.list.d/*.sources; do "
'[ -e "$f" ] || continue; '
'echo "##FILE $f"; '
'cat "$f"; '
"echo; "
"done'"
)

@override
def requires_command(self) -> str:
return "apt" # if apt installed, above should exist
return "apt"

default = list

@override
def process(self, output):
repos = []

for line in output:
repo = parse_apt_repo(line)
if repo:
repos.append(repo)

def process(self, output): # type: ignore[override]
repos: list = []
current_file: str | None = None
buffer: list[str] = []

def flush():
nonlocal buffer, current_file, repos
if current_file is None or not buffer:
buffer = []
return
if current_file.endswith(".sources"):
repos.extend(parse_deb822_sources_file(buffer))
else: # treat anything else as legacy list syntax
repos.extend(parse_apt_list_file(buffer))
buffer = []

for raw_line in output:
if raw_line.startswith("##FILE "):
# New file marker
flush()
current_file = raw_line.split(" ", 1)[1].strip()
continue
buffer.append(raw_line)

# Flush last file
flush()
return repos


Expand All @@ -115,14 +248,30 @@ class AptKeys(GpgFactBase):
}
"""

# This requires both apt-key *and* apt-key itself requires gpg
@override
def command(self) -> str:
return "! command -v gpg || apt-key list --with-colons"

@override
def requires_command(self) -> str:
return "apt-key"
# Prefer not to use deprecated apt-key even if present. Iterate over keyrings
# directly. This maintains backwards compatibility of output with the
# previous implementation which fell back to this method.
return (
"for f in "
" /etc/apt/trusted.gpg "
" /etc/apt/trusted.gpg.d/*.gpg /etc/apt/trusted.gpg.d/*.asc "
" /etc/apt/keyrings/*.gpg /etc/apt/keyrings/*.asc "
" /usr/share/keyrings/*.gpg /usr/share/keyrings/*.asc "
"; do "
' [ -e "$f" ] || continue; '
' case "$f" in '
" *.asc) "
' gpg --batch --show-keys --with-colons --keyid-format LONG "$f" '
" ;; "
" *) "
' gpg --batch --no-default-keyring --keyring "$f" '
" --list-keys --with-colons --keyid-format LONG "
" ;; "
" esac; "
"done"
)


class AptSimulationDict(TypedDict):
Expand Down
Loading
Loading