148 lines
7.0 KiB
Python
148 lines
7.0 KiB
Python
import logging
|
|
|
|
import requests
|
|
|
|
from typing import Dict, Union
|
|
|
|
|
|
class Instance:
|
|
def __init__(self, instance_dict : Dict):
|
|
"""If obfuscate, reject_media or reject_reports are not specified default to False"""
|
|
self.severity: str = "suspend"
|
|
self.obfuscate: bool = False
|
|
self.reject_media: bool = False
|
|
self.reject_reports: bool = False
|
|
self.id: Union[int, None] = None
|
|
self.domain: str = ""
|
|
self.private_comment: str = ""
|
|
self.public_comment: str = ""
|
|
|
|
self.parse_block(instance_dict)
|
|
|
|
def __str__(self):
|
|
return f"{self.domain}: {self.severity}"
|
|
|
|
def __eq__(self, other):
|
|
return self.domain == other.domain and self.severity == other.severity and self.reject_media == other.reject_media and self.reject_reports == other.reject_reports and self.obfuscate == other.obfuscate
|
|
|
|
def status_str(self):
|
|
return f"{self.severity}\nReject reports: {self.reject_reports}\nReject media: {self.reject_media}\nObfuscate: {self.obfuscate}"
|
|
|
|
def as_dict(self, private=False):
|
|
keys = ["domain", "severity", "public_comment", "obfuscate", "reject_media", "reject_reports"]
|
|
if private:
|
|
keys.append("private_comment")
|
|
exportable = {}
|
|
for key in keys:
|
|
exportable[key] = getattr(self, key)
|
|
return exportable
|
|
|
|
def parse_block(self, instance_dict):
|
|
# this specifies possible properties and default values if not found on the remote source. If a default is None
|
|
# the value is required and the parse will fail
|
|
properties_and_defaults = [("domain", None), ("severity", "suspend"), ("public_comment", ""),
|
|
("private_comment", ""), ("obfuscate", False), ("reject_media", False),
|
|
("reject_reports", False)]
|
|
for key, default in properties_and_defaults:
|
|
try:
|
|
setattr(self, key, instance_dict[key])
|
|
except KeyError:
|
|
if default is not None:
|
|
setattr(self, key, default)
|
|
else:
|
|
raise KeyError(f"The key {key} was not in the instance_dict response.")
|
|
|
|
def apply(self, server, token, block_id=None):
|
|
"""Applies instance block on the remote server"""
|
|
headers = {
|
|
f'Authorization': f'Bearer {token}',
|
|
}
|
|
# As long as we generate this enside of apply we cannot properly test for the correct format
|
|
data = {"domain": self.domain,
|
|
"severity": self.severity,
|
|
"reject_media": str(self.reject_media).lower(),
|
|
"reject_reports": str(self.reject_reports).lower(),
|
|
"private_comment": str(self.private_comment).lower(),
|
|
"public_comment": self.public_comment,
|
|
"obfuscate": str(self.obfuscate).lower()}
|
|
"""If no id is given add a new block, else update the existing block"""
|
|
if block_id is None:
|
|
response = requests.post(f'https://{server}/api/v1/admin/domain_blocks', data=data, headers=headers)
|
|
else:
|
|
response = requests.put(f'https://{server}/api/v1/admin/domain_blocks/{block_id}', data=data,
|
|
headers=headers)
|
|
if response.status_code != 200:
|
|
raise ConnectionError(f"Could not apply block for {self.domain} ({response.status_code}: {response.reason})")
|
|
|
|
def delete(self, server: str, token: str):
|
|
"""Deletes the instance from the blocklist on the remote server"""
|
|
headers = {
|
|
f'Authorization': f'Bearer {token}',
|
|
}
|
|
response = requests.delete(f'https://{server}/api/v1/admin/domain_blocks/{self.id}', headers=headers)
|
|
if response.status_code != 200:
|
|
raise ConnectionError(f"Could not apply block ({response.status_code}: {response.reason})")
|
|
|
|
@staticmethod
|
|
def list_diffs(local_blocklist, remote_blocklist):
|
|
"""Compares the local and remote blocklist and returns a list of differences"""
|
|
diffs = []
|
|
for local_instance in local_blocklist:
|
|
instance_found = False
|
|
for idx, remote_instance in enumerate(remote_blocklist):
|
|
if local_instance.domain == remote_instance.domain:
|
|
instance_found = True
|
|
if local_instance == remote_instance:
|
|
pass
|
|
else:
|
|
"""If the local block is different from the remote block, add it to the diff"""
|
|
diffs.append({"local": local_instance, "remote": remote_instance})
|
|
"""Remove the remote instance from the list so we later have a list of remote instances we don't
|
|
have locally"""
|
|
del remote_blocklist[idx]
|
|
"""If the local instance is not in the remote blocklist, add it to the diff"""
|
|
if not instance_found:
|
|
diffs.append({"local": local_instance, "remote": None})
|
|
for remote_instance in remote_blocklist:
|
|
diffs.append({"local": None, "remote": remote_instance})
|
|
return diffs
|
|
|
|
@staticmethod
|
|
def apply_blocks_from_diff(diffs, server, token, no_delete: bool):
|
|
"""Uses a diff (list of difference in local an remote instance) to apply instance blocks"""
|
|
for diff in diffs:
|
|
if diff["local"] is None:
|
|
if not no_delete:
|
|
"""Delete the block on the remote server"""
|
|
diff['remote'].delete(server, token)
|
|
logging.info(f"Deleted {diff['remote'].domain} from blocklist")
|
|
elif diff["remote"] is None:
|
|
"""Add the block on the remote server"""
|
|
diff["local"].apply(server, token)
|
|
logging.info(f"Added {diff['local'].domain} to blocklist")
|
|
else:
|
|
"""Update the block on the remote server"""
|
|
diff["local"].apply(server, token, block_id=diff["remote"].id)
|
|
logging.info(f"Updated {diff['local'].domain} in blocklist")
|
|
|
|
@staticmethod
|
|
def show_diffs(local_blocklist, remote_blocklist):
|
|
"""Shows a table in the CLI comparing the local and remote blocklist"""
|
|
from rich.table import Table
|
|
from rich.console import Console
|
|
table = Table(title="Differences", expand=True, show_lines=True)
|
|
|
|
table.add_column("Domain", style="cyan")
|
|
table.add_column("Local status", style="green")
|
|
table.add_column("Current remote status", style="magenta")
|
|
diffs = Instance.list_diffs(local_blocklist, remote_blocklist)
|
|
for diff in diffs:
|
|
if diff["local"] is None:
|
|
table.add_row(diff["remote"].domain, None, diff["remote"].status_str())
|
|
elif diff["remote"] is None:
|
|
table.add_row(diff["local"].domain, diff["local"].status_str(), None)
|
|
else:
|
|
table.add_row(diff["local"].domain, diff["local"].status_str(), diff["remote"].status_str())
|
|
console = Console()
|
|
console.print(table)
|