From 9eeaed3b143fa998dd2a3b5b6d3ca9724bb93a49 Mon Sep 17 00:00:00 2001 From: moanos Date: Thu, 21 Dec 2023 10:30:22 +0100 Subject: [PATCH] refactor!: Start major cli rewrite #1 --- fediverse_blocklist_tool/cli.py | 189 ++++++++++++++++++++++---------- 1 file changed, 132 insertions(+), 57 deletions(-) diff --git a/fediverse_blocklist_tool/cli.py b/fediverse_blocklist_tool/cli.py index 7ab04f4..51ede91 100644 --- a/fediverse_blocklist_tool/cli.py +++ b/fediverse_blocklist_tool/cli.py @@ -2,14 +2,53 @@ import argparse import json import logging +from typing import List + import requests import os import toml +import validators from fediverse_blocklist_tool.models import Instance from fediverse_blocklist_tool.helpers import blocklist_to_markdown, blocklist_to_toml, blocklist_to_csv, \ blocklist_to_json +SUPPORTED_LOADING_FORMATS = ["toml", "csv", "json"] + +def determine_location(location: str) -> (bool, str, str): + """ + Determines whether a location is remote and splits off an access token if necessary + + :return: A set of (is_remote, location, access_token) where access token can be None + """ + # If the file extension is in supported formats the location is assumed to be local + if location.split(".")[-1] in SUPPORTED_LOADING_FORMATS: + return False, location, None + elif "@" in location: + access_token, remote_server = location.split("@") + else: + remote_server = location + access_token = None + # Validate that the domain is valid + if not validators.domain(remote_server): + error_message = f"Could not parse blocklist location: {location}. Local blocklists must have one of the following formats: {SUPPORTED_LOADING_FORMATS} or a valid domain" + logging.error(error_message) + raise ValueError(error_message) + return True, remote_server, access_token + +def load_blocklist(location: str) -> List[Instance]: + """Load blocklist from the given location + + :param location: location of the blocklist. Remote locations are allowed to contain an access token. Examples: local_blocklist.toml, ACCESS_TOKEN@fediserver1.org + """ + + is_remote, location, access_token = determine_location(location) + if not is_remote: + return load_blocklist_file(location) + else: + if access_token is None: + access_token = os.getenv('FBT_TOKEN') + return load_blocklist_from_instance(location, access_token) def load_blocklist_file(filename: str) -> [Instance]: if filename.endswith("json"): @@ -55,7 +94,7 @@ def exporter(blocklist, output=None, format: str = "toml", private: bool = False exported_text = blocklist_to_toml(blocklist, private) if format == "csv": exported_text = blocklist_to_csv(blocklist, private) - if format == "markdown": + if format == "md": exported_text = blocklist_to_markdown(blocklist, private) if format == "json": exported_text = blocklist_to_json(blocklist, private) @@ -68,91 +107,127 @@ def exporter(blocklist, output=None, format: str = "toml", private: bool = False print(exported_text) -def merge(input_file, merge_target, format: str = "toml", private: bool = False, overwrite=False): +def merge_lists(input_blocklist, merge_target_blocklist, overwrite=False): """Shows a table in the CLI comparing the local and remote blocklist""" from rich.table import Table from rich.console import Console - input_blocklist = load_blocklist_file(input_file) - merge_target_blocklist = load_blocklist_file(merge_target) for input_instance in input_blocklist: # If the block is already there with the same parameters we do nothing if input_instance in merge_target_blocklist: continue # Check if there is a domain in the merge target where the input domain is similar try: - merge_target_instance = [merge_target_instance for merge_target_instance in merge_target_blocklist if input_instance.domain == merge_target_instance.domain][0] + merge_target_instance = [merge_target_instance for merge_target_instance in merge_target_blocklist if + input_instance.domain == merge_target_instance.domain][0] + + key_input = "" if not overwrite: - key_input = "" while key_input not in ("i", "O"): print(f"Different settings for {input_instance.domain} detected.") Instance.show_diff(input_instance, merge_target_instance) - key_input = input("Keep input (i) or original (o) [i/O]") + key_input = input("Keep input (i) or original (O) [i/O]") elif key_input == "i": merge_target_blocklist.append(merge_target_instance) else: merge_target_blocklist.append(input_instance) except KeyError: pass + return merge_target_blocklist + + +def get_format_from_filename(filename: str) -> str: + detected_format = filename.split(".")[-1] + if detected_format not in ["toml", "csv", "md", "json"]: + detected_format = "toml" + return detected_format + + +def diff(args): + if len(args.blocklist) > 2: + raise NotImplemented("Comparing more than two blocklists is not yet supported") + blocklists_to_compare = args.blocklist + blocklist_a = load_blocklist_file(blocklists_to_compare[0]) + blocklist_b = load_blocklist_file(blocklists_to_compare[1]) + Instance.show_diffs(blocklist_a, blocklist_b) + + +def export(args): + blocklist_to_export = load_blocklist_file(args.blocklist_to_export) + export_format = get_format_from_filename(args.target_blocklist) + exporter(blocklist_to_export, args.target, export_format, args.private) + + +def deploy(args): + blocklist_to_deploy = load_blocklist_file(args.blocklist_to_deploy) + target_blocklist = load_blocklist_file(args.target_blocklist) + diffs = Instance.list_diffs(blocklist_to_deploy, target_blocklist) + Instance.apply_blocks_from_diff(diffs, args.server, token, args.no_delete) + + +def merge(args): + blocklist_to_merge = load_blocklist_file(args.blocklist_to_merge) + target_blocklist = load_blocklist_file(args.target) + export_format = get_format_from_filename(args.target) + merge_lists(blocklist_to_merge, target_blocklist, export_format) def cli(): parser = argparse.ArgumentParser(description='Compare, merge, export and deploy blocklist of a fediverse server') - parser.add_argument('action', choices=['diff', 'deploy', 'export', 'merge'], - help="Either use 'diff' to check the difference between local blockĺist and the blocklist on " - "the server, 'deploy' to apply the current local blocklist or 'export' to export the remote " - "blocklist into a local file. merge can be used to merge a blocklist (given by -i) into " - "another (-o)") - parser.add_argument('-s', '--server', help="The address of the server where you want to deploy (e.g. " - "mastodon.social)") - parser.add_argument('-t', '--token', help="Authorization token") - parser.add_argument('-i', '--input-file', help="The blocklist to use") - parser.add_argument('-r', '--remote-blocklist', help="The remote blocklist as json for debugging reasons") - parser.add_argument('-o', '--output', help="Filename where to export the blocklist") parser.add_argument('-v', '--verbose', action='store_true') - parser.add_argument('-n', '--no-delete', action='store_true', help="Do not delete existing blocks") - parser.add_argument('--format', default="toml", type=str, help="Export format: toml|markdown|csv|json") - parser.add_argument('--private', action='store_true', help="When the flag is set, private comment will also be " - "exported.") + subparsers = parser.add_subparsers(required=True) + + diff_parser = subparsers.add_parser('diff') + diff_parser.add_argument('blocklist', + nargs="+", + type=str, + help="The blocklists you want to compare. Provide at least two. Example: " + "lockalblocklist.toml ACCESS_TOKEN@fediserver.org") + diff_parser.set_defaults(func=diff) + + export_parser = subparsers.add_parser('export') + export_parser.add_argument('blocklist_to_export', + type=str, + help="The blocklist you want to export. Example: ACCESS_TOKEN@fediserver.org") + export_parser.add_argument('target', + type=str, + help="Filename where to save the blocklist. The extension will determine the format, " + "if none ist given it'll default to toml") + + export_parser.add_argument('--private', + action='store_true', + default=False, + help="When the flag is set, private comment will also be exported.") + export_parser.set_defaults(func=export) + + merge_parser = subparsers.add_parser('merge') + merge_parser.add_argument('blocklist_to_merge', + type=str, + help="The blocklist you want to merge to the target. Example: lockalblocklist.toml " + "ACCESS_TOKEN@fediserver.org") + merge_parser.add_argument('target', + type=str, + help="Merge target that will hold the merged blocklist. Example: lockalblocklist.toml " + "ACCESS_TOKEN@fediserver.org") + merge_parser.set_defaults(func=merge) + + deploy_parser = subparsers.add_parser('deploy') + deploy_parser.add_argument('blocklist_to_deploy', + type=str, + help="The blocklist you want to deploy to the target. Example: lockalblocklist.toml " + "ACCESS_TOKEN@fediserver.org") + deploy_parser.add_argument('target', + type=str, + help="Deploy target that will hold the merged blocklist. Example: lockalblocklist.toml " + "ACCESS_TOKEN@fediserver.org") + deploy_parser.add_argument('-n', '--no-delete', action='store_true', help="Do not delete existing blocks") + deploy_parser.set_defaults(func=deploy) + args = parser.parse_args() if args.verbose: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.WARN) - - if args.token: - token = args.token - else: - token = os.getenv('MBD_TOKEN') - - """Get a remote blocklist only when necessary""" - if args.action in ["diff", "deploy", "export"]: - """if there is a remote blocklist provided load this instead of fetching it from a server (for debugging reasons)""" - if args.remote_blocklist: - remote_blocklist = load_blocklist_file(args.remote_blocklist) - else: - remote_blocklist = load_blocklist_from_instance(server=args.server, token=token) - - """Load local blocklist only when needed""" - if args.action in ["diff", "deploy", "merge"]: - if args.input_file: - blocklist_filename = args.input_file - else: - blocklist_filename = "../blocklist.toml" - try: - local_blocklist = load_blocklist_file(blocklist_filename) - except FileNotFoundError: - print("Local blocklist file was not found. Make sure to specify it's location via -i") - exit() - - if args.action == "diff": - Instance.show_diffs(local_blocklist, remote_blocklist) - elif args.action == "deploy": - diffs = Instance.list_diffs(local_blocklist, remote_blocklist) - Instance.apply_blocks_from_diff(diffs, args.server, token, args.no_delete) - elif args.action == "export": - exporter(remote_blocklist, args.output, args.format, args.private) - elif args.action == "merge": - merge(args.input_file, args.output, args.format, args.private) + args.func(args) if __name__ == "__main__":