ansible-freebsd-jailhost-tools/tools/pkgmirror-jailrevalidator.py

443 lines
18 KiB
Python
Executable file

#!/usr/bin/env python3
from argparse import ArgumentParser, FileType, Namespace
from collections import defaultdict, namedtuple
from functools import cached_property
from hashlib import sha256
from io import BufferedReader, BytesIO
from json import load, loads
from logging import DEBUG, basicConfig, getLogger
from subprocess import check_output
from sys import exit, stderr
from tarfile import open as tar_open
from typing import Optional, TypedDict, Union
from urllib.request import Request, urlopen
basicConfig(stream=stderr, level=DEBUG)
logger = getLogger(__name__)
parser = ArgumentParser(
description='Revalidating files on local FreeBSD pkg mirror')
parser.add_argument(
'-v', '--verbose', dest='verbose', action='store_true',
help='Verbose logging')
subparsers = parser.add_subparsers(
title='subcommands',
description='Subcommands direct in which mode the revalidator will run in')
# Parser for live functionality
parser_live = subparsers.add_parser(name='live', help='Live mode')
parser_live.add_argument(
dest='pkgmirror-url', type=str, help='URL of the pkg mirror')
parser_live.add_argument(
dest='jail-root', type=str, help='Path of the jail (chroot)')
parser_live.add_argument(
dest='packages', type=str, help='Space separated list of packages')
parser_live.set_defaults(func=lambda x: live_run(x))
# Parser for the test functionality, without pkgmirror availability
parser_test_wo_pkgmirror_chroot = subparsers.add_parser(
name='wo-pkg-chroot', help=(
'Testing without pkgmirror and chroot availability. Complete ' +
'dry-run.'))
parser_test_wo_pkgmirror_chroot.add_argument(
dest='abi', type=str, help='FreeBSD ABI version (e.g. FreeBSD:13:amd64)')
parser_test_wo_pkgmirror_chroot.add_argument(
dest='pkgtxz', type=FileType(mode='rb'),
help='Path to the packagesite.txz file')
parser_test_wo_pkgmirror_chroot.add_argument(
dest='packages', type=str, help='Space separated list of packages')
parser_test_wo_pkgmirror_chroot.set_defaults(
func=lambda x: test_wo_pkgmirror_chroot(x))
# Parser for the test functionality, WITH pkgmirror availability
parser_test_wo_chroot = subparsers.add_parser(
name='wo-chroot', help=(
'Testing WITH pkgmirror but without chroot availability. Will ' +
'(re)download and refresh packages when necessary.'))
parser_test_wo_chroot.add_argument(
dest='pkgmirror-url', type=str, help='Pkg mirror repo URL')
parser_test_wo_chroot.add_argument(
dest='abi', type=str, help='FreeBSD ABI version (e.g. FreeBSD:13:amd64)')
parser_test_wo_chroot.add_argument(
dest='packages', type=str, help='Space separated list of packages')
parser_test_wo_chroot.set_defaults(func=lambda x: test_wo_chroot(x))
CollectedPackage = namedtuple(
typename='CollectedPackage',
field_names=['name', 'origin', 'version', 'flavor'])
UrlItem = namedtuple(
typename='UrlItem',
field_names=['name', 'origin', 'version', 'url', 'sha256', 'pkgsize'])
FetchedUrl = namedtuple(
typename='FetchedUrl', field_names=['content', 'sha256'])
OriginFlavorDict = defaultdict[Optional[str], dict]
OriginVersionsDict = defaultdict[str, OriginFlavorDict]
OriginDict = defaultdict[str, OriginVersionsDict]
class DepsItemDict(TypedDict):
'A dictionary in the JSON depicting dependencies for one package.'
origin: str
version: str
flavor: Optional[str]
DepsDict = dict[str, DepsItemDict]
class DependencyResolver(object):
'Resolving dependencies to requested packages.'
# name -> version -> dict
_names: defaultdict[str, dict[str, dict]]
# origin -> version -> flavor -> dict
_origins: OriginDict
_lower_names: dict[str, str]
_found_packages: set[CollectedPackage]
_iterations = 0
def __init__(
self, packages_txz_fd: Union[BufferedReader, BytesIO],
verbose: bool, passed_packages: str):
self._packages_txz_fd = packages_txz_fd
self._verbose = verbose
self._passed_packages = set(passed_packages.strip().split())
self._names = defaultdict(dict)
self._origins = \
defaultdict(lambda: defaultdict(lambda: defaultdict(dict)))
self._found_packages = set()
self._lower_names = dict()
def _load_definitions(self, splitted_lines: list[bytes]):
'Load definitions from the splitted lines.'
for line in splitted_lines:
loaded = loads(line)
name = loaded['name']
origin = loaded['origin']
version = loaded['version']
flavor = loaded.get('annotations', {}).get('flavor')
self._names[name][version] = loaded
self._origins[origin][version][flavor] = loaded
self._lower_names = {x.lower(): x for x in self._names}
if self._verbose:
count_names = len(self._names)
count_origins = len(self._origins)
logger.debug(
msg=f'Read info about {count_names} names and {count_origins} ' +
'origins')
def _load_packagefile(self):
'Load the packages from the tempfile.'
with self._packages_txz_fd as fd:
archive = tar_open(mode='r:xz', fileobj=fd)
exfile = archive.extractfile(member='packagesite.yaml')
if exfile is None:
raise FileNotFoundError(
'packagesite.yml not found in packagesite.txz')
splitted_lines = exfile.read().splitlines()
if self._verbose:
logger.debug(
f'Read {len(splitted_lines)} lines from packagesite.yml')
self._load_definitions(splitted_lines=splitted_lines)
def _get_origins_with_flavor(
self, versions: OriginVersionsDict, origin: str,
flavor: Optional[str]) -> set[CollectedPackage]:
"""
When a `@` is not passed but and origin is found for the package
name, evaluate if it is with a `None` flavor or should we
revalidate all flavors.
"""
found_packages = set()
for version, flavors in versions.items():
if flavor in flavors:
# Take a found `None` or specific flavor
loaded = flavors[flavor]
found_packages.add(CollectedPackage(
name=loaded.get('name'), origin=origin, version=version,
flavor=flavor))
elif flavor is None:
# Take ALL flavors
found_packages.update(CollectedPackage(
name=loaded.get('name'), origin=origin, version=version,
flavor=x) for x, loaded in flavors.items())
else:
raise KeyError(f'{flavor!r} of {origin!r} is not found.')
return found_packages
def _resolve_pkgnames_by_name(self):
'Lower all the packagenames and look for the passed name.'
def _resolve_one_passed_packagename(
self, passed_name: str, missing: list[str]):
'Resolve one packagename to one or more versions.'
if name := self._lower_names.get(passed_name):
versions = self._names[name]
for version, loaded in versions.items():
origin = loaded.get('origin')
flavor = loaded.get('annotations', {}).get('flavor')
self._found_packages.add(CollectedPackage(
name=passed_name, origin=origin, version=version,
flavor=flavor))
elif '@' in passed_name:
# Search origins
origin, flavor = passed_name.split(sep='@', maxsplit=1)
is_found = False
if origin in self._origins:
for version, flavors in self._origins[origin].items():
if flavor in flavors:
loaded = flavors[flavor]
self._found_packages.add(CollectedPackage(
name=loaded.get('name'), origin=origin,
version=version, flavor=flavor))
is_found = True
if not is_found:
missing.append(passed_name)
elif versions := self._origins.get(passed_name):
# passed_name is an origin without a flavor, add all flavors
self._found_packages.update(self._get_origins_with_flavor(
versions=versions, origin=passed_name, flavor=None))
else:
missing.append(passed_name)
def _resolve_requested_package_origins(self):
'Resolve the root packages to a set of `CollectedPackage`s.'
missing: list[str] = list()
for passed_name in self._passed_packages:
self._resolve_one_passed_packagename(
passed_name=passed_name, missing=missing)
if missing:
raise KeyError(f'Packages not found: {missing}'.format(
missing=' '.join(missing)))
if self._verbose:
logger.info(
msg='Found {count} requested packages: {origins}'.format(
count=len(self._found_packages),
origins=self._found_packages))
def _resolve_dependencies_in_deps_section(
self, loaded_deps: DepsDict, result_deps: set[CollectedPackage],
missing: list):
'Resolve dependencies from one package\'s `deps` section'
for pkgname, details in loaded_deps.items():
origin = details.get('origin')
version = details.get('version')
flavor = details.get('flavor')
collected = CollectedPackage(
name=pkgname, origin=origin, version=version, flavor=flavor)
if collected in self._found_packages:
continue
if pkgname not in self._names or \
version not in self._names[pkgname]:
missing.append(collected)
continue
result_deps.add(CollectedPackage(
name=pkgname, origin=origin, version=version,
flavor=flavor))
def _add_resolved_dependencies(self, package: CollectedPackage):
'Resolve the dependencies of one collected package.'
self._iterations += 1
loaded = self._names[package.name][package.version]
loaded_deps: Optional[DepsDict] = loaded.get('deps')
result_deps: set[CollectedPackage] = set()
if not loaded_deps:
return result_deps
missing = []
self._resolve_dependencies_in_deps_section(
loaded_deps=loaded_deps, result_deps=result_deps, missing=missing)
if missing:
raise KeyError(
'Dependencies not found for {package}: {missing}'.format(
package=package, missing=' '.join(map(str, missing))))
# Don't resolve any already examined dependent package
self._found_packages.update(result_deps)
for item in result_deps:
self._add_resolved_dependencies(package=item)
def _resolve_as_urls(self) -> list[UrlItem]:
'Compile the result list into URLs to refetch.'
result = []
for item in self._found_packages:
loaded = self._names[item.name][item.version]
sha256sum = loaded.get('sum')
pkgsize = loaded.get('pkgsize')
repopath = loaded.get('repopath')
origin = \
f'{item.origin}@{item.flavor}' if item.flavor else item.origin
result.append(UrlItem(
name=item.name, origin=origin, version=item.version,
url=repopath, sha256=sha256sum, pkgsize=pkgsize))
return result
def process(self) -> list[UrlItem]:
'Start processing.'
self._load_packagefile()
self._resolve_requested_package_origins()
root_origins = self._found_packages.copy()
for item in root_origins:
self._add_resolved_dependencies(package=item)
if self._verbose:
logger.debug(msg=(
'Resolved {count} packages with {iter} iterations: {pkgs}'
).format(
count=len(self._found_packages), iter=self._iterations,
pkgs=self._found_packages))
url_items = self._resolve_as_urls()
if self._verbose:
logger.debug(msg='Resolved {count} UrlItems: {url_items}'.format(
count=len(url_items), url_items=url_items))
return url_items
def _get_fetched_url(url: str, do_hash: bool, cachebust: bool) -> FetchedUrl:
headers = {'Cache-Bypass': 'true'} if cachebust else {}
request = Request(url=url, headers=headers)
response = urlopen(url=request)
content = response.read()
if not do_hash:
return FetchedUrl(content=content, sha256=None)
hasher = sha256()
hasher.update(content)
return FetchedUrl(content=content, sha256=hasher.hexdigest())
class Revalidator(object):
'Class to handle the refetching part of the script.'
_errors_found = False
def __init__(
self, url_items: list[UrlItem], abi: str,
verbose: bool, pkgmirror_url: Optional[str] = None):
self._url_items = url_items
self._abi = abi
self._verbose = verbose
self._pkgmirror_url = (pkgmirror_url or '').rstrip('/')
def print_items(self):
for item in self.processed_items:
print(item)
@cached_property
def processed_items(self) -> list[UrlItem]:
result = []
for item in self._url_items:
result.append(UrlItem(
name=item.name, origin=item.origin, version=item.version,
url='/'.join([
self._pkgmirror_url, self._abi, 'latest',
item.url.lstrip('/')]),
sha256=item.sha256, pkgsize=item.pkgsize))
return result
def _do_revalidate(
self, fetched_url: FetchedUrl, url_item: UrlItem, idx: int):
'Revalidate one package.'
if self._verbose:
size_diff = url_item.pkgsize - len(fetched_url.content)
hash_diffs = fetched_url.sha256 != url_item.sha256
logger.debug(msg=(
f'NOT OK ({idx}): {url_item.origin} ({url_item.version}), ' +
f'size difference is {size_diff}, hash diff is {hash_diffs}'))
new_fetched_url = _get_fetched_url(
url=url_item.url, do_hash=True, cachebust=True)
if new_fetched_url.sha256 != url_item.sha256 or \
url_item.pkgsize != len(new_fetched_url.content):
self._errors_found = True
logger.error(msg=(
f'{url_item.origin} ({url_item.version}) does not match ' +
'after refetching!'))
else:
print(f'{url_item.name} ({url_item.version}) refetched OK.')
def process(self):
'Start processing'
for idx, item in enumerate(self.processed_items, start=1):
fetched_url = _get_fetched_url(
url=item.url, do_hash=True, cachebust=False)
if fetched_url.sha256 == item.sha256 and \
len(fetched_url.content) == item.pkgsize:
if self._verbose:
logger.debug(
msg=f'OK ({idx}): {item.origin} ({item.version})')
continue
self._do_revalidate(
fetched_url=fetched_url, url_item=item, idx=idx)
if self._errors_found:
raise Exception(
'Errors found, please rerun with verbose settings to see ' +
'them.')
def test_wo_pkgmirror_chroot(args: Namespace):
'Testing without pkgmirror and chroot availability.'
resolver = DependencyResolver(
packages_txz_fd=args.pkgtxz, verbose=args.verbose,
passed_packages=args.packages)
url_items = resolver.process()
revalidator = Revalidator(
url_items=url_items, abi=args.abi, verbose=args.verbose)
if args.verbose:
for item in revalidator.processed_items:
print(item)
def half_liverun(pkgmirror_url: str, abi: str, verbose: bool, packages: str):
'Do a half liverun.'
url_prefix = '/'.join((pkgmirror_url, abi, 'latest'))
url_meta_txz = '/'.join((url_prefix, 'meta.txz'))
url_pkgsite_txz = '/'.join((url_prefix, 'packagesite.txz'))
if verbose:
logger.debug(msg=f'Fetching {url_meta_txz}')
meta_txz = \
_get_fetched_url(url=url_meta_txz, do_hash=False, cachebust=True)
if verbose:
logger.debug(msg=f'{url_meta_txz} size is {len(meta_txz.content)}')
logger.debug(msg=f'Fetching {url_pkgsite_txz}')
pkgsite_txz = \
_get_fetched_url(url=url_pkgsite_txz, do_hash=False, cachebust=True)
if verbose:
logger.debug(
msg=f'{url_pkgsite_txz} size is {len(pkgsite_txz.content)}')
io = BytesIO(initial_bytes=pkgsite_txz.content)
resolver = DependencyResolver(
packages_txz_fd=io, verbose=verbose, passed_packages=packages)
url_items = resolver.process()
revalidator = Revalidator(
url_items=url_items, abi=abi, pkgmirror_url=pkgmirror_url,
verbose=verbose)
revalidator.process()
def test_wo_chroot(args: Namespace):
'Testing without chroot but WITH pkgmirror URL.'
pkgmirror_url = getattr(args, 'pkgmirror-url')
half_liverun(
pkgmirror_url=pkgmirror_url, abi=args.abi, verbose=args.verbose,
packages=args.packages)
def _get_abi(jail_chroot: str, verbose: bool) -> str:
'Return the used `$ABI` in the future jail.'
output = check_output(['pkg', '--chroot', jail_chroot, 'config', 'abi'])
abi = output.strip().decode(encoding='utf-8')
if verbose:
logger.info(msg=f'ABI in {jail_chroot!r} is {abi}')
return abi
def live_run(args: Namespace):
'Do a live run, with chroot and with pkgmirror URL.'
pkgmirror_url = getattr(args, 'pkgmirror-url')
jail_chroot = getattr(args, 'jail-root')
abi = _get_abi(jail_chroot=jail_chroot, verbose=args.verbose)
half_liverun(
pkgmirror_url=pkgmirror_url, abi=abi, verbose=args.verbose,
packages=args.packages)
if __name__ == '__main__':
args = parser.parse_args()
if args.verbose:
basicConfig(stream=stderr, level=DEBUG)
if not hasattr(args, 'func'):
parser.print_help()
exit(0)
args.func(args)