Wholly revamping pkgvalidator script

This commit is contained in:
László Károlyi 2022-08-07 15:35:25 +02:00
parent e894091f49
commit b080814327
Signed by: karolyi
GPG key ID: 2DCAF25E55735BFE
2 changed files with 429 additions and 222 deletions

View file

@ -1,15 +1,23 @@
- name: "Installing packages: {{ install_packages }}"
pkgng:
community.general.pkgng:
chroot: '{{ vars["jail_" + jail_name + "_new_path"] }}'
state: latest
name: "{{ install_packages }}"
register: install_result
ignore_errors: true
# - name: "Installing packages: {{ install_packages }}"
# ansible.builtin.command:
# pkg
# -c '{{ vars["jail_" + jail_name + "_new_path"] }}'
# install -y {{ install_packages }}
# register: install_result
# ignore_errors: true
- name: Refresh pkg mirror when package install failed
ansible.builtin.command: |
'{{ ansible_roles_path }}/karolyi.ansible-freebsd-jailhost-tools/tools/pkgmirror-jailrevalidator.py'
'{{ pkgmirror_url }}' '{{ vars["jail_" + jail_name + "_new_path"] }}' '{{ install_packages }}'
live '{{ pkgmirror_url }}' '{{ vars["jail_" + jail_name + "_new_path"] }}' '{{ install_packages }}'
register: revalidate_command
when: install_result.failed

View file

@ -1,244 +1,443 @@
#!/usr/bin/env python3
from argparse import ArgumentParser, Namespace
from collections import defaultdict
from argparse import ArgumentParser, FileType, Namespace
from collections import defaultdict, namedtuple
from functools import cached_property
from hashlib import sha256
from http.client import HTTPResponse
from io import BytesIO
from json import loads
from io import BufferedReader, BytesIO
from json import load, loads
from logging import DEBUG, basicConfig, getLogger
from pathlib import Path
from subprocess import check_output
from sys import exit, stderr
from tarfile import open as tar_open
from typing import DefaultDict, Dict, List, Set, Union
from typing import Optional, TypedDict, Union
from urllib.request import Request, urlopen
_WorkedOnPkgInfo = List[Dict[str, Union[str, int]]]
_LoadedPackages = \
Dict[str, DefaultDict[str, Dict[str, Dict[str, Union[str, int]]]]]
# result['names'][name][version] = loaded
basicConfig(stream=stderr, level=DEBUG)
logger = getLogger(__name__)
parser = ArgumentParser(
description='Revalidating files on local FreeBSD pkg mirror')
parser.add_argument(
dest='pkgmirror_url', type=str, help='URL of the pkg mirror')
parser.add_argument(
dest='jail_root', type=str, help='Path of the jail (chroot)')
parser.add_argument(
dest='packages', type=str, help='Space separated list of packages')
parser.add_argument(
'-v', '--verbose', dest='verbose', action='store_true',
help='Verbose logging')
subparsers = parser.add_subparsers(
title='subcommands',
description='Subcommands direct in which mode the revalidator will run in')
# Parser for live functionality
parser_live = subparsers.add_parser(name='live', help='Live mode')
parser_live.add_argument(
dest='pkgmirror-url', type=str, help='URL of the pkg mirror')
parser_live.add_argument(
dest='jail-root', type=str, help='Path of the jail (chroot)')
parser_live.add_argument(
dest='packages', type=str, help='Space separated list of packages')
parser_live.set_defaults(func=lambda x: live_run(x))
# Parser for the test functionality, without pkgmirror availability
parser_test_wo_pkgmirror_chroot = subparsers.add_parser(
name='wo-pkg-chroot', help=(
'Testing without pkgmirror and chroot availability. Complete ' +
'dry-run.'))
parser_test_wo_pkgmirror_chroot.add_argument(
dest='abi', type=str, help='FreeBSD ABI version (e.g. FreeBSD:13:amd64)')
parser_test_wo_pkgmirror_chroot.add_argument(
dest='pkgtxz', type=FileType(mode='rb'),
help='Path to the packagesite.txz file')
parser_test_wo_pkgmirror_chroot.add_argument(
dest='packages', type=str, help='Space separated list of packages')
parser_test_wo_pkgmirror_chroot.set_defaults(
func=lambda x: test_wo_pkgmirror_chroot(x))
# Parser for the test functionality, WITH pkgmirror availability
parser_test_wo_chroot = subparsers.add_parser(
name='wo-chroot', help=(
'Testing WITH pkgmirror but without chroot availability. Will ' +
'(re)download and refresh packages when necessary.'))
parser_test_wo_chroot.add_argument(
dest='pkgmirror-url', type=str, help='Pkg mirror repo URL')
parser_test_wo_chroot.add_argument(
dest='abi', type=str, help='FreeBSD ABI version (e.g. FreeBSD:13:amd64)')
parser_test_wo_chroot.add_argument(
dest='packages', type=str, help='Space separated list of packages')
parser_test_wo_chroot.set_defaults(func=lambda x: test_wo_chroot(x))
CollectedPackage = namedtuple(
typename='CollectedPackage',
field_names=['name', 'origin', 'version', 'flavor'])
UrlItem = namedtuple(
typename='UrlItem',
field_names=['name', 'origin', 'version', 'url', 'sha256', 'pkgsize'])
FetchedUrl = namedtuple(
typename='FetchedUrl', field_names=['content', 'sha256'])
OriginFlavorDict = defaultdict[Optional[str], dict]
OriginVersionsDict = defaultdict[str, OriginFlavorDict]
OriginDict = defaultdict[str, OriginVersionsDict]
def _get_abi(args: Namespace) -> str:
'Return the used `$ABI` in the future jail.'
output = check_output(['pkg', '--chroot', args.jail_root, 'config', 'abi'])
abi = output.strip().decode(encoding='utf-8')
class DepsItemDict(TypedDict):
'A dictionary in the JSON depicting dependencies for one package.'
origin: str
version: str
flavor: Optional[str]
DepsDict = dict[str, DepsItemDict]
class DependencyResolver(object):
'Resolving dependencies to requested packages.'
# name -> version -> dict
_names: defaultdict[str, dict[str, dict]]
# origin -> version -> flavor -> dict
_origins: OriginDict
_lower_names: dict[str, str]
_found_packages: set[CollectedPackage]
_iterations = 0
def __init__(
self, packages_txz_fd: Union[BufferedReader, BytesIO],
verbose: bool, passed_packages: str):
self._packages_txz_fd = packages_txz_fd
self._verbose = verbose
self._passed_packages = set(passed_packages.strip().split())
self._names = defaultdict(dict)
self._origins = \
defaultdict(lambda: defaultdict(lambda: defaultdict(dict)))
self._found_packages = set()
self._lower_names = dict()
def _load_definitions(self, splitted_lines: list[bytes]):
'Load definitions from the splitted lines.'
for line in splitted_lines:
loaded = loads(line)
name = loaded['name']
origin = loaded['origin']
version = loaded['version']
flavor = loaded.get('annotations', {}).get('flavor')
self._names[name][version] = loaded
self._origins[origin][version][flavor] = loaded
self._lower_names = {x.lower(): x for x in self._names}
if self._verbose:
count_names = len(self._names)
count_origins = len(self._origins)
logger.debug(
msg=f'Read info about {count_names} names and {count_origins} ' +
'origins')
def _load_packagefile(self):
'Load the packages from the tempfile.'
with self._packages_txz_fd as fd:
archive = tar_open(mode='r:xz', fileobj=fd)
exfile = archive.extractfile(member='packagesite.yaml')
if exfile is None:
raise FileNotFoundError(
'packagesite.yml not found in packagesite.txz')
splitted_lines = exfile.read().splitlines()
if self._verbose:
logger.debug(
f'Read {len(splitted_lines)} lines from packagesite.yml')
self._load_definitions(splitted_lines=splitted_lines)
def _get_origins_with_flavor(
self, versions: OriginVersionsDict, origin: str,
flavor: Optional[str]) -> set[CollectedPackage]:
"""
When a `@` is not passed but and origin is found for the package
name, evaluate if it is with a `None` flavor or should we
revalidate all flavors.
"""
found_packages = set()
for version, flavors in versions.items():
if flavor in flavors:
# Take a found `None` or specific flavor
loaded = flavors[flavor]
found_packages.add(CollectedPackage(
name=loaded.get('name'), origin=origin, version=version,
flavor=flavor))
elif flavor is None:
# Take ALL flavors
found_packages.update(CollectedPackage(
name=loaded.get('name'), origin=origin, version=version,
flavor=x) for x, loaded in flavors.items())
else:
raise KeyError(f'{flavor!r} of {origin!r} is not found.')
return found_packages
def _resolve_pkgnames_by_name(self):
'Lower all the packagenames and look for the passed name.'
def _resolve_one_passed_packagename(
self, passed_name: str, missing: list[str]):
'Resolve one packagename to one or more versions.'
if name := self._lower_names.get(passed_name):
versions = self._names[name]
for version, loaded in versions.items():
origin = loaded.get('origin')
flavor = loaded.get('annotations', {}).get('flavor')
self._found_packages.add(CollectedPackage(
name=passed_name, origin=origin, version=version,
flavor=flavor))
elif '@' in passed_name:
# Search origins
origin, flavor = passed_name.split(sep='@', maxsplit=1)
is_found = False
if origin in self._origins:
for version, flavors in self._origins[origin].items():
if flavor in flavors:
loaded = flavors[flavor]
self._found_packages.add(CollectedPackage(
name=loaded.get('name'), origin=origin,
version=version, flavor=flavor))
is_found = True
if not is_found:
missing.append(passed_name)
elif versions := self._origins.get(passed_name):
# passed_name is an origin without a flavor, add all flavors
self._found_packages.update(self._get_origins_with_flavor(
versions=versions, origin=passed_name, flavor=None))
else:
missing.append(passed_name)
def _resolve_requested_package_origins(self):
'Resolve the root packages to a set of `CollectedPackage`s.'
missing: list[str] = list()
for passed_name in self._passed_packages:
self._resolve_one_passed_packagename(
passed_name=passed_name, missing=missing)
if missing:
raise KeyError(f'Packages not found: {missing}'.format(
missing=' '.join(missing)))
if self._verbose:
logger.info(
msg='Found {count} requested packages: {origins}'.format(
count=len(self._found_packages),
origins=self._found_packages))
def _resolve_dependencies_in_deps_section(
self, loaded_deps: DepsDict, result_deps: set[CollectedPackage],
missing: list):
'Resolve dependencies from one package\'s `deps` section'
for pkgname, details in loaded_deps.items():
origin = details.get('origin')
version = details.get('version')
flavor = details.get('flavor')
collected = CollectedPackage(
name=pkgname, origin=origin, version=version, flavor=flavor)
if collected in self._found_packages:
continue
if pkgname not in self._names or \
version not in self._names[pkgname]:
missing.append(collected)
continue
result_deps.add(CollectedPackage(
name=pkgname, origin=origin, version=version,
flavor=flavor))
def _add_resolved_dependencies(self, package: CollectedPackage):
'Resolve the dependencies of one collected package.'
self._iterations += 1
loaded = self._names[package.name][package.version]
loaded_deps: Optional[DepsDict] = loaded.get('deps')
result_deps: set[CollectedPackage] = set()
if not loaded_deps:
return result_deps
missing = []
self._resolve_dependencies_in_deps_section(
loaded_deps=loaded_deps, result_deps=result_deps, missing=missing)
if missing:
raise KeyError(
'Dependencies not found for {package}: {missing}'.format(
package=package, missing=' '.join(map(str, missing))))
# Don't resolve any already examined dependent package
self._found_packages.update(result_deps)
for item in result_deps:
self._add_resolved_dependencies(package=item)
def _resolve_as_urls(self) -> list[UrlItem]:
'Compile the result list into URLs to refetch.'
result = []
for item in self._found_packages:
loaded = self._names[item.name][item.version]
sha256sum = loaded.get('sum')
pkgsize = loaded.get('pkgsize')
repopath = loaded.get('repopath')
origin = \
f'{item.origin}@{item.flavor}' if item.flavor else item.origin
result.append(UrlItem(
name=item.name, origin=origin, version=item.version,
url=repopath, sha256=sha256sum, pkgsize=pkgsize))
return result
def process(self) -> list[UrlItem]:
'Start processing.'
self._load_packagefile()
self._resolve_requested_package_origins()
root_origins = self._found_packages.copy()
for item in root_origins:
self._add_resolved_dependencies(package=item)
if self._verbose:
logger.debug(msg=(
'Resolved {count} packages with {iter} iterations: {pkgs}'
).format(
count=len(self._found_packages), iter=self._iterations,
pkgs=self._found_packages))
url_items = self._resolve_as_urls()
if self._verbose:
logger.debug(msg='Resolved {count} UrlItems: {url_items}'.format(
count=len(url_items), url_items=url_items))
return url_items
def _get_fetched_url(url: str, do_hash: bool, cachebust: bool) -> FetchedUrl:
headers = {'Cache-Bypass': 'true'} if cachebust else {}
request = Request(url=url, headers=headers)
response = urlopen(url=request)
content = response.read()
if not do_hash:
return FetchedUrl(content=content, sha256=None)
hasher = sha256()
hasher.update(content)
return FetchedUrl(content=content, sha256=hasher.hexdigest())
class Revalidator(object):
'Class to handle the refetching part of the script.'
_errors_found = False
def __init__(
self, url_items: list[UrlItem], abi: str,
verbose: bool, pkgmirror_url: Optional[str] = None):
self._url_items = url_items
self._abi = abi
self._verbose = verbose
self._pkgmirror_url = (pkgmirror_url or '').rstrip('/')
def print_items(self):
for item in self.processed_items:
print(item)
@cached_property
def processed_items(self) -> list[UrlItem]:
result = []
for item in self._url_items:
result.append(UrlItem(
name=item.name, origin=item.origin, version=item.version,
url='/'.join([
self._pkgmirror_url, self._abi, 'latest',
item.url.lstrip('/')]),
sha256=item.sha256, pkgsize=item.pkgsize))
return result
def _do_revalidate(
self, fetched_url: FetchedUrl, url_item: UrlItem, idx: int):
'Revalidate one package.'
if self._verbose:
size_diff = url_item.pkgsize - len(fetched_url.content)
hash_diffs = fetched_url.sha256 != url_item.sha256
logger.debug(msg=(
f'NOT OK ({idx}): {url_item.origin} ({url_item.version}), ' +
f'size difference is {size_diff}, hash diff is {hash_diffs}'))
new_fetched_url = _get_fetched_url(
url=url_item.url, do_hash=True, cachebust=True)
if new_fetched_url.sha256 != url_item.sha256 or \
url_item.pkgsize != len(new_fetched_url.content):
self._errors_found = True
logger.error(msg=(
f'{url_item.origin} ({url_item.version}) does not match ' +
'after refetching!'))
else:
print(f'{url_item.name} ({url_item.version}) refetched OK.')
def process(self):
'Start processing'
for idx, item in enumerate(self.processed_items, start=1):
fetched_url = _get_fetched_url(
url=item.url, do_hash=True, cachebust=False)
if fetched_url.sha256 == item.sha256 and \
len(fetched_url.content) == item.pkgsize:
if self._verbose:
logger.debug(
msg=f'OK ({idx}): {item.origin} ({item.version})')
continue
self._do_revalidate(
fetched_url=fetched_url, url_item=item, idx=idx)
if self._errors_found:
raise Exception(
'Errors found, please rerun with verbose settings to see ' +
'them.')
def test_wo_pkgmirror_chroot(args: Namespace):
'Testing without pkgmirror and chroot availability.'
resolver = DependencyResolver(
packages_txz_fd=args.pkgtxz, verbose=args.verbose,
passed_packages=args.packages)
url_items = resolver.process()
revalidator = Revalidator(
url_items=url_items, abi=args.abi, verbose=args.verbose)
if args.verbose:
logger.info(msg=f'ABI is {abi}')
for item in revalidator.processed_items:
print(item)
def half_liverun(pkgmirror_url: str, abi: str, verbose: bool, packages: str):
'Do a half liverun.'
url_prefix = '/'.join((pkgmirror_url, abi, 'latest'))
url_meta_txz = '/'.join((url_prefix, 'meta.txz'))
url_pkgsite_txz = '/'.join((url_prefix, 'packagesite.txz'))
if verbose:
logger.debug(msg=f'Fetching {url_meta_txz}')
meta_txz = \
_get_fetched_url(url=url_meta_txz, do_hash=False, cachebust=True)
if verbose:
logger.debug(msg=f'{url_meta_txz} size is {len(meta_txz.content)}')
logger.debug(msg=f'Fetching {url_pkgsite_txz}')
pkgsite_txz = \
_get_fetched_url(url=url_pkgsite_txz, do_hash=False, cachebust=True)
if verbose:
logger.debug(
msg=f'{url_pkgsite_txz} size is {len(pkgsite_txz.content)}')
io = BytesIO(initial_bytes=pkgsite_txz.content)
resolver = DependencyResolver(
packages_txz_fd=io, verbose=verbose, passed_packages=packages)
url_items = resolver.process()
revalidator = Revalidator(
url_items=url_items, abi=abi, pkgmirror_url=pkgmirror_url,
verbose=verbose)
revalidator.process()
def test_wo_chroot(args: Namespace):
'Testing without chroot but WITH pkgmirror URL.'
pkgmirror_url = getattr(args, 'pkgmirror-url')
half_liverun(
pkgmirror_url=pkgmirror_url, abi=args.abi, verbose=args.verbose,
packages=args.packages)
def _get_abi(jail_chroot: str, verbose: bool) -> str:
'Return the used `$ABI` in the future jail.'
output = check_output(['pkg', '--chroot', jail_chroot, 'config', 'abi'])
abi = output.strip().decode(encoding='utf-8')
if verbose:
logger.info(msg=f'ABI in {jail_chroot!r} is {abi}')
return abi
def _revalidate_packagesite(abi: str, args: Namespace) -> List[bytes]:
"""
Revalidate packagesite before fetching and return the new
`ExFileObject` that is the `packagesite.txz`.
"""
pkgmirror_url: str = args.pkgmirror_url
url_prefix = '/'.join((pkgmirror_url, abi, 'latest'))
headers = {'Cache-Bypass': 'true'}
request = Request(url='/'.join((url_prefix, 'meta.txz')), headers=headers)
response = urlopen(url=request) # type: HTTPResponse
request = Request(
url='/'.join((url_prefix, 'packagesite.txz')), headers=headers)
response = urlopen(url=request) # type: HTTPResponse
archive = tar_open(mode='r:xz', fileobj=BytesIO(response.read()))
exfile = archive.extractfile(member='packagesite.yaml')
if exfile is None:
raise FileNotFoundError('packagesite.yml not found in packagesite.txz')
splitted_lines = exfile.read().splitlines()
if args.verbose:
logger.info(f'Read {len(splitted_lines)} lines from packagesite.yml')
return splitted_lines
def _load_packages(lines: List[bytes], args: Namespace) -> _LoadedPackages:
"""
Load and return the packages from the passed JSON structured lines.
"""
result: _LoadedPackages = \
dict(names=defaultdict(dict), origins=defaultdict(dict))
for line in lines:
# print(f'{line}\n')
loaded = loads(line)
name = loaded['name']
origin = loaded['origin']
version = loaded['version']
result['names'][name][version] = loaded
result['origins'][origin][version] = loaded
if args.verbose:
count_names = len(result['names'])
count_origins = len(result['origins'])
logger.info(
msg=f'Read info about {count_names} names and {count_origins} ' +
'origins')
return result
def _extract_deps(loaded_packages: dict, passed_packages: dict) -> dict:
'Compile and return the packages to check, including dependencies.'
dependencies = defaultdict(set)
for name, versions in passed_packages.items():
for version in versions:
dict_version = loaded_packages[name][version]
if 'deps' not in dict_version:
continue
for depended_pkg, dict_depended_item in \
dict_version['deps'].items():
dependencies[depended_pkg].add(dict_depended_item['version'])
if not dependencies: # No dependencies
return dependencies
dict_deps = _extract_deps(
loaded_packages=loaded_packages, passed_packages=dependencies)
for name, versions in dict_deps.items():
dependencies[name].update(versions)
return dependencies
def _get_packages_to_check(
args: Namespace, abi: str, loaded_packages: _LoadedPackages,
passed_packages: Set[str],
) -> _WorkedOnPkgInfo:
'Compile and return the packages to check.'
pkgmirror_url: str = args.pkgmirror_url
unified_dict = \
dict(**loaded_packages['names'], **loaded_packages['origins'])
set_not_in_packages = passed_packages - set(unified_dict)
if set_not_in_packages:
raise KeyError(f'Packages not found: {set_not_in_packages}')
dict_pkgs = {name: set(unified_dict[name]) for name in passed_packages}
dict_deps = _extract_deps(
loaded_packages=unified_dict, passed_packages=dict_pkgs)
for name, versions in dict_deps.items():
if name not in dict_pkgs:
dict_pkgs[name] = set()
dict_pkgs[name].update(versions)
url_prefix = '/'.join((pkgmirror_url, abi, 'latest'))
result: _WorkedOnPkgInfo = list()
for name_or_origin, versions in dict_pkgs.items():
for version in versions:
dict_version = unified_dict[name_or_origin][version]
result.append(dict(
name_or_origin=name_or_origin, version=version,
url='/'.join((url_prefix, dict_version['repopath'])),
pkgsize=dict_version['pkgsize'], sha256=dict_version['sum']))
if args.verbose:
logger.info(msg=f'Will check {len(result)} package(s)')
return result
def _fetch_and_get_info(request: Request) -> Dict[str, Union[str, int]]:
'Fetch the package and return size and SHA256 sum.'
response = urlopen(url=request) # type: HTTPResponse
content = response.read()
hasher = sha256()
hasher.update(content)
return dict(size=len(content), digest=hasher.hexdigest())
def _get_to_revalidate(
packages_to_check: _WorkedOnPkgInfo, args: Namespace
) -> _WorkedOnPkgInfo:
"""
Download the packages in the dict return the mismatched ones in a
new `dict`.
"""
to_revalidate: _WorkedOnPkgInfo = list()
validated = []
for dict_info in packages_to_check:
name_or_origin = dict_info['name_or_origin']
url = dict_info['url']
request = Request(url=url) # type: ignore
dl_info = _fetch_and_get_info(request=request)
if dict_info['pkgsize'] != dl_info['size']:
if args.verbose:
logger.info(msg=f'Size mismatch: {name_or_origin}')
to_revalidate.append(dict_info)
continue
if dict_info['sha256'] != dl_info['digest']:
if args.verbose:
logger.info(msg=f'SHA256 sum mismatch: {name_or_origin}')
to_revalidate.append(dict_info)
continue
validated.append(name_or_origin)
# print('OK: {validated}'.format(validated=' '.join(validated)))
return to_revalidate
def _revalidate_packages(
to_revalidate: _WorkedOnPkgInfo, args: Namespace) -> bool:
'Revalidate the packages that are mismatched.'
if args.verbose:
logger.info(msg=f'Will revalidate {len(to_revalidate)} package(s)')
headers = {'Cache-Bypass': 'true'}
success = True
for dict_item in to_revalidate:
name = dict_item['name_or_origin']
url = dict_item['url']
print(f'Revalidating {name} ... ', end='')
request = Request(url=url, headers=headers) # type: ignore
dl_info = _fetch_and_get_info(request=request)
if dict_item['pkgsize'] != dl_info['size']:
print('Size mismatch!')
success = False
continue
if dict_item['sha256'] != dl_info['digest']:
print('SHA256 sum mismatch!')
success = False
continue
print('OK.')
return success
def _check_pkgmirror_url(args: Namespace):
'Check the passed URL for format validity.'
pkgmirror_url: str = args.pkgmirror_url
if not pkgmirror_url.startswith(('http://', 'https://')):
raise ValueError(f'Invalid pkgmirror_url {pkgmirror_url}')
if args.verbose:
logger.info(msg=f'pkgmirror url is {pkgmirror_url}')
def run():
args = parser.parse_args()
if args.verbose:
basicConfig(stream=stderr, level=DEBUG)
path_jailroot = Path(args.jail_root)
if not path_jailroot.is_dir():
raise FileNotFoundError(path_jailroot)
passed_packages = set(args.packages.split())
if args.verbose:
logger.info(
msg=f'Passed {len(passed_packages)} package(s): {args.packages}')
abi = _get_abi(args=args)
_check_pkgmirror_url(args=args)
lines = _revalidate_packagesite(abi=abi, args=args)
loaded_packages = _load_packages(lines=lines, args=args)
packages_to_check = _get_packages_to_check(
args=args, abi=abi, loaded_packages=loaded_packages,
passed_packages=passed_packages)
to_revalidate = _get_to_revalidate(
packages_to_check=packages_to_check, args=args)
if to_revalidate:
if _revalidate_packages(to_revalidate=to_revalidate, args=args):
return
elif args.verbose:
logger.info(msg='Nothing to revalidate.')
exit(1)
def live_run(args: Namespace):
'Do a live run, with chroot and with pkgmirror URL.'
pkgmirror_url = getattr(args, 'pkgmirror-url')
jail_chroot = getattr(args, 'jail-root')
abi = _get_abi(jail_chroot=jail_chroot, verbose=args.verbose)
half_liverun(
pkgmirror_url=pkgmirror_url, abi=abi, verbose=args.verbose,
packages=args.packages)
if __name__ == '__main__':
run()
args = parser.parse_args()
if args.verbose:
basicConfig(stream=stderr, level=DEBUG)
if not hasattr(args, 'func'):
parser.print_help()
exit(0)
args.func(args)