276 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			276 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import argparse
 | |
| import json
 | |
| import logging
 | |
| import os
 | |
| from types import SimpleNamespace
 | |
| 
 | |
| import requests
 | |
| # TODO: consider using OSMPythonTools instead of requests or overpass library
 | |
| from osmtogeojson import osmtogeojson
 | |
| from tqdm import tqdm
 | |
| 
 | |
| DEFAULT_OSM_DATA_FILE = "export.geojson"
 | |
| # Search area must be the official name, e.g. "Germany" is not a valid area name in Overpass API
 | |
| # Consider instead finding & using the code within the query itself, e.g. "ISO3166-1"="DE"
 | |
| DEFAULT_OVERPASS_SEARCH_AREA = "Deutschland"
 | |
| 
 | |
| 
 | |
| def parse_args():
 | |
|     """Parse command-line arguments."""
 | |
|     parser = argparse.ArgumentParser(
 | |
|         description="Download animal shelter data from the Overpass API to the Notfellchen API.")
 | |
|     parser.add_argument("--api-token", type=str, help="API token for authentication.")
 | |
|     parser.add_argument("--area", type=str, help="Area to search for animal shelters (default: Deutschland).")
 | |
|     parser.add_argument("--instance", type=str, help="API instance URL.")
 | |
|     parser.add_argument("--data-file", type=str, help="Path to the GeoJSON file containing (only) animal shelters.")
 | |
|     parser.add_argument("--use-cached", action='store_true', help="Use the stored GeoJSON file")
 | |
|     return parser.parse_args()
 | |
| 
 | |
| 
 | |
| def get_config():
 | |
|     """Get configuration from environment variables or command-line arguments."""
 | |
|     args = parse_args()
 | |
| 
 | |
|     api_token = args.api_token or os.getenv("NOTFELLCHEN_API_TOKEN")
 | |
|     # TODO: document new environment variable NOTFELLCHEN_AREA
 | |
|     area = args.area or os.getenv("NOTFELLCHEN_AREA", DEFAULT_OVERPASS_SEARCH_AREA)
 | |
|     instance = args.instance or os.getenv("NOTFELLCHEN_INSTANCE")
 | |
|     data_file = args.data_file or os.getenv("NOTFELLCHEN_DATA_FILE", DEFAULT_OSM_DATA_FILE)
 | |
|     use_cached = args.use_cached or os.getenv("NOTFELLCHEN_USE_CACHED", False)
 | |
| 
 | |
|     if not api_token or not instance:
 | |
|         raise ValueError("API token and instance URL must be provided via environment variables or CLI arguments.")
 | |
| 
 | |
|     return api_token, area, instance, data_file, use_cached
 | |
| 
 | |
| 
 | |
| def get_or_none(data, key):
 | |
|     if key in data["properties"].keys():
 | |
|         return data["properties"][key]
 | |
|     else:
 | |
|         return None
 | |
| 
 | |
| 
 | |
| def get_or_empty(data, key):
 | |
|     if key in data["properties"].keys():
 | |
|         return data["properties"][key]
 | |
|     else:
 | |
|         return ""
 | |
| 
 | |
| 
 | |
| def choose(keys, data, replace=False):
 | |
|     for key in keys:
 | |
|         if key in data.keys():
 | |
|             if replace:
 | |
|                 return data[key].replace(" ", "").replace("-", "").replace("(", "").replace(")", "")
 | |
|             else:
 | |
|                 return data[key]
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def add(value, platform):
 | |
|     if value != "":
 | |
|         if value.find(platform) == -1:
 | |
|             return f"https://www.{platform}.com/{value}"
 | |
|         else:
 | |
|             return value
 | |
|     else:
 | |
|         return None
 | |
| 
 | |
| 
 | |
| def https(value):
 | |
|     if value is not None and value != "":
 | |
|         value = value.replace("http://", "")
 | |
|         if value.find("https") == -1:
 | |
|             return f"https://{value}"
 | |
|         else:
 | |
|             return value
 | |
|     else:
 | |
|         return None
 | |
| 
 | |
| 
 | |
| def calc_coordinate_center(coordinates):
 | |
|     """
 | |
|     Calculates the center as the arithmetic mean of the list of coordinates.
 | |
| 
 | |
|     Not perfect because earth is a sphere (citation needed) but good enough.
 | |
|     """
 | |
|     if not coordinates:
 | |
|         return None, None
 | |
| 
 | |
|     lon_sum = 0.0
 | |
|     lat_sum = 0.0
 | |
|     count = 0
 | |
| 
 | |
|     for lon, lat in coordinates:
 | |
|         lon_sum += lon
 | |
|         lat_sum += lat
 | |
|         count += 1
 | |
| 
 | |
|     return lon_sum / count, lat_sum / count
 | |
| 
 | |
| 
 | |
| def get_center_coordinates(geometry):
 | |
|     """
 | |
|     Given a GeoJSON geometry dict, return (longitude, latitude)
 | |
| 
 | |
|     If a shape, calculate the center, else reurn the point
 | |
|     """
 | |
|     geom_type = geometry["type"]
 | |
|     coordinates = geometry["coordinates"]
 | |
| 
 | |
|     if geom_type == "Point":
 | |
|         return coordinates[0], coordinates[1]
 | |
| 
 | |
|     elif geom_type == "LineString":
 | |
|         return calc_coordinate_center(coordinates)
 | |
| 
 | |
|     elif geom_type == "Polygon":
 | |
|         outer_ring = coordinates[0]
 | |
|         return calc_coordinate_center(outer_ring)
 | |
| 
 | |
|     else:
 | |
|         raise ValueError(f"Unsupported geometry type: {geom_type}")
 | |
| 
 | |
| 
 | |
| # TODO: take note of new get_overpass_result function which does the bulk of the new overpass query work
 | |
| def get_overpass_result(area, data_file):
 | |
|     """Build the Overpass query for fetching animal shelters in the specified area."""
 | |
|     overpass_endpoint = "https://overpass-api.de/api/interpreter"
 | |
|     overpass_query = f"""
 | |
|         [out:json][timeout:25];
 | |
|         area[name="{area}"]->.searchArea;
 | |
|         nwr["amenity"="animal_shelter"](area.searchArea);
 | |
|         out body;
 | |
|         >;
 | |
|         out skel qt;
 | |
|         """
 | |
|     r = requests.get(overpass_endpoint, params={'data': overpass_query})
 | |
|     if r.status_code == 200:
 | |
|         rjson = r.json()
 | |
|         result = osmtogeojson.process_osm_json(rjson)
 | |
|         with open(data_file, 'w', encoding='utf-8') as f:
 | |
|             json.dump(result, f, ensure_ascii=False)
 | |
|         return result
 | |
| 
 | |
| 
 | |
| def add_if_available(base_data, keys, result):
 | |
|     # Loads the data into the org if available
 | |
|     for key in keys:
 | |
|         if getattr(base_data, key) is not None:
 | |
|             result[key] = getattr(base_data, key)
 | |
|     return result
 | |
| 
 | |
| 
 | |
| def create_location(tierheim, instance, headers):
 | |
|     location_data = {
 | |
|         "place_id": tierheim["id"],
 | |
|         "longitude": get_center_coordinates(tierheim["geometry"])[0],
 | |
|         "latitude": get_center_coordinates(tierheim["geometry"])[1],
 | |
|         "name": tierheim["properties"]["name"],
 | |
|         "city": tierheim["properties"]["addr:city"],
 | |
|         "housenumber": get_or_empty(tierheim, "addr:housenumber"),
 | |
|         "postcode": get_or_empty(tierheim, "addr:postcode"),
 | |
|         "street": get_or_empty(tierheim, "addr:street"),
 | |
|         "countrycode": get_or_empty(tierheim, "addr:country"),
 | |
|     }
 | |
| 
 | |
|     location_result = requests.post(f"{instance}/api/locations/", json=location_data, headers=headers)
 | |
| 
 | |
|     if location_result.status_code != 201:
 | |
|         try:
 | |
|             print(
 | |
|                 f"Location for {tierheim["properties"]["name"]}:{location_result.status_code} {location_result.json()} not created")
 | |
|         except requests.exceptions.JSONDecodeError:
 | |
|             print(f"Location for {tierheim["properties"]["name"]} could not be created")
 | |
|             exit()
 | |
| 
 | |
|     return location_result.json()
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     api_token, area, instance, data_file, use_cached = get_config()
 | |
|     if not use_cached:
 | |
|         # Query shelters
 | |
|         overpass_result = get_overpass_result(area, data_file)
 | |
|         if overpass_result is None:
 | |
|             print("Error: get_overpass_result returned None")
 | |
|             return
 | |
|     else:
 | |
|         with open(data_file, 'r', encoding='utf-8') as f:
 | |
|             overpass_result = json.load(f)
 | |
| 
 | |
|     # Set headers and endpoint
 | |
|     endpoint = f"{instance}/api/organizations/"
 | |
|     h = {'Authorization': f'Token {api_token}', "content-type": "application/json"}
 | |
| 
 | |
|     tierheime = overpass_result["features"]
 | |
|     stats = {"num_updated_orgs": 0,
 | |
|              "num_inserted_orgs": 0}
 | |
| 
 | |
|     for idx, tierheim in enumerate(tqdm(tierheime)):
 | |
|         # Check if data is low quality
 | |
|         if "name" not in tierheim["properties"].keys() or "addr:city" not in tierheim["properties"].keys():
 | |
|             continue
 | |
| 
 | |
|         # Load TH data in for easier accessing
 | |
|         th_data = SimpleNamespace(
 | |
|             name=tierheim["properties"]["name"],
 | |
|             email=choose(("contact:email", "email"), tierheim["properties"]),
 | |
|             phone_number=choose(("contact:phone", "phone"), tierheim["properties"], replace=True),
 | |
|             fediverse_profile=get_or_none(tierheim, "contact:mastodon"),
 | |
|             facebook=https(add(get_or_empty(tierheim, "contact:facebook"), "facebook")),
 | |
|             instagram=https(add(get_or_empty(tierheim, "contact:instagram"), "instagram")),
 | |
|             website=https(choose(("contact:website", "website"), tierheim["properties"])),
 | |
|             description=get_or_none(tierheim, "opening_hours"),
 | |
|             external_object_identifier=tierheim["id"],
 | |
|             EXTERNAL_SOURCE_IDENTIFIER="OSM",
 | |
|         )
 | |
| 
 | |
|         # Define here for later
 | |
|         optional_data = ["email", "phone_number", "website", "description", "fediverse_profile", "facebook",
 | |
|                          "instagram"]
 | |
| 
 | |
|         # Check if rescue organization exists
 | |
|         search_data = {"external_source_identifier": "OSM",
 | |
|                        "external_object_identifier": f"{tierheim["id"]}"}
 | |
|         search_result = requests.get(f"{instance}/api/organizations", params=search_data, headers=h)
 | |
|         # Rescue organization exits
 | |
|         if search_result.status_code == 200:
 | |
|             stats["num_updated_orgs"] += 1
 | |
|             org_id = search_result.json()[0]["id"]
 | |
|             logging.debug(f"{th_data.name} already exists as ID {org_id}.")
 | |
|             org_patch_data = {"id": org_id,
 | |
|                               "name": th_data.name}
 | |
|             if search_result.json()[0]["location"] is None:
 | |
|                 location = create_location(tierheim, instance, h)
 | |
|                 org_patch_data["location"] = location["id"]
 | |
| 
 | |
|             org_patch_data = add_if_available(th_data, optional_data, org_patch_data)
 | |
| 
 | |
|             result = requests.patch(endpoint, json=org_patch_data, headers=h)
 | |
|             if result.status_code != 200:
 | |
|                 logging.warning(f"Updating {tierheim['properties']['name']} failed:{result.status_code} {result.json()}")
 | |
|             continue
 | |
|         # Rescue organization does not exist
 | |
|         else:
 | |
|             stats["num_inserted_orgs"] += 1
 | |
|             location = create_location(tierheim, instance, h)
 | |
|             org_data = {"name": tierheim["properties"]["name"],
 | |
|                         "external_object_identifier": f"{tierheim["id"]}",
 | |
|                         "external_source_identifier": "OSM",
 | |
|                         "location": location["id"]
 | |
|                         }
 | |
| 
 | |
|             org_data = add_if_available(th_data, optional_data, org_data)
 | |
| 
 | |
|             result = requests.post(endpoint, json=org_data, headers=h)
 | |
| 
 | |
|             if result.status_code != 201:
 | |
|                 print(f"{idx} {tierheim["properties"]["name"]}:{result.status_code} {result.json()}")
 | |
|     print(f"Upload finished. Inserted {stats['num_inserted_orgs']} new orgs and updated {stats['num_updated_orgs']} orgs.")
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main()
 |