#!/usr/bin/env python from contextlib import suppress from http import HTTPStatus from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from random import choices from signal import SIGTERM, signal from socket import AF_INET6, IPPROTO_IPV6, IPV6_V6ONLY, socket from sqlite3 import connect from string import ascii_lowercase from sys import stderr from tempfile import gettempdir from threading import Thread from types import FrameType from typing import Literal, NamedTuple _F2B_DBPATH = Path('/', 'var', 'db', 'fail2ban', 'fail2ban.sqlite3') _CONFIG_RESULT_ALL = """\ graph_category fail2ban graph_title ALL RECORDS in DB table '{db_table}' graph_vlabel Database record count by jail name graph_printf %3.0lf """ _CONFIG_RESULT_CURRENT = """\ graph_category fail2ban graph_title CURRENTLY BANNED in DB table '{db_table}' graph_vlabel Currently banned IPs by jail name graph_printf %3.0lf """ _DB_QUERY_ALL = ( 'SELECT `jails`.`rowid`, COUNT(`{db_table}`.`jail`) FROM `jails` LEFT ' 'JOIN `{db_table}` ON `jails`.`name` = `{db_table}`.`jail` GROUP BY ' '`jails`.`name` ORDER BY `jails`.`rowid`') _DB_QUERY_CURRENT = ( 'SELECT `jails`.`rowid`, (SELECT COUNT(`{db_table}`.`rowid`) FROM ' '`{db_table}` WHERE `{db_table}`.`jail` = `jails`.`name` AND `{db_table}`' '.`timeofban` + `{db_table}`.`bantime` > UNIXEPOCH()) FROM `jails` ORDER ' 'BY `jails`.`rowid`;') def _log_message(message: str) -> None: 'Do a simple log on `stderr`.' stderr.write(f'=== {message}\n') class _JailNameResult(NamedTuple): 'Result from listing jail names.' id: int name: str class _JailDataResult(NamedTuple): 'Result from getting jail data.' id: int value: int class F2bRemoteRequestHandler(SimpleHTTPRequestHandler): server_version = 'F2bRemoteRequestHandler' _randompath = Path( gettempdir(), ''.join(choices(population=ascii_lowercase, k=15))) def __init__( self, request: socket, client_address: tuple[str, int], server: ThreadingHTTPServer): super().__init__( request=request, client_address=client_address, server=server, directory=str(self._randompath)) def __send_403(self) -> None: 'Send a HTTP 403.' self.send_error(code=HTTPStatus.FORBIDDEN) def __get_jails(self) -> list[_JailNameResult]: 'Return the list of jails.' cursor = self.__connection.cursor() cursor.execute('SELECT `ROWID`, `name` FROM `jails` ORDER BY `ROWID`') result = list[_JailNameResult]() while row := cursor.fetchone(): result.append(_JailNameResult(*row)) cursor.close() return result def __send_data_response(self, body: str) -> None: 'Encode and send a response with a HTTP 200.' body_bytes = body.encode(encoding='utf-8', errors='surrogateescape') self.send_response(code=HTTPStatus.OK) self.send_header( keyword='Content-Type', value='text/plain; charset=utf-8') self.send_header(keyword='Content-Length', value=str(len(body_bytes))) self.send_header(keyword='Connection', value='close') self.end_headers() self.wfile.write(body_bytes) def __get_config( self, db_table: str, qtype: Literal['all', 'current']) -> None: 'Send the current plugin configuration for `db_table`.' template = \ _CONFIG_RESULT_ALL if qtype == 'all' else _CONFIG_RESULT_CURRENT message = template.format(db_table=db_table) for jail in self.__get_jails(): message += f'usercount{jail.id}.label {jail.name}\n' message += f'usercount{jail.id}.draw LINE1\n' self.__send_data_response(body=message) def __get_data( self, db_table: str, qtype: Literal['all', 'current']) -> None: 'Return organized data for the requested `db_table`.' output = '' cursor = self.__connection.cursor() template = _DB_QUERY_ALL if qtype == 'all' else _DB_QUERY_CURRENT cursor.execute(template.format(db_table=db_table)) while row := cursor.fetchone(): data = _JailDataResult(*row) output += f'usercount{data.id}.value {data.value}\n' cursor.close() self.__send_data_response(body=output) def do_GET(self) -> None: 'Overwriting `do_GET`.' self.__connection = connect(database=_F2B_DBPATH) match self.path: case '/db-all/bans/config/': self.__get_config(db_table='bans', qtype='all') case '/db-all/bips/config/': self.__get_config(db_table='bips', qtype='all') case '/db-all/bans/data/': self.__get_data(db_table='bans', qtype='all') case '/db-all/bips/data/': self.__get_data(db_table='bips', qtype='all') case '/db-current/bans/config/': self.__get_config(db_table='bans', qtype='current') case '/db-current/bips/config/': self.__get_config(db_table='bips', qtype='current') case '/db-current/bans/data/': self.__get_data(db_table='bans', qtype='current') case '/db-current/bips/data/': self.__get_data(db_table='bips', qtype='current') case _: self.__send_403() self.__connection.close() do_HEAD = do_POST = __send_403 class DualStackThreadingHttpServer(ThreadingHTTPServer): address_family = AF_INET6 def server_bind(self): # suppress exception when protocol is IPv4-only with suppress(OSError): self.socket.setsockopt(IPPROTO_IPV6, IPV6_V6ONLY, 0) return super().server_bind() server = DualStackThreadingHttpServer( server_address=('', 8000), RequestHandlerClass=F2bRemoteRequestHandler, bind_and_activate=True) def shutdown(signalnum: int, frame: FrameType | None): 'Shut down upon SIGTERM.' _log_message('Shutting down.') server.shutdown() if __name__ == '__main__': signal(signalnum=SIGTERM, handler=shutdown) run_thread = Thread( target=server.serve_forever, kwargs=dict(poll_interval=1)) _log_message('Serving.') run_thread.start() try: run_thread.join() except KeyboardInterrupt: pass _log_message('Server is shut down.')