fediverse-blocklist-tool/fediverse_blocklist_tool/cli.py

235 lines
9.3 KiB
Python

# import tomllib
import argparse
import json
import logging
from typing import List
import requests
import os
import toml
import validators
from fediverse_blocklist_tool.models import Instance
from fediverse_blocklist_tool.helpers import blocklist_to_markdown, blocklist_to_toml, blocklist_to_csv, \
blocklist_to_json
SUPPORTED_LOADING_FORMATS = ["toml", "csv", "json"]
def determine_location(location: str) -> (bool, str, str):
"""
Determines whether a location is remote and splits off an access token if necessary
:return: A set of (is_remote, location, access_token) where access token can be None
"""
# If the file extension is in supported formats the location is assumed to be local
if location.split(".")[-1] in SUPPORTED_LOADING_FORMATS:
return False, location, None
elif "@" in location:
access_token, remote_server = location.split("@")
else:
remote_server = location
access_token = None
# Validate that the domain is valid
if not validators.domain(remote_server):
error_message = f"Could not parse blocklist location: {location}. Local blocklists must have one of the following formats: {SUPPORTED_LOADING_FORMATS} or a valid domain"
logging.error(error_message)
raise ValueError(error_message)
return True, remote_server, access_token
def load_blocklist(location: str) -> List[Instance]:
"""Load blocklist from the given location
:param location: location of the blocklist. Remote locations are allowed to contain an access token. Examples: local_blocklist.toml, ACCESS_TOKEN@fediserver1.org
"""
is_remote, location, access_token = determine_location(location)
if not is_remote:
return load_blocklist_file(location)
else:
if access_token is None:
access_token = os.getenv('FBT_TOKEN')
return load_blocklist_from_instance(location, access_token)
def load_blocklist_file(filename: str) -> [Instance]:
if filename.endswith("json"):
with open(filename) as f:
instances = blocklist_json_to_instances(json.load(f))
else:
with open(filename, "r") as f:
data = toml.load(f)
instances = []
for instance_dict in data["instances"]:
instance = Instance(instance_dict)
instances.append(instance)
return instances
def blocklist_json_to_instances(blocklist_json: str) -> [Instance]:
instances = []
for i in blocklist_json:
instances.append(Instance(i))
return instances
def load_blocklist_from_instance(server: str, token: str) -> [Instance]:
headers = {
f'Authorization': f'Bearer {token}',
}
response = requests.get(f'https://{server}/api/v1/admin/domain_blocks', headers=headers)
if response.status_code == 200:
blocklist_json = json.loads(response.content)
return blocklist_json_to_instances(blocklist_json)
else:
raise ConnectionError(f"Could not connect to the server ({response.status_code}: {response.reason})")
def remove_key_from_dict(dict, key):
del dict[key]
return dict
def exporter(blocklist, output=None, format: str = "toml", private: bool = False):
if format == "toml":
exported_text = blocklist_to_toml(blocklist, private)
if format == "csv":
exported_text = blocklist_to_csv(blocklist, private)
if format == "md":
exported_text = blocklist_to_markdown(blocklist, private)
if format == "json":
exported_text = blocklist_to_json(blocklist, private)
# Output the text
if output is not None:
with open(output, "w") as f:
f.write(exported_text)
else:
print(exported_text)
def merge_lists(input_blocklist, merge_target_blocklist, overwrite=False):
"""Shows a table in the CLI comparing the local and remote blocklist"""
from rich.table import Table
from rich.console import Console
for input_instance in input_blocklist:
# If the block is already there with the same parameters we do nothing
if input_instance in merge_target_blocklist:
continue
# Check if there is a domain in the merge target where the input domain is similar
try:
merge_target_instance = [merge_target_instance for merge_target_instance in merge_target_blocklist if
input_instance.domain == merge_target_instance.domain][0]
key_input = ""
if not overwrite:
while key_input not in ("i", "O"):
print(f"Different settings for {input_instance.domain} detected.")
Instance.show_diff(input_instance, merge_target_instance)
key_input = input("Keep input (i) or original (O) [i/O]")
elif key_input == "i":
merge_target_blocklist.append(merge_target_instance)
else:
merge_target_blocklist.append(input_instance)
except KeyError:
pass
return merge_target_blocklist
def get_format_from_filename(filename: str) -> str:
detected_format = filename.split(".")[-1]
if detected_format not in ["toml", "csv", "md", "json"]:
detected_format = "toml"
return detected_format
def diff(args):
if len(args.blocklist) > 2:
raise NotImplemented("Comparing more than two blocklists is not yet supported")
blocklists_to_compare = args.blocklist
blocklist_a = load_blocklist_file(blocklists_to_compare[0])
blocklist_b = load_blocklist_file(blocklists_to_compare[1])
Instance.show_diffs(blocklist_a, blocklist_b)
def export(args):
blocklist_to_export = load_blocklist_file(args.blocklist_to_export)
export_format = get_format_from_filename(args.target_blocklist)
exporter(blocklist_to_export, args.target, export_format, args.private)
def deploy(args):
blocklist_to_deploy = load_blocklist_file(args.blocklist_to_deploy)
target_blocklist = load_blocklist_file(args.target_blocklist)
diffs = Instance.list_diffs(blocklist_to_deploy, target_blocklist)
Instance.apply_blocks_from_diff(diffs, args.server, token, args.no_delete)
def merge(args):
blocklist_to_merge = load_blocklist_file(args.blocklist_to_merge)
target_blocklist = load_blocklist_file(args.target)
export_format = get_format_from_filename(args.target)
merge_lists(blocklist_to_merge, target_blocklist, export_format)
def cli():
parser = argparse.ArgumentParser(description='Compare, merge, export and deploy blocklist of a fediverse server')
parser.add_argument('-v', '--verbose', action='store_true')
subparsers = parser.add_subparsers(required=True)
diff_parser = subparsers.add_parser('diff')
diff_parser.add_argument('blocklist',
nargs="+",
type=str,
help="The blocklists you want to compare. Provide at least two. Example: "
"lockalblocklist.toml ACCESS_TOKEN@fediserver.org")
diff_parser.set_defaults(func=diff)
export_parser = subparsers.add_parser('export')
export_parser.add_argument('blocklist_to_export',
type=str,
help="The blocklist you want to export. Example: ACCESS_TOKEN@fediserver.org")
export_parser.add_argument('target',
type=str,
help="Filename where to save the blocklist. The extension will determine the format, "
"if none ist given it'll default to toml")
export_parser.add_argument('--private',
action='store_true',
default=False,
help="When the flag is set, private comment will also be exported.")
export_parser.set_defaults(func=export)
merge_parser = subparsers.add_parser('merge')
merge_parser.add_argument('blocklist_to_merge',
type=str,
help="The blocklist you want to merge to the target. Example: lockalblocklist.toml "
"ACCESS_TOKEN@fediserver.org")
merge_parser.add_argument('target',
type=str,
help="Merge target that will hold the merged blocklist. Example: lockalblocklist.toml "
"ACCESS_TOKEN@fediserver.org")
merge_parser.set_defaults(func=merge)
deploy_parser = subparsers.add_parser('deploy')
deploy_parser.add_argument('blocklist_to_deploy',
type=str,
help="The blocklist you want to deploy to the target. Example: lockalblocklist.toml "
"ACCESS_TOKEN@fediserver.org")
deploy_parser.add_argument('target',
type=str,
help="Deploy target that will hold the merged blocklist. Example: lockalblocklist.toml "
"ACCESS_TOKEN@fediserver.org")
deploy_parser.add_argument('-n', '--no-delete', action='store_true', help="Do not delete existing blocks")
deploy_parser.set_defaults(func=deploy)
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.WARN)
args.func(args)
if __name__ == "__main__":
cli()