Working amazon collection & checking

This commit is contained in:
László Károlyi 2022-02-12 11:34:09 +01:00
parent f52cca7c65
commit 9b2c8c60b4
Signed by: karolyi
GPG key ID: 2DCAF25E55735BFE
10 changed files with 200 additions and 5 deletions

3
.gitignore vendored
View file

@ -138,3 +138,6 @@ dmypy.json
# Cython debug symbols
cython_debug/
icygov.yaml
pyrightconfig.json
data/

3
.isort.cfg Normal file
View file

@ -0,0 +1,3 @@
[settings]
multi_line_output=4
src_paths=backend

0
__init__.py Normal file
View file

20
config.py Normal file
View file

@ -0,0 +1,20 @@
from os import environ
from pathlib import Path
from yaml import load
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
_CONFPATH = environ.get('ICYGOV_CONFIG')
if not _CONFPATH:
raise FileNotFoundError('ICYGOV_CONFIG must be an environment variable!')
_PATH_CONF = Path(_CONFPATH)
if not _PATH_CONF.exists():
raise FileNotFoundError(_CONFPATH)
with _PATH_CONF.open() as fd:
CONFIG = load(stream=fd, Loader=Loader)

1
icygov-sample.yaml Normal file
View file

@ -0,0 +1 @@
path_datadir: /etc/icygov/data

0
ip_collector/__init__.py Normal file
View file

89
ip_collector/dl_amazon.py Normal file
View file

@ -0,0 +1,89 @@
__doc__ = 'Downloader for amazon IP ranges'
from functools import lru_cache
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
from json import loads
from pathlib import Path
from tempfile import mkstemp
from typing import Optional
from urllib.request import urlopen
from ..config import CONFIG
from .typing import Ip4DictType, Ip6DictType
# https://docs.aws.amazon.com/general/latest/gr/aws-ip-ranges.html
URL = 'https://ip-ranges.amazonaws.com/ip-ranges.json'
_SEP = '||'
_PATH_MY_DATA_V4 = Path(CONFIG['path_datadir'], 'amazon-ipv4.txt')
_PATH_MY_DATA_V6 = Path(CONFIG['path_datadir'], 'amazon-ipv6.txt')
_IP4_DICT: Ip4DictType = {}
_IP6_DICT: Ip6DictType = {}
if not _PATH_MY_DATA_V4.is_file():
_PATH_MY_DATA_V4.touch()
if not _PATH_MY_DATA_V6.is_file():
_PATH_MY_DATA_V6.touch()
@lru_cache(maxsize=50)
def _amazon_match_v4(address: IPv4Address) -> Optional[str]:
'Try to match an IPv4 address, return data if it matches.'
for network, description in _IP4_DICT.items():
if address in network:
return f'{address} in {network}: {description}'
@lru_cache(maxsize=50)
def _amazon_match_v6(address: IPv6Address) -> Optional[str]:
'Try to match an IPv6 address, return data if it matches.'
for network, description in _IP6_DICT.items():
if address in network:
return f'{address} in {network}: {description}'
def _amazon_load():
'Load data from files'
global _IP4_DICT, _IP6_DICT
my_ip4dict: Ip4DictType = {}
my_ip6dict: Ip6DictType = {}
with _PATH_MY_DATA_V4.open('r') as fd:
for line in fd:
line = line.strip()
if not line:
continue
address, description = line.split(_SEP)
my_ip4dict[IPv4Network(address=address)] = description
with _PATH_MY_DATA_V6.open('r') as fd:
for line in fd:
line = line.strip()
if not line:
continue
address, description = line.split(_SEP)
my_ip6dict[IPv6Network(address=address)] = description
_IP4_DICT = my_ip4dict
_IP6_DICT = my_ip6dict
def _amazon_download():
'Download, parse and save files.'
with urlopen(url=URL) as fd:
data = loads(s=fd.read())
my_tmp_fd_v4, my_tmp_path_v4 = mkstemp()
with open(my_tmp_fd_v4, 'w') as fd:
for ip4_items in data['prefixes']:
fd.write(
f'{ip4_items["ip_prefix"]}{_SEP}'
f'Amazon: {ip4_items["service"]} '
f'{ip4_items["network_border_group"]}\n')
my_tmp_fd_v6, my_tmp_path_v6 = mkstemp()
Path(_PATH_MY_DATA_V4).write_text(Path(my_tmp_path_v4).read_text())
Path(my_tmp_path_v4).unlink()
with open(my_tmp_fd_v4, 'w') as fd:
for ip6_items in data['ipv6_prefixes']:
fd.write(
f'{ip6_items["ipv6_prefix"]}{_SEP}'
f'Amazon: {ip6_items["service"]} '
f'{ip6_items["network_border_group"]}\n')
Path(_PATH_MY_DATA_V6).write_text(Path(my_tmp_path_v6).read_text())
Path(my_tmp_path_v6).unlink()

35
ip_collector/reader.py Normal file
View file

@ -0,0 +1,35 @@
from functools import lru_cache
from ipaddress import IPv4Address, IPv6Address
from typing import Optional, Union
from .dl_amazon import (
_amazon_download, _amazon_load, _amazon_match_v4, _amazon_match_v6)
@lru_cache(maxsize=50)
def _match_v4(address: IPv4Address) -> Optional[str]:
return _amazon_match_v4(address=address)
@lru_cache(maxsize=50)
def _match_v6(address: IPv6Address) -> Optional[str]:
return _amazon_match_v6(address=address)
def match(address: Union[IPv4Address, IPv6Address]) -> Optional[str]:
if type(address) is IPv4Address:
return _match_v4(address=address)
return _match_v6(address=address)
def redownload_ranges():
'Redownload the ranges from all available providers.'
_amazon_download()
def load():
'Load values from file.'
_amazon_load()
load()

5
ip_collector/typing.py Normal file
View file

@ -0,0 +1,5 @@
from ipaddress import IPv4Network, IPv6Network
from typing import Dict
Ip4DictType = Dict[IPv4Network, str]
Ip6DictType = Dict[IPv6Network, str]

49
main.py
View file

@ -1,11 +1,50 @@
#!/usr/bin/env python3
from time import sleep
from ipaddress import IPv4Network, ip_address
from flask.app import Flask
from flask.globals import request
from flask.helpers import make_response
from flask.wrappers import Response
from .ip_collector.reader import redownload_ranges, match
app = Flask(import_name="icy_governor")
BASE_PATH = '/icy-governor'
def main():
while True:
sleep(10)
@app.route(rule=f'{BASE_PATH}/reload-ranges/', methods=['GET'])
def reload_ranges() -> Response:
redownload_ranges()
response = make_response('', 200)
return response
@app.route(rule=f'{BASE_PATH}/listener-add/', methods=['POST'])
def listener_add() -> Response:
app.logger.debug('Add data received: %s', request.form)
response = make_response('alrighty', 200)
ip = request.form.get('ip')
if not ip:
response.headers['Icecast-Auth-Message'] = 'IP not specified'
return response
ip_addr = ip_address(address=ip)
result = match(address=ip_addr)
app.logger.debug('IP is: %s, result: %r', ip_addr, result)
if result:
response.headers['Icecast-Auth-Message'] = result
else:
response.headers['Icecast-Auth-User'] = 1
# response.headers['icecast-auth-timelimit'] = 10
return response
@app.route(rule=f'{BASE_PATH}/listener-remove/', methods=['POST'])
def listener_remove() -> Response:
app.logger.debug('Remove data received: %s', request.form)
response = make_response('alrighty', 200)
return response
if __name__ == '__main__':
main()
app.run()