145 lines
6.4 KiB
Python
145 lines
6.4 KiB
Python
import logging
|
|
|
|
import requests
|
|
|
|
|
|
class Instance:
|
|
def __init__(self, instance_dict):
|
|
"""If obfuscate, reject_media or reject_reports are not specified default to False"""
|
|
self.obfuscate = False
|
|
self.reject_media = False
|
|
self.reject_reports = False
|
|
self.id = None
|
|
|
|
"""Remote blocks and local blocks are parsed differently"""
|
|
try:
|
|
instance_dict["id"]
|
|
self.parse_remote_block(instance_dict)
|
|
except KeyError:
|
|
self.parse_local_block(instance_dict)
|
|
|
|
def __str__(self):
|
|
return f"{self.domain}: {self.severity}"
|
|
|
|
def __eq__(self, other):
|
|
return self.domain == other.domain and self.severity == other.severity and self.reject_media == other.reject_media and self.reject_reports == other.reject_reports and self.obfuscate == other.obfuscate
|
|
|
|
def status_str(self):
|
|
return f"{self.severity}, Reject reports: {self.reject_reports}, Reject media: {self.reject_media}, Obfuscate: {self.obfuscate}"
|
|
|
|
def parse_remote_block(self, instance_dict):
|
|
self.domain = instance_dict["domain"]
|
|
self.id = instance_dict["id"]
|
|
self.severity = instance_dict["severity"]
|
|
self.public_comment = instance_dict["public_comment"]
|
|
self.private_comment = instance_dict["private_comment"]
|
|
self.obfuscate = instance_dict["obfuscate"]
|
|
self.reject_media = instance_dict["reject_media"]
|
|
self.reject_reports = instance_dict["reject_reports"]
|
|
|
|
def parse_local_block(self, instance_dict):
|
|
self.name = instance_dict["name"]
|
|
self.domain = instance_dict["domain"]
|
|
self.severity = instance_dict["severity"]
|
|
self.public_comment = instance_dict["public_comment"]
|
|
self.private_comment = instance_dict["private_comment"]
|
|
try:
|
|
self.obfuscate = instance_dict["obfuscate"]
|
|
except KeyError:
|
|
pass
|
|
try:
|
|
self.reject_media = instance_dict["reject_media"]
|
|
except KeyError:
|
|
pass
|
|
try:
|
|
self.reject_reports = instance_dict["reject_reports"]
|
|
except KeyError:
|
|
pass
|
|
|
|
def apply(self, server, token, block_id=None):
|
|
headers = {
|
|
f'Authorization': f'Bearer {token}',
|
|
}
|
|
data = {"domain": self.domain,
|
|
"severity": self.severity,
|
|
"reject_media": self.reject_media,
|
|
"reject_reports": self.reject_reports,
|
|
"private_comment": self.private_comment,
|
|
"public_comment": self.public_comment,
|
|
"obfuscate": self.obfuscate}
|
|
"""If no id is given add a new block, else update the existing block"""
|
|
if block_id is None:
|
|
response = requests.post(f'https://{server}/api/v1/admin/domain_blocks', data=data, headers=headers)
|
|
else:
|
|
response = requests.put(f'https://{server}/api/v1/admin/domain_blocks/{block_id}', data=data, headers=headers)
|
|
if response.status_code != 200:
|
|
raise ConnectionError(f"Could not apply block ({response.status_code}: {response.reason})")
|
|
def delete(self, server: str, token: str):
|
|
headers = {
|
|
f'Authorization': f'Bearer {token}',
|
|
}
|
|
response = requests.delete(f'https://{server}/api/v1/admin/domain_blocks/{self.id}', headers=headers)
|
|
if response.status_code != 200:
|
|
raise ConnectionError(f"Could not apply block ({response.status_code}: {response.reason})")
|
|
|
|
|
|
@staticmethod
|
|
def list_diffs(local_blocklist, remote_blocklist):
|
|
diffs = []
|
|
for local_instance in local_blocklist:
|
|
instance_found = False
|
|
for idx, remote_instance in enumerate(remote_blocklist):
|
|
if local_instance.domain == remote_instance.domain:
|
|
instance_found = True
|
|
if local_instance == remote_instance:
|
|
pass
|
|
else:
|
|
"""If the local block is different from the remote block, add it to the diff"""
|
|
diffs.append({"local": local_instance, "remote": remote_instance})
|
|
"""Remove the remote instance from the list so we later have a list of remote instances we don't
|
|
have locally"""
|
|
del remote_blocklist[idx]
|
|
"""If the local instance is not in the remote blocklist, add it to the diff"""
|
|
if not instance_found:
|
|
diffs.append({"local": local_instance, "remote": None})
|
|
for remote_instance in remote_blocklist:
|
|
diffs.append({"local": None, "remote": remote_instance})
|
|
return diffs
|
|
|
|
@staticmethod
|
|
def apply_blocks_from_diff(diffs, server, token, no_delete: bool):
|
|
for diff in diffs:
|
|
if diff["local"] is None:
|
|
if not no_delete:
|
|
"""Delete the block on the remote server"""
|
|
diff['remote'].delete(server, token)
|
|
logging.info(f"Deleted {diff['remote'].domain} from blocklist")
|
|
elif diff["remote"] is None:
|
|
"""Add the block on the remote server"""
|
|
diff["local"].apply(server, token)
|
|
logging.info(f"Added {diff['remote'].domain} to blocklist")
|
|
else:
|
|
"""Update the block on the remote server"""
|
|
diff["local"].apply(server, token, block_id=diff["remote"].id)
|
|
logging.info(f"Updated {diff['remote'].domain} in blocklist")
|
|
|
|
@staticmethod
|
|
def show_diffs(local_blocklist, remote_blocklist):
|
|
from rich.table import Table
|
|
from rich.console import Console
|
|
table = Table(title="Differences", expand=True, show_lines=True)
|
|
|
|
table.add_column("Domain", style="cyan")
|
|
table.add_column("Current remote status", style="magenta")
|
|
table.add_column("Local status", style="green")
|
|
diffs = Instance.list_diffs(local_blocklist, remote_blocklist)
|
|
for diff in diffs:
|
|
if diff["local"] is None:
|
|
table.add_row(diff["remote"].domain, None, diff["remote"].status_str())
|
|
elif diff["remote"] is None:
|
|
table.add_row(diff["local"].domain, diff["local"].status_str(), None)
|
|
else:
|
|
table.add_row(diff["local"].domain, diff["local"].status_str(), diff["remote"].status_str())
|
|
console = Console()
|
|
console.print(table)
|