Rename from mastodon->fediverse
This commit is contained in:
		
							
								
								
									
										151
									
								
								fediverse_blocklist_deploy/cli.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										151
									
								
								fediverse_blocklist_deploy/cli.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,151 @@
 | 
			
		||||
# import tomllib
 | 
			
		||||
import argparse
 | 
			
		||||
import json
 | 
			
		||||
import logging
 | 
			
		||||
import requests
 | 
			
		||||
import os
 | 
			
		||||
import toml
 | 
			
		||||
 | 
			
		||||
from fediverse_blocklist_deploy.models import Instance
 | 
			
		||||
from fediverse_blocklist_deploy.helpers import blocklist_to_markdown, blocklist_to_toml, blocklist_to_csv, \
 | 
			
		||||
    blocklist_to_json
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def load_blocklist_file(filename: str) -> [Instance]:
 | 
			
		||||
    with open(filename, "r") as f:
 | 
			
		||||
        data = toml.load(f)
 | 
			
		||||
    instances = []
 | 
			
		||||
    for instance_dict in data["instances"]:
 | 
			
		||||
        instance = Instance(instance_dict)
 | 
			
		||||
        instances.append(instance)
 | 
			
		||||
    return instances
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def blocklist_json_to_instances(blocklist_json: str) -> [Instance]:
 | 
			
		||||
    instances = []
 | 
			
		||||
    for i in blocklist_json:
 | 
			
		||||
        instances.append(Instance(i))
 | 
			
		||||
    return instances
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def load_blocklist_from_instance(server: str, token: str) -> [Instance]:
 | 
			
		||||
    headers = {
 | 
			
		||||
        f'Authorization': f'Bearer {token}',
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    response = requests.get(f'https://{server}/api/v1/admin/domain_blocks', headers=headers)
 | 
			
		||||
    if response.status_code == 200:
 | 
			
		||||
        blocklist_json = json.loads(response.content)
 | 
			
		||||
        return blocklist_json_to_instances(blocklist_json)
 | 
			
		||||
    else:
 | 
			
		||||
        raise ConnectionError(f"Could not connect to the server ({response.status_code}: {response.reason})")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def remove_key_from_dict(dict, key):
 | 
			
		||||
    del dict[key]
 | 
			
		||||
    return dict
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def exporter(blocklist, output=None, format: str = "toml", private: bool = False):
 | 
			
		||||
    if format == "toml":
 | 
			
		||||
        exported_text = blocklist_to_toml(blocklist, private)
 | 
			
		||||
    if format == "csv":
 | 
			
		||||
        exported_text = blocklist_to_csv(blocklist, private)
 | 
			
		||||
    if format == "markdown":
 | 
			
		||||
        exported_text = blocklist_to_markdown(blocklist, private)
 | 
			
		||||
    if format == "json":
 | 
			
		||||
        exported_text = blocklist_to_json(blocklist, private)
 | 
			
		||||
 | 
			
		||||
    # Output the text
 | 
			
		||||
    if output is not None:
 | 
			
		||||
        with open(output, "w") as f:
 | 
			
		||||
            f.write(exported_text)
 | 
			
		||||
    else:
 | 
			
		||||
        print(exported_text)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def merge(input_file, merge_target, format: str = "toml", private: bool = False, overwrite=False):
 | 
			
		||||
    input_blocklist = load_blocklist_file(input_file)
 | 
			
		||||
    merge_target_blocklist = load_blocklist_file(merge_target)
 | 
			
		||||
    for input_instance in input_blocklist:
 | 
			
		||||
        # If the block is already there with the same parameters we do nothing
 | 
			
		||||
        if input_instance in merge_target_blocklist:
 | 
			
		||||
            continue
 | 
			
		||||
        # Check if there is a domain in the merge target where the input domain is similar
 | 
			
		||||
        try:
 | 
			
		||||
            merge_target_instance = [merge_target_instance for merge_target_instance in merge_target if input_instance.domain == merge_target_instance.domain][0]
 | 
			
		||||
            if not overwrite:
 | 
			
		||||
                key_input = ""
 | 
			
		||||
                while key_input not in ("i", "O"):
 | 
			
		||||
                    print(f"Different settings for {input_instance.domain} detected.")
 | 
			
		||||
                    print(f"In the input blocklist the setting is\n{input_instance} whereas it's {merge_target_instance} in the merge target")
 | 
			
		||||
                    key_input = input("Keep input (i) or original (o) [i/O]")
 | 
			
		||||
            elif key_input == "i":
 | 
			
		||||
                merge_target_blocklist.append(merge_target_instance)
 | 
			
		||||
            else:
 | 
			
		||||
                merge_target_blocklist.append(input_instance)
 | 
			
		||||
        except KeyError:
 | 
			
		||||
            pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def cli():
 | 
			
		||||
    parser = argparse.ArgumentParser(description='Deploy blocklist updates to a fediverse server')
 | 
			
		||||
    parser.add_argument('action', choices=['diff', 'deploy', 'export', 'merge'],
 | 
			
		||||
                        help="Either use 'diff' to check the difference between local blockĺist and the blocklist on "
 | 
			
		||||
                             "the server, 'deploy' to apply the current local blocklist or 'export' to export the remote "
 | 
			
		||||
                             "blocklist into a local file. merge can be used to merge a blocklist (given by -i) into "
 | 
			
		||||
                             "another (-o)")
 | 
			
		||||
    parser.add_argument('-s', '--server', help="The address of the server where you want to deploy (e.g. "
 | 
			
		||||
                                               "mastodon.social)")
 | 
			
		||||
    parser.add_argument('-t', '--token', help="Authorization token")
 | 
			
		||||
    parser.add_argument('-i', '--input-file', help="The blocklist to use")
 | 
			
		||||
    parser.add_argument('-r', '--remote-blocklist', help="The remote blocklist as json for debugging reasons")
 | 
			
		||||
    parser.add_argument('-o', '--output', help="Filename where to export the blocklist")
 | 
			
		||||
    parser.add_argument('-v', '--verbose', action='store_true')
 | 
			
		||||
    parser.add_argument('-n', '--no-delete', action='store_true', help="Do not delete existing blocks")
 | 
			
		||||
    parser.add_argument('--format', default="toml", type=str, help="Export format: toml|markdown|csv|json")
 | 
			
		||||
    parser.add_argument('--private', action='store_true', help="When the flag is set, private comment will also be "
 | 
			
		||||
                                                               "exported.")
 | 
			
		||||
    args = parser.parse_args()
 | 
			
		||||
    if args.verbose:
 | 
			
		||||
        logging.basicConfig(level=logging.DEBUG)
 | 
			
		||||
    else:
 | 
			
		||||
        logging.basicConfig(level=logging.WARN)
 | 
			
		||||
 | 
			
		||||
    if args.token:
 | 
			
		||||
        token = args.token
 | 
			
		||||
    else:
 | 
			
		||||
        token = os.getenv('MBD_TOKEN')
 | 
			
		||||
 | 
			
		||||
    """if there is a remote blocklist provided load this instead of fetching it from a server (for debugging reasons)"""
 | 
			
		||||
    if args.remote_blocklist:
 | 
			
		||||
        with open(args.remote_blocklist) as f:
 | 
			
		||||
            remote_blocklist = blocklist_json_to_instances(json.load(f))
 | 
			
		||||
    else:
 | 
			
		||||
        remote_blocklist = load_blocklist_from_instance(server=args.server, token=token)
 | 
			
		||||
 | 
			
		||||
    """Load local blocklist only when needed"""
 | 
			
		||||
    if args.action in ["diff", "deploy", "merge"]:
 | 
			
		||||
        if args.input_file:
 | 
			
		||||
            blocklist_filename = args.input_file
 | 
			
		||||
        else:
 | 
			
		||||
            blocklist_filename = "../blocklist.toml"
 | 
			
		||||
        try:
 | 
			
		||||
            local_blocklist = load_blocklist_file(blocklist_filename)
 | 
			
		||||
        except FileNotFoundError:
 | 
			
		||||
            print("Local blocklist file was not found. Make sure to specify it's location via -i")
 | 
			
		||||
            exit()
 | 
			
		||||
 | 
			
		||||
    if args.action == "diff":
 | 
			
		||||
        Instance.show_diffs(local_blocklist, remote_blocklist)
 | 
			
		||||
    elif args.action == "deploy":
 | 
			
		||||
        diffs = Instance.list_diffs(local_blocklist, remote_blocklist)
 | 
			
		||||
        Instance.apply_blocks_from_diff(diffs, args.server, token, args.no_delete)
 | 
			
		||||
    elif args.action == "export":
 | 
			
		||||
        exporter(remote_blocklist, args.output, args.format, args.private)
 | 
			
		||||
    elif args.action == "merge":
 | 
			
		||||
        merge(args.input_file, args.output, args.format, args.private)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    cli()
 | 
			
		||||
							
								
								
									
										36
									
								
								fediverse_blocklist_deploy/helpers.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										36
									
								
								fediverse_blocklist_deploy/helpers.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,36 @@
 | 
			
		||||
from fediverse_blocklist_deploy.models import Instance
 | 
			
		||||
import toml
 | 
			
		||||
import io
 | 
			
		||||
import csv
 | 
			
		||||
import json
 | 
			
		||||
 | 
			
		||||
def blocklist_to_markdown(blocklist: [Instance], private: bool = False):
 | 
			
		||||
    if private:
 | 
			
		||||
        markdown_string = "| Instance | Status | Reason | Private Comment |\n | --- | --- | --- |\n"
 | 
			
		||||
    else:
 | 
			
		||||
        markdown_string = "| Instance | Status | Reason |\n | --- | --- | --- |\n"
 | 
			
		||||
    for instance in blocklist:
 | 
			
		||||
 | 
			
		||||
        if private:
 | 
			
		||||
            markdown_string += f"| {instance.domain} | {instance.severity} | {instance.public_comment} | {instance.private_comment} |\n"
 | 
			
		||||
        else:
 | 
			
		||||
            markdown_string += f"| {instance.domain} | {instance.severity} | {instance.public_comment} |\n"
 | 
			
		||||
 | 
			
		||||
    return markdown_string
 | 
			
		||||
 | 
			
		||||
def blocklist_to_toml(blocklist: [Instance], private: bool = False):
 | 
			
		||||
    toml_string = toml.dumps({"instances": [b.as_dict(private) for b in blocklist]})
 | 
			
		||||
    return toml_string
 | 
			
		||||
 | 
			
		||||
def blocklist_to_csv(blocklist: [Instance], private: bool = False):
 | 
			
		||||
    csv_string = io.StringIO()
 | 
			
		||||
    blocklist_as_dict = [b.as_dict(private) for b in blocklist]
 | 
			
		||||
    keys = blocklist_as_dict[0].keys()
 | 
			
		||||
    w = csv.DictWriter(csv_string, keys)
 | 
			
		||||
    w.writeheader()
 | 
			
		||||
    w.writerows(blocklist_as_dict)
 | 
			
		||||
    return csv_string.getvalue()
 | 
			
		||||
 | 
			
		||||
def blocklist_to_json(blocklist: [Instance], private: bool = False):
 | 
			
		||||
    json_string = json.dumps([b.as_dict(private) for b in blocklist])
 | 
			
		||||
    return json_string
 | 
			
		||||
							
								
								
									
										140
									
								
								fediverse_blocklist_deploy/models.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										140
									
								
								fediverse_blocklist_deploy/models.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,140 @@
 | 
			
		||||
import logging
 | 
			
		||||
 | 
			
		||||
import requests
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Instance:
 | 
			
		||||
    def __init__(self, instance_dict):
 | 
			
		||||
        """If obfuscate, reject_media or reject_reports are not specified default to False"""
 | 
			
		||||
        self.obfuscate = False
 | 
			
		||||
        self.reject_media = False
 | 
			
		||||
        self.reject_reports = False
 | 
			
		||||
        self.id = None
 | 
			
		||||
 | 
			
		||||
        self.parse_block(instance_dict)
 | 
			
		||||
 | 
			
		||||
    def __str__(self):
 | 
			
		||||
        return f"{self.domain}: {self.severity}"
 | 
			
		||||
 | 
			
		||||
    def __eq__(self, other):
 | 
			
		||||
        return self.domain == other.domain and self.severity == other.severity and self.reject_media == other.reject_media and self.reject_reports == other.reject_reports and self.obfuscate == other.obfuscate
 | 
			
		||||
 | 
			
		||||
    def status_str(self):
 | 
			
		||||
        return f"{self.severity}\nReject reports: {self.reject_reports}\nReject media: {self.reject_media}\nObfuscate: {self.obfuscate}"
 | 
			
		||||
 | 
			
		||||
    def as_dict(self, private=False):
 | 
			
		||||
        keys = ["domain", "severity", "public_comment", "obfuscate", "reject_media", "reject_reports"]
 | 
			
		||||
        if private:
 | 
			
		||||
            keys.append("private_comment")
 | 
			
		||||
        exportable = {}
 | 
			
		||||
        for key in keys:
 | 
			
		||||
            exportable[key] = getattr(self, key)
 | 
			
		||||
        return exportable
 | 
			
		||||
 | 
			
		||||
    def parse_block(self, instance_dict):
 | 
			
		||||
        # this specifies possible properties and default values if not found on the remote source. If a default is None
 | 
			
		||||
        # the value is required and the parse will fail
 | 
			
		||||
        properties_and_defaults = [("domain", None), ("severity", "suspend"), ("public_comment", ""),
 | 
			
		||||
                                   ("private_comment", ""), ("obfuscate", False), ("reject_media", True),
 | 
			
		||||
                                   ("reject_reports", True)]
 | 
			
		||||
        for key, default in properties_and_defaults:
 | 
			
		||||
            try:
 | 
			
		||||
                setattr(self, key, instance_dict[key])
 | 
			
		||||
            except KeyError:
 | 
			
		||||
                if default is not None:
 | 
			
		||||
                    setattr(self, key, default)
 | 
			
		||||
                else:
 | 
			
		||||
                    raise KeyError(f"The key {key} was not in the instance_dict response.")
 | 
			
		||||
 | 
			
		||||
    def apply(self, server, token, block_id=None):
 | 
			
		||||
        """Applies instance block on the remote server"""
 | 
			
		||||
        headers = {
 | 
			
		||||
            f'Authorization': f'Bearer {token}',
 | 
			
		||||
        }
 | 
			
		||||
        data = {"domain": self.domain,
 | 
			
		||||
                "severity": self.severity,
 | 
			
		||||
                "reject_media": str(self.reject_media).lower(),
 | 
			
		||||
                "reject_reports": str(self.reject_reports).lower(),
 | 
			
		||||
                "private_comment": str(self.private_comment).lower(),
 | 
			
		||||
                "public_comment": self.public_comment,
 | 
			
		||||
                "obfuscate": str(self.obfuscate).lower()}
 | 
			
		||||
        """If no id is given add a new block, else update the existing block"""
 | 
			
		||||
        if block_id is None:
 | 
			
		||||
            response = requests.post(f'https://{server}/api/v1/admin/domain_blocks', data=data, headers=headers)
 | 
			
		||||
        else:
 | 
			
		||||
            response = requests.put(f'https://{server}/api/v1/admin/domain_blocks/{block_id}', data=data,
 | 
			
		||||
                                    headers=headers)
 | 
			
		||||
        if response.status_code != 200:
 | 
			
		||||
            raise ConnectionError(f"Could not apply block for {self.domain} ({response.status_code}: {response.reason})")
 | 
			
		||||
 | 
			
		||||
    def delete(self, server: str, token: str):
 | 
			
		||||
        """Deletes the instance from the blocklist on the remote server"""
 | 
			
		||||
        headers = {
 | 
			
		||||
            f'Authorization': f'Bearer {token}',
 | 
			
		||||
        }
 | 
			
		||||
        response = requests.delete(f'https://{server}/api/v1/admin/domain_blocks/{self.id}', headers=headers)
 | 
			
		||||
        if response.status_code != 200:
 | 
			
		||||
            raise ConnectionError(f"Could not apply block ({response.status_code}: {response.reason})")
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def list_diffs(local_blocklist, remote_blocklist):
 | 
			
		||||
        """Compares the local and remote blocklist and returns a list of differences"""
 | 
			
		||||
        diffs = []
 | 
			
		||||
        for local_instance in local_blocklist:
 | 
			
		||||
            instance_found = False
 | 
			
		||||
            for idx, remote_instance in enumerate(remote_blocklist):
 | 
			
		||||
                if local_instance.domain == remote_instance.domain:
 | 
			
		||||
                    instance_found = True
 | 
			
		||||
                    if local_instance == remote_instance:
 | 
			
		||||
                        pass
 | 
			
		||||
                    else:
 | 
			
		||||
                        """If the local block is different from the remote block, add it to the diff"""
 | 
			
		||||
                        diffs.append({"local": local_instance, "remote": remote_instance})
 | 
			
		||||
                    """Remove the remote instance from the list so we later have a list of remote instances we don't
 | 
			
		||||
                    have locally"""
 | 
			
		||||
                    del remote_blocklist[idx]
 | 
			
		||||
            """If the local instance is not in the remote blocklist, add it to the diff"""
 | 
			
		||||
            if not instance_found:
 | 
			
		||||
                diffs.append({"local": local_instance, "remote": None})
 | 
			
		||||
        for remote_instance in remote_blocklist:
 | 
			
		||||
            diffs.append({"local": None, "remote": remote_instance})
 | 
			
		||||
        return diffs
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def apply_blocks_from_diff(diffs, server, token, no_delete: bool):
 | 
			
		||||
        """Uses a diff (list of difference in local an remote instance) to apply instance blocks"""
 | 
			
		||||
        for diff in diffs:
 | 
			
		||||
            if diff["local"] is None:
 | 
			
		||||
                if not no_delete:
 | 
			
		||||
                    """Delete the block on the remote server"""
 | 
			
		||||
                    diff['remote'].delete(server, token)
 | 
			
		||||
                    logging.info(f"Deleted {diff['remote'].domain} from blocklist")
 | 
			
		||||
            elif diff["remote"] is None:
 | 
			
		||||
                """Add the block on the remote server"""
 | 
			
		||||
                diff["local"].apply(server, token)
 | 
			
		||||
                logging.info(f"Added {diff['local'].domain} to blocklist")
 | 
			
		||||
            else:
 | 
			
		||||
                """Update the block on the remote server"""
 | 
			
		||||
                diff["local"].apply(server, token, block_id=diff["remote"].id)
 | 
			
		||||
                logging.info(f"Updated {diff['local'].domain} in blocklist")
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def show_diffs(local_blocklist, remote_blocklist):
 | 
			
		||||
        """Shows a table in the CLI comparing the local and remote blocklist"""
 | 
			
		||||
        from rich.table import Table
 | 
			
		||||
        from rich.console import Console
 | 
			
		||||
        table = Table(title="Differences", expand=True, show_lines=True)
 | 
			
		||||
 | 
			
		||||
        table.add_column("Domain", style="cyan")
 | 
			
		||||
        table.add_column("Local status", style="green")
 | 
			
		||||
        table.add_column("Current remote status", style="magenta")
 | 
			
		||||
        diffs = Instance.list_diffs(local_blocklist, remote_blocklist)
 | 
			
		||||
        for diff in diffs:
 | 
			
		||||
            if diff["local"] is None:
 | 
			
		||||
                table.add_row(diff["remote"].domain, None, diff["remote"].status_str())
 | 
			
		||||
            elif diff["remote"] is None:
 | 
			
		||||
                table.add_row(diff["local"].domain, diff["local"].status_str(), None)
 | 
			
		||||
            else:
 | 
			
		||||
                table.add_row(diff["local"].domain, diff["local"].status_str(), diff["remote"].status_str())
 | 
			
		||||
        console = Console()
 | 
			
		||||
        console.print(table)
 | 
			
		||||
		Reference in New Issue
	
	Block a user