Initial code commit, adds diff functionality
This commit is contained in:
		
							
								
								
									
										70
									
								
								cli.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										70
									
								
								cli.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,70 @@
 | 
			
		||||
# import tomllib
 | 
			
		||||
import argparse
 | 
			
		||||
import json
 | 
			
		||||
import logging
 | 
			
		||||
import requests
 | 
			
		||||
 | 
			
		||||
import tomli
 | 
			
		||||
 | 
			
		||||
from models import Instance
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def load_local_blocklist(filename: str) -> [Instance]:
 | 
			
		||||
    with open(filename, "rb") as f:
 | 
			
		||||
        data = tomli.load(f)
 | 
			
		||||
    instances = []
 | 
			
		||||
    for instance_dict in data["instances"]:
 | 
			
		||||
        instance = Instance(instance_dict)
 | 
			
		||||
        instances.append(instance)
 | 
			
		||||
    for instance in instances:
 | 
			
		||||
        print(instance)
 | 
			
		||||
    return instances
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def blocklist_json_to_instances(blocklist_json):
 | 
			
		||||
    instances = []
 | 
			
		||||
    for i in blocklist_json:
 | 
			
		||||
        instances.append(Instance(i))
 | 
			
		||||
    return instances
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def load_remote_blocklist(server, token):
 | 
			
		||||
    headers = {
 | 
			
		||||
        f'Authorization': f'Bearer {token}',
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    response = requests.get(f'https://{server}/api/v1/admin/domain_blocks', headers=headers)
 | 
			
		||||
    if response.status_code == 200:
 | 
			
		||||
        blocklist_json = json.loads(response.content)
 | 
			
		||||
        return blocklist_json_to_instances(blocklist_json)
 | 
			
		||||
    else:
 | 
			
		||||
        raise ConnectionError(f"Could not connect to the server ({response.status_code}: {response.reason})")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def cli():
 | 
			
		||||
    parser = argparse.ArgumentParser(description='Deploy blocklist updates to a mastodon server')
 | 
			
		||||
    parser.add_argument('action', choices=['diff', 'deploy'],
 | 
			
		||||
                        help="Either use 'diff' to check the difference between current blocks and future blocks or 'deploy'.")
 | 
			
		||||
    parser.add_argument('-s', '--server', help="The address of the server where you want to deploy (e.g. "
 | 
			
		||||
                                               "mastodon.social)")
 | 
			
		||||
    parser.add_argument('-t', '--token', help="Authorization token")
 | 
			
		||||
    parser.add_argument('-i', '--input-file', help="The blocklist to use")
 | 
			
		||||
    parser.add_argument('-r', '--remote-blocklist', help="The remote blocklist as json for debugging reasons")
 | 
			
		||||
    args = parser.parse_args()
 | 
			
		||||
    logging.basicConfig(level=logging.WARN)
 | 
			
		||||
    if args.input_file:
 | 
			
		||||
        blocklist_filename = args.input_file
 | 
			
		||||
    else:
 | 
			
		||||
        blocklist_filename = "blocklist.toml"
 | 
			
		||||
    local_blocklist = load_local_blocklist(blocklist_filename)
 | 
			
		||||
    if args.remote_blocklist:
 | 
			
		||||
        with open(args.remote_blocklist) as f:
 | 
			
		||||
            remote_blocklist = blocklist_json_to_instances(json.load(f))
 | 
			
		||||
    else:
 | 
			
		||||
        remote_blocklist = load_remote_blocklist(server=args.server, token=args.token)
 | 
			
		||||
 | 
			
		||||
    Instance.show_diff(local_blocklist, remote_blocklist)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    cli()
 | 
			
		||||
							
								
								
									
										85
									
								
								models.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										85
									
								
								models.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,85 @@
 | 
			
		||||
class Instance:
 | 
			
		||||
    def __init__(self, instance_dict):
 | 
			
		||||
        """If obfuscate, reject_media or reject_reports are not specified default to False"""
 | 
			
		||||
        self.obfuscate = False
 | 
			
		||||
        self.reject_media = False
 | 
			
		||||
        self.reject_reports = False
 | 
			
		||||
        self.id = None
 | 
			
		||||
 | 
			
		||||
        """Remote blocks and local blocks are parsed differently"""
 | 
			
		||||
        try:
 | 
			
		||||
            instance_dict["id"]
 | 
			
		||||
            self.parse_remote_block(instance_dict)
 | 
			
		||||
        except KeyError:
 | 
			
		||||
            self.parse_local_block(instance_dict)
 | 
			
		||||
 | 
			
		||||
    def __str__(self):
 | 
			
		||||
        return f"{self.name}: {self.severity}"
 | 
			
		||||
 | 
			
		||||
    def __eq__(self, other):
 | 
			
		||||
        return self.domain == other.domain and self.severity == other.severity and self.reject_media == other.reject_media and self.reject_reports == other.reject_reports and self.obfuscate == other.obfuscate
 | 
			
		||||
 | 
			
		||||
    def status_str(self):
 | 
			
		||||
        return f"{self.severity}, Reject reports: {self.reject_reports}, Reject media: {self.reject_media}, Obfuscate: {self.obfuscate}"
 | 
			
		||||
 | 
			
		||||
    def parse_remote_block(self, instance_dict):
 | 
			
		||||
        self.domain = instance_dict["domain"]
 | 
			
		||||
        self.id = instance_dict["id"]
 | 
			
		||||
        self.severity = instance_dict["severity"]
 | 
			
		||||
        self.public_comment = instance_dict["public_comment"]
 | 
			
		||||
        self.private_comment = instance_dict["private_comment"]
 | 
			
		||||
        self.remote = True
 | 
			
		||||
 | 
			
		||||
    def parse_local_block(self, instance_dict):
 | 
			
		||||
        self.name = instance_dict["name"]
 | 
			
		||||
        self.domain = instance_dict["domain"]
 | 
			
		||||
        self.severity = instance_dict["severity"]
 | 
			
		||||
        self.public_comment = instance_dict["public_comment"]
 | 
			
		||||
        self.private_comment = instance_dict["private_comment"]
 | 
			
		||||
        self.remote = False
 | 
			
		||||
 | 
			
		||||
    def apply(self, instance, token):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def list_diff(local_blocklist, remote_blocklist):
 | 
			
		||||
        diffs = []
 | 
			
		||||
        for local_instance in local_blocklist:
 | 
			
		||||
            instance_found = False
 | 
			
		||||
            for idx, remote_instance in enumerate(remote_blocklist):
 | 
			
		||||
                if local_instance.domain == remote_instance.domain:
 | 
			
		||||
                    instance_found = True
 | 
			
		||||
                    if local_instance == remote_instance:
 | 
			
		||||
                        pass
 | 
			
		||||
                    else:
 | 
			
		||||
                        """If the local block is different from the remote block, add it to the diff"""
 | 
			
		||||
                        diffs.append({"local": local_instance, "remote": remote_instance})
 | 
			
		||||
                    """Remove the remote instance from the list so we later have a list of remote instances we don't
 | 
			
		||||
                    have locally"""
 | 
			
		||||
                    del remote_blocklist[idx]
 | 
			
		||||
            """If the local instance is not in the remote blocklist, add it to the diff"""
 | 
			
		||||
            if not instance_found:
 | 
			
		||||
                diffs.append({"local": local_instance, "remote": None})
 | 
			
		||||
        for remote_instance in remote_blocklist:
 | 
			
		||||
            diffs.append({"local": None, "remote": remote_instance})
 | 
			
		||||
        return diffs
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def show_diff(local_blocklist, remote_blocklist):
 | 
			
		||||
        from rich.table import Table
 | 
			
		||||
        from rich.console import Console
 | 
			
		||||
        table = Table(title="Differences", expand=True, show_lines=True)
 | 
			
		||||
 | 
			
		||||
        table.add_column("Domain", style="cyan")
 | 
			
		||||
        table.add_column("Current remote status", style="magenta")
 | 
			
		||||
        table.add_column("Local status", style="green")
 | 
			
		||||
        diffs = Instance.list_diff(local_blocklist, remote_blocklist)
 | 
			
		||||
        for diff in diffs:
 | 
			
		||||
            if diff["local"] is None:
 | 
			
		||||
                table.add_row(diff["remote"].domain, None, diff["remote"].status_str())
 | 
			
		||||
            elif diff["remote"] is None:
 | 
			
		||||
                table.add_row(diff["local"].domain, diff["local"].status_str(), None)
 | 
			
		||||
            else:
 | 
			
		||||
                table.add_row(diff["local"].domain, diff["local"].status_str(), diff["remote"].status_str())
 | 
			
		||||
        console = Console()
 | 
			
		||||
        console.print(table)
 | 
			
		||||
		Reference in New Issue
	
	Block a user