From 1122ab984366132b9fb5db250fc8675968e83817 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A1szl=C3=B3=20K=C3=A1rolyi?= Date: Tue, 6 Aug 2024 23:57:08 +0200 Subject: [PATCH] Be able to show current bans --- f2b-responder.py | 60 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 41 insertions(+), 19 deletions(-) diff --git a/f2b-responder.py b/f2b-responder.py index 625213a..ebb2f89 100755 --- a/f2b-responder.py +++ b/f2b-responder.py @@ -13,15 +13,27 @@ from sys import stderr from tempfile import gettempdir from threading import Thread from types import FrameType -from typing import NamedTuple +from typing import Literal, NamedTuple _F2B_DBPATH = Path('/', 'var', 'db', 'fail2ban', 'fail2ban.sqlite3') -_CONFIG_RESULT = """\ +_CONFIG_RESULT_ALL = """\ graph_category fail2ban -graph_title {db_table} DB table statistics +graph_title {db_table} DB table statistics - ALL RECORDS graph_vlabel Database record count by jail name graph_printf %3.0lf """ +_CONFIG_RESULT_CURRENT = """\ +graph_category fail2ban +graph_title {db_table} DB table statistics - CURRENTLY BANNED +graph_vlabel Currently banned IPs by jail name +graph_printf %3.0lf +""" +_DB_QUERY_ALL = ( + 'SELECT `jail`, COUNT(*) FROM `{db_table}` WHERE `jail` IN ({in_clause}) ' + 'GROUP BY `jail`') +_DB_QUERY_CURRENT = ( + 'SELECT `jail`, COUNT(*) FROM `{db_table}` WHERE `timeofban` + `bantime` ' + '> UNIXEPOCH() AND `jail` IN ({in_clause}) GROUP BY `jail`') def _log_message(message: str) -> None: @@ -72,24 +84,26 @@ class F2bRemoteRequestHandler(SimpleHTTPRequestHandler): self.end_headers() self.wfile.write(body_bytes) - def __get_config(self, db_table: str) -> None: + def __get_config( + self, db_table: str, qtype: Literal['all', 'current']) -> None: 'Send the current plugin configuration for `db_table`.' - message = _CONFIG_RESULT.format(db_table=db_table) + template = \ + _CONFIG_RESULT_ALL if qtype == 'all' else _CONFIG_RESULT_CURRENT + message = template.format(db_table=db_table) for jail in self.__get_jails(): message += f'usercount{jail.id}.label {jail.name}\n' message += f'usercount{jail.id}.draw LINE1\n' self.__send_data_response(body=message) - def __get_data(self, db_table: str) -> None: + def __get_data( + self, db_table: str, qtype: Literal['all', 'current']) -> None: 'Return organized data for the requested `db_table`.' output = '' cursor = self.__connection.cursor() jails = self.__get_jails() in_clause = ', '.join(f'\'{jail.name}\'' for jail in jails) - db_query = ( - f'SELECT `jail`, COUNT(*) FROM `{db_table}` WHERE `jail` IN ' - f'({in_clause}) GROUP BY `jail`') - cursor.execute(db_query) + template = _DB_QUERY_ALL if qtype == 'all' else _DB_QUERY_CURRENT + cursor.execute(template.format(db_table=db_table, in_clause=in_clause)) result = dict[str, int]() while row := cursor.fetchone(): jail_name, count = row @@ -103,15 +117,23 @@ class F2bRemoteRequestHandler(SimpleHTTPRequestHandler): def do_GET(self) -> None: 'Overwriting `do_GET`.' self.__connection = connect(database=_F2B_DBPATH) - match(self.path): - case '/db/bans/config/': - self.__get_config(db_table='bans') - case '/db/bips/config/': - self.__get_config(db_table='bips') - case '/db/bans/data/': - self.__get_data(db_table='bans') - case '/db/bips/data/': - self.__get_data(db_table='bips') + match self.path: + case '/db-all/bans/config/': + self.__get_config(db_table='bans', qtype='all') + case '/db-all/bips/config/': + self.__get_config(db_table='bips', qtype='all') + case '/db-all/bans/data/': + self.__get_data(db_table='bans', qtype='all') + case '/db-all/bips/data/': + self.__get_data(db_table='bips', qtype='all') + case '/db-current/bans/config/': + self.__get_config(db_table='bans', qtype='current') + case '/db-current/bips/config/': + self.__get_config(db_table='bips', qtype='current') + case '/db-current/bans/data/': + self.__get_data(db_table='bans', qtype='current') + case '/db-current/bips/data/': + self.__get_data(db_table='bips', qtype='current') case _: self.__send_403() self.__connection.close()