Skip to content

Instantly share code, notes, and snippets.

@gpproton
Created January 29, 2025 15:48
Show Gist options
  • Save gpproton/2b533b1cee734e3f70678317b6f2e7a4 to your computer and use it in GitHub Desktop.
Save gpproton/2b533b1cee734e3f70678317b6f2e7a4 to your computer and use it in GitHub Desktop.

Revisions

  1. gpproton created this gist Jan 29, 2025.
    258 changes: 258 additions & 0 deletions osm-data-merge.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,258 @@
    # Copyright (c) 2025 <Godwin peter. O>
    #
    # Permission is hereby granted, free of charge, to any person obtaining a copy
    # of this software and associated documentation files (the "Software"), to deal
    # in the Software without restriction, including without limitation the rights
    # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    # copies of the Software, and to permit persons to whom the Software is
    # furnished to do so, subject to the following conditions:
    #
    # The above copyright notice and this permission notice shall be included in all
    # copies or substantial portions of the Software.
    #
    # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
    # SOFTWARE.
    #
    # Project: python-experiments
    # Author: Godwin peter. O ([email protected])
    # Created At: Wed 29 Jan 2025 12:46:36
    # Modified By: Godwin peter. O ([email protected])
    # Modified At: Wed 29 Jan 2025 12:46:36

    import logging
    import os
    import sys
    from datetime import datetime
    from pathlib import Path
    from typing import List, Optional

    import boto3
    import osmium
    import requests

    # NOTE: Install these first
    # Resolve any virtual env issue
    # pip install --upgrade boto3 osmium

    ## Valid ENV Variables - .osm.env OR .env
    # OSM_LOCATIONS=africa/mali,africa/togo
    # OSM_SOURCE=https://<source-url>
    # S3_ENABLED=False
    # S3_ACCOUNT_ID=xx
    # S3_ENDPOINT_URL=xx
    # S3_BUCKET_NAME=xx
    # S3_ACCESS_KEY_ID=xx
    # S3_ACCESS_KEY_SECRET=xx


    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)


    def load_env_file(input_env_file, override=False):
    dotenv_path: str = ".env"
    file_path = Path(input_env_file)
    if file_path.is_file():
    dotenv_path = input_env_file
    elif Path(dotenv_path).is_file():
    pass
    else:
    logger.critical("Unable to load env vars")
    sys.exit()

    with open(dotenv_path) as file_obj:
    lines = file_obj.read().splitlines() # Removes \n from lines

    dotenv_vars = {}
    for line in lines:
    line = line.strip()
    if not line or line.startswith("#") or "=" not in line:
    continue

    key, value = line.split("=", maxsplit=1)
    dotenv_vars.setdefault(key, value)

    if override:
    os.environ.update(dotenv_vars)
    else:
    for key, value in dotenv_vars.items():
    os.environ.setdefault(key, value)


    load_env_file(".osm.env")

    output_name = "all"
    locations_conf = os.getenv("OSM_LOCATIONS", "africa/togo,africa/benin")
    logger.info(locations_conf)
    all_locations: List[str] = locations_conf.split(",")

    OSM_SOURCE = os.getenv("OSM_SOURCE")
    S3_ENABLED: bool = eval(os.getenv("S3_ENABLED", "False"))
    ## account_id - For cloudflare OR replace with S3 url
    S3_ACCOUNT_ID = os.getenv("S3_ACCOUNT_ID")
    S3_ENDPOINT_URL = os.getenv(
    "S3_ENDPOINT_URL", f"https://{S3_ACCOUNT_ID}.r2.cloudflarestorage.com"
    )
    S3_CONFIG = {
    "bucket_name": os.getenv("S3_BUCKET_NAME", ""),
    "endpoint_url": S3_ENDPOINT_URL,
    "access_key_id": os.getenv("S3_ACCESS_KEY_ID", ""),
    "access_key_secret": os.getenv("S3_ACCESS_KEY_SECRET", ""),
    }


    class OSMDownloaderMerger:
    def __init__(self, base_url: str = "", locations: List[str] = []):
    self.base_url = base_url
    self.locations = locations

    def needs_download(self, filepath: str) -> bool:
    """Check if file needs to be downloaded based on age or existence."""
    if not os.path.exists(filepath):
    return True

    file_time = datetime.fromtimestamp(os.path.getmtime(filepath))
    age = datetime.now() - file_time
    return age.days >= 7

    def clean_old_merge(self, file_input: str):
    logger.info("Cleaning up old merged file..")
    file_path = Path(file_input)
    try:
    file_path.unlink()
    logger.info(f"{file_path} has been deleted successfully.")
    except FileNotFoundError:
    logger.error(f"{file_path} does not exist.")
    except PermissionError:
    logger.error(f"Permission denied: {file_path}.")
    except Exception as e:
    logger.error(f"Error occurred: {e}")

    def download_files(self) -> List[str]:
    """Download OSM files if needed."""
    downloaded_files = []

    for loc in self.locations:
    paths = loc.split("/")
    country = paths[1]
    filename = f"{country}-latest.osm.pbf"
    filepath = f"{loc}-latest.osm.pbf"

    if self.needs_download(filename):
    url = f"{self.base_url}/{filepath}"
    logger.info(f"Downloading OSM file for {loc}")

    try:
    response = requests.get(url, stream=True)
    response.raise_for_status()

    with open(filename, "wb") as f:
    for chunk in response.iter_content(chunk_size=8192):
    f.write(chunk)

    logger.info(f"Successfully downloaded {loc} OSM data")
    except requests.exceptions.RequestException as e:
    logger.error(f"Failed to download {loc} OSM data: {str(e)}")
    continue

    downloaded_files.append(filename)

    return downloaded_files

    def merge_files(
    self, input_files: List[str], output_file: str = "all.osm.pbf"
    ) -> Optional[str]:
    """Merge downloaded OSM files using osmium."""
    try:
    logger.info("Starting merge process")

    self.clean_old_merge(output_file)
    handler = osmium.SimpleWriter(output_file)
    reader = osmium.MergeInputReader()

    for input_file in input_files:
    logger.info(f"Processing {input_file}")

    reader.add_file(input_file)

    reader.apply(handler)

    logger.info(f"Successfully merged files into {output_file}")

    return output_file

    except Exception as e:
    logger.error(f"Failed to merge files: {str(e)}")
    return None

    def upload_to_r2(
    self,
    file_path: str,
    bucket_name: str,
    endpoint_url: str,
    access_key_id: str,
    access_key_secret: str,
    ) -> bool:
    """Upload merged file to Cloudflare R2."""
    try:
    logger.info("Initiating upload to Cloudflare R2")

    # Configure R2 client
    s3_client = boto3.client(
    service_name="s3",
    endpoint_url=endpoint_url,
    aws_access_key_id=access_key_id,
    aws_secret_access_key=access_key_secret,
    )

    # Upload file
    with open(file_path, "rb") as file_data:
    s3_client.upload_fileobj(
    file_data, bucket_name, os.path.basename(file_path)
    )

    logger.info(f"Successfully uploaded {file_path} to R2")
    return True

    except Exception as e:
    logger.error(f"Failed to upload to R2: {str(e)}")
    return False


    def main():
    if OSM_SOURCE is None:
    logger.critical("Incorrect download URL")
    sys.exit()
    else:
    osm_handler = OSMDownloaderMerger(OSM_SOURCE, all_locations)
    downloaded_files: List[str] = osm_handler.download_files()

    if not downloaded_files:
    logger.error("No files were downloaded or found locally")
    return

    merged_file: str | None = osm_handler.merge_files(
    downloaded_files, f"{output_name}.osm.pbf"
    )
    if not merged_file:
    logger.error("Failed to merge files")
    return

    # Upload to R2
    if S3_ENABLED:
    upload_success = osm_handler.upload_to_r2(merged_file, **S3_CONFIG)
    if upload_success:
    logger.info("Process completed successfully")
    else:
    logger.error("Process completed with errors")
    else:
    logger.info(f"Upload to {S3_CONFIG['endpoint_url']} is disabled...")


    if __name__ == "__main__":
    main()