munin-f2b-remote/f2b-responder.py

147 lines
4.9 KiB
Python
Executable file

#!/usr/bin/env python
from contextlib import suppress
from http import HTTPStatus
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from random import choices
from signal import SIGTERM, signal
from socket import AF_INET6, IPPROTO_IPV6, IPV6_V6ONLY, socket
from sqlite3 import connect
from string import ascii_lowercase
from tempfile import gettempdir
from threading import Thread
from types import FrameType
from typing import NamedTuple
_F2B_DBPATH = Path('/', 'var', 'db', 'fail2ban', 'fail2ban.sqlite3')
_CONFIG_RESULT = """\
graph_category fail2ban
graph_title {db_table} DB table statistics
graph_vlabel Database record count by jail name
graph_printf %3.0lf
"""
class _JailInfo(NamedTuple):
id: int
name: str
class F2bRemoteRequestHandler(SimpleHTTPRequestHandler):
server_version = 'F2bRemoteRequestHandler'
_randompath = Path(
gettempdir(),
''.join(choices(population=ascii_lowercase, k=15)))
def __init__(
self, request: socket, client_address: tuple[str, int],
server: ThreadingHTTPServer):
super().__init__(
request=request, client_address=client_address, server=server,
directory=str(self._randompath))
def __send_403(self) -> None:
'Send a HTTP 403.'
self.send_error(code=HTTPStatus.FORBIDDEN)
def __get_jails(self) -> list[_JailInfo]:
'Return the list of jails.'
cursor = self.__connection.cursor()
cursor.execute('SELECT `ROWID`, `name` FROM `jails` ORDER BY `ROWID`')
result = list[_JailInfo]()
while row := cursor.fetchone():
result.append(_JailInfo(*row))
cursor.close()
return result
def __send_data_response(self, body: str) -> None:
'Encode and send a response with a HTTP 200.'
body_bytes = body.encode(encoding='utf-8', errors='surrogateescape')
self.send_response(code=HTTPStatus.OK)
self.send_header(
keyword='Content-Type', value='text/plain; charset=utf-8')
self.send_header(keyword='Content-Length', value=str(len(body_bytes)))
self.send_header(keyword='Connection', value='close')
self.end_headers()
self.wfile.write(body_bytes)
def __get_config(self, db_table: str) -> None:
'Send the current plugin configuration for `db_table`.'
message = _CONFIG_RESULT.format(db_table=db_table)
for jail in self.__get_jails():
message += f'usercount{jail.id}.label {jail.name}\n'
message += f'usercount{jail.id}.draw LINE1\n'
self.__send_data_response(body=message)
def __get_data(self, db_table: str) -> None:
'Return organized data for the requested `db_table`.'
output = ''
cursor = self.__connection.cursor()
jails = self.__get_jails()
in_clause = ', '.join(f'\'{jail.name}\'' for jail in jails)
db_query = (
f'SELECT `jail`, COUNT(*) FROM `{db_table}` WHERE `jail` IN '
f'({in_clause}) GROUP BY `jail`')
cursor.execute(db_query)
result = dict[str, int]()
while row := cursor.fetchone():
jail_name, count = row
result[jail_name] = count
cursor.close()
for jail_id, jail_name in jails:
count = result.get(jail_name) or 0
output += f'usercount{jail_id}.value {count}\n'
self.__send_data_response(body=output)
def do_GET(self) -> None:
'Overwriting `do_GET`.'
self.__connection = connect(database=_F2B_DBPATH)
match(self.path):
case '/db/bans/config/':
self.__get_config(db_table='bans')
case '/db/bips/config/':
self.__get_config(db_table='bips')
case '/db/bans/data/':
self.__get_data(db_table='bans')
case '/db/bips/data/':
self.__get_data(db_table='bips')
case _:
self.__send_403()
self.__connection.close()
do_HEAD = do_POST = __send_403
class DualStackThreadingHttpServer(ThreadingHTTPServer):
address_family = AF_INET6
def server_bind(self):
# suppress exception when protocol is IPv4-only
with suppress(OSError):
self.socket.setsockopt(IPPROTO_IPV6, IPV6_V6ONLY, 0)
return super().server_bind()
server = DualStackThreadingHttpServer(
server_address=('', 8000), RequestHandlerClass=F2bRemoteRequestHandler,
bind_and_activate=True)
def shutdown(signalnum: int, frame: FrameType | None):
'Shut down upon SIGTERM.'
print('Shutting down.')
server.shutdown()
if __name__ == '__main__':
signal(signalnum=SIGTERM, handler=shutdown)
run_thread = Thread(
target=server.serve_forever, kwargs=dict(poll_interval=1))
print('Serving.')
run_thread.start()
try:
run_thread.join()
except KeyboardInterrupt:
pass
print('Server is shut down.')