Restructure project to make a valid poetry project, add script

This commit is contained in:
2023-01-12 08:00:36 +01:00
parent 45f52b940e
commit c1e4770b0e
4 changed files with 153 additions and 2 deletions

View File

@@ -0,0 +1,104 @@
# import tomllib
import argparse
import json
import logging
import requests
import toml
from mastodon_blocklist_deploy.models import Instance
def load_local_blocklist(filename: str) -> [Instance]:
with open(filename, "r") as f:
data = toml.load(f)
instances = []
for instance_dict in data["instances"]:
instance = Instance(instance_dict)
instances.append(instance)
return instances
def export_blocklist_toml(blocklist: [Instance], filname: str):
toml_str = ""
for instance in blocklist:
toml_str += f'''
[[instances]]
name = "{instance.domain}"
domain = "{instance.domain}"
severity = "{instance.severity}"
reject_media = {str(instance.reject_media).lower()}
reject_reports = {str(instance.reject_reports).lower()}
public_comment = "{instance.public_comment}"
private_comment = "{instance.private_comment}"
'''
with open(filname, "w") as f:
f.write(toml_str)
def blocklist_json_to_instances(blocklist_json: str):
instances = []
for i in blocklist_json:
instances.append(Instance(i))
return instances
def load_remote_blocklist(server: str, token: str):
headers = {
f'Authorization': f'Bearer {token}',
}
response = requests.get(f'https://{server}/api/v1/admin/domain_blocks', headers=headers)
if response.status_code == 200:
blocklist_json = json.loads(response.content)
return blocklist_json_to_instances(blocklist_json)
else:
raise ConnectionError(f"Could not connect to the server ({response.status_code}: {response.reason})")
def cli():
parser = argparse.ArgumentParser(description='Deploy blocklist updates to a mastodon server')
parser.add_argument('action', choices=['diff', 'deploy', 'export'],
help="Either use 'diff' to check the difference between current blocks and future blocks, "
"'deploy' to apply the current local blocklist or 'export' to export the remote "
"blocklist into a local file.")
parser.add_argument('-s', '--server', help="The address of the server where you want to deploy (e.g. "
"mastodon.social)")
parser.add_argument('-t', '--token', help="Authorization token")
parser.add_argument('-i', '--input-file', help="The blocklist to use")
parser.add_argument('-r', '--remote-blocklist', help="The remote blocklist as json for debugging reasons")
parser.add_argument('-o', '--output', help="Filename where to export the blocklist")
parser.add_argument('-v', '--verbose', action='store_true')
parser.add_argument('-n', '--no-delete', action='store_true', help="Do not delete existing blocks")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.WARN)
"""if there is a remote blocklist provided load this instead of fetching it from a server (for debugging reasons)"""
if args.remote_blocklist:
with open(args.remote_blocklist) as f:
remote_blocklist = blocklist_json_to_instances(json.load(f))
else:
remote_blocklist = load_remote_blocklist(server=args.server, token=args.token)
"""Load local blocklist only when needed"""
if args.action in ["diff", "deploy"]:
if args.input_file:
blocklist_filename = args.input_file
else:
blocklist_filename = "../blocklist.toml"
local_blocklist = load_local_blocklist(blocklist_filename)
if args.action == "diff":
Instance.show_diffs(local_blocklist, remote_blocklist)
elif args.action == "deploy":
diffs = Instance.list_diffs(local_blocklist, remote_blocklist)
Instance.apply_blocks_from_diff(diffs, args.server, args.token, args.no_delete)
elif args.action == "export":
export_blocklist_toml(remote_blocklist, args.output)
if __name__ == "__main__":
cli()

View File

@@ -0,0 +1,150 @@
import logging
import requests
class Instance:
def __init__(self, instance_dict):
"""If obfuscate, reject_media or reject_reports are not specified default to False"""
self.obfuscate = False
self.reject_media = False
self.reject_reports = False
self.id = None
"""Remote blocks and local blocks are parsed differently"""
try:
instance_dict["id"]
self.parse_remote_block(instance_dict)
except KeyError:
self.parse_local_block(instance_dict)
def __str__(self):
return f"{self.domain}: {self.severity}"
def __eq__(self, other):
return self.domain == other.domain and self.severity == other.severity and self.reject_media == other.reject_media and self.reject_reports == other.reject_reports and self.obfuscate == other.obfuscate
def status_str(self):
return f"{self.severity}\nReject reports: {self.reject_reports}\nReject media: {self.reject_media}\nObfuscate: {self.obfuscate}"
def parse_remote_block(self, instance_dict):
self.domain = instance_dict["domain"]
self.id = instance_dict["id"]
self.severity = instance_dict["severity"]
self.public_comment = instance_dict["public_comment"]
self.private_comment = instance_dict["private_comment"]
self.obfuscate = instance_dict["obfuscate"]
self.reject_media = instance_dict["reject_media"]
self.reject_reports = instance_dict["reject_reports"]
def parse_local_block(self, instance_dict):
self.name = instance_dict["name"]
self.domain = instance_dict["domain"]
self.severity = instance_dict["severity"]
self.public_comment = instance_dict["public_comment"]
self.private_comment = instance_dict["private_comment"]
try:
self.obfuscate = instance_dict["obfuscate"]
except KeyError:
pass
try:
self.reject_media = instance_dict["reject_media"]
except KeyError:
pass
try:
self.reject_reports = instance_dict["reject_reports"]
except KeyError:
pass
def apply(self, server, token, block_id=None):
"""Applies instance block on the remote server"""
headers = {
f'Authorization': f'Bearer {token}',
}
data = {"domain": self.domain,
"severity": self.severity,
"reject_media": self.reject_media,
"reject_reports": self.reject_reports,
"private_comment": self.private_comment,
"public_comment": self.public_comment,
"obfuscate": self.obfuscate}
"""If no id is given add a new block, else update the existing block"""
if block_id is None:
response = requests.post(f'https://{server}/api/v1/admin/domain_blocks', data=data, headers=headers)
else:
response = requests.put(f'https://{server}/api/v1/admin/domain_blocks/{block_id}', data=data,
headers=headers)
if response.status_code != 200:
raise ConnectionError(f"Could not apply block ({response.status_code}: {response.reason})")
def delete(self, server: str, token: str):
"""Deletes the instance from the blocklist on the remote server"""
headers = {
f'Authorization': f'Bearer {token}',
}
response = requests.delete(f'https://{server}/api/v1/admin/domain_blocks/{self.id}', headers=headers)
if response.status_code != 200:
raise ConnectionError(f"Could not apply block ({response.status_code}: {response.reason})")
@staticmethod
def list_diffs(local_blocklist, remote_blocklist):
"""Compares the local and remote blocklist and returns a list of differences"""
diffs = []
for local_instance in local_blocklist:
instance_found = False
for idx, remote_instance in enumerate(remote_blocklist):
if local_instance.domain == remote_instance.domain:
instance_found = True
if local_instance == remote_instance:
pass
else:
"""If the local block is different from the remote block, add it to the diff"""
diffs.append({"local": local_instance, "remote": remote_instance})
"""Remove the remote instance from the list so we later have a list of remote instances we don't
have locally"""
del remote_blocklist[idx]
"""If the local instance is not in the remote blocklist, add it to the diff"""
if not instance_found:
diffs.append({"local": local_instance, "remote": None})
for remote_instance in remote_blocklist:
diffs.append({"local": None, "remote": remote_instance})
return diffs
@staticmethod
def apply_blocks_from_diff(diffs, server, token, no_delete: bool):
"""Uses a diff (list of difference in local an remote instance) to apply instance blocks"""
for diff in diffs:
if diff["local"] is None:
if not no_delete:
"""Delete the block on the remote server"""
diff['remote'].delete(server, token)
logging.info(f"Deleted {diff['remote'].domain} from blocklist")
elif diff["remote"] is None:
"""Add the block on the remote server"""
diff["local"].apply(server, token)
logging.info(f"Added {diff['local'].domain} to blocklist")
else:
"""Update the block on the remote server"""
diff["local"].apply(server, token, block_id=diff["remote"].id)
logging.info(f"Updated {diff['local'].domain} in blocklist")
@staticmethod
def show_diffs(local_blocklist, remote_blocklist):
"""Shows a table in the CLI comparing the local and remote blocklist"""
from rich.table import Table
from rich.console import Console
table = Table(title="Differences", expand=True, show_lines=True)
table.add_column("Domain", style="cyan")
table.add_column("Local status", style="green")
table.add_column("Current remote status", style="magenta")
diffs = Instance.list_diffs(local_blocklist, remote_blocklist)
for diff in diffs:
if diff["local"] is None:
table.add_row(diff["remote"].domain, None, diff["remote"].status_str())
elif diff["remote"] is None:
table.add_row(diff["local"].domain, diff["local"].status_str(), None)
else:
table.add_row(diff["local"].domain, diff["local"].status_str(), diff["remote"].status_str())
console = Console()
console.print(table)