Skip to content

Instantly share code, notes, and snippets.

@MaxWolf-01
Created September 14, 2025 20:35
Show Gist options
  • Select an option

  • Save MaxWolf-01/3a9ba05e7d5d97f1310abcab06b44372 to your computer and use it in GitHub Desktop.

Select an option

Save MaxWolf-01/3a9ba05e7d5d97f1310abcab06b44372 to your computer and use it in GitHub Desktop.

Revisions

  1. MaxWolf-01 created this gist Sep 14, 2025.
    632 changes: 632 additions & 0 deletions sun_viewer.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,632 @@
    #!/usr/bin/env -S uv run --script
    # /// script
    # dependencies = [
    # "requests",
    # "pillow",
    # "pygame",
    # "numpy",
    # ]
    # requires-python = ">=3.10"
    # ///

    """Live solar imagery viewer using Helioviewer API."""

    from __future__ import annotations

    import argparse
    import bisect
    import json
    import sys
    import threading
    import time
    from collections import deque
    from dataclasses import dataclass
    from datetime import datetime, timedelta, timezone
    from io import BytesIO
    from pathlib import Path
    from typing import Final, TypeAlias

    import numpy as np
    import pygame
    import requests
    from PIL import Image

    # Type aliases
    ImageBuffer: TypeAlias = list[tuple[pygame.Surface, str, str]] # (surface, timestamp, image_id)
    SourceId: TypeAlias = int

    # Constants
    CACHE_DIR: Final = Path("/tmp/sun_viewer_cache")
    API_BASE: Final = "https://api.helioviewer.org/v2/"
    DEFAULT_FPS: Final = 2
    DEFAULT_BUFFER_SIZE: Final = 60
    POLL_INTERVAL: Final = 60
    WINDOW_SIZE: Final = (1024, 1024)

    # Solar observation sources
    SOURCES: Final[dict[str, SourceId]] = {
    # SDO/AIA wavelengths
    '304': 13, '171': 10, '193': 11, '211': 12, '335': 14,
    '094': 8, '131': 9, '1600': 15, '1700': 16, '4500': 17,
    # SDO/HMI
    'hmi': 18, 'magnetogram': 19,
    # SOHO/LASCO
    'c2': 4, 'c3': 5,
    # PROBA-2/SWAP
    'swap': 32,
    }

    SOURCE_NAMES: Final = {v: k for k, v in SOURCES.items()}

    WAVELENGTH_INFO: Final = {
    '304': '304Å - Chromosphere (50,000K)',
    '171': '171Å - Quiet corona (600,000K)',
    '193': '193Å - Corona & flares (1.2M K)',
    '211': '211Å - Active regions (2M K)',
    '335': '335Å - Active regions (2.5M K)',
    '094': '094Å - Flaring regions (6M K)',
    '131': '131Å - Flares (10M K)',
    '1600': '1600Å - Transition region',
    '1700': '1700Å - Photosphere',
    '4500': '4500Å - Visible light',
    'hmi': 'HMI - Magnetic intensity',
    'magnetogram': 'Magnetogram',
    'c2': 'LASCO C2 - Coronagraph',
    'c3': 'LASCO C3 - Coronagraph',
    'swap': 'SWAP 174Å'
    }


    @dataclass
    class ImageMetadata:
    id: str
    timestamp: str
    source_id: SourceId


    class CacheManager:
    def __init__(self, cache_dir: Path = CACHE_DIR):
    self.cache_dir = cache_dir
    self.cache_dir.mkdir(exist_ok=True)
    self.metadata_lock = threading.Lock()
    self.metadata = self._load_metadata()

    def _load_metadata(self) -> dict:
    metadata_file = self.cache_dir / "metadata.json"
    if metadata_file.exists():
    try:
    return json.loads(metadata_file.read_text())
    except:
    return {}
    return {}

    def _save_metadata(self):
    with self.metadata_lock:
    metadata_file = self.cache_dir / "metadata.json"
    metadata_file.write_text(json.dumps(self.metadata))

    def get_path(self, source_id: SourceId, image_id: str) -> Path:
    return self.cache_dir / f"{source_id}_{image_id}.jpg"

    def save(self, source_id: SourceId, image_id: str, image: Image.Image, timestamp: str):
    try:
    path = self.get_path(source_id, image_id)
    image.save(path, 'JPEG', quality=90)

    with self.metadata_lock:
    key = str(source_id)
    if key not in self.metadata:
    self.metadata[key] = {}
    self.metadata[key][image_id] = {
    'timestamp': timestamp,
    'cached_at': time.time()
    }
    self._save_metadata()
    except Exception:
    pass

    def load(self, source_id: SourceId, image_id: str) -> Image.Image | None:
    path = self.get_path(source_id, image_id)
    if path.exists():
    try:
    return Image.open(path)
    except:
    pass
    return None

    def is_cached(self, source_id: SourceId, image_id: str) -> bool:
    with self.metadata_lock:
    return str(source_id) in self.metadata and image_id in self.metadata[str(source_id)]


    class HelioviewerClient:
    def __init__(self, source_id: SourceId):
    self.source_id = source_id
    self.session = requests.Session()

    def get_closest_image(self, target_time: datetime) -> dict | None:
    params = {
    'date': target_time.strftime('%Y-%m-%dT%H:%M:%SZ'),
    'sourceId': self.source_id
    }
    try:
    response = self.session.get(f"{API_BASE}getClosestImage/", params=params, timeout=10)
    response.raise_for_status()
    return response.json()
    except:
    return None

    def download_image(self, image_id: str, width: int = 1024) -> Image.Image | None:
    params = {'id': image_id, 'width': width, 'height': width, 'type': 'jpg'}
    try:
    response = self.session.get(f"{API_BASE}downloadImage/", params=params, timeout=15)
    response.raise_for_status()
    return Image.open(BytesIO(response.content))
    except:
    return None


    class SunViewer:
    def __init__(self, source_id: SourceId = 13, initial_mode: str = 'video',
    poll_interval: int = POLL_INTERVAL):
    pygame.init()

    self.source_id = source_id
    self.mode = initial_mode
    self.poll_interval = poll_interval
    self.video_fps = DEFAULT_FPS
    self.prefetch_frames = DEFAULT_BUFFER_SIZE

    self.fullscreen = False
    self.show_info = True
    self.show_help = False
    self.window_size = WINDOW_SIZE
    self._setup_display()

    self.font_large = pygame.font.Font(None, 24)
    self.font_small = pygame.font.Font(None, 18)
    self.clock = pygame.time.Clock()

    self.client = HelioviewerClient(source_id)
    self.cache = CacheManager()

    self.buffers: dict[SourceId, ImageBuffer] = {}
    self.image_ids: dict[SourceId, set[str]] = {}
    self.playback_indices: dict[SourceId, int] = {}
    self.current_surface: pygame.Surface | None = None
    self.last_image_time = "Loading..."

    self.running = True
    self.fetch_lock = threading.Lock()
    self.prefetch_stop = threading.Event()
    self.prefetch_thread = None

    self.fps = 0.0
    self.frame_count = 0
    self.fps_update_time = time.time()

    self.mode_message = ""
    self.mode_message_alpha = 0

    self._init_buffer(source_id)

    def _init_buffer(self, source_id: SourceId):
    if source_id not in self.buffers:
    self.buffers[source_id] = []
    self.image_ids[source_id] = set()
    self.playback_indices[source_id] = 0

    def _setup_display(self):
    flags = pygame.FULLSCREEN if self.fullscreen else pygame.RESIZABLE
    self.screen = pygame.display.set_mode(
    (0, 0) if self.fullscreen else self.window_size, flags
    )
    if self.fullscreen:
    self.window_size = (self.screen.get_width(), self.screen.get_height())
    pygame.display.set_caption("Live Sun Viewer")

    def _pil_to_surface(self, image: Image.Image) -> pygame.Surface:
    if image.mode != 'RGB':
    image = image.convert('RGB')
    array = np.array(image).transpose(1, 0, 2)
    return pygame.surfarray.make_surface(array)

    def _scale_to_fit(self, surface: pygame.Surface) -> pygame.Surface:
    sw, sh = surface.get_size()
    ww, wh = self.window_size
    scale = min(ww / sw, wh / sh)
    new_size = (int(sw * scale), int(sh * scale))
    return pygame.transform.smoothscale(surface, new_size)

    def _show_message(self, text: str, duration: int = 2000):
    self.mode_message = text
    self.mode_message_alpha = 255
    pygame.time.set_timer(pygame.USEREVENT + 1, duration)

    def _toggle_fullscreen(self):
    self.fullscreen = not self.fullscreen
    self._setup_display()

    def _cycle_source(self, direction: int):
    sources = list(SOURCES.values())
    idx = sources.index(self.source_id)
    self.source_id = sources[(idx + direction) % len(sources)]
    self.client.source_id = self.source_id

    self._init_buffer(self.source_id)
    self._start_prefetch(self.source_id)

    self._show_message(f"Source: {SOURCE_NAMES.get(self.source_id, 'Unknown')}")

    def _start_prefetch(self, source_id: SourceId):
    if self.prefetch_thread and self.prefetch_thread.is_alive():
    self.prefetch_stop.set()
    self.prefetch_thread.join(timeout=0.5)
    self.prefetch_stop.clear()

    self.prefetch_thread = threading.Thread(
    target=self._prefetch_historical,
    args=(source_id,),
    daemon=True
    )
    self.prefetch_thread.start()

    def _toggle_mode(self):
    self.mode = 'live' if self.mode == 'video' else 'video'
    self._show_message(f"Mode: {self.mode.upper()}")

    if self.mode == 'live':
    threading.Thread(target=self._fetch_latest, daemon=True).start()

    def _fetch_latest(self):
    now = datetime.now(timezone.utc)
    data = self.client.get_closest_image(now - timedelta(seconds=30))

    if data and 'id' in data:
    if (image := self.client.download_image(data['id'])):
    surface = self._pil_to_surface(image)
    self.current_surface = surface
    self.last_image_time = data.get('date', 'Unknown')
    self.cache.save(self.source_id, data['id'], image, self.last_image_time)

    def _prefetch_historical(self, source_id: SourceId):
    with self.fetch_lock:
    current_count = len(self.buffers.get(source_id, []))
    if current_count >= self.prefetch_frames:
    print(f"Already have {current_count} frames, skipping prefetch")
    return

    frames_needed = self.prefetch_frames - current_count
    if frames_needed <= 0:
    return

    print(f"Pre-fetching {frames_needed} more frames for source {source_id} (have {current_count}, want {self.prefetch_frames})...")

    client = HelioviewerClient(source_id)
    now = datetime.now(timezone.utc)
    cached_count = downloaded_count = skipped_count = 0

    # Round to nearest 12-minute boundary for consistent timestamps
    base_minute = (now.minute // 12) * 12
    base_time = now.replace(minute=base_minute, second=0, microsecond=0)

    for i in range(self.prefetch_frames):
    if self.prefetch_stop.is_set():
    return

    with self.fetch_lock:
    if len(self.buffers.get(source_id, [])) >= self.prefetch_frames:
    break

    target_time = base_time - timedelta(minutes=i * 12)

    try:
    if not (data := client.get_closest_image(target_time)):
    continue

    image_id = data['id']

    # Check if already in our buffer
    if image_id in self.image_ids.get(source_id, set()):
    skipped_count += 1
    continue

    timestamp = data.get('date', 'Unknown')

    if self.cache.is_cached(source_id, image_id):
    if image := self.cache.load(source_id, image_id):
    cached_count += 1
    else:
    if not (image := client.download_image(image_id)):
    continue
    self.cache.save(source_id, image_id, image, timestamp)
    downloaded_count += 1

    try:
    surface = self._pil_to_surface(image)
    with self.fetch_lock:
    self._init_buffer(source_id)
    bisect.insort(self.buffers[source_id], (surface, timestamp, image_id), key=lambda x: x[1])
    self.image_ids[source_id].add(image_id)
    except:
    continue

    except:
    continue

    total_frames = len(self.buffers.get(source_id, []))
    print(f"Pre-fetch complete: {total_frames} frames (skipped: {skipped_count}, cached: {cached_count}, downloaded: {downloaded_count})")

    def _fetch_worker(self):
    while self.running:
    try:
    now = datetime.now(timezone.utc)
    target_time = now - timedelta(minutes=2) # Always fetch recent images (2 min accounts for API delay)

    if not (data := self.client.get_closest_image(target_time)):
    continue

    image_id = data['id']
    if image_id not in self.image_ids.get(self.source_id, set()):
    timestamp = data.get('date', 'Unknown')

    if self.cache.is_cached(self.source_id, image_id):
    image = self.cache.load(self.source_id, image_id)
    else:
    image = self.client.download_image(image_id)
    if image:
    self.cache.save(self.source_id, image_id, image, timestamp)

    if image:
    try:
    surface = self._pil_to_surface(image)
    with self.fetch_lock:
    self._init_buffer(self.source_id)
    bisect.insort(self.buffers[self.source_id], (surface, timestamp, image_id), key=lambda x: x[1])
    self.image_ids[self.source_id].add(image_id)

    if self.mode == 'live':
    self.current_surface = surface
    self.last_image_time = timestamp
    except:
    pass
    except:
    pass

    time.sleep(self.poll_interval)

    def _handle_keydown(self, event: pygame.event.Event):
    key = event.key
    mods = event.mod

    if key in (pygame.K_ESCAPE, pygame.K_q):
    self.running = False
    elif key == pygame.K_c and (mods & pygame.KMOD_CTRL):
    self.running = False
    elif key in (pygame.K_f, pygame.K_F11):
    self._toggle_fullscreen()
    elif key in (pygame.K_m, pygame.K_SPACE):
    self._toggle_mode()
    elif key == pygame.K_LEFT:
    self._cycle_source(-1)
    elif key == pygame.K_RIGHT:
    self._cycle_source(1)
    elif key == pygame.K_i:
    self.show_info = not self.show_info
    self._show_message(f"Info: {'ON' if self.show_info else 'OFF'}")
    elif key == pygame.K_h:
    self.show_help = not self.show_help
    elif key == pygame.K_UP:
    self.video_fps = min(30, self.video_fps + 1)
    self._show_message(f"Video FPS: {self.video_fps}")
    elif key == pygame.K_DOWN:
    self.video_fps = max(0.5, self.video_fps - 0.5)
    self._show_message(f"Video FPS: {self.video_fps}")
    elif key == pygame.K_b:
    if mods & pygame.KMOD_SHIFT:
    self.prefetch_frames = min(500, self.prefetch_frames + 10)
    else:
    self.prefetch_frames = max(10, self.prefetch_frames - 10)
    self._show_message(f"Buffer: {self.prefetch_frames} frames (~{self.prefetch_frames * 0.2:.1f} hours)")
    self._start_prefetch(self.source_id)

    def _draw_info(self):
    if not self.show_info:
    return

    source_name = SOURCE_NAMES.get(self.source_id, 'Unknown')
    wavelength_desc = WAVELENGTH_INFO.get(source_name, source_name)

    buffer = self.buffers.get(self.source_id, [])
    buffer_info = f"Buffer: {len(buffer)} frames" if self.mode == 'video' else "Live"
    fps_info = f" | Video FPS: {self.video_fps}" if self.mode == 'video' else ""

    delay_info = ""
    if self.last_image_time and self.last_image_time != "Loading...":
    try:
    img_dt = datetime.fromisoformat(self.last_image_time.replace('Z', '+00:00'))
    delay_minutes = (datetime.now(timezone.utc) - img_dt).total_seconds() / 60
    delay_info = f" | Delay: {delay_minutes:.1f} min"
    except:
    pass

    lines = [
    wavelength_desc,
    f"Mode: {self.mode.upper()} | {buffer_info}{fps_info}",
    f"Image: {self.last_image_time}{delay_info}",
    ]

    y = 10
    for line in lines:
    text = self.font_small.render(line, True, (200, 200, 200))
    rect = text.get_rect()

    bg = pygame.Surface((rect.width + 10, rect.height + 4))
    bg.set_alpha(180)
    bg.fill((0, 0, 0))

    self.screen.blit(bg, (10, y))
    self.screen.blit(text, (15, y + 2))
    y += rect.height + 5

    if self.mode_message_alpha > 0:
    msg = self.font_large.render(self.mode_message, True, (255, 255, 255))
    msg.set_alpha(self.mode_message_alpha)
    msg_rect = msg.get_rect(center=(self.window_size[0] // 2, self.window_size[1] // 2))

    bg = pygame.Surface((msg_rect.width + 20, msg_rect.height + 10))
    bg.set_alpha(int(self.mode_message_alpha * 0.7))
    bg.fill((0, 0, 0))
    bg_rect = bg.get_rect(center=(self.window_size[0] // 2, self.window_size[1] // 2))

    self.screen.blit(bg, bg_rect)
    self.screen.blit(msg, msg_rect)

    self.mode_message_alpha = max(0, self.mode_message_alpha - 5)

    def _draw_help(self):
    overlay = pygame.Surface(self.window_size)
    overlay.set_alpha(240)
    overlay.fill((0, 0, 0))
    self.screen.blit(overlay, (0, 0))

    lines = [
    "KEYBOARD SHORTCUTS",
    "",
    "F / F11 - Toggle fullscreen",
    "M / Space - Switch mode (Live/Video)",
    "Left/Right - Cycle through wavelengths",
    "Up/Down - Adjust video FPS (0.5-30)",
    "b / B - Decrease/increase buffer size",
    "I - Toggle info display",
    "H - Show/hide this help",
    "ESC / Q / Ctrl+C - Quit",
    ]

    y = self.window_size[1] // 2 - len(lines) * 15
    for line in lines:
    font = self.font_large if line == "KEYBOARD SHORTCUTS" else self.font_small
    color = (255, 200, 0) if line == "KEYBOARD SHORTCUTS" else (255, 255, 255)
    text = font.render(line, True, color)
    rect = text.get_rect(center=(self.window_size[0] // 2, y))
    self.screen.blit(text, rect)
    y += 30

    def run(self):
    self._start_prefetch(self.source_id)
    threading.Thread(target=self._fetch_worker, daemon=True).start()

    self._show_message("Loading solar imagery...", 3000)

    playback_counter = 0

    while self.running:
    for event in pygame.event.get():
    if event.type == pygame.QUIT:
    self.running = False
    elif event.type == pygame.KEYDOWN:
    self._handle_keydown(event)
    elif event.type == pygame.VIDEORESIZE and not self.fullscreen:
    self.window_size = (event.w, event.h)
    self.screen = pygame.display.set_mode(self.window_size, pygame.RESIZABLE)
    elif event.type == pygame.USEREVENT + 1:
    pygame.time.set_timer(pygame.USEREVENT + 1, 0)

    self.screen.fill((0, 0, 0))

    buffer = self.buffers.get(self.source_id, [])

    if self.mode == 'video' and buffer:
    playback_counter += 1
    playback_interval = max(1, int(60 / self.video_fps))

    if playback_counter >= playback_interval:
    playback_counter = 0
    with self.fetch_lock:
    if buffer:
    idx = self.playback_indices.get(self.source_id, 0)
    if idx >= len(buffer):
    idx = 0

    self.current_surface, self.last_image_time, _ = buffer[idx]
    self.playback_indices[self.source_id] = (idx + 1) % len(buffer)

    if self.current_surface:
    scaled = self._scale_to_fit(self.current_surface)
    x = (self.window_size[0] - scaled.get_width()) // 2
    y = (self.window_size[1] - scaled.get_height()) // 2
    self.screen.blit(scaled, (x, y))

    if self.show_help:
    self._draw_help()
    else:
    self._draw_info()

    pygame.display.flip()

    self.frame_count += 1
    if (current_time := time.time()) - self.fps_update_time >= 1.0:
    self.fps = self.frame_count / (current_time - self.fps_update_time)
    self.frame_count = 0
    self.fps_update_time = current_time

    self.clock.tick(60)

    pygame.quit()


    def main():
    parser = argparse.ArgumentParser(
    description='Live Sun Viewer - Display near-real-time solar imagery',
    formatter_class=argparse.RawDescriptionHelpFormatter,
    epilog="""
    Keyboard shortcuts:
    F / F11 - Toggle fullscreen
    M / Space - Switch between Live and Video modes
    ← / → - Cycle through wavelengths
    ↑ / ↓ - Adjust video FPS (0.5-30)
    b / B - Decrease/increase buffer size
    I - Toggle info display
    H - Show help
    ESC / Q / Ctrl+C - Quit
    Available sources (--source):
    304, 171, 193, 211 - SDO/AIA wavelengths
    hmi, magnetogram - SDO/HMI instruments
    c2, c3 - SOHO/LASCO coronagraphs
    """
    )

    parser.add_argument('--source', default='304', choices=list(SOURCES.keys()),
    help='Initial image source/wavelength (default: 304)')
    parser.add_argument('--mode', default='video', choices=['live', 'video'],
    help='Initial display mode (default: video)')
    parser.add_argument('--poll-interval', type=int, default=60,
    help='Seconds between API polls (default: 60)')
    parser.add_argument('--fullscreen', action='store_true',
    help='Start in fullscreen mode')

    args = parser.parse_args()

    print(f"Starting Sun Viewer...")
    print(f"Mode: {args.mode}, Source: {args.source}")
    print(f"Press H for help, F for fullscreen")

    viewer = SunViewer(
    source_id=SOURCES[args.source],
    initial_mode=args.mode,
    poll_interval=args.poll_interval
    )

    if args.fullscreen:
    viewer._toggle_fullscreen()

    try:
    viewer.run()
    except KeyboardInterrupt:
    print("\nShutting down...")
    sys.exit(0)


    if __name__ == "__main__":
    main()