#!/usr/bin/env python from contextlib import suppress from http import HTTPStatus from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from random import choices from signal import SIGTERM, signal from socket import AF_INET6, IPPROTO_IPV6, IPV6_V6ONLY, socket from sqlite3 import connect from string import ascii_lowercase from sys import stderr from tempfile import gettempdir from threading import Thread from types import FrameType from typing import NamedTuple _F2B_DBPATH = Path('/', 'var', 'db', 'fail2ban', 'fail2ban.sqlite3') _CONFIG_RESULT = """\ graph_category fail2ban graph_title {db_table} DB table statistics graph_vlabel Database record count by jail name graph_printf %3.0lf """ def _log_message(message: str) -> None: 'Do a simple log on `stderr`.' stderr.write(f'=== {message}\n') class _JailInfo(NamedTuple): id: int name: str class F2bRemoteRequestHandler(SimpleHTTPRequestHandler): server_version = 'F2bRemoteRequestHandler' _randompath = Path( gettempdir(), ''.join(choices(population=ascii_lowercase, k=15))) def __init__( self, request: socket, client_address: tuple[str, int], server: ThreadingHTTPServer): super().__init__( request=request, client_address=client_address, server=server, directory=str(self._randompath)) def __send_403(self) -> None: 'Send a HTTP 403.' self.send_error(code=HTTPStatus.FORBIDDEN) def __get_jails(self) -> list[_JailInfo]: 'Return the list of jails.' cursor = self.__connection.cursor() cursor.execute('SELECT `ROWID`, `name` FROM `jails` ORDER BY `ROWID`') result = list[_JailInfo]() while row := cursor.fetchone(): result.append(_JailInfo(*row)) cursor.close() return result def __send_data_response(self, body: str) -> None: 'Encode and send a response with a HTTP 200.' body_bytes = body.encode(encoding='utf-8', errors='surrogateescape') self.send_response(code=HTTPStatus.OK) self.send_header( keyword='Content-Type', value='text/plain; charset=utf-8') self.send_header(keyword='Content-Length', value=str(len(body_bytes))) self.send_header(keyword='Connection', value='close') self.end_headers() self.wfile.write(body_bytes) def __get_config(self, db_table: str) -> None: 'Send the current plugin configuration for `db_table`.' message = _CONFIG_RESULT.format(db_table=db_table) for jail in self.__get_jails(): message += f'usercount{jail.id}.label {jail.name}\n' message += f'usercount{jail.id}.draw LINE1\n' self.__send_data_response(body=message) def __get_data(self, db_table: str) -> None: 'Return organized data for the requested `db_table`.' output = '' cursor = self.__connection.cursor() jails = self.__get_jails() in_clause = ', '.join(f'\'{jail.name}\'' for jail in jails) db_query = ( f'SELECT `jail`, COUNT(*) FROM `{db_table}` WHERE `jail` IN ' f'({in_clause}) GROUP BY `jail`') cursor.execute(db_query) result = dict[str, int]() while row := cursor.fetchone(): jail_name, count = row result[jail_name] = count cursor.close() for jail_id, jail_name in jails: count = result.get(jail_name) or 0 output += f'usercount{jail_id}.value {count}\n' self.__send_data_response(body=output) def do_GET(self) -> None: 'Overwriting `do_GET`.' self.__connection = connect(database=_F2B_DBPATH) match(self.path): case '/db/bans/config/': self.__get_config(db_table='bans') case '/db/bips/config/': self.__get_config(db_table='bips') case '/db/bans/data/': self.__get_data(db_table='bans') case '/db/bips/data/': self.__get_data(db_table='bips') case _: self.__send_403() self.__connection.close() do_HEAD = do_POST = __send_403 class DualStackThreadingHttpServer(ThreadingHTTPServer): address_family = AF_INET6 def server_bind(self): # suppress exception when protocol is IPv4-only with suppress(OSError): self.socket.setsockopt(IPPROTO_IPV6, IPV6_V6ONLY, 0) return super().server_bind() server = DualStackThreadingHttpServer( server_address=('', 8000), RequestHandlerClass=F2bRemoteRequestHandler, bind_and_activate=True) def shutdown(signalnum: int, frame: FrameType | None): 'Shut down upon SIGTERM.' _log_message('Shutting down.') server.shutdown() if __name__ == '__main__': signal(signalnum=SIGTERM, handler=shutdown) run_thread = Thread( target=server.serve_forever, kwargs=dict(poll_interval=1)) _log_message('Serving.') run_thread.start() try: run_thread.join() except KeyboardInterrupt: pass _log_message('Server is shut down.')