refactor: Rename to fediverse blocklist tool/fbt
This commit is contained in:
1
fediverse_blocklist_tool/__init__.py
Normal file
1
fediverse_blocklist_tool/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
version = "0.1.0"
|
159
fediverse_blocklist_tool/cli.py
Normal file
159
fediverse_blocklist_tool/cli.py
Normal file
@@ -0,0 +1,159 @@
|
||||
# import tomllib
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import requests
|
||||
import os
|
||||
import toml
|
||||
|
||||
from fediverse_blocklist_tool.models import Instance
|
||||
from fediverse_blocklist_tool.helpers import blocklist_to_markdown, blocklist_to_toml, blocklist_to_csv, \
|
||||
blocklist_to_json
|
||||
|
||||
|
||||
def load_blocklist_file(filename: str) -> [Instance]:
|
||||
if filename.endswith("json"):
|
||||
with open(filename) as f:
|
||||
instances = blocklist_json_to_instances(json.load(f))
|
||||
else:
|
||||
with open(filename, "r") as f:
|
||||
data = toml.load(f)
|
||||
instances = []
|
||||
for instance_dict in data["instances"]:
|
||||
instance = Instance(instance_dict)
|
||||
instances.append(instance)
|
||||
return instances
|
||||
|
||||
|
||||
def blocklist_json_to_instances(blocklist_json: str) -> [Instance]:
|
||||
instances = []
|
||||
for i in blocklist_json:
|
||||
instances.append(Instance(i))
|
||||
return instances
|
||||
|
||||
|
||||
def load_blocklist_from_instance(server: str, token: str) -> [Instance]:
|
||||
headers = {
|
||||
f'Authorization': f'Bearer {token}',
|
||||
}
|
||||
|
||||
response = requests.get(f'https://{server}/api/v1/admin/domain_blocks', headers=headers)
|
||||
if response.status_code == 200:
|
||||
blocklist_json = json.loads(response.content)
|
||||
return blocklist_json_to_instances(blocklist_json)
|
||||
else:
|
||||
raise ConnectionError(f"Could not connect to the server ({response.status_code}: {response.reason})")
|
||||
|
||||
|
||||
def remove_key_from_dict(dict, key):
|
||||
del dict[key]
|
||||
return dict
|
||||
|
||||
|
||||
def exporter(blocklist, output=None, format: str = "toml", private: bool = False):
|
||||
if format == "toml":
|
||||
exported_text = blocklist_to_toml(blocklist, private)
|
||||
if format == "csv":
|
||||
exported_text = blocklist_to_csv(blocklist, private)
|
||||
if format == "markdown":
|
||||
exported_text = blocklist_to_markdown(blocklist, private)
|
||||
if format == "json":
|
||||
exported_text = blocklist_to_json(blocklist, private)
|
||||
|
||||
# Output the text
|
||||
if output is not None:
|
||||
with open(output, "w") as f:
|
||||
f.write(exported_text)
|
||||
else:
|
||||
print(exported_text)
|
||||
|
||||
|
||||
def merge(input_file, merge_target, format: str = "toml", private: bool = False, overwrite=False):
|
||||
"""Shows a table in the CLI comparing the local and remote blocklist"""
|
||||
from rich.table import Table
|
||||
from rich.console import Console
|
||||
input_blocklist = load_blocklist_file(input_file)
|
||||
merge_target_blocklist = load_blocklist_file(merge_target)
|
||||
for input_instance in input_blocklist:
|
||||
# If the block is already there with the same parameters we do nothing
|
||||
if input_instance in merge_target_blocklist:
|
||||
continue
|
||||
# Check if there is a domain in the merge target where the input domain is similar
|
||||
try:
|
||||
merge_target_instance = [merge_target_instance for merge_target_instance in merge_target_blocklist if input_instance.domain == merge_target_instance.domain][0]
|
||||
if not overwrite:
|
||||
key_input = ""
|
||||
while key_input not in ("i", "O"):
|
||||
print(f"Different settings for {input_instance.domain} detected.")
|
||||
Instance.show_diff(input_instance, merge_target_instance)
|
||||
key_input = input("Keep input (i) or original (o) [i/O]")
|
||||
elif key_input == "i":
|
||||
merge_target_blocklist.append(merge_target_instance)
|
||||
else:
|
||||
merge_target_blocklist.append(input_instance)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
|
||||
def cli():
|
||||
parser = argparse.ArgumentParser(description='Compare, merge, export and deploy blocklist of a fediverse server')
|
||||
parser.add_argument('action', choices=['diff', 'deploy', 'export', 'merge'],
|
||||
help="Either use 'diff' to check the difference between local blockĺist and the blocklist on "
|
||||
"the server, 'deploy' to apply the current local blocklist or 'export' to export the remote "
|
||||
"blocklist into a local file. merge can be used to merge a blocklist (given by -i) into "
|
||||
"another (-o)")
|
||||
parser.add_argument('-s', '--server', help="The address of the server where you want to deploy (e.g. "
|
||||
"mastodon.social)")
|
||||
parser.add_argument('-t', '--token', help="Authorization token")
|
||||
parser.add_argument('-i', '--input-file', help="The blocklist to use")
|
||||
parser.add_argument('-r', '--remote-blocklist', help="The remote blocklist as json for debugging reasons")
|
||||
parser.add_argument('-o', '--output', help="Filename where to export the blocklist")
|
||||
parser.add_argument('-v', '--verbose', action='store_true')
|
||||
parser.add_argument('-n', '--no-delete', action='store_true', help="Do not delete existing blocks")
|
||||
parser.add_argument('--format', default="toml", type=str, help="Export format: toml|markdown|csv|json")
|
||||
parser.add_argument('--private', action='store_true', help="When the flag is set, private comment will also be "
|
||||
"exported.")
|
||||
args = parser.parse_args()
|
||||
if args.verbose:
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
else:
|
||||
logging.basicConfig(level=logging.WARN)
|
||||
|
||||
if args.token:
|
||||
token = args.token
|
||||
else:
|
||||
token = os.getenv('MBD_TOKEN')
|
||||
|
||||
"""Get a remote blocklist only when necessary"""
|
||||
if args.action in ["diff", "deploy", "export"]:
|
||||
"""if there is a remote blocklist provided load this instead of fetching it from a server (for debugging reasons)"""
|
||||
if args.remote_blocklist:
|
||||
remote_blocklist = load_blocklist_file(args.remote_blocklist)
|
||||
else:
|
||||
remote_blocklist = load_blocklist_from_instance(server=args.server, token=token)
|
||||
|
||||
"""Load local blocklist only when needed"""
|
||||
if args.action in ["diff", "deploy", "merge"]:
|
||||
if args.input_file:
|
||||
blocklist_filename = args.input_file
|
||||
else:
|
||||
blocklist_filename = "../blocklist.toml"
|
||||
try:
|
||||
local_blocklist = load_blocklist_file(blocklist_filename)
|
||||
except FileNotFoundError:
|
||||
print("Local blocklist file was not found. Make sure to specify it's location via -i")
|
||||
exit()
|
||||
|
||||
if args.action == "diff":
|
||||
Instance.show_diffs(local_blocklist, remote_blocklist)
|
||||
elif args.action == "deploy":
|
||||
diffs = Instance.list_diffs(local_blocklist, remote_blocklist)
|
||||
Instance.apply_blocks_from_diff(diffs, args.server, token, args.no_delete)
|
||||
elif args.action == "export":
|
||||
exporter(remote_blocklist, args.output, args.format, args.private)
|
||||
elif args.action == "merge":
|
||||
merge(args.input_file, args.output, args.format, args.private)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
36
fediverse_blocklist_tool/helpers.py
Normal file
36
fediverse_blocklist_tool/helpers.py
Normal file
@@ -0,0 +1,36 @@
|
||||
from fediverse_blocklist_tool.models import Instance
|
||||
import toml
|
||||
import io
|
||||
import csv
|
||||
import json
|
||||
|
||||
def blocklist_to_markdown(blocklist: [Instance], private: bool = False):
|
||||
if private:
|
||||
markdown_string = "| Instance | Status | Reason | Private Comment |\n | --- | --- | --- |\n"
|
||||
else:
|
||||
markdown_string = "| Instance | Status | Reason |\n | --- | --- | --- |\n"
|
||||
for instance in blocklist:
|
||||
|
||||
if private:
|
||||
markdown_string += f"| {instance.domain} | {instance.severity} | {instance.public_comment} | {instance.private_comment} |\n"
|
||||
else:
|
||||
markdown_string += f"| {instance.domain} | {instance.severity} | {instance.public_comment} |\n"
|
||||
|
||||
return markdown_string
|
||||
|
||||
def blocklist_to_toml(blocklist: [Instance], private: bool = False):
|
||||
toml_string = toml.dumps({"instances": [b.as_dict(private) for b in blocklist]})
|
||||
return toml_string
|
||||
|
||||
def blocklist_to_csv(blocklist: [Instance], private: bool = False):
|
||||
csv_string = io.StringIO()
|
||||
blocklist_as_dict = [b.as_dict(private) for b in blocklist]
|
||||
keys = blocklist_as_dict[0].keys()
|
||||
w = csv.DictWriter(csv_string, keys)
|
||||
w.writeheader()
|
||||
w.writerows(blocklist_as_dict)
|
||||
return csv_string.getvalue()
|
||||
|
||||
def blocklist_to_json(blocklist: [Instance], private: bool = False):
|
||||
json_string = json.dumps([b.as_dict(private) for b in blocklist])
|
||||
return json_string
|
162
fediverse_blocklist_tool/models.py
Normal file
162
fediverse_blocklist_tool/models.py
Normal file
@@ -0,0 +1,162 @@
|
||||
import logging
|
||||
|
||||
import requests
|
||||
|
||||
from typing import Dict, Union
|
||||
|
||||
|
||||
class Instance:
|
||||
def __init__(self, instance_dict : Dict):
|
||||
"""If obfuscate, reject_media or reject_reports are not specified default to False"""
|
||||
self.severity: str = "suspend"
|
||||
self.obfuscate: bool = False
|
||||
self.reject_media: bool = False
|
||||
self.reject_reports: bool = False
|
||||
self.id: Union[int, None] = None
|
||||
self.domain: str = ""
|
||||
self.private_comment: str = ""
|
||||
self.public_comment: str = ""
|
||||
|
||||
self.parse_block(instance_dict)
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.domain}: {self.severity}"
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.domain == other.domain and self.severity == other.severity and self.reject_media == other.reject_media and self.reject_reports == other.reject_reports and self.obfuscate == other.obfuscate
|
||||
|
||||
def status_str(self):
|
||||
return f"{self.severity}\nReject reports: {self.reject_reports}\nReject media: {self.reject_media}\nObfuscate: {self.obfuscate}"
|
||||
|
||||
def as_dict(self, private=False):
|
||||
keys = ["domain", "severity", "public_comment", "obfuscate", "reject_media", "reject_reports"]
|
||||
if private:
|
||||
keys.append("private_comment")
|
||||
exportable = {}
|
||||
for key in keys:
|
||||
exportable[key] = getattr(self, key)
|
||||
return exportable
|
||||
|
||||
def parse_block(self, instance_dict):
|
||||
# this specifies possible properties and default values if not found on the remote source. If a default is None
|
||||
# the value is required and the parse will fail
|
||||
properties_and_defaults = [("domain", None), ("severity", "suspend"), ("public_comment", ""),
|
||||
("private_comment", ""), ("obfuscate", False), ("reject_media", False),
|
||||
("reject_reports", False)]
|
||||
for key, default in properties_and_defaults:
|
||||
try:
|
||||
setattr(self, key, instance_dict[key])
|
||||
except KeyError:
|
||||
if default is not None:
|
||||
setattr(self, key, default)
|
||||
else:
|
||||
raise KeyError(f"The key {key} was not in the instance_dict response.")
|
||||
|
||||
def apply(self, server, token, block_id=None):
|
||||
"""Applies instance block on the remote server"""
|
||||
headers = {
|
||||
f'Authorization': f'Bearer {token}',
|
||||
}
|
||||
# As long as we generate this enside of apply we cannot properly test for the correct format
|
||||
data = {"domain": self.domain,
|
||||
"severity": self.severity,
|
||||
"reject_media": str(self.reject_media).lower(),
|
||||
"reject_reports": str(self.reject_reports).lower(),
|
||||
"private_comment": str(self.private_comment).lower(),
|
||||
"public_comment": self.public_comment,
|
||||
"obfuscate": str(self.obfuscate).lower()}
|
||||
"""If no id is given add a new block, else update the existing block"""
|
||||
if block_id is None:
|
||||
response = requests.post(f'https://{server}/api/v1/admin/domain_blocks', data=data, headers=headers)
|
||||
else:
|
||||
response = requests.put(f'https://{server}/api/v1/admin/domain_blocks/{block_id}', data=data,
|
||||
headers=headers)
|
||||
if response.status_code != 200:
|
||||
raise ConnectionError(f"Could not apply block for {self.domain} ({response.status_code}: {response.reason})")
|
||||
|
||||
def delete(self, server: str, token: str):
|
||||
"""Deletes the instance from the blocklist on the remote server"""
|
||||
headers = {
|
||||
f'Authorization': f'Bearer {token}',
|
||||
}
|
||||
response = requests.delete(f'https://{server}/api/v1/admin/domain_blocks/{self.id}', headers=headers)
|
||||
if response.status_code != 200:
|
||||
raise ConnectionError(f"Could not apply block ({response.status_code}: {response.reason})")
|
||||
|
||||
@staticmethod
|
||||
def list_diffs(local_blocklist, remote_blocklist):
|
||||
"""Compares the local and remote blocklist and returns a list of differences"""
|
||||
diffs = []
|
||||
for local_instance in local_blocklist:
|
||||
instance_found = False
|
||||
for idx, remote_instance in enumerate(remote_blocklist):
|
||||
if local_instance.domain == remote_instance.domain:
|
||||
instance_found = True
|
||||
if local_instance == remote_instance:
|
||||
pass
|
||||
else:
|
||||
"""If the local block is different from the remote block, add it to the diff"""
|
||||
diffs.append({"local": local_instance, "remote": remote_instance})
|
||||
"""Remove the remote instance from the list so we later have a list of remote instances we don't
|
||||
have locally"""
|
||||
del remote_blocklist[idx]
|
||||
"""If the local instance is not in the remote blocklist, add it to the diff"""
|
||||
if not instance_found:
|
||||
diffs.append({"local": local_instance, "remote": None})
|
||||
for remote_instance in remote_blocklist:
|
||||
diffs.append({"local": None, "remote": remote_instance})
|
||||
return diffs
|
||||
|
||||
@staticmethod
|
||||
def apply_blocks_from_diff(diffs, server, token, no_delete: bool):
|
||||
"""Uses a diff (list of difference in local an remote instance) to apply instance blocks"""
|
||||
for diff in diffs:
|
||||
if diff["local"] is None:
|
||||
if not no_delete:
|
||||
"""Delete the block on the remote server"""
|
||||
diff['remote'].delete(server, token)
|
||||
logging.info(f"Deleted {diff['remote'].domain} from blocklist")
|
||||
elif diff["remote"] is None:
|
||||
"""Add the block on the remote server"""
|
||||
diff["local"].apply(server, token)
|
||||
logging.info(f"Added {diff['local'].domain} to blocklist")
|
||||
else:
|
||||
"""Update the block on the remote server"""
|
||||
diff["local"].apply(server, token, block_id=diff["remote"].id)
|
||||
logging.info(f"Updated {diff['local'].domain} in blocklist")
|
||||
|
||||
@staticmethod
|
||||
def show_diffs(local_blocklist, remote_blocklist):
|
||||
"""Shows a table in the CLI comparing the local and remote blocklist"""
|
||||
from rich.table import Table
|
||||
from rich.console import Console
|
||||
table = Table(title="Differences", expand=True, show_lines=True)
|
||||
|
||||
table.add_column("Domain", style="cyan")
|
||||
table.add_column("Local status", style="green")
|
||||
table.add_column("Current remote status", style="magenta")
|
||||
diffs = Instance.list_diffs(local_blocklist, remote_blocklist)
|
||||
for diff in diffs:
|
||||
if diff["local"] is None:
|
||||
table.add_row(diff["remote"].domain, None, diff["remote"].status_str())
|
||||
elif diff["remote"] is None:
|
||||
table.add_row(diff["local"].domain, diff["local"].status_str(), None)
|
||||
else:
|
||||
table.add_row(diff["local"].domain, diff["local"].status_str(), diff["remote"].status_str())
|
||||
console = Console()
|
||||
console.print(table)
|
||||
|
||||
@staticmethod
|
||||
def show_diff(instanceA, instanceB, column_names=('Input', 'Original')):
|
||||
from rich.table import Table
|
||||
from rich.console import Console
|
||||
table = Table(title="Differences", expand=True, show_lines=True)
|
||||
|
||||
table.add_column("Attribute", style="cyan")
|
||||
table.add_column(column_names[0], style="green")
|
||||
table.add_column(column_names[1], style="magenta")
|
||||
compare_attributes = ["domain", "severity", "obfuscate", "private_comment", "public_comment", "reject_media", "reject_reports"]
|
||||
for attr in compare_attributes:
|
||||
table.add_row(attr, str(getattr(instanceA, attr)), str(getattr(instanceB, attr)))
|
||||
console = Console()
|
||||
console.print(table)
|
Reference in New Issue
Block a user