Skip to content

Instantly share code, notes, and snippets.

@hansthen
Created May 25, 2025 17:14
Show Gist options
  • Select an option

  • Save hansthen/d9e9957c88a850dc11aa7b0894a163c8 to your computer and use it in GitHub Desktop.

Select an option

Save hansthen/d9e9957c88a850dc11aa7b0894a163c8 to your computer and use it in GitHub Desktop.

Revisions

  1. hansthen created this gist May 25, 2025.
    54 changes: 54 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,54 @@
    import aiohttp
    import asyncio
    from pipe import Pipe
    import json
    import jq


    api_key="<your key>"

    async def schedule(interval):
    while True:
    yield
    await asyncio.sleep(interval)

    @Pipe
    async def iss(stream):
    async with aiohttp.ClientSession() as session:
    async for _ in stream:
    async with session.get("https://api.wheretheiss.at/v1/satellites/25544") as response:
    data = await response.json()
    yield data

    @Pipe
    async def reverse_geocode(stream, api_key):
    async with aiohttp.ClientSession() as session:
    async for item in stream:
    url = (
    "https://maps.googleapis.com/maps/api/geocode/json?"
    "latlng={latitude},{longitude}&key=" + api_key
    )
    async with session.get(url.format(**item)) as response:
    data = await response.json()
    item["reverse"] = data
    yield item

    @Pipe
    async def jq_transform(stream, program):
    p = jq.compile(program)
    async for item in stream:
    for s in p.input(item).all():
    yield s

    @Pipe
    async def display(stream):
    async for item in stream:
    print(json.dumps(item))


    select_address = """
    [.reverse.results[] | select(.types != ["plus_code"])] | first

    """
    asyncio.run(schedule(10) | iss | reverse_geocode(api_key) |
    jq_transform(select_address) | display)