munin-f2b-remote/f2b-responder.py

175 lines
6.2 KiB
Python
Executable file

#!/usr/bin/env python
from contextlib import suppress
from http import HTTPStatus
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from random import choices
from signal import SIGTERM, signal
from socket import AF_INET6, IPPROTO_IPV6, IPV6_V6ONLY, socket
from sqlite3 import connect
from string import ascii_lowercase
from sys import stderr
from tempfile import gettempdir
from threading import Thread
from types import FrameType
from typing import Literal, NamedTuple
_F2B_DBPATH = Path('/', 'var', 'db', 'fail2ban', 'fail2ban.sqlite3')
_CONFIG_RESULT_ALL = """\
graph_category fail2ban
graph_title {db_table} DB table statistics - ALL RECORDS
graph_vlabel Database record count by jail name
graph_printf %3.0lf
"""
_CONFIG_RESULT_CURRENT = """\
graph_category fail2ban
graph_title {db_table} DB table statistics - CURRENTLY BANNED
graph_vlabel Currently banned IPs by jail name
graph_printf %3.0lf
"""
_DB_QUERY_ALL = (
'SELECT `jail`, COUNT(*) FROM `{db_table}` WHERE `jail` IN ({in_clause}) '
'GROUP BY `jail`')
_DB_QUERY_CURRENT = (
'SELECT `jail`, COUNT(*) FROM `{db_table}` WHERE `timeofban` + `bantime` '
'> UNIXEPOCH() AND `jail` IN ({in_clause}) GROUP BY `jail`')
def _log_message(message: str) -> None:
'Do a simple log on `stderr`.'
stderr.write(f'=== {message}\n')
class _JailInfo(NamedTuple):
id: int
name: str
class F2bRemoteRequestHandler(SimpleHTTPRequestHandler):
server_version = 'F2bRemoteRequestHandler'
_randompath = Path(
gettempdir(),
''.join(choices(population=ascii_lowercase, k=15)))
def __init__(
self, request: socket, client_address: tuple[str, int],
server: ThreadingHTTPServer):
super().__init__(
request=request, client_address=client_address, server=server,
directory=str(self._randompath))
def __send_403(self) -> None:
'Send a HTTP 403.'
self.send_error(code=HTTPStatus.FORBIDDEN)
def __get_jails(self) -> list[_JailInfo]:
'Return the list of jails.'
cursor = self.__connection.cursor()
cursor.execute('SELECT `ROWID`, `name` FROM `jails` ORDER BY `ROWID`')
result = list[_JailInfo]()
while row := cursor.fetchone():
result.append(_JailInfo(*row))
cursor.close()
return result
def __send_data_response(self, body: str) -> None:
'Encode and send a response with a HTTP 200.'
body_bytes = body.encode(encoding='utf-8', errors='surrogateescape')
self.send_response(code=HTTPStatus.OK)
self.send_header(
keyword='Content-Type', value='text/plain; charset=utf-8')
self.send_header(keyword='Content-Length', value=str(len(body_bytes)))
self.send_header(keyword='Connection', value='close')
self.end_headers()
self.wfile.write(body_bytes)
def __get_config(
self, db_table: str, qtype: Literal['all', 'current']) -> None:
'Send the current plugin configuration for `db_table`.'
template = \
_CONFIG_RESULT_ALL if qtype == 'all' else _CONFIG_RESULT_CURRENT
message = template.format(db_table=db_table)
for jail in self.__get_jails():
message += f'usercount{jail.id}.label {jail.name}\n'
message += f'usercount{jail.id}.draw LINE1\n'
self.__send_data_response(body=message)
def __get_data(
self, db_table: str, qtype: Literal['all', 'current']) -> None:
'Return organized data for the requested `db_table`.'
output = ''
cursor = self.__connection.cursor()
jails = self.__get_jails()
in_clause = ', '.join(f'\'{jail.name}\'' for jail in jails)
template = _DB_QUERY_ALL if qtype == 'all' else _DB_QUERY_CURRENT
cursor.execute(template.format(db_table=db_table, in_clause=in_clause))
result = dict[str, int]()
while row := cursor.fetchone():
jail_name, count = row
result[jail_name] = count
cursor.close()
for jail_id, jail_name in jails:
count = result.get(jail_name) or 0
output += f'usercount{jail_id}.value {count}\n'
self.__send_data_response(body=output)
def do_GET(self) -> None:
'Overwriting `do_GET`.'
self.__connection = connect(database=_F2B_DBPATH)
match self.path:
case '/db-all/bans/config/':
self.__get_config(db_table='bans', qtype='all')
case '/db-all/bips/config/':
self.__get_config(db_table='bips', qtype='all')
case '/db-all/bans/data/':
self.__get_data(db_table='bans', qtype='all')
case '/db-all/bips/data/':
self.__get_data(db_table='bips', qtype='all')
case '/db-current/bans/config/':
self.__get_config(db_table='bans', qtype='current')
case '/db-current/bips/config/':
self.__get_config(db_table='bips', qtype='current')
case '/db-current/bans/data/':
self.__get_data(db_table='bans', qtype='current')
case '/db-current/bips/data/':
self.__get_data(db_table='bips', qtype='current')
case _:
self.__send_403()
self.__connection.close()
do_HEAD = do_POST = __send_403
class DualStackThreadingHttpServer(ThreadingHTTPServer):
address_family = AF_INET6
def server_bind(self):
# suppress exception when protocol is IPv4-only
with suppress(OSError):
self.socket.setsockopt(IPPROTO_IPV6, IPV6_V6ONLY, 0)
return super().server_bind()
server = DualStackThreadingHttpServer(
server_address=('', 8000), RequestHandlerClass=F2bRemoteRequestHandler,
bind_and_activate=True)
def shutdown(signalnum: int, frame: FrameType | None):
'Shut down upon SIGTERM.'
_log_message('Shutting down.')
server.shutdown()
if __name__ == '__main__':
signal(signalnum=SIGTERM, handler=shutdown)
run_thread = Thread(
target=server.serve_forever, kwargs=dict(poll_interval=1))
_log_message('Serving.')
run_thread.start()
try:
run_thread.join()
except KeyboardInterrupt:
pass
_log_message('Server is shut down.')