276 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			276 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import argparse
 | 
						|
import json
 | 
						|
import logging
 | 
						|
import os
 | 
						|
from types import SimpleNamespace
 | 
						|
 | 
						|
import requests
 | 
						|
# TODO: consider using OSMPythonTools instead of requests or overpass library
 | 
						|
from osmtogeojson import osmtogeojson
 | 
						|
from tqdm import tqdm
 | 
						|
 | 
						|
DEFAULT_OSM_DATA_FILE = "export.geojson"
 | 
						|
# Search area must be the official name, e.g. "Germany" is not a valid area name in Overpass API
 | 
						|
# Consider instead finding & using the code within the query itself, e.g. "ISO3166-1"="DE"
 | 
						|
DEFAULT_OVERPASS_SEARCH_AREA = "Deutschland"
 | 
						|
 | 
						|
 | 
						|
def parse_args():
 | 
						|
    """Parse command-line arguments."""
 | 
						|
    parser = argparse.ArgumentParser(
 | 
						|
        description="Download animal shelter data from the Overpass API to the Notfellchen API.")
 | 
						|
    parser.add_argument("--api-token", type=str, help="API token for authentication.")
 | 
						|
    parser.add_argument("--area", type=str, help="Area to search for animal shelters (default: Deutschland).")
 | 
						|
    parser.add_argument("--instance", type=str, help="API instance URL.")
 | 
						|
    parser.add_argument("--data-file", type=str, help="Path to the GeoJSON file containing (only) animal shelters.")
 | 
						|
    parser.add_argument("--use-cached", action='store_true', help="Use the stored GeoJSON file")
 | 
						|
    return parser.parse_args()
 | 
						|
 | 
						|
 | 
						|
def get_config():
 | 
						|
    """Get configuration from environment variables or command-line arguments."""
 | 
						|
    args = parse_args()
 | 
						|
 | 
						|
    api_token = args.api_token or os.getenv("NOTFELLCHEN_API_TOKEN")
 | 
						|
    # TODO: document new environment variable NOTFELLCHEN_AREA
 | 
						|
    area = args.area or os.getenv("NOTFELLCHEN_AREA", DEFAULT_OVERPASS_SEARCH_AREA)
 | 
						|
    instance = args.instance or os.getenv("NOTFELLCHEN_INSTANCE")
 | 
						|
    data_file = args.data_file or os.getenv("NOTFELLCHEN_DATA_FILE", DEFAULT_OSM_DATA_FILE)
 | 
						|
    use_cached = args.use_cached or os.getenv("NOTFELLCHEN_USE_CACHED", False)
 | 
						|
 | 
						|
    if not api_token or not instance:
 | 
						|
        raise ValueError("API token and instance URL must be provided via environment variables or CLI arguments.")
 | 
						|
 | 
						|
    return api_token, area, instance, data_file, use_cached
 | 
						|
 | 
						|
 | 
						|
def get_or_none(data, key):
 | 
						|
    if key in data["properties"].keys():
 | 
						|
        return data["properties"][key]
 | 
						|
    else:
 | 
						|
        return None
 | 
						|
 | 
						|
 | 
						|
def get_or_empty(data, key):
 | 
						|
    if key in data["properties"].keys():
 | 
						|
        return data["properties"][key]
 | 
						|
    else:
 | 
						|
        return ""
 | 
						|
 | 
						|
 | 
						|
def choose(keys, data, replace=False):
 | 
						|
    for key in keys:
 | 
						|
        if key in data.keys():
 | 
						|
            if replace:
 | 
						|
                return data[key].replace(" ", "").replace("-", "").replace("(", "").replace(")", "")
 | 
						|
            else:
 | 
						|
                return data[key]
 | 
						|
    return None
 | 
						|
 | 
						|
 | 
						|
def add(value, platform):
 | 
						|
    if value != "":
 | 
						|
        if value.find(platform) == -1:
 | 
						|
            return f"https://www.{platform}.com/{value}"
 | 
						|
        else:
 | 
						|
            return value
 | 
						|
    else:
 | 
						|
        return None
 | 
						|
 | 
						|
 | 
						|
def https(value):
 | 
						|
    if value is not None and value != "":
 | 
						|
        value = value.replace("http://", "")
 | 
						|
        if value.find("https") == -1:
 | 
						|
            return f"https://{value}"
 | 
						|
        else:
 | 
						|
            return value
 | 
						|
    else:
 | 
						|
        return None
 | 
						|
 | 
						|
 | 
						|
def calc_coordinate_center(coordinates):
 | 
						|
    """
 | 
						|
    Calculates the center as the arithmetic mean of the list of coordinates.
 | 
						|
 | 
						|
    Not perfect because earth is a sphere (citation needed) but good enough.
 | 
						|
    """
 | 
						|
    if not coordinates:
 | 
						|
        return None, None
 | 
						|
 | 
						|
    lon_sum = 0.0
 | 
						|
    lat_sum = 0.0
 | 
						|
    count = 0
 | 
						|
 | 
						|
    for lon, lat in coordinates:
 | 
						|
        lon_sum += lon
 | 
						|
        lat_sum += lat
 | 
						|
        count += 1
 | 
						|
 | 
						|
    return lon_sum / count, lat_sum / count
 | 
						|
 | 
						|
 | 
						|
def get_center_coordinates(geometry):
 | 
						|
    """
 | 
						|
    Given a GeoJSON geometry dict, return (longitude, latitude)
 | 
						|
 | 
						|
    If a shape, calculate the center, else reurn the point
 | 
						|
    """
 | 
						|
    geom_type = geometry["type"]
 | 
						|
    coordinates = geometry["coordinates"]
 | 
						|
 | 
						|
    if geom_type == "Point":
 | 
						|
        return coordinates[0], coordinates[1]
 | 
						|
 | 
						|
    elif geom_type == "LineString":
 | 
						|
        return calc_coordinate_center(coordinates)
 | 
						|
 | 
						|
    elif geom_type == "Polygon":
 | 
						|
        outer_ring = coordinates[0]
 | 
						|
        return calc_coordinate_center(outer_ring)
 | 
						|
 | 
						|
    else:
 | 
						|
        raise ValueError(f"Unsupported geometry type: {geom_type}")
 | 
						|
 | 
						|
 | 
						|
# TODO: take note of new get_overpass_result function which does the bulk of the new overpass query work
 | 
						|
def get_overpass_result(area, data_file):
 | 
						|
    """Build the Overpass query for fetching animal shelters in the specified area."""
 | 
						|
    overpass_endpoint = "https://overpass-api.de/api/interpreter"
 | 
						|
    overpass_query = f"""
 | 
						|
        [out:json][timeout:25];
 | 
						|
        area[name="{area}"]->.searchArea;
 | 
						|
        nwr["amenity"="animal_shelter"](area.searchArea);
 | 
						|
        out body;
 | 
						|
        >;
 | 
						|
        out skel qt;
 | 
						|
        """
 | 
						|
    r = requests.get(overpass_endpoint, params={'data': overpass_query})
 | 
						|
    if r.status_code == 200:
 | 
						|
        rjson = r.json()
 | 
						|
        result = osmtogeojson.process_osm_json(rjson)
 | 
						|
        with open(data_file, 'w', encoding='utf-8') as f:
 | 
						|
            json.dump(result, f, ensure_ascii=False)
 | 
						|
        return result
 | 
						|
 | 
						|
 | 
						|
def add_if_available(base_data, keys, result):
 | 
						|
    # Loads the data into the org if available
 | 
						|
    for key in keys:
 | 
						|
        if getattr(base_data, key) is not None:
 | 
						|
            result[key] = getattr(base_data, key)
 | 
						|
    return result
 | 
						|
 | 
						|
 | 
						|
def create_location(tierheim, instance, headers):
 | 
						|
    location_data = {
 | 
						|
        "place_id": tierheim["id"],
 | 
						|
        "longitude": get_center_coordinates(tierheim["geometry"])[0],
 | 
						|
        "latitude": get_center_coordinates(tierheim["geometry"])[1],
 | 
						|
        "name": tierheim["properties"]["name"],
 | 
						|
        "city": tierheim["properties"]["addr:city"],
 | 
						|
        "housenumber": get_or_empty(tierheim, "addr:housenumber"),
 | 
						|
        "postcode": get_or_empty(tierheim, "addr:postcode"),
 | 
						|
        "street": get_or_empty(tierheim, "addr:street"),
 | 
						|
        "countrycode": get_or_empty(tierheim, "addr:country"),
 | 
						|
    }
 | 
						|
 | 
						|
    location_result = requests.post(f"{instance}/api/locations/", json=location_data, headers=headers)
 | 
						|
 | 
						|
    if location_result.status_code != 201:
 | 
						|
        try:
 | 
						|
            print(
 | 
						|
                f"Location for {tierheim["properties"]["name"]}:{location_result.status_code} {location_result.json()} not created")
 | 
						|
        except requests.exceptions.JSONDecodeError:
 | 
						|
            print(f"Location for {tierheim["properties"]["name"]} could not be created")
 | 
						|
            exit()
 | 
						|
 | 
						|
    return location_result.json()
 | 
						|
 | 
						|
 | 
						|
def main():
 | 
						|
    api_token, area, instance, data_file, use_cached = get_config()
 | 
						|
    if not use_cached:
 | 
						|
        # Query shelters
 | 
						|
        overpass_result = get_overpass_result(area, data_file)
 | 
						|
        if overpass_result is None:
 | 
						|
            print("Error: get_overpass_result returned None")
 | 
						|
            return
 | 
						|
    else:
 | 
						|
        with open(data_file, 'r', encoding='utf-8') as f:
 | 
						|
            overpass_result = json.load(f)
 | 
						|
 | 
						|
    # Set headers and endpoint
 | 
						|
    endpoint = f"{instance}/api/organizations/"
 | 
						|
    h = {'Authorization': f'Token {api_token}', "content-type": "application/json"}
 | 
						|
 | 
						|
    tierheime = overpass_result["features"]
 | 
						|
    stats = {"num_updated_orgs": 0,
 | 
						|
             "num_inserted_orgs": 0}
 | 
						|
 | 
						|
    for idx, tierheim in enumerate(tqdm(tierheime)):
 | 
						|
        # Check if data is low quality
 | 
						|
        if "name" not in tierheim["properties"].keys() or "addr:city" not in tierheim["properties"].keys():
 | 
						|
            continue
 | 
						|
 | 
						|
        # Load TH data in for easier accessing
 | 
						|
        th_data = SimpleNamespace(
 | 
						|
            name=tierheim["properties"]["name"],
 | 
						|
            email=choose(("contact:email", "email"), tierheim["properties"]),
 | 
						|
            phone_number=choose(("contact:phone", "phone"), tierheim["properties"], replace=True),
 | 
						|
            fediverse_profile=get_or_none(tierheim, "contact:mastodon"),
 | 
						|
            facebook=https(add(get_or_empty(tierheim, "contact:facebook"), "facebook")),
 | 
						|
            instagram=https(add(get_or_empty(tierheim, "contact:instagram"), "instagram")),
 | 
						|
            website=https(choose(("contact:website", "website"), tierheim["properties"])),
 | 
						|
            description=get_or_none(tierheim, "opening_hours"),
 | 
						|
            external_object_identifier=tierheim["id"],
 | 
						|
            EXTERNAL_SOURCE_IDENTIFIER="OSM",
 | 
						|
        )
 | 
						|
 | 
						|
        # Define here for later
 | 
						|
        optional_data = ["email", "phone_number", "website", "description", "fediverse_profile", "facebook",
 | 
						|
                         "instagram"]
 | 
						|
 | 
						|
        # Check if rescue organization exists
 | 
						|
        search_data = {"external_source_identifier": "OSM",
 | 
						|
                       "external_object_identifier": f"{tierheim["id"]}"}
 | 
						|
        search_result = requests.get(f"{instance}/api/organizations", params=search_data, headers=h)
 | 
						|
        # Rescue organization exits
 | 
						|
        if search_result.status_code == 200:
 | 
						|
            stats["num_updated_orgs"] += 1
 | 
						|
            org_id = search_result.json()[0]["id"]
 | 
						|
            logging.debug(f"{th_data.name} already exists as ID {org_id}.")
 | 
						|
            org_patch_data = {"id": org_id,
 | 
						|
                              "name": th_data.name}
 | 
						|
            if search_result.json()[0]["location"] is None:
 | 
						|
                location = create_location(tierheim, instance, h)
 | 
						|
                org_patch_data["location"] = location["id"]
 | 
						|
 | 
						|
            org_patch_data = add_if_available(th_data, optional_data, org_patch_data)
 | 
						|
 | 
						|
            result = requests.patch(endpoint, json=org_patch_data, headers=h)
 | 
						|
            if result.status_code != 200:
 | 
						|
                logging.warning(f"Updating {tierheim['properties']['name']} failed:{result.status_code} {result.json()}")
 | 
						|
            continue
 | 
						|
        # Rescue organization does not exist
 | 
						|
        else:
 | 
						|
            stats["num_inserted_orgs"] += 1
 | 
						|
            location = create_location(tierheim, instance, h)
 | 
						|
            org_data = {"name": tierheim["properties"]["name"],
 | 
						|
                        "external_object_identifier": f"{tierheim["id"]}",
 | 
						|
                        "external_source_identifier": "OSM",
 | 
						|
                        "location": location["id"]
 | 
						|
                        }
 | 
						|
 | 
						|
            org_data = add_if_available(th_data, optional_data, org_data)
 | 
						|
 | 
						|
            result = requests.post(endpoint, json=org_data, headers=h)
 | 
						|
 | 
						|
            if result.status_code != 201:
 | 
						|
                print(f"{idx} {tierheim["properties"]["name"]}:{result.status_code} {result.json()}")
 | 
						|
    print(f"Upload finished. Inserted {stats['num_inserted_orgs']} new orgs and updated {stats['num_updated_orgs']} orgs.")
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    main()
 |