235 lines
9.3 KiB
Python
235 lines
9.3 KiB
Python
# import tomllib
|
|
import argparse
|
|
import json
|
|
import logging
|
|
from typing import List
|
|
|
|
import requests
|
|
import os
|
|
import toml
|
|
import validators
|
|
|
|
from fediverse_blocklist_tool.models import Instance
|
|
from fediverse_blocklist_tool.helpers import blocklist_to_markdown, blocklist_to_toml, blocklist_to_csv, \
|
|
blocklist_to_json
|
|
|
|
SUPPORTED_LOADING_FORMATS = ["toml", "csv", "json"]
|
|
|
|
def determine_location(location: str) -> (bool, str, str):
|
|
"""
|
|
Determines whether a location is remote and splits off an access token if necessary
|
|
|
|
:return: A set of (is_remote, location, access_token) where access token can be None
|
|
"""
|
|
# If the file extension is in supported formats the location is assumed to be local
|
|
if location.split(".")[-1] in SUPPORTED_LOADING_FORMATS:
|
|
return False, location, None
|
|
elif "@" in location:
|
|
access_token, remote_server = location.split("@")
|
|
else:
|
|
remote_server = location
|
|
access_token = None
|
|
# Validate that the domain is valid
|
|
if not validators.domain(remote_server):
|
|
error_message = f"Could not parse blocklist location: {location}. Local blocklists must have one of the following formats: {SUPPORTED_LOADING_FORMATS} or a valid domain"
|
|
logging.error(error_message)
|
|
raise ValueError(error_message)
|
|
return True, remote_server, access_token
|
|
|
|
def load_blocklist(location: str) -> List[Instance]:
|
|
"""Load blocklist from the given location
|
|
|
|
:param location: location of the blocklist. Remote locations are allowed to contain an access token. Examples: local_blocklist.toml, ACCESS_TOKEN@fediserver1.org
|
|
"""
|
|
|
|
is_remote, location, access_token = determine_location(location)
|
|
if not is_remote:
|
|
return load_blocklist_file(location)
|
|
else:
|
|
if access_token is None:
|
|
access_token = os.getenv('FBT_TOKEN')
|
|
return load_blocklist_from_instance(location, access_token)
|
|
|
|
def load_blocklist_file(filename: str) -> [Instance]:
|
|
if filename.endswith("json"):
|
|
with open(filename) as f:
|
|
instances = blocklist_json_to_instances(json.load(f))
|
|
else:
|
|
with open(filename, "r") as f:
|
|
data = toml.load(f)
|
|
instances = []
|
|
for instance_dict in data["instances"]:
|
|
instance = Instance(instance_dict)
|
|
instances.append(instance)
|
|
return instances
|
|
|
|
|
|
def blocklist_json_to_instances(blocklist_json: str) -> [Instance]:
|
|
instances = []
|
|
for i in blocklist_json:
|
|
instances.append(Instance(i))
|
|
return instances
|
|
|
|
|
|
def load_blocklist_from_instance(server: str, token: str) -> [Instance]:
|
|
headers = {
|
|
f'Authorization': f'Bearer {token}',
|
|
}
|
|
|
|
response = requests.get(f'https://{server}/api/v1/admin/domain_blocks', headers=headers)
|
|
if response.status_code == 200:
|
|
blocklist_json = json.loads(response.content)
|
|
return blocklist_json_to_instances(blocklist_json)
|
|
else:
|
|
raise ConnectionError(f"Could not connect to the server ({response.status_code}: {response.reason})")
|
|
|
|
|
|
def remove_key_from_dict(dict, key):
|
|
del dict[key]
|
|
return dict
|
|
|
|
|
|
def exporter(blocklist, output=None, format: str = "toml", private: bool = False):
|
|
if format == "toml":
|
|
exported_text = blocklist_to_toml(blocklist, private)
|
|
if format == "csv":
|
|
exported_text = blocklist_to_csv(blocklist, private)
|
|
if format == "md":
|
|
exported_text = blocklist_to_markdown(blocklist, private)
|
|
if format == "json":
|
|
exported_text = blocklist_to_json(blocklist, private)
|
|
|
|
# Output the text
|
|
if output is not None:
|
|
with open(output, "w") as f:
|
|
f.write(exported_text)
|
|
else:
|
|
print(exported_text)
|
|
|
|
|
|
def merge_lists(input_blocklist, merge_target_blocklist, overwrite=False):
|
|
"""Shows a table in the CLI comparing the local and remote blocklist"""
|
|
from rich.table import Table
|
|
from rich.console import Console
|
|
for input_instance in input_blocklist:
|
|
# If the block is already there with the same parameters we do nothing
|
|
if input_instance in merge_target_blocklist:
|
|
continue
|
|
# Check if there is a domain in the merge target where the input domain is similar
|
|
try:
|
|
merge_target_instance = [merge_target_instance for merge_target_instance in merge_target_blocklist if
|
|
input_instance.domain == merge_target_instance.domain][0]
|
|
|
|
key_input = ""
|
|
if not overwrite:
|
|
while key_input not in ("i", "O"):
|
|
print(f"Different settings for {input_instance.domain} detected.")
|
|
Instance.show_diff(input_instance, merge_target_instance)
|
|
key_input = input("Keep input (i) or original (O) [i/O]")
|
|
elif key_input == "i":
|
|
merge_target_blocklist.append(merge_target_instance)
|
|
else:
|
|
merge_target_blocklist.append(input_instance)
|
|
except KeyError:
|
|
pass
|
|
return merge_target_blocklist
|
|
|
|
|
|
def get_format_from_filename(filename: str) -> str:
|
|
detected_format = filename.split(".")[-1]
|
|
if detected_format not in ["toml", "csv", "md", "json"]:
|
|
detected_format = "toml"
|
|
return detected_format
|
|
|
|
|
|
def diff(args):
|
|
if len(args.blocklist) > 2:
|
|
raise NotImplemented("Comparing more than two blocklists is not yet supported")
|
|
blocklists_to_compare = args.blocklist
|
|
blocklist_a = load_blocklist_file(blocklists_to_compare[0])
|
|
blocklist_b = load_blocklist_file(blocklists_to_compare[1])
|
|
Instance.show_diffs(blocklist_a, blocklist_b)
|
|
|
|
|
|
def export(args):
|
|
blocklist_to_export = load_blocklist_file(args.blocklist_to_export)
|
|
export_format = get_format_from_filename(args.target_blocklist)
|
|
exporter(blocklist_to_export, args.target, export_format, args.private)
|
|
|
|
|
|
def deploy(args):
|
|
blocklist_to_deploy = load_blocklist_file(args.blocklist_to_deploy)
|
|
target_blocklist = load_blocklist_file(args.target_blocklist)
|
|
diffs = Instance.list_diffs(blocklist_to_deploy, target_blocklist)
|
|
Instance.apply_blocks_from_diff(diffs, args.server, token, args.no_delete)
|
|
|
|
|
|
def merge(args):
|
|
blocklist_to_merge = load_blocklist_file(args.blocklist_to_merge)
|
|
target_blocklist = load_blocklist_file(args.target)
|
|
export_format = get_format_from_filename(args.target)
|
|
merge_lists(blocklist_to_merge, target_blocklist, export_format)
|
|
|
|
|
|
def cli():
|
|
parser = argparse.ArgumentParser(description='Compare, merge, export and deploy blocklist of a fediverse server')
|
|
parser.add_argument('-v', '--verbose', action='store_true')
|
|
subparsers = parser.add_subparsers(required=True)
|
|
|
|
diff_parser = subparsers.add_parser('diff')
|
|
diff_parser.add_argument('blocklist',
|
|
nargs="+",
|
|
type=str,
|
|
help="The blocklists you want to compare. Provide at least two. Example: "
|
|
"lockalblocklist.toml ACCESS_TOKEN@fediserver.org")
|
|
diff_parser.set_defaults(func=diff)
|
|
|
|
export_parser = subparsers.add_parser('export')
|
|
export_parser.add_argument('blocklist_to_export',
|
|
type=str,
|
|
help="The blocklist you want to export. Example: ACCESS_TOKEN@fediserver.org")
|
|
export_parser.add_argument('target',
|
|
type=str,
|
|
help="Filename where to save the blocklist. The extension will determine the format, "
|
|
"if none ist given it'll default to toml")
|
|
|
|
export_parser.add_argument('--private',
|
|
action='store_true',
|
|
default=False,
|
|
help="When the flag is set, private comment will also be exported.")
|
|
export_parser.set_defaults(func=export)
|
|
|
|
merge_parser = subparsers.add_parser('merge')
|
|
merge_parser.add_argument('blocklist_to_merge',
|
|
type=str,
|
|
help="The blocklist you want to merge to the target. Example: lockalblocklist.toml "
|
|
"ACCESS_TOKEN@fediserver.org")
|
|
merge_parser.add_argument('target',
|
|
type=str,
|
|
help="Merge target that will hold the merged blocklist. Example: lockalblocklist.toml "
|
|
"ACCESS_TOKEN@fediserver.org")
|
|
merge_parser.set_defaults(func=merge)
|
|
|
|
deploy_parser = subparsers.add_parser('deploy')
|
|
deploy_parser.add_argument('blocklist_to_deploy',
|
|
type=str,
|
|
help="The blocklist you want to deploy to the target. Example: lockalblocklist.toml "
|
|
"ACCESS_TOKEN@fediserver.org")
|
|
deploy_parser.add_argument('target',
|
|
type=str,
|
|
help="Deploy target that will hold the merged blocklist. Example: lockalblocklist.toml "
|
|
"ACCESS_TOKEN@fediserver.org")
|
|
deploy_parser.add_argument('-n', '--no-delete', action='store_true', help="Do not delete existing blocks")
|
|
deploy_parser.set_defaults(func=deploy)
|
|
|
|
args = parser.parse_args()
|
|
if args.verbose:
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
else:
|
|
logging.basicConfig(level=logging.WARN)
|
|
args.func(args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cli()
|