Created
September 14, 2025 20:35
-
-
Save MaxWolf-01/3a9ba05e7d5d97f1310abcab06b44372 to your computer and use it in GitHub Desktop.
Live solar imagery viewer using Helioviewer API (uv run sun_viewer.py; press h for options)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run --script | |
| # /// script | |
| # dependencies = [ | |
| # "requests", | |
| # "pillow", | |
| # "pygame", | |
| # "numpy", | |
| # ] | |
| # requires-python = ">=3.10" | |
| # /// | |
| """Live solar imagery viewer using Helioviewer API.""" | |
| from __future__ import annotations | |
| import argparse | |
| import bisect | |
| import json | |
| import sys | |
| import threading | |
| import time | |
| from collections import deque | |
| from dataclasses import dataclass | |
| from datetime import datetime, timedelta, timezone | |
| from io import BytesIO | |
| from pathlib import Path | |
| from typing import Final, TypeAlias | |
| import numpy as np | |
| import pygame | |
| import requests | |
| from PIL import Image | |
| # Type aliases | |
| ImageBuffer: TypeAlias = list[tuple[pygame.Surface, str, str]] # (surface, timestamp, image_id) | |
| SourceId: TypeAlias = int | |
| # Constants | |
| CACHE_DIR: Final = Path("/tmp/sun_viewer_cache") | |
| API_BASE: Final = "https://api.helioviewer.org/v2/" | |
| DEFAULT_FPS: Final = 2 | |
| DEFAULT_BUFFER_SIZE: Final = 60 | |
| POLL_INTERVAL: Final = 60 | |
| WINDOW_SIZE: Final = (1024, 1024) | |
| # Solar observation sources | |
| SOURCES: Final[dict[str, SourceId]] = { | |
| # SDO/AIA wavelengths | |
| '304': 13, '171': 10, '193': 11, '211': 12, '335': 14, | |
| '094': 8, '131': 9, '1600': 15, '1700': 16, '4500': 17, | |
| # SDO/HMI | |
| 'hmi': 18, 'magnetogram': 19, | |
| # SOHO/LASCO | |
| 'c2': 4, 'c3': 5, | |
| # PROBA-2/SWAP | |
| 'swap': 32, | |
| } | |
| SOURCE_NAMES: Final = {v: k for k, v in SOURCES.items()} | |
| WAVELENGTH_INFO: Final = { | |
| '304': '304Å - Chromosphere (50,000K)', | |
| '171': '171Å - Quiet corona (600,000K)', | |
| '193': '193Å - Corona & flares (1.2M K)', | |
| '211': '211Å - Active regions (2M K)', | |
| '335': '335Å - Active regions (2.5M K)', | |
| '094': '094Å - Flaring regions (6M K)', | |
| '131': '131Å - Flares (10M K)', | |
| '1600': '1600Å - Transition region', | |
| '1700': '1700Å - Photosphere', | |
| '4500': '4500Å - Visible light', | |
| 'hmi': 'HMI - Magnetic intensity', | |
| 'magnetogram': 'Magnetogram', | |
| 'c2': 'LASCO C2 - Coronagraph', | |
| 'c3': 'LASCO C3 - Coronagraph', | |
| 'swap': 'SWAP 174Å' | |
| } | |
| @dataclass | |
| class ImageMetadata: | |
| id: str | |
| timestamp: str | |
| source_id: SourceId | |
| class CacheManager: | |
| def __init__(self, cache_dir: Path = CACHE_DIR): | |
| self.cache_dir = cache_dir | |
| self.cache_dir.mkdir(exist_ok=True) | |
| self.metadata_lock = threading.Lock() | |
| self.metadata = self._load_metadata() | |
| def _load_metadata(self) -> dict: | |
| metadata_file = self.cache_dir / "metadata.json" | |
| if metadata_file.exists(): | |
| try: | |
| return json.loads(metadata_file.read_text()) | |
| except: | |
| return {} | |
| return {} | |
| def _save_metadata(self): | |
| with self.metadata_lock: | |
| metadata_file = self.cache_dir / "metadata.json" | |
| metadata_file.write_text(json.dumps(self.metadata)) | |
| def get_path(self, source_id: SourceId, image_id: str) -> Path: | |
| return self.cache_dir / f"{source_id}_{image_id}.jpg" | |
| def save(self, source_id: SourceId, image_id: str, image: Image.Image, timestamp: str): | |
| try: | |
| path = self.get_path(source_id, image_id) | |
| image.save(path, 'JPEG', quality=90) | |
| with self.metadata_lock: | |
| key = str(source_id) | |
| if key not in self.metadata: | |
| self.metadata[key] = {} | |
| self.metadata[key][image_id] = { | |
| 'timestamp': timestamp, | |
| 'cached_at': time.time() | |
| } | |
| self._save_metadata() | |
| except Exception: | |
| pass | |
| def load(self, source_id: SourceId, image_id: str) -> Image.Image | None: | |
| path = self.get_path(source_id, image_id) | |
| if path.exists(): | |
| try: | |
| return Image.open(path) | |
| except: | |
| pass | |
| return None | |
| def is_cached(self, source_id: SourceId, image_id: str) -> bool: | |
| with self.metadata_lock: | |
| return str(source_id) in self.metadata and image_id in self.metadata[str(source_id)] | |
| class HelioviewerClient: | |
| def __init__(self, source_id: SourceId): | |
| self.source_id = source_id | |
| self.session = requests.Session() | |
| def get_closest_image(self, target_time: datetime) -> dict | None: | |
| params = { | |
| 'date': target_time.strftime('%Y-%m-%dT%H:%M:%SZ'), | |
| 'sourceId': self.source_id | |
| } | |
| try: | |
| response = self.session.get(f"{API_BASE}getClosestImage/", params=params, timeout=10) | |
| response.raise_for_status() | |
| return response.json() | |
| except: | |
| return None | |
| def download_image(self, image_id: str, width: int = 1024) -> Image.Image | None: | |
| params = {'id': image_id, 'width': width, 'height': width, 'type': 'jpg'} | |
| try: | |
| response = self.session.get(f"{API_BASE}downloadImage/", params=params, timeout=15) | |
| response.raise_for_status() | |
| return Image.open(BytesIO(response.content)) | |
| except: | |
| return None | |
| class SunViewer: | |
| def __init__(self, source_id: SourceId = 13, initial_mode: str = 'video', | |
| poll_interval: int = POLL_INTERVAL): | |
| pygame.init() | |
| self.source_id = source_id | |
| self.mode = initial_mode | |
| self.poll_interval = poll_interval | |
| self.video_fps = DEFAULT_FPS | |
| self.prefetch_frames = DEFAULT_BUFFER_SIZE | |
| self.fullscreen = False | |
| self.show_info = True | |
| self.show_help = False | |
| self.window_size = WINDOW_SIZE | |
| self._setup_display() | |
| self.font_large = pygame.font.Font(None, 24) | |
| self.font_small = pygame.font.Font(None, 18) | |
| self.clock = pygame.time.Clock() | |
| self.client = HelioviewerClient(source_id) | |
| self.cache = CacheManager() | |
| self.buffers: dict[SourceId, ImageBuffer] = {} | |
| self.image_ids: dict[SourceId, set[str]] = {} | |
| self.playback_indices: dict[SourceId, int] = {} | |
| self.current_surface: pygame.Surface | None = None | |
| self.last_image_time = "Loading..." | |
| self.running = True | |
| self.fetch_lock = threading.Lock() | |
| self.prefetch_stop = threading.Event() | |
| self.prefetch_thread = None | |
| self.fps = 0.0 | |
| self.frame_count = 0 | |
| self.fps_update_time = time.time() | |
| self.mode_message = "" | |
| self.mode_message_alpha = 0 | |
| self._init_buffer(source_id) | |
| def _init_buffer(self, source_id: SourceId): | |
| if source_id not in self.buffers: | |
| self.buffers[source_id] = [] | |
| self.image_ids[source_id] = set() | |
| self.playback_indices[source_id] = 0 | |
| def _setup_display(self): | |
| flags = pygame.FULLSCREEN if self.fullscreen else pygame.RESIZABLE | |
| self.screen = pygame.display.set_mode( | |
| (0, 0) if self.fullscreen else self.window_size, flags | |
| ) | |
| if self.fullscreen: | |
| self.window_size = (self.screen.get_width(), self.screen.get_height()) | |
| pygame.display.set_caption("Live Sun Viewer") | |
| def _pil_to_surface(self, image: Image.Image) -> pygame.Surface: | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| array = np.array(image).transpose(1, 0, 2) | |
| return pygame.surfarray.make_surface(array) | |
| def _scale_to_fit(self, surface: pygame.Surface) -> pygame.Surface: | |
| sw, sh = surface.get_size() | |
| ww, wh = self.window_size | |
| scale = min(ww / sw, wh / sh) | |
| new_size = (int(sw * scale), int(sh * scale)) | |
| return pygame.transform.smoothscale(surface, new_size) | |
| def _show_message(self, text: str, duration: int = 2000): | |
| self.mode_message = text | |
| self.mode_message_alpha = 255 | |
| pygame.time.set_timer(pygame.USEREVENT + 1, duration) | |
| def _toggle_fullscreen(self): | |
| self.fullscreen = not self.fullscreen | |
| self._setup_display() | |
| def _cycle_source(self, direction: int): | |
| sources = list(SOURCES.values()) | |
| idx = sources.index(self.source_id) | |
| self.source_id = sources[(idx + direction) % len(sources)] | |
| self.client.source_id = self.source_id | |
| self._init_buffer(self.source_id) | |
| self._start_prefetch(self.source_id) | |
| self._show_message(f"Source: {SOURCE_NAMES.get(self.source_id, 'Unknown')}") | |
| def _start_prefetch(self, source_id: SourceId): | |
| if self.prefetch_thread and self.prefetch_thread.is_alive(): | |
| self.prefetch_stop.set() | |
| self.prefetch_thread.join(timeout=0.5) | |
| self.prefetch_stop.clear() | |
| self.prefetch_thread = threading.Thread( | |
| target=self._prefetch_historical, | |
| args=(source_id,), | |
| daemon=True | |
| ) | |
| self.prefetch_thread.start() | |
| def _toggle_mode(self): | |
| self.mode = 'live' if self.mode == 'video' else 'video' | |
| self._show_message(f"Mode: {self.mode.upper()}") | |
| if self.mode == 'live': | |
| threading.Thread(target=self._fetch_latest, daemon=True).start() | |
| def _fetch_latest(self): | |
| now = datetime.now(timezone.utc) | |
| data = self.client.get_closest_image(now - timedelta(seconds=30)) | |
| if data and 'id' in data: | |
| if (image := self.client.download_image(data['id'])): | |
| surface = self._pil_to_surface(image) | |
| self.current_surface = surface | |
| self.last_image_time = data.get('date', 'Unknown') | |
| self.cache.save(self.source_id, data['id'], image, self.last_image_time) | |
| def _prefetch_historical(self, source_id: SourceId): | |
| with self.fetch_lock: | |
| current_count = len(self.buffers.get(source_id, [])) | |
| if current_count >= self.prefetch_frames: | |
| print(f"Already have {current_count} frames, skipping prefetch") | |
| return | |
| frames_needed = self.prefetch_frames - current_count | |
| if frames_needed <= 0: | |
| return | |
| print(f"Pre-fetching {frames_needed} more frames for source {source_id} (have {current_count}, want {self.prefetch_frames})...") | |
| client = HelioviewerClient(source_id) | |
| now = datetime.now(timezone.utc) | |
| cached_count = downloaded_count = skipped_count = 0 | |
| # Round to nearest 12-minute boundary for consistent timestamps | |
| base_minute = (now.minute // 12) * 12 | |
| base_time = now.replace(minute=base_minute, second=0, microsecond=0) | |
| for i in range(self.prefetch_frames): | |
| if self.prefetch_stop.is_set(): | |
| return | |
| with self.fetch_lock: | |
| if len(self.buffers.get(source_id, [])) >= self.prefetch_frames: | |
| break | |
| target_time = base_time - timedelta(minutes=i * 12) | |
| try: | |
| if not (data := client.get_closest_image(target_time)): | |
| continue | |
| image_id = data['id'] | |
| # Check if already in our buffer | |
| if image_id in self.image_ids.get(source_id, set()): | |
| skipped_count += 1 | |
| continue | |
| timestamp = data.get('date', 'Unknown') | |
| if self.cache.is_cached(source_id, image_id): | |
| if image := self.cache.load(source_id, image_id): | |
| cached_count += 1 | |
| else: | |
| if not (image := client.download_image(image_id)): | |
| continue | |
| self.cache.save(source_id, image_id, image, timestamp) | |
| downloaded_count += 1 | |
| try: | |
| surface = self._pil_to_surface(image) | |
| with self.fetch_lock: | |
| self._init_buffer(source_id) | |
| bisect.insort(self.buffers[source_id], (surface, timestamp, image_id), key=lambda x: x[1]) | |
| self.image_ids[source_id].add(image_id) | |
| except: | |
| continue | |
| except: | |
| continue | |
| total_frames = len(self.buffers.get(source_id, [])) | |
| print(f"Pre-fetch complete: {total_frames} frames (skipped: {skipped_count}, cached: {cached_count}, downloaded: {downloaded_count})") | |
| def _fetch_worker(self): | |
| while self.running: | |
| try: | |
| now = datetime.now(timezone.utc) | |
| target_time = now - timedelta(minutes=2) # Always fetch recent images (2 min accounts for API delay) | |
| if not (data := self.client.get_closest_image(target_time)): | |
| continue | |
| image_id = data['id'] | |
| if image_id not in self.image_ids.get(self.source_id, set()): | |
| timestamp = data.get('date', 'Unknown') | |
| if self.cache.is_cached(self.source_id, image_id): | |
| image = self.cache.load(self.source_id, image_id) | |
| else: | |
| image = self.client.download_image(image_id) | |
| if image: | |
| self.cache.save(self.source_id, image_id, image, timestamp) | |
| if image: | |
| try: | |
| surface = self._pil_to_surface(image) | |
| with self.fetch_lock: | |
| self._init_buffer(self.source_id) | |
| bisect.insort(self.buffers[self.source_id], (surface, timestamp, image_id), key=lambda x: x[1]) | |
| self.image_ids[self.source_id].add(image_id) | |
| if self.mode == 'live': | |
| self.current_surface = surface | |
| self.last_image_time = timestamp | |
| except: | |
| pass | |
| except: | |
| pass | |
| time.sleep(self.poll_interval) | |
| def _handle_keydown(self, event: pygame.event.Event): | |
| key = event.key | |
| mods = event.mod | |
| if key in (pygame.K_ESCAPE, pygame.K_q): | |
| self.running = False | |
| elif key == pygame.K_c and (mods & pygame.KMOD_CTRL): | |
| self.running = False | |
| elif key in (pygame.K_f, pygame.K_F11): | |
| self._toggle_fullscreen() | |
| elif key in (pygame.K_m, pygame.K_SPACE): | |
| self._toggle_mode() | |
| elif key == pygame.K_LEFT: | |
| self._cycle_source(-1) | |
| elif key == pygame.K_RIGHT: | |
| self._cycle_source(1) | |
| elif key == pygame.K_i: | |
| self.show_info = not self.show_info | |
| self._show_message(f"Info: {'ON' if self.show_info else 'OFF'}") | |
| elif key == pygame.K_h: | |
| self.show_help = not self.show_help | |
| elif key == pygame.K_UP: | |
| self.video_fps = min(30, self.video_fps + 1) | |
| self._show_message(f"Video FPS: {self.video_fps}") | |
| elif key == pygame.K_DOWN: | |
| self.video_fps = max(0.5, self.video_fps - 0.5) | |
| self._show_message(f"Video FPS: {self.video_fps}") | |
| elif key == pygame.K_b: | |
| if mods & pygame.KMOD_SHIFT: | |
| self.prefetch_frames = min(500, self.prefetch_frames + 10) | |
| else: | |
| self.prefetch_frames = max(10, self.prefetch_frames - 10) | |
| self._show_message(f"Buffer: {self.prefetch_frames} frames (~{self.prefetch_frames * 0.2:.1f} hours)") | |
| self._start_prefetch(self.source_id) | |
| def _draw_info(self): | |
| if not self.show_info: | |
| return | |
| source_name = SOURCE_NAMES.get(self.source_id, 'Unknown') | |
| wavelength_desc = WAVELENGTH_INFO.get(source_name, source_name) | |
| buffer = self.buffers.get(self.source_id, []) | |
| buffer_info = f"Buffer: {len(buffer)} frames" if self.mode == 'video' else "Live" | |
| fps_info = f" | Video FPS: {self.video_fps}" if self.mode == 'video' else "" | |
| delay_info = "" | |
| if self.last_image_time and self.last_image_time != "Loading...": | |
| try: | |
| img_dt = datetime.fromisoformat(self.last_image_time.replace('Z', '+00:00')) | |
| delay_minutes = (datetime.now(timezone.utc) - img_dt).total_seconds() / 60 | |
| delay_info = f" | Delay: {delay_minutes:.1f} min" | |
| except: | |
| pass | |
| lines = [ | |
| wavelength_desc, | |
| f"Mode: {self.mode.upper()} | {buffer_info}{fps_info}", | |
| f"Image: {self.last_image_time}{delay_info}", | |
| ] | |
| y = 10 | |
| for line in lines: | |
| text = self.font_small.render(line, True, (200, 200, 200)) | |
| rect = text.get_rect() | |
| bg = pygame.Surface((rect.width + 10, rect.height + 4)) | |
| bg.set_alpha(180) | |
| bg.fill((0, 0, 0)) | |
| self.screen.blit(bg, (10, y)) | |
| self.screen.blit(text, (15, y + 2)) | |
| y += rect.height + 5 | |
| if self.mode_message_alpha > 0: | |
| msg = self.font_large.render(self.mode_message, True, (255, 255, 255)) | |
| msg.set_alpha(self.mode_message_alpha) | |
| msg_rect = msg.get_rect(center=(self.window_size[0] // 2, self.window_size[1] // 2)) | |
| bg = pygame.Surface((msg_rect.width + 20, msg_rect.height + 10)) | |
| bg.set_alpha(int(self.mode_message_alpha * 0.7)) | |
| bg.fill((0, 0, 0)) | |
| bg_rect = bg.get_rect(center=(self.window_size[0] // 2, self.window_size[1] // 2)) | |
| self.screen.blit(bg, bg_rect) | |
| self.screen.blit(msg, msg_rect) | |
| self.mode_message_alpha = max(0, self.mode_message_alpha - 5) | |
| def _draw_help(self): | |
| overlay = pygame.Surface(self.window_size) | |
| overlay.set_alpha(240) | |
| overlay.fill((0, 0, 0)) | |
| self.screen.blit(overlay, (0, 0)) | |
| lines = [ | |
| "KEYBOARD SHORTCUTS", | |
| "", | |
| "F / F11 - Toggle fullscreen", | |
| "M / Space - Switch mode (Live/Video)", | |
| "Left/Right - Cycle through wavelengths", | |
| "Up/Down - Adjust video FPS (0.5-30)", | |
| "b / B - Decrease/increase buffer size", | |
| "I - Toggle info display", | |
| "H - Show/hide this help", | |
| "ESC / Q / Ctrl+C - Quit", | |
| ] | |
| y = self.window_size[1] // 2 - len(lines) * 15 | |
| for line in lines: | |
| font = self.font_large if line == "KEYBOARD SHORTCUTS" else self.font_small | |
| color = (255, 200, 0) if line == "KEYBOARD SHORTCUTS" else (255, 255, 255) | |
| text = font.render(line, True, color) | |
| rect = text.get_rect(center=(self.window_size[0] // 2, y)) | |
| self.screen.blit(text, rect) | |
| y += 30 | |
| def run(self): | |
| self._start_prefetch(self.source_id) | |
| threading.Thread(target=self._fetch_worker, daemon=True).start() | |
| self._show_message("Loading solar imagery...", 3000) | |
| playback_counter = 0 | |
| while self.running: | |
| for event in pygame.event.get(): | |
| if event.type == pygame.QUIT: | |
| self.running = False | |
| elif event.type == pygame.KEYDOWN: | |
| self._handle_keydown(event) | |
| elif event.type == pygame.VIDEORESIZE and not self.fullscreen: | |
| self.window_size = (event.w, event.h) | |
| self.screen = pygame.display.set_mode(self.window_size, pygame.RESIZABLE) | |
| elif event.type == pygame.USEREVENT + 1: | |
| pygame.time.set_timer(pygame.USEREVENT + 1, 0) | |
| self.screen.fill((0, 0, 0)) | |
| buffer = self.buffers.get(self.source_id, []) | |
| if self.mode == 'video' and buffer: | |
| playback_counter += 1 | |
| playback_interval = max(1, int(60 / self.video_fps)) | |
| if playback_counter >= playback_interval: | |
| playback_counter = 0 | |
| with self.fetch_lock: | |
| if buffer: | |
| idx = self.playback_indices.get(self.source_id, 0) | |
| if idx >= len(buffer): | |
| idx = 0 | |
| self.current_surface, self.last_image_time, _ = buffer[idx] | |
| self.playback_indices[self.source_id] = (idx + 1) % len(buffer) | |
| if self.current_surface: | |
| scaled = self._scale_to_fit(self.current_surface) | |
| x = (self.window_size[0] - scaled.get_width()) // 2 | |
| y = (self.window_size[1] - scaled.get_height()) // 2 | |
| self.screen.blit(scaled, (x, y)) | |
| if self.show_help: | |
| self._draw_help() | |
| else: | |
| self._draw_info() | |
| pygame.display.flip() | |
| self.frame_count += 1 | |
| if (current_time := time.time()) - self.fps_update_time >= 1.0: | |
| self.fps = self.frame_count / (current_time - self.fps_update_time) | |
| self.frame_count = 0 | |
| self.fps_update_time = current_time | |
| self.clock.tick(60) | |
| pygame.quit() | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description='Live Sun Viewer - Display near-real-time solar imagery', | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Keyboard shortcuts: | |
| F / F11 - Toggle fullscreen | |
| M / Space - Switch between Live and Video modes | |
| ← / → - Cycle through wavelengths | |
| ↑ / ↓ - Adjust video FPS (0.5-30) | |
| b / B - Decrease/increase buffer size | |
| I - Toggle info display | |
| H - Show help | |
| ESC / Q / Ctrl+C - Quit | |
| Available sources (--source): | |
| 304, 171, 193, 211 - SDO/AIA wavelengths | |
| hmi, magnetogram - SDO/HMI instruments | |
| c2, c3 - SOHO/LASCO coronagraphs | |
| """ | |
| ) | |
| parser.add_argument('--source', default='304', choices=list(SOURCES.keys()), | |
| help='Initial image source/wavelength (default: 304)') | |
| parser.add_argument('--mode', default='video', choices=['live', 'video'], | |
| help='Initial display mode (default: video)') | |
| parser.add_argument('--poll-interval', type=int, default=60, | |
| help='Seconds between API polls (default: 60)') | |
| parser.add_argument('--fullscreen', action='store_true', | |
| help='Start in fullscreen mode') | |
| args = parser.parse_args() | |
| print(f"Starting Sun Viewer...") | |
| print(f"Mode: {args.mode}, Source: {args.source}") | |
| print(f"Press H for help, F for fullscreen") | |
| viewer = SunViewer( | |
| source_id=SOURCES[args.source], | |
| initial_mode=args.mode, | |
| poll_interval=args.poll_interval | |
| ) | |
| if args.fullscreen: | |
| viewer._toggle_fullscreen() | |
| try: | |
| viewer.run() | |
| except KeyboardInterrupt: | |
| print("\nShutting down...") | |
| sys.exit(0) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment