Add optional verbosity

This commit is contained in:
László Károlyi 2021-11-17 17:58:12 +01:00
parent 5f8330ed3a
commit 298d5b576a
Signed by: karolyi
GPG key ID: 2DCAF25E55735BFE

View file

@ -6,13 +6,20 @@ from hashlib import sha256
from http.client import HTTPResponse
from io import BytesIO
from json import loads
from logging import DEBUG, basicConfig, getLogger
from pathlib import Path
from subprocess import check_output
from sys import exit
from sys import exit, stderr
from tarfile import open as tar_open
from typing import List, Set
from typing import DefaultDict, Dict, List, Set, Union
from urllib.request import Request, urlopen
_WorkedOnPkgInfo = List[Dict[str, Union[str, int]]]
_LoadedPackages = \
Dict[str, DefaultDict[str, Dict[str, Dict[str, Union[str, int]]]]]
# result['names'][name][version] = loaded
logger = getLogger(__name__)
parser = ArgumentParser()
parser.add_argument(
dest='pkgmirror_url', type=str, help='URL of the pkg mirror')
@ -28,16 +35,18 @@ parser.add_argument(
def _get_abi(args: Namespace) -> str:
'Return the used `$ABI` in the future jail.'
output = check_output(['pkg', '--chroot', args.jail_root, 'config', 'abi'])
abi = output.strip().decode(encoding='utf-8')
if args.verbose:
print('')
return output.strip().decode('utf-8')
logger.info(msg=f'ABI is {abi}')
return abi
def _revalidate_packagesite(abi: str, pkgmirror_url: str) -> List[bytes]:
def _revalidate_packagesite(abi: str, args: Namespace) -> List[bytes]:
"""
Revalidate packagesite before fetching and return the new
`ExFileObject` that is the `packagesite.txz`.
"""
pkgmirror_url: str = args.pkgmirror_url
url_prefix = '/'.join((pkgmirror_url, abi, 'latest'))
headers = {'Cache-Bypass': 'true'}
request = Request(url='/'.join((url_prefix, 'meta.txz')), headers=headers)
@ -46,15 +55,21 @@ def _revalidate_packagesite(abi: str, pkgmirror_url: str) -> List[bytes]:
url='/'.join((url_prefix, 'packagesite.txz')), headers=headers)
response = urlopen(url=request) # type: HTTPResponse
archive = tar_open(mode='r:xz', fileobj=BytesIO(response.read()))
exfile = archive.extractfile('packagesite.yaml')
return exfile.read().splitlines()
exfile = archive.extractfile(member='packagesite.yaml')
if exfile is None:
raise FileNotFoundError('packagesite.yml not found in packagesite.txz')
splitted_lines = exfile.read().splitlines()
if args.verbose:
logger.info(f'Read {splitted_lines} lines from packagesite.yml')
return splitted_lines
def _load_packages(lines: List[bytes]) -> dict:
def _load_packages(lines: List[bytes], args: Namespace) -> _LoadedPackages:
"""
Load and return the packages from the passed JSON structured lines.
"""
result = dict(names=defaultdict(dict), origins=defaultdict(dict))
result: _LoadedPackages = \
dict(names=defaultdict(dict), origins=defaultdict(dict))
for line in lines:
# print(f'{line}\n')
loaded = loads(line)
@ -63,6 +78,12 @@ def _load_packages(lines: List[bytes]) -> dict:
version = loaded['version']
result['names'][name][version] = loaded
result['origins'][origin][version] = loaded
if args.verbose:
count_names = len(result['names'])
count_origins = len(result['origins'])
logger.info(
msg=f'Read info about {count_names} names and {count_origins} ' +
'origins')
return result
@ -87,10 +108,11 @@ def _extract_deps(loaded_packages: dict, passed_packages: dict) -> dict:
def _get_packages_to_check(
pkgmirror_url: str, abi: str, loaded_packages: dict,
args: Namespace, abi: str, loaded_packages: _LoadedPackages,
passed_packages: Set[str],
) -> List[dict]:
) -> _WorkedOnPkgInfo:
'Compile and return the packages to check.'
pkgmirror_url: str = args.pkgmirror_url
unified_dict = \
dict(**loaded_packages['names'], **loaded_packages['origins'])
set_not_in_packages = passed_packages - set(unified_dict)
@ -104,7 +126,7 @@ def _get_packages_to_check(
dict_pkgs[name] = set()
dict_pkgs[name].update(versions)
url_prefix = '/'.join((pkgmirror_url, abi, 'latest'))
result = list()
result: _WorkedOnPkgInfo = list()
for name_or_origin, versions in dict_pkgs.items():
for version in versions:
dict_version = unified_dict[name_or_origin][version]
@ -112,10 +134,12 @@ def _get_packages_to_check(
name_or_origin=name_or_origin, version=version,
url='/'.join((url_prefix, dict_version['repopath'])),
pkgsize=dict_version['pkgsize'], sha256=dict_version['sum']))
if args.verbose:
logger.info(msg=f'Will check {len(result)} package(s)')
return result
def _fetch_and_get_info(request: Request) -> dict:
def _fetch_and_get_info(request: Request) -> Dict[str, Union[str, int]]:
'Fetch the package and return size and SHA256 sum.'
response = urlopen(url=request) # type: HTTPResponse
content = response.read()
@ -124,24 +148,28 @@ def _fetch_and_get_info(request: Request) -> dict:
return dict(size=len(content), digest=hasher.hexdigest())
def _get_to_revalidate(packages_to_check: List[dict]) -> List[dict]:
def _get_to_revalidate(
packages_to_check: _WorkedOnPkgInfo, args: Namespace
) -> _WorkedOnPkgInfo:
"""
Download the packages in the dict return the mismatched ones in a
new `dict`.
"""
to_revalidate = dict()
to_revalidate: _WorkedOnPkgInfo = list()
validated = []
for dict_info in packages_to_check:
name_or_origin = dict_info['name_or_origin']
url = dict_info['url']
request = Request(url=url)
request = Request(url=url) # type: ignore
dl_info = _fetch_and_get_info(request=request)
if dict_info['pkgsize'] != dl_info['size']:
print(f'Size mismatch: {name_or_origin}')
if args.verbose:
logger.info(msg=f'Size mismatch: {name_or_origin}')
to_revalidate.append(dict_info)
continue
if dict_info['sha256'] != dl_info['digest']:
print(f'SHA256 sum mismatch: {name_or_origin}')
if args.verbose:
logger.info(msg=f'SHA256 sum mismatch: {name_or_origin}')
to_revalidate.append(dict_info)
continue
validated.append(name_or_origin)
@ -149,15 +177,18 @@ def _get_to_revalidate(packages_to_check: List[dict]) -> List[dict]:
return to_revalidate
def _revalidate_packages(to_revalidate: List[dict]) -> bool:
def _revalidate_packages(
to_revalidate: _WorkedOnPkgInfo, args: Namespace) -> bool:
'Revalidate the packages that are mismatched.'
if args.verbose:
logger.info(msg=f'Will revalidate {len(to_revalidate)} package(s)')
headers = {'Cache-Bypass': 'true'}
success = True
for dict_item in to_revalidate:
name = dict_item['name']
url = dict_item['url']
print(f'Revalidating {name} ... ', end='')
request = Request(url=url, headers=headers)
request = Request(url=url, headers=headers) # type: ignore
dl_info = _fetch_and_get_info(request=request)
if dict_item['pkgsize'] != dl_info['size']:
print('Size mismatch!')
@ -171,31 +202,39 @@ def _revalidate_packages(to_revalidate: List[dict]) -> bool:
return success
def _check_pkgmirror_url(url: str):
def _check_pkgmirror_url(args: Namespace):
'Check the passed URL for format validity.'
if not url.startswith(('http://', 'https://')):
raise ValueError(f'Invalid pkgmirror_url {url}')
pkgmirror_url: str = args.pkgmirror_url
if not pkgmirror_url.startswith(('http://', 'https://')):
raise ValueError(f'Invalid pkgmirror_url {pkgmirror_url}')
if args.verbose:
logger.info(msg=f'pkgmirror url is {pkgmirror_url}')
def run():
args = parser.parse_args()
if args.verbose:
basicConfig(stream=stderr, level=DEBUG)
path_jailroot = Path(args.jail_root)
if not path_jailroot.is_dir():
raise FileNotFoundError(path_jailroot)
passed_packages = set(args.packages.split())
if args.verbose:
logger.info(msg=f'Passed {len(passed_packages)}: {args.packages}')
abi = _get_abi(args=args)
_check_pkgmirror_url(url=args.pkgmirror_url)
lines = _revalidate_packagesite(abi=abi, pkgmirror_url=args.pkgmirror_url)
loaded_packages = _load_packages(lines=lines)
_check_pkgmirror_url(args=args)
lines = _revalidate_packagesite(abi=abi, args=args)
loaded_packages = _load_packages(lines=lines, args=args)
packages_to_check = _get_packages_to_check(
pkgmirror_url=args.pkgmirror_url, abi=abi,
loaded_packages=loaded_packages, passed_packages=passed_packages)
to_revalidate = _get_to_revalidate(packages_to_check=packages_to_check)
args=args, abi=abi, loaded_packages=loaded_packages,
passed_packages=passed_packages)
to_revalidate = _get_to_revalidate(
packages_to_check=packages_to_check, args=args)
if to_revalidate:
if not _revalidate_packages(to_revalidate=to_revalidate):
exit(1)
if _revalidate_packages(to_revalidate=to_revalidate, args=args):
return
exit(1)
if __name__ == '__main__':
run()