Skip to content

Instantly share code, notes, and snippets.

@gpproton
Last active December 7, 2024 16:26
Show Gist options
  • Select an option

  • Save gpproton/f8bdb298be0d624937cd7ffc876e0b83 to your computer and use it in GitHub Desktop.

Select an option

Save gpproton/f8bdb298be0d624937cd7ffc876e0b83 to your computer and use it in GitHub Desktop.

Revisions

  1. gpproton renamed this gist Dec 7, 2024. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. gpproton revised this gist Dec 7, 2024. 2 changed files with 10 additions and 2 deletions.
    File renamed without changes.
    12 changes: 10 additions & 2 deletions resolver.py → trip_resolver.py
    Original file line number Diff line number Diff line change
    @@ -32,8 +32,8 @@
    from dateutil.relativedelta import relativedelta

    ## Default Values
    global_input_file: str = "input.csv"
    global_output_file: str = "output.csv"
    global_input_file: str = "files/trip_input.csv"
    global_output_file: str = "files/trip_output.csv"
    global_nominatim_url: str = "nominatim.openstreetmap.org"
    global_routing_host: str = "valhalla1.openstreetmap.de"
    global_http_chunks = 2
    @@ -163,6 +163,8 @@ def get_coords(self, address: str) -> Location | None:
    return None

    def get_route_attributes(self, trip_code: str, source: Coord, destination: Coord) -> RouteInfo | None:
    global global_fixed_speed
    global global_top_speed
    try:
    conn = http.client.HTTPSConnection(self.routing_host)
    payload = json.dumps({
    @@ -379,6 +381,7 @@ async def bootstrap(cls, data: DataHandler, service: CoordService):
    return self

    async def __geocode_coords(self) -> list[Location]:
    global global_http_chunks
    # Local coord resolver functions
    async def resolve_location(name: str, delay: float = 0.15) -> Location:
    await asyncio.sleep(delay)
    @@ -389,6 +392,7 @@ async def resolve_location(name: str, delay: float = 0.15) -> Location:
    return response

    async def exec_chunked(location_chunk: list[Location] | tuple[dict[str, str | float], ...]) -> list[Location]:
    global global_http_delay
    chunk_tasks = [resolve_location(location_item["name"], global_http_delay) for location_item in
    location_chunk]
    result = await asyncio.gather(*chunk_tasks)
    @@ -457,6 +461,10 @@ async def exec_delayed(input_trip_code: str, input_source: Coord,


    async def main():
    global global_input_file
    global global_output_file
    global global_routing_host
    global global_nominatim_url
    proc = await DataProcessing.bootstrap(DataHandler(global_input_file, global_output_file),
    CoordService(global_nominatim_url, global_routing_host))

  3. gpproton revised this gist Dec 7, 2024. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion resolver.py
    Original file line number Diff line number Diff line change
    @@ -1,7 +1,7 @@
    #!/usr/bin/env python3

    """
    Copyright (c) 2024 drolx Labs
    Copyright (c) 2024 Godwin Peter .O
    Licensed under the MIT License
    you may not use this file except in compliance with the License.
  4. gpproton created this gist Dec 7, 2024.
    6 changes: 6 additions & 0 deletions input.csv
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,6 @@
    customer,trip_code,trip_id,source,tonnage,destination,contract_type,trailer_type,remarks,fuel_type,sub_ftn,customer_name,billing_fuel_quantity,billing_mileage,backup_3,backup_2,backup_1,created_by,created_on
    2000015,LUC_IKORO_ABAKALIKI_30_CO_FL_D_L_XXX,T1000002,IKORODU,30,ABAKALIKI,FLOATING,CONTAINER,,DIESEL,LOADING,LUCKY FIBRES (NIGERIA) LTD.,690.000,2.100,1,1,1,FF_TM_04,04-23-24
    2000015,LUC_IKORO_ABUJA_30_CO_FL_D_L_XXX,T1000003,IKORODU,30,ABUJA,FLOATING,CONTAINER,,DIESEL,LOADING,LUCKY FIBRES (NIGERIA) LTD.,800.000,2.100,1,1,1,FF_TM_04,04-23-24
    2000015,LUC_IKORO_ADO_EKITI_30_CO_FL_D_L_XXX,T1000004,IKORODU,30,ADO_EKITI,FLOATING,CONTAINER,,DIESEL,LOADING,LUCKY FIBRES (NIGERIA) LTD.,331.000,2.100,1,1,1,FF_TM_04,04-23-24
    2000015,LUC_IKORO_AKURE_30_CO_FL_D_L_XXX,T1000005,IKORODU,30,AKURE,FLOATING,CONTAINER,,DIESEL,LOADING,LUCKY FIBRES (NIGERIA) LTD.,280.000,2.100,1,1,1,FF_TM_04,04-23-24
    2000015,LUC_IKORO_ASABA_30_CO_FL_D_L_XXX,T1000006,IKORODU,30,ASABA,FLOATING,CONTAINER,,DIESEL,LOADING,LUCKY FIBRES (NIGERIA) LTD.,480.000,2.100,1,1,1,FF_TM_04,04-23-24
    467 changes: 467 additions & 0 deletions resolver.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,467 @@
    #!/usr/bin/env python3

    """
    Copyright (c) 2024 drolx Labs
    Licensed under the MIT License
    you may not use this file except in compliance with the License.
    https://opensource.org/license/mit
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
    Author: Godwin peter .O ([email protected])
    Created At: Thursday, 7th Nov 2024
    Modified By: Godwin peter .O
    Modified At: Sat Dec 07 2024
    """

    import asyncio
    import csv
    import http.client
    import json
    import re
    import sys
    import urllib.parse
    from datetime import datetime
    from itertools import batched
    from typing import TypedDict

    from dateutil.relativedelta import relativedelta

    ## Default Values
    global_input_file: str = "input.csv"
    global_output_file: str = "output.csv"
    global_nominatim_url: str = "nominatim.openstreetmap.org"
    global_routing_host: str = "valhalla1.openstreetmap.de"
    global_http_chunks = 2
    global_http_delay = 0.15
    global_fixed_speed = 41
    global_top_speed = 59

    ## Shared Types
    Coord = TypedDict("Coord", {"lat": float, "lon": float})
    Location = TypedDict("Location", {"name": str, "address": str, "lat": float, "lon": float})
    RouteInfo = TypedDict("RouteInfo", {
    "trip_code": str,
    "length": float,
    "time": str,
    "source": Coord,
    "destination": Coord,
    })
    TripInfo = TypedDict("TripInfo", {
    "trip_code": str,
    "length": str,
    "time": str,
    "source": str,
    "source_coords": str,
    "destination": str,
    "destination_coords": str,
    })


    class Utils:
    """
    Utility class containing helper methods for logging and time formatting.
    The Utils class offers simple utilities for managing log outputs and converting
    time values for better readability. This is useful for debugging and timing solutions.
    - current_time(): Returns the current system time formatted as HH:MM:SS.
    - log_info(message: str): Logs the provided message with the current timestamp.
    - format_time(value: float): Takes a number of seconds and formats it into HH:MM:SS.
    """

    @classmethod
    def current_time(cls) -> str:
    """
    Get the current time formatted as HH:MM:SS.
    Returns:
    str: Current time in HH:MM:SS format.
    """
    now = datetime.now()
    return now.strftime("%H:%M:%S")

    @classmethod
    def log_info(cls, message: str) -> None:
    self = cls()
    print("{0} | {1}".format(self.current_time(), message))

    @classmethod
    def format_time(cls, value: float) -> str:
    rt = relativedelta(seconds=int(value))

    return "{:02d}:{:02d}:{:02d}".format(int(rt.hours), int(rt.minutes), int(rt.seconds))


    class CoordService:
    """
    Service class for geocoding and route resolution using HTTP APIs.
    The CoordService class is designed to interact with geocoding and
    routing services through HTTP requests, providing end-users the ability to resolve
    addresses to coordinates and compute route attributes such as length and estimated travel time.
    Class Attributes:
    headers (dict): A dictionary containing default headers used for HTTP connections.
    Attributes:
    geocoding_host (str): Hostname for geocoding service.
    routing_host (str): Hostname for routing service.
    Methods:
    - get_coords(address: str): Sends a request to the geocoding service to resolve
    an address into geographic coordinates (latitude and longitude).
    - get_route_attributes(trip_code: str, source: Coord, destination: Coord):
    Queries the routing service to fetch route details including length and estimated duration
    between specified coordinates.
    """

    geocoding_host: str
    routing_host: str
    headers = {
    "Content-Type": "application/json",
    "User-Agent": "Chrome"
    }

    def __init__(self, geocoding_host: str, routing_host: str) -> None:
    """
    Initializes CoordService with specified geocoding and routing hostnames.
    Args:
    geocoding_host (str): Hostname for geocoding service.
    routing_host (str): Hostname for routing service.
    """
    self.geocoding_host = geocoding_host
    self.routing_host = routing_host

    def get_coords(self, address: str) -> Location | None:
    try:
    conn = http.client.HTTPSConnection(self.geocoding_host)
    url_path = "/search?format=json&limit=1&addressdetails=0&[email protected]&q=" + urllib.parse.quote(
    address)
    payload = ""

    conn.request("GET", url_path, payload, self.headers)
    res = conn.getresponse()
    data = res.read()
    result_items = data.decode("utf-8")
    item = json.loads(result_items)[0]

    return {"name": address, "lat": item["lat"], "lon": item["lon"], "address": item["display_name"]}
    except http.client.HTTPException:
    Utils.log_info("Error: There was an error with the HTTP request")

    return None

    def get_route_attributes(self, trip_code: str, source: Coord, destination: Coord) -> RouteInfo | None:
    try:
    conn = http.client.HTTPSConnection(self.routing_host)
    payload = json.dumps({
    "format": "json",
    "shape_format": "polyline6",
    "units": "kilometers",
    "alternates": 0,
    "search_filter": {
    "exclude_closures": True
    },
    "costing": "auto",
    "costing_options": {
    "auto": {"fixed_speed": global_fixed_speed, "top_speed": global_top_speed}
    },
    "locations": [
    {
    "lat": source["lat"],
    "lon": source["lon"]
    },
    {
    "lat": destination["lat"],
    "lon": destination["lon"]
    }
    ]
    })

    conn.request("POST", "/route", payload, self.headers)
    res = conn.getresponse()
    data = res.read()
    result = data.decode("utf-8")

    item = json.loads(result)
    return {
    "trip_code": trip_code,
    "length": item["trip"]["summary"]["length"],
    "time": item["trip"]["summary"]["time"],
    "source": {"lat": source["lat"], "lon": source["lon"]},
    "destination": {"lat": destination["lat"], "lon": destination["lon"]},
    }
    except http.client.HTTPException:
    Utils.log_info("Error: There was an error with the HTTP request")

    return None


    class DataHandler:
    """
    Handles data loading, transformation, and output file generation.
    The DataHandler class processes CSV input files and extracts relevant location and trip
    data. It also transforms raw data into structured format and writes out results.
    Attributes:
    input_path (str): Path to the input CSV file.
    output_path (str): Path to the output CSV file.
    source_data (list): Raw data loaded from the input file.
    locations (list): List of unique locations extracted from source data.
    routes (list): List of resolved routes.
    trips (list): Final list of trip data for output.
    Methods:
    - get_location_object(name: str): Finds a Location instance in the locations list
    by matching the given location name.
    - get_route_object(trip_code: str): Searches for a resolved RouteInfo instance using
    the trip code.
    - generate_output(): Compiles trip data and writes to the output CSV file.
    - save_output_file(data: list[TripInfo]): Takes trip data and outputs it to the
    designated output CSV file.
    """

    input_path: str
    output_path: str
    source_data = []
    locations: list[Location] = []
    routes: list[RouteInfo] = []
    trips: list[TripInfo] = []

    def __init__(self, input_path: str, output_path: str) -> None:
    """
    Initializes the DataHandler with specified paths for input and output files.
    Args:
    input_path (str): Path to the input CSV file.
    output_path (str): Path to the output CSV file.
    """

    self.input_path = input_path
    self.output_path = output_path
    self.__load()
    self.__load_locations()

    def get_location_object(self, name: str) -> Location | None:
    return next((item for item in self.locations if item['name'] == name), None)

    def get_route_object(self, trip_code: str) -> RouteInfo | None:
    return next((item for item in self.routes if item['trip_code'] == trip_code), None)

    def __load(self) -> None:
    try:
    with open(self.input_path, "r") as source_file:
    local_source = list(csv.DictReader(source_file, delimiter=","))
    self.source_data.extend(local_source)

    # Correct locations name
    for line_item in self.source_data:
    source_value = re.sub(r'[^a-zA-Z0-9\s]', ' ', line_item["source"]).capitalize()
    destination_value = re.sub(r'[^a-zA-Z0-9\s]', ' ', line_item["destination"]).capitalize()
    trip_code: str = line_item["trip_code"]

    line_item["source"] = source_value
    line_item["destination"] = destination_value
    line_item["trip_code"] = trip_code.lower()

    except IOError:
    Utils.log_info("There was an error reading file {0}".format(self.input_path))
    sys.exit()

    def __load_locations(self) -> None:
    for line_item in self.source_data:
    trip_source: str = line_item["source"]
    trip_destination: str = line_item["destination"]

    check_source = self.get_location_object(trip_source)
    check_destination = self.get_location_object(trip_destination)

    if check_source is None or len(check_source) < 1:
    self.locations.append({'name': trip_source})
    if check_destination is None or len(check_destination) < 1:
    self.locations.append({'name': trip_destination})

    Utils.log_info("Loaded {0} locations for processing...".format(len(self.locations)))

    def generate_output(self) -> None:
    if len(self.routes) > 0:
    for loc in self.source_data:
    route: RouteInfo = self.get_route_object(loc["trip_code"])
    # noinspection PyTypeChecker
    self.trips.append({
    "trip_code": loc["trip_code"],
    "length": route["length"],
    "time": Utils.format_time(route["time"]),
    "source": loc["source"],
    "source_coords": "{0}, {1}".format(route["source"]["lat"], route["source"]["lon"]),
    "destination": loc["destination"],
    "destination_coords": "{0}, {1}".format(route["destination"]["lat"], route["destination"]["lon"]),
    })

    self.save_output_file(self.trips)
    else:
    Utils.log_info("Error: No route information resolved...")

    # noinspection PyTypeChecker
    def save_output_file(self, data: list[TripInfo]) -> None:
    # Get the keys from the first dictionary as the header
    keys = data[0].keys()

    try:
    # Open the file in write mode
    with open(self.output_path, 'w', newline='') as csv_file:
    writer = csv.DictWriter(csv_file, fieldnames=keys)
    writer.writeheader()
    writer.writerows(data)

    Utils.log_info("Successfully outputted resolved trips...")
    except IOError:
    Utils.log_info("There was an error writing to {0}".format(self.input_path))
    sys.exit()

    class DataProcessing:
    """
    Coordinates the asynchronous processing of locations and routes.
    The DataProcessing class leverages asynchronous functions to handle large datasets
    efficiently, minimizing blocking I/O during geocode and route resolution.
    Coordinates the processing of locations and route information.
    Attributes:
    data (DataHandler): An instance of DataHandler to manage data transformation.
    service (CoordService): An instance of CoordService to provide coordinates and
    routing information services.
    location_pool (list[Location]): A pool of resolved locations used for processing routes.
    service (CoordService): Service for resolving coordinates and routes.
    Methods:
    - bootstrap(data: DataHandler, service: CoordService): Initializes data processing by
    resolving coordinates and retrieving route data.
    - __geocode_coords(): Asynchronously processes and resolves geographic coordinates
    for each location.
    - __process_routes(): Asynchronously fetches route details based on resolved
    locations and source input data.
    """
    data: DataHandler
    service: CoordService
    location_pool: list[Location]

    def __init__(self, data: DataHandler, service: CoordService) -> None:
    self.data = data
    self.service = service

    @classmethod
    async def bootstrap(cls, data: DataHandler, service: CoordService):
    """
    Bootstrap the data processing with geocoding and routing services.
    Args:
    data (DataHandler): Data handler for source and processed data.
    service (CoordService): Service for resolving coordinates and routes.
    Returns:
    DataProcessing: An initialized DataProcessing instance.
    """
    self = cls(data, service)
    self.location_pool = await self.__geocode_coords()
    self.data.routes = await self.__process_routes()

    return self

    async def __geocode_coords(self) -> list[Location]:
    # Local coord resolver functions
    async def resolve_location(name: str, delay: float = 0.15) -> Location:
    await asyncio.sleep(delay)

    response = self.service.get_coords(name)
    Utils.log_info("Name: {0}, Lat: {1}, Lon: {2}".format(response["name"], response["lat"], response["lon"]))

    return response

    async def exec_chunked(location_chunk: list[Location] | tuple[dict[str, str | float], ...]) -> list[Location]:
    chunk_tasks = [resolve_location(location_item["name"], global_http_delay) for location_item in
    location_chunk]
    result = await asyncio.gather(*chunk_tasks)

    return result

    try:
    chunked_locations = list(batched(self.data.locations, global_http_chunks))
    tasks = [exec_chunked(loc_chunks) for loc_chunks in chunked_locations]

    Utils.log_info("Started resolving {0} locations for coordinates...".format(len(self.data.locations)))
    grouped_location: list[list[Location]] = await asyncio.gather(*tasks)

    Utils.log_info("Completed resolving locations...")

    resolved_locations: list[Location] = [item for sublist in grouped_location for item in sublist]

    # update tagged data
    for local_item in self.data.locations:
    loc: Location = next((item for item in resolved_locations if item['name'] == local_item["name"]), None)
    local_item.update(loc)

    return resolved_locations

    except Exception as error:
    Utils.log_info('Error processing coordinates... {0}'.format(error))
    return []

    async def __process_routes(self) -> list[TripInfo]:
    try:
    if len(self.location_pool) > 0:
    query_data: list = self.data.source_data

    async def exec_delayed(input_trip_code: str, input_source: Coord,
    input_destination: Coord) -> RouteInfo:
    await asyncio.sleep(global_http_delay)
    response: RouteInfo = self.service.get_route_attributes(input_trip_code, input_source,
    input_destination)
    distance = "{0}KM".format(response["length"])

    Utils.log_info("Route:{0}, Time: {1}, Distance: {2}".format(
    input_trip_code,
    Utils.format_time(float(response["time"])),
    distance
    ))

    return response

    tasks = []
    for item in query_data:
    data_source: Location = self.data.get_location_object(item["source"])
    data_destination: Location = self.data.get_location_object(item["destination"])

    trip_code = item["trip_code"]
    source: Coord = {"lat": data_source["lat"], "lon": data_source["lon"]}
    destination: Coord = {"lat": data_destination["lat"], "lon": data_destination["lon"]}

    tasks.append(exec_delayed(trip_code, source, destination))

    return await asyncio.gather(*tasks)

    except Exception as error:
    Utils.log_info('Error processing route information'.format(error))

    return []


    async def main():
    proc = await DataProcessing.bootstrap(DataHandler(global_input_file, global_output_file),
    CoordService(global_nominatim_url, global_routing_host))

    proc.data.generate_output()


    if __name__ == "__main__":
    asyncio.run(main())