Rearrange configuration data

This commit is contained in:
László Károlyi 2024-04-08 19:59:48 +02:00
parent eda18d8611
commit dd1c152ca0
Signed by: karolyi
GPG key ID: 2DCAF25E55735BFE
8 changed files with 223 additions and 159 deletions

View file

@ -1,29 +1,28 @@
defaults:
tlsa:
# Default ports for which to set up the TLSA records.
# e.g: _443._tcp.host.example.com
records:
# HTTPS
- protocol: tcp
portnumber: 443
# Default TLSA TTLs ...
ttl: 86400 # 1 day in seconds
# ... and rollover periods: the time until a process will recognize
# and reload the new certificate
# and reload the new certificate.
# use -1 for instant TLSA record update after TTL passes
time_to_reload: 86400 # 1 day in seconds
# Default ports for which to set up the TLSA records.
# e.g: _443._tcp.host.example.com
ports:
# HTTPS
- protocol: tcp
number: 443
usage:
# Certificate usage.
# For e-mail (SMTP) applications, this must be either 2 (DANE-TA) or 3
# (DANE-EE). Values usage=0 and usage=1 should not be used for SMTP
# see rfc7672 section 3.1.3:
# https://www.rfc-editor.org/rfc/rfc7672.html#section-3.1.3
- 3
selector:
# see https://en.wikipedia.org/wiki/DNS-based_Authentication_of_Named_Entities
- 1
matching_type:
# see https://en.wikipedia.org/wiki/DNS-based_Authentication_of_Named_Entities
- 1
rrtypes:
# Certificate usage.
# For e-mail (SMTP) applications, this must be either 2 (DANE-TA) or 3
# (DANE-EE). Values usage=0 and usage=1 should not be used for SMTP
# see rfc7672 section 3.1.3:
# https://www.rfc-editor.org/rfc/rfc7672.html#section-3.1.3
- usage: 3 # DANE-EE
# see https://en.wikipedia.org/wiki/DNS-based_Authentication_of_Named_Entities
selector: 0 # Entire cert, not just its publickey
# see https://en.wikipedia.org/wiki/DNS-based_Authentication_of_Named_Entities
matching_type: 2 # SHA512
# Default record update method, currently only 'dns-update'
method: dns-update

View file

@ -216,15 +216,18 @@ class RenewHandler(HandlerBase):
run on them. If a certificate was newly fetched and put into the
upcoming directory, its domain name is not returned.
"""
if cert.all_upcomings_in_place:
domains = self._put_in_place_when_ttl_passed(
certconfig_path=certconfig_path, cert=cert)
return domains
config = deepcopy(x=self._certbot_config)
config.certname = cert.lineagename
tlsa_updater = TlsaUpdater(
verbose=self._args.verbose, dry_run=self._args.dry_run,
config=self._config, cert=cert, certconfig_path=certconfig_path)
if cert.all_upcomings_in_place:
domains = self._put_in_place_when_ttl_passed(
certconfig_path=certconfig_path, cert=cert)
if domains:
# In case time_to_reload < 0, we need to update TLSAs
tlsa_updater.process()
return domains
config = deepcopy(x=self._certbot_config)
config.certname = cert.lineagename
if not should_renew(config=config, lineage=cert):
tlsa_updater.process()
return []

View file

@ -19,7 +19,7 @@ from cryptography.x509.base import Certificate, load_pem_x509_certificate
from ktools.pathlib import Path
from .config_types.common import TlsaMatchingType, TlsaSelector, TlsaUsage
from .tlsa.record_type import TlsaRecordType
from .tlsa.record_type import TlsaResourceRecord
KindType = Literal['cert', 'privkey', 'chain', 'fullchain']
_TargetDictType = dict[KindType, Path]
@ -273,27 +273,27 @@ class DaneCert(RenewableCert):
def get_tlsa_recordtypes(
self, usage: TlsaUsage, selector: TlsaSelector,
matching_type: TlsaMatchingType,
time_to_reload: int) -> list[TlsaRecordType]:
time_to_reload: int) -> list[TlsaResourceRecord]:
'Return a list of generated `TlsaRecordType`s.'
result = list[TlsaRecordType]()
result = list[TlsaResourceRecord]()
data = get_tlsa_bytes(
cert=self.__latest_pubcert, selector=selector,
matching_type=matching_type)
result.append(TlsaRecordType(
result.append(TlsaResourceRecord(
usage=usage, selector=selector, matching_type=matching_type,
data=data))
if self.__upcoming_pubcert:
data = get_tlsa_bytes(
cert=self.__upcoming_pubcert, selector=selector,
matching_type=matching_type)
result.append(TlsaRecordType(
result.append(TlsaResourceRecord(
usage=usage, selector=selector, matching_type=matching_type,
data=data))
if self.__is_oldcert_tlsa_needed(time_to_reload=time_to_reload):
data = get_tlsa_bytes(
cert=self.__second_latest_pubcert, selector=selector,
matching_type=matching_type)
result.append(TlsaRecordType(
result.append(TlsaResourceRecord(
usage=usage, selector=selector, matching_type=matching_type,
data=data))
return result

View file

@ -13,7 +13,7 @@ from typing import Any, Literal, Optional, Union, get_origin
from dns.name import Name
from dns.tsig import Key
from dns.tsigkeyring import from_text as tsig_from_text
from typing_extensions import TypedDict, is_typeddict
from typing_extensions import Self, TypedDict, is_typeddict
from .config_types.common import (
LITERAL_TCP_OR_UDP, LITERAL_TSIG_ALGORITHMS, TlsaMatchingType,
@ -29,11 +29,12 @@ from yaml import dump, load
from .config_types.configyaml import (
ConfDict, DefaultsDict, DefaultsTlsaDict, DefaultsTlsaPortDict,
DnsUpdateDict, DnsUpdateServerDict, DnsUpdateServerTsigDict, PathsDict,
PathsNsupdateDict)
DefaultTlsaRrtypeDict, DnsUpdateDict, DnsUpdateServerDict,
DnsUpdateServerTsigDict, PathsDict, PathsNsupdateDict)
from .config_types.runtimeyaml import (
RUNTIMECONF_DOC, RUNTIMECONF_TEMPLATE, RuntimeAdopteditemDict,
RuntimeAdopteditemHostDict, RuntimeAdopteditemHostitemRecordsitemDict,
RuntimeAdopteditemHostitemRecordsitemRrtypeitemDict,
RuntimeAdopteditemServeritemDict, RuntimeAdopteditemServeritemTsigDict,
RuntimeDict)
@ -87,23 +88,47 @@ def _check_write_access(path: Path) -> None:
@dataclass
class _DefaultsTlsaPortConf(object):
protocol: LITERAL_TCP_OR_UDP
number: int
portnumber: int
@staticmethod
def parse(data: DefaultsTlsaPortDict) -> _DefaultsTlsaPortConf:
check_required_keys(data=data, datatype=DefaultsTlsaPortDict)
return _DefaultsTlsaPortConf(
protocol=data['protocol'], number=data['number'])
protocol=data['protocol'], portnumber=data['portnumber'])
@dataclass
class DefaultsTlsaRrtypeConf(object):
usage: TlsaUsage
selector: TlsaSelector
matching_type: TlsaMatchingType
_singletons = dict[tuple[int, int, int], Self]()
@staticmethod
def parse(data: DefaultTlsaRrtypeDict) -> DefaultsTlsaRrtypeConf:
key = (data['usage'], data['selector'], data['matching_type'])
if result := DefaultsTlsaRrtypeConf._singletons.get(key):
return result
result = DefaultsTlsaRrtypeConf(
usage=TlsaUsage(value=data['usage']),
selector=TlsaSelector(value=data['selector']),
matching_type=TlsaMatchingType(value=data['matching_type']))
DefaultsTlsaRrtypeConf._singletons[key] = result
return result
@cached_property
def adopted_type(self) -> AdoptedCertHostitemRecordsitemRrtypeitemConfig:
return AdoptedCertHostitemRecordsitemRrtypeitemConfig(
usage=self.usage, selector=self.selector,
matching_type=self.matching_type)
@dataclass
class _DefaultsTlsaConf(object):
ttl: int
time_to_reload: int
ports: list[_DefaultsTlsaPortConf]
usage: list[TlsaUsage]
selector: list[TlsaSelector]
matching_type: list[TlsaMatchingType]
records: list[_DefaultsTlsaPortConf]
rrtypes: list[DefaultsTlsaRrtypeConf]
@staticmethod
def parse(data: DefaultsTlsaDict) -> _DefaultsTlsaConf:
@ -111,11 +136,11 @@ class _DefaultsTlsaConf(object):
check_required_keys(data=data, datatype=DefaultsTlsaDict)
return _DefaultsTlsaConf(
ttl=data['ttl'], time_to_reload=data['time_to_reload'],
ports=[_DefaultsTlsaPortConf.parse(data=x) for x in data['ports']],
usage=TlsaUsage.from_ints(data=data['usage']),
selector=TlsaSelector.from_ints(data=data['selector']),
matching_type=TlsaMatchingType.from_ints(
data=data['matching_type']))
records=[
_DefaultsTlsaPortConf.parse(data=x) for x in data['records']],
rrtypes=[
DefaultsTlsaRrtypeConf.parse(data=x) for x in data['rrtypes']]
)
@dataclass
@ -301,13 +326,34 @@ class AdoptedCertHostitemServeritemConfig(object):
pformat(list(config.dnsupdate.servers.values())))
@dataclass
class AdoptedCertHostitemRecordsitemRrtypeitemConfig(object):
usage: TlsaUsage
selector: TlsaSelector
matching_type: TlsaMatchingType
@staticmethod
def parse(
item: RuntimeAdopteditemHostitemRecordsitemRrtypeitemDict
) -> AdoptedCertHostitemRecordsitemRrtypeitemConfig:
return AdoptedCertHostitemRecordsitemRrtypeitemConfig(
usage=TlsaUsage(value=item['usage']),
selector=TlsaSelector(value=item['selector']),
matching_type=TlsaMatchingType(value=item['matching_type']))
def dump(self) -> RuntimeAdopteditemHostitemRecordsitemRrtypeitemDict:
return RuntimeAdopteditemHostitemRecordsitemRrtypeitemDict(
usage=self.usage.value, selector=self.selector.value,
matching_type=self.matching_type.value)
@dataclass
class AdoptedCertHostitemRecordsitemConfig(object):
protocol: TlsaProtocol
number: int
usage: Optional[list[TlsaUsage]]
selector: Optional[list[TlsaSelector]]
matching_type: Optional[list[TlsaMatchingType]]
portnumber: int
ttl: Optional[int]
time_to_reload: Optional[int]
rrtypes: Optional[list[AdoptedCertHostitemRecordsitemRrtypeitemConfig]]
@staticmethod
def parse(
@ -325,21 +371,26 @@ class AdoptedCertHostitemRecordsitemConfig(object):
matching_type = item.get('matching_type')
if isinstance(matching_type, list):
matching_type = TlsaMatchingType.from_ints(data=matching_type)
rrtypes = item.get('rrtypes')
if isinstance(rrtypes, list):
rrtypes = [
AdoptedCertHostitemRecordsitemRrtypeitemConfig.parse(item=x)
for x in rrtypes]
return AdoptedCertHostitemRecordsitemConfig(
protocol=TlsaProtocol.from_config_literal(data=item['protocol']),
number=item['number'], usage=usage, selector=selector,
matching_type=matching_type)
portnumber=item['portnumber'], ttl=item.get('ttl'),
time_to_reload=item.get('time_to_reload'), rrtypes=rrtypes)
def dump(self) -> RuntimeAdopteditemHostitemRecordsitemDict:
'Return a dumped `AdoptedCertHostitemPortsitemConfig`.'
result = RuntimeAdopteditemHostitemRecordsitemDict(
protocol=self.protocol.to_literal(), number=self.number)
if self.usage:
result['usage'] = [x.value for x in self.usage]
if self.selector:
result['selector'] = [x.value for x in self.selector]
if self.matching_type:
result['matching_type'] = [x.value for x in self.matching_type]
protocol=self.protocol.to_literal(), portnumber=self.portnumber)
if isinstance(self.ttl, int):
result['ttl'] = self.ttl
if isinstance(self.time_to_reload, int):
result['time_to_reload'] = self.time_to_reload
if self.rrtypes:
result['rrtypes'] = [x.dump() for x in self.rrtypes]
return result
@ -349,8 +400,6 @@ class AdoptedCertHostitemConfig(object):
name: str
zone: Optional[str]
records: Optional[list[AdoptedCertHostitemRecordsitemConfig]]
ttl: Optional[int]
time_to_reload: Optional[int]
servers: Optional[list[AdoptedCertHostitemServeritemConfig]]
@staticmethod
@ -369,7 +418,6 @@ class AdoptedCertHostitemConfig(object):
for x in records]
return AdoptedCertHostitemConfig(
name=item['name'], zone=item.get('zone'), records=records,
ttl=item.get('ttl'), time_to_reload=item.get('time_to_reload'),
servers=servers)
def get_serverinfo(self, config: Configuration) -> list[_DnsServerInfo]:
@ -385,10 +433,6 @@ class AdoptedCertHostitemConfig(object):
result['zone'] = self.zone
if isinstance(self.records, list):
result['records'] = [x.dump() for x in self.records]
if isinstance(self.ttl, int):
result['ttl'] = self.ttl
if isinstance(self.time_to_reload, int):
result['time_to_reload'] = self.time_to_reload
if isinstance(self.servers, list):
result['servers'] = [x.dump() for x in self.servers]
return result

View file

@ -8,19 +8,27 @@ from .common import LITERAL_TCP_OR_UDP, LITERAL_TSIG_ALGORITHMS
class DefaultsTlsaPortDict(TypedDict):
'`defaults["tlsa"]["ports"]` section of the raw configuration.'
'`defaults["tlsa"]["records"]` section of the raw configuration.'
protocol: LITERAL_TCP_OR_UDP
number: int
portnumber: int
class DefaultTlsaRrtypeDict(TypedDict):
"""
One `defaults["tlsa"]["rrtypes"]` section item of the raw
configuration.
"""
usage: int
selector: int
matching_type: int
class DefaultsTlsaDict(TypedDict):
'`defaults["tlsa"]` section of the raw configuration.'
records: list[DefaultsTlsaPortDict]
ttl: int
time_to_reload: int
ports: list[DefaultsTlsaPortDict]
usage: list[int]
selector: list[int]
matching_type: list[int]
rrtypes: list[DefaultTlsaRrtypeDict]
class DefaultsDict(TypedDict):

View file

@ -20,23 +20,27 @@ RUNTIMECONF_DOC = """\
# zone: example.com
# records:
# - protocol: tcp
# number: 443
# usage:
# - 3
# selector:
# - 1
# matching_type:
# - 1
# portnumber: 443
# ttl: 123
# time_to_reload: 456
# rrtypes:
# - usage: 3
# selector: 1
# matching_type: 1
# - usage: 3
# selector: 1
# matching_type: 2
# - protocol: tcp
# number: 25
# usage:
# - 3
# selector:
# - 1
# matching_type:
# - 1
# ttl: 123
# time_to_reload: 456
# portnumber: 25
# ttl: 123
# time_to_reload: 456
# rrtypes:
# - usage: 3
# selector: 1
# matching_type: 1
# - usage: 3
# selector: 1
# matching_type: 2
# servers:
# - ip: 127.0.0.1
# tsig:
@ -54,21 +58,27 @@ RUNTIMECONF_DOC = """\
# zone: example.com
# records:
# - protocol: tcp
# number: 443
# usage:
# - 3
# selector:
# - 1
# matching_type:
# - 1
# portnumber: 443
# ttl: 123
# time_to_reload: 456
# rrtypes:
# - usage: 3
# selector: 1
# matching_type: 1
# - usage: 3
# selector: 1
# matching_type: 2
# - protocol: tcp
# number: 25
# usage:
# - 3
# selector:
# - 1
# matching_type:
# - 1
# portnumber: 25
# ttl: 123
# time_to_reload: 456
# rrtypes:
# - usage: 3
# selector: 1
# matching_type: 1
# - usage: 3
# selector: 1
# matching_type: 2
# ttl: 789
# time_to_reload: 123
# servers:
@ -107,16 +117,27 @@ class RuntimeAdopteditemServeritemDict(TypedDict):
tsig: NotRequired[RuntimeAdopteditemServeritemTsigDict]
class RuntimeAdopteditemHostitemRecordsitemRrtypeitemDict(TypedDict):
"""
`adopted["hosts"]["records"]["rrtypes"]` section item of
the runtime configuration.
"""
usage: int
selector: int
matching_type: int
class RuntimeAdopteditemHostitemRecordsitemDict(TypedDict):
"""
`adopted["hosts"]["records"]` section item of the runtime
configuration.
"""
protocol: LITERAL_TCP_OR_UDP
number: int
usage: NotRequired[list[int]]
selector: NotRequired[list[int]]
matching_type: NotRequired[list[int]]
portnumber: int
ttl: NotRequired[int]
time_to_reload: NotRequired[int]
rrtypes: NotRequired[
list[RuntimeAdopteditemHostitemRecordsitemRrtypeitemDict]]
class RuntimeAdopteditemHostDict(TypedDict):
@ -126,8 +147,6 @@ class RuntimeAdopteditemHostDict(TypedDict):
name: str
zone: NotRequired[str]
records: NotRequired[list[RuntimeAdopteditemHostitemRecordsitemDict]]
ttl: NotRequired[int]
time_to_reload: NotRequired[int]
servers: NotRequired[list[RuntimeAdopteditemServeritemDict]]

View file

@ -1,10 +1,8 @@
from __future__ import annotations
from argparse import Namespace
from dataclasses import dataclass
from functools import cached_property
from ipaddress import IPv4Address, IPv6Address
from itertools import product
from logging import getLogger
from typing import Optional, Union
@ -27,10 +25,11 @@ from dns.update import UpdateMessage
from ..cert import DaneCert
from ..config import (
AdoptedCertHostitemConfig, AdoptedCertHostitemRecordsitemConfig,
AdoptedCertHostitemRecordsitemRrtypeitemConfig,
AdoptedCertHostitemServeritemConfig, Configuration, get_host_and_domain)
from ..config_types.common import (
TlsaMatchingType, TlsaProtocol, TlsaSelector, TlsaUsage)
from .record_type import TlsaRecordType
from .record_type import TlsaResourceRecord
_LOGGER = getLogger(name=__name__)
@ -104,16 +103,16 @@ class _DnsServerInfo(object):
def str_ip(self) -> str:
return str(self.ip)
def _resolve(self, fqdn: _FqdnHostname) -> set[TlsaRecordType]:
def _resolve(self, fqdn: _FqdnHostname) -> set[TlsaResourceRecord]:
'Return resolved `TlsaRecordType` for the passed FQDN.'
response_set = set[TlsaRecordType]()
response_set = set[TlsaResourceRecord]()
try:
answer = self.resolver.resolve(qname=fqdn.as_dnsname, rdtype=TLSA)
except NXDOMAIN:
return response_set
item: RdataTlsa
for item in answer: # type: ignore
record = TlsaRecordType.from_rdtype(item=item)
record = TlsaResourceRecord.from_rdtype(item=item)
response_set.add(record)
return response_set
@ -137,8 +136,8 @@ class _DnsServerInfo(object):
notify(msg='-- Removing: -')
def _get_zoneupdate(
self, fqdn: _FqdnHostname, records_existing: set[TlsaRecordType],
records_required: set[TlsaRecordType], ttl: int, verbose: bool
self, fqdn: _FqdnHostname, records_existing: set[TlsaResourceRecord],
records_required: set[TlsaResourceRecord], ttl: int, verbose: bool
) -> UpdateMessage:
"""
Return an `UpdateMessage` for updating this FQDN. The assumption
@ -167,8 +166,8 @@ class _DnsServerInfo(object):
return update
def _are_records_same(
self, fqdn: _FqdnHostname, records_existing: set[TlsaRecordType],
records_required: set[TlsaRecordType], verbose: bool
self, fqdn: _FqdnHostname, records_existing: set[TlsaResourceRecord],
records_required: set[TlsaResourceRecord], verbose: bool
) -> bool:
'Return trueness of similarity while logging.'
if records_existing != records_required:
@ -214,11 +213,11 @@ _serverinfo_cache = dict[Configuration, list[_DnsServerInfo]]()
class _TlsaHostPrefix(object):
'The TLSA prefix to a hostname.'
protocol: TlsaProtocol
number: int
portnumber: int
@cached_property
def as_str(self) -> str:
return f'_{self.number}.{self.protocol.value}'
return f'_{self.portnumber}.{self.protocol.value}'
def __hash__(self) -> int:
return hash(self.as_str)
@ -231,9 +230,9 @@ class _TlsaRecordsConfig(object):
for a hostname.
"""
prefix: _TlsaHostPrefix
usage: list[TlsaUsage]
selector: list[TlsaSelector]
matching_type: list[TlsaMatchingType]
ttl: int
time_to_reload: int
rrtypes: list[AdoptedCertHostitemRecordsitemRrtypeitemConfig]
@staticmethod
def from_adopted(
@ -247,14 +246,17 @@ class _TlsaRecordsConfig(object):
result = list[_TlsaRecordsConfig]()
def_tlsa = config.defaults.tlsa
for item in records:
usage = item.usage or def_tlsa.usage
selector = item.selector or def_tlsa.selector
matching_type = item.matching_type or def_tlsa.matching_type
ttl = item.ttl if isinstance(item.ttl, int) else def_tlsa.ttl
time_to_reload = item.time_to_reload \
if isinstance(item.time_to_reload, int) \
else def_tlsa.time_to_reload
rrtypes = item.rrtypes if isinstance(item.rrtypes, list) else [
x.adopted_type for x in def_tlsa.rrtypes]
prefix = _TlsaHostPrefix(
protocol=item.protocol, number=item.number)
protocol=item.protocol, portnumber=item.portnumber)
result.append(_TlsaRecordsConfig(
prefix=prefix, usage=usage, selector=selector,
matching_type=matching_type))
prefix=prefix, ttl=ttl, time_to_reload=time_to_reload,
rrtypes=rrtypes))
return result
@staticmethod
@ -262,28 +264,28 @@ class _TlsaRecordsConfig(object):
'Return configuration defaults.'
result = list[_TlsaRecordsConfig]()
def_tlsa = config.defaults.tlsa
for item in def_tlsa.ports:
for item in def_tlsa.records:
prefix = _TlsaHostPrefix(
protocol=TlsaProtocol.from_config_literal(data=item.protocol),
number=item.number)
portnumber=item.portnumber)
result.append(_TlsaRecordsConfig(
prefix=prefix, usage=def_tlsa.usage,
selector=def_tlsa.selector,
matching_type=def_tlsa.matching_type))
prefix=prefix, ttl=def_tlsa.ttl,
time_to_reload=def_tlsa.time_to_reload,
rrtypes=[x.adopted_type for x in def_tlsa.rrtypes]))
return result
def required_records_for_cert(
self, cert: DaneCert, time_to_reload: int) -> set[TlsaRecordType]:
self, cert: DaneCert) -> set[TlsaResourceRecord]:
"""
Return the currently required `TlsaRecordType`s for the passed
`DaneCert`.
"""
result = set[TlsaRecordType]()
iterator = product(self.usage, self.selector, self.matching_type)
for usage, selector, matching_type in iterator:
result = set[TlsaResourceRecord]()
for rrtype in self.rrtypes:
result.update(cert.get_tlsa_recordtypes(
usage=usage, selector=selector, matching_type=matching_type,
time_to_reload=time_to_reload))
usage=rrtype.usage, selector=rrtype.selector,
matching_type=rrtype.matching_type,
time_to_reload=self.time_to_reload))
return result
@ -328,7 +330,7 @@ class _FqdnHostname(object):
return hash(self._as_fqdn)
_TlsaRecordsDict = dict[_FqdnHostname, set[TlsaRecordType]]
_TlsaRecordsDict = dict[_FqdnHostname, set[TlsaResourceRecord]]
@dataclass
@ -339,8 +341,6 @@ class HostnameConfig(object):
"""
name: str
zone: str
ttl: int
time_to_reload: int
servers: list[_DnsServerInfo]
records: list[_TlsaRecordsConfig]
@ -353,8 +353,7 @@ class HostnameConfig(object):
"""
host, zone = get_host_and_domain(hostname=hostname)
return HostnameConfig(
name=host, zone=zone, ttl=config.defaults.tlsa.ttl,
time_to_reload=config.defaults.tlsa.time_to_reload,
name=host, zone=zone,
servers=_DnsServerInfo.get_defaults(config=config),
records=_TlsaRecordsConfig.get_defaults(config=config))
@ -369,15 +368,10 @@ class HostnameConfig(object):
"""
result = list[HostnameConfig]()
def_servers = _DnsServerInfo.get_defaults(config=config)
def_ttl = config.defaults.tlsa.ttl
def_time_to_reload = config.defaults.tlsa.time_to_reload
def_records = _TlsaRecordsConfig.get_defaults(config=config)
for item in hosts:
zone = item.zone or \
get_host_and_domain(hostname=cert.common_names[0])[1]
ttl = def_ttl if item.ttl is None else item.ttl
time_to_reload = def_time_to_reload \
if item.time_to_reload is None else item.time_to_reload
servers = def_servers if item.servers is None \
else _DnsServerInfo.from_adopted(
servers=item.servers, config=config)
@ -385,9 +379,7 @@ class HostnameConfig(object):
else _TlsaRecordsConfig.from_adopted(
records=item.records, config=config)
result.append(HostnameConfig(
name=item.name, zone=zone, ttl=ttl,
time_to_reload=time_to_reload, servers=servers,
records=records))
name=item.name, zone=zone, servers=servers, records=records))
return result
def get_tlsa_records(self, cert: DaneCert) -> _TlsaRecordsDict:
@ -396,6 +388,5 @@ class HostnameConfig(object):
for record in self.records:
key = _FqdnHostname.construct(
prefix=record.prefix, name=self.name, zone=self.zone)
result[key] = record.required_records_for_cert(
cert=cert, time_to_reload=self.time_to_reload)
result[key] = record.required_records_for_cert(cert=cert)
return result

View file

@ -11,9 +11,9 @@ from ..config_types.common import TlsaMatchingType, TlsaSelector, TlsaUsage
@dataclass
class TlsaRecordType(object):
class TlsaResourceRecord(object):
"""
A generated TLSA record type. See:
A generated TLSA resource record type. See:
https://en.wikipedia.org/wiki/DNS-based_Authentication_of_Named_Entities
"""
usage: TlsaUsage
@ -23,7 +23,7 @@ class TlsaRecordType(object):
@staticmethod
def from_rdtype(item: TLSA):
return TlsaRecordType(
return TlsaResourceRecord(
usage=TlsaUsage(value=item.usage),
selector=TlsaSelector(value=item.selector),
matching_type=TlsaMatchingType(value=item.mtype), data=item.cert)
@ -36,7 +36,7 @@ class TlsaRecordType(object):
mtype=self.matching_type.value, cert=self.data)
def __eq__(self, other: object) -> bool:
if isinstance(other, TlsaRecordType):
if isinstance(other, TlsaResourceRecord):
return self.usage == other.usage \
and self.selector == other.selector \
and self.matching_type == other.matching_type \