Handle package names AND origins

This commit is contained in:
László Károlyi 2021-09-07 15:21:48 +02:00
parent 5b833fc7da
commit d54d634253
Signed by: karolyi
GPG key ID: 2DCAF25E55735BFE

View file

@ -45,68 +45,66 @@ def _revalidate_packagesite(abi: str, pkgmirror_url: str) -> List[bytes]:
return exfile.read().splitlines() return exfile.read().splitlines()
def _load_packages(lines: List[bytes]) -> defaultdict: def _load_packages(lines: List[bytes]) -> dict:
""" """
Load and return the packages from the passed JSON structured lines. Load and return the packages from the passed JSON structured lines.
""" """
result = defaultdict(dict) result = dict(names=defaultdict(dict), origins=defaultdict(dict))
for line in lines: for line in lines:
# print(f'{line}\n') # print(f'{line}\n')
loaded = loads(line) loaded = loads(line)
name = loaded['name'] name = loaded['name']
origin = loaded['origin']
version = loaded['version'] version = loaded['version']
result[name][version] = loaded result['names'][name][version] = loaded
result['origins'][origin][version] = loaded
return result return result
def _extract_deps( def _extract_deps(loaded_packages: dict, passed_packages: dict) -> dict:
infodict_packages: defaultdict, passed_packages: dict) -> dict:
'Compile and return the packages to check, including dependencies.' 'Compile and return the packages to check, including dependencies.'
result = dict() dependencies = defaultdict(set)
for name, versions in passed_packages.items(): for name, versions in passed_packages.items():
for version in versions: for version in versions:
dict_version = infodict_packages[name][version] dict_version = loaded_packages[name][version]
if 'deps' not in dict_version: if 'deps' not in dict_version:
continue continue
for depended_pkg, dict_depended_item in \ for depended_pkg, dict_depended_item in \
dict_version['deps'].items(): dict_version['deps'].items():
if depended_pkg not in result: dependencies[depended_pkg].add(dict_depended_item['version'])
result[depended_pkg] = set() if not dependencies: # No dependencies
result[depended_pkg].add(dict_depended_item['version']) return dependencies
if not result:
return result
dict_deps = _extract_deps( dict_deps = _extract_deps(
infodict_packages=infodict_packages, passed_packages=result) loaded_packages=loaded_packages, passed_packages=dependencies)
for name, versions in dict_deps.items(): for name, versions in dict_deps.items():
if name not in result: dependencies[name].update(versions)
result[name] = set() return dependencies
result[name].update(versions)
return result
def _get_packages_to_check( def _get_packages_to_check(
pkgmirror_url: str, abi: str, infodict_packages: defaultdict, pkgmirror_url: str, abi: str, loaded_packages: dict,
passed_packages: Set[str], passed_packages: Set[str],
) -> List[dict]: ) -> List[dict]:
'Compile and return the packages to check.' 'Compile and return the packages to check.'
set_not_in_packages = passed_packages - set(infodict_packages) unified_dict = \
dict(**loaded_packages['names'], **loaded_packages['origins'])
set_not_in_packages = passed_packages - set(unified_dict)
if set_not_in_packages: if set_not_in_packages:
raise KeyError(f'Packages not found: {set_not_in_packages}') raise KeyError(f'Packages not found: {set_not_in_packages}')
dict_pkgs = { dict_pkgs = {name: set(unified_dict[name]) for name in passed_packages}
name: set(infodict_packages[name]) for name in passed_packages}
dict_deps = _extract_deps( dict_deps = _extract_deps(
infodict_packages=infodict_packages, passed_packages=dict_pkgs) loaded_packages=unified_dict, passed_packages=dict_pkgs)
for name, versions in dict_deps.items(): for name, versions in dict_deps.items():
if name not in dict_pkgs: if name not in dict_pkgs:
dict_pkgs[name] = set() dict_pkgs[name] = set()
dict_pkgs[name].update(versions) dict_pkgs[name].update(versions)
url_prefix = '/'.join((pkgmirror_url, abi, 'latest')) url_prefix = '/'.join((pkgmirror_url, abi, 'latest'))
result = list() result = list()
for name, versions in dict_pkgs.items(): for name_or_origin, versions in dict_pkgs.items():
for version in versions: for version in versions:
dict_version = infodict_packages[name][version] dict_version = unified_dict[name_or_origin][version]
result.append(dict( result.append(dict(
name=name, version=version, name_or_origin=name_or_origin, version=version,
url='/'.join((url_prefix, dict_version['repopath'])), url='/'.join((url_prefix, dict_version['repopath'])),
pkgsize=dict_version['pkgsize'], sha256=dict_version['sum'])) pkgsize=dict_version['pkgsize'], sha256=dict_version['sum']))
return result return result
@ -127,20 +125,22 @@ def _get_to_revalidate(packages_to_check: List[dict]) -> List[dict]:
new `dict`. new `dict`.
""" """
to_revalidate = dict() to_revalidate = dict()
validated = []
for dict_info in packages_to_check: for dict_info in packages_to_check:
name = dict_info['name'] name_or_origin = dict_info['name_or_origin']
url = dict_info['url'] url = dict_info['url']
request = Request(url=url) request = Request(url=url)
dl_info = _fetch_and_get_info(request=request) dl_info = _fetch_and_get_info(request=request)
if dict_info['pkgsize'] != dl_info['size']: if dict_info['pkgsize'] != dl_info['size']:
print(f'Size mismatch: {name}') print(f'Size mismatch: {name_or_origin}')
to_revalidate.append(dict_info) to_revalidate.append(dict_info)
continue continue
if dict_info['sha256'] != dl_info['digest']: if dict_info['sha256'] != dl_info['digest']:
print(f'SHA256 sum mismatch: {name}') print(f'SHA256 sum mismatch: {name_or_origin}')
to_revalidate.append(dict_info) to_revalidate.append(dict_info)
continue continue
print(f'OK: {name}') validated.append(name_or_origin)
print('OK: {validated}'.format(validated=' '.join(validated)))
return to_revalidate return to_revalidate
@ -181,11 +181,10 @@ def run():
abi = _get_abi(jail_root=args.jail_root) abi = _get_abi(jail_root=args.jail_root)
_check_pkgmirror_url(url=args.pkgmirror_url) _check_pkgmirror_url(url=args.pkgmirror_url)
lines = _revalidate_packagesite(abi=abi, pkgmirror_url=args.pkgmirror_url) lines = _revalidate_packagesite(abi=abi, pkgmirror_url=args.pkgmirror_url)
infodict_packages = _load_packages(lines=lines) loaded_packages = _load_packages(lines=lines)
packages_to_check = _get_packages_to_check( packages_to_check = _get_packages_to_check(
pkgmirror_url=args.pkgmirror_url, abi=abi, pkgmirror_url=args.pkgmirror_url, abi=abi,
infodict_packages=infodict_packages, loaded_packages=loaded_packages, passed_packages=passed_packages)
passed_packages=passed_packages)
to_revalidate = _get_to_revalidate(packages_to_check=packages_to_check) to_revalidate = _get_to_revalidate(packages_to_check=packages_to_check)
if to_revalidate: if to_revalidate:
if not _revalidate_packages(to_revalidate=to_revalidate): if not _revalidate_packages(to_revalidate=to_revalidate):