Created
July 24, 2025 12:18
-
-
Save rishubil/864ad10908df5dc83d89d852d593b0ca to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run --script | |
| # /// script | |
| # requires-python = ">=3.8" | |
| # dependencies = [] | |
| # /// | |
| """ | |
| ZFS Health Monitoring Script with Email Alerts | |
| Monitors ZFS pools and SMART disk health, sends Gmail notifications | |
| """ | |
| import smtplib | |
| import subprocess | |
| import sys | |
| import socket | |
| from email.mime.text import MIMEText | |
| from email.mime.multipart import MIMEMultipart | |
| from datetime import datetime | |
| from pathlib import Path | |
| # 설정 - 여기를 수정하세요 | |
| GMAIL_USER = "[email protected]" | |
| GMAIL_PASSWORD = "your-16-digit-app-password" # Gmail 앱 비밀번호 | |
| HOSTNAME = socket.gethostname() | |
| def send_email(subject: str, body: str) -> bool: | |
| """Gmail SMTP를 통해 이메일 전송""" | |
| try: | |
| msg = MIMEMultipart() | |
| msg['From'] = f"ZFS Monitor <{GMAIL_USER}>" | |
| msg['To'] = GMAIL_USER | |
| msg['Subject'] = f"{subject} - {HOSTNAME}" | |
| msg.attach(MIMEText(body, 'plain')) | |
| with smtplib.SMTP('smtp.gmail.com', 587) as server: | |
| server.starttls() | |
| server.login(GMAIL_USER, GMAIL_PASSWORD) | |
| server.send_message(msg) | |
| print(f"✅ Email sent: {subject}") | |
| return True | |
| except Exception as e: | |
| print(f"❌ Failed to send email: {e}") | |
| return False | |
| def run_command(cmd: list) -> tuple[bool, str]: | |
| """명령어 실행 및 결과 반환""" | |
| try: | |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) | |
| return True, result.stdout.strip() | |
| except subprocess.CalledProcessError as e: | |
| return False, e.stderr.strip() if e.stderr else str(e) | |
| except FileNotFoundError: | |
| return False, f"Command not found: {' '.join(cmd)}" | |
| def check_zfs_pools() -> bool: | |
| """ZFS 풀 상태 확인""" | |
| print("🔍 Checking ZFS pools...") | |
| success, output = run_command(['zpool', 'status', '-x']) | |
| if not success: | |
| send_email( | |
| "⚠️ ZFS Check Failed", | |
| f"Could not check ZFS status at {datetime.now()}:\n\n" | |
| f"Error: {output}\n\n" | |
| f"Server: {HOSTNAME}" | |
| ) | |
| return False | |
| if output != "all pools are healthy": | |
| # 상세 상태 가져오기 | |
| success, detailed = run_command(['zpool', 'status', '-v']) | |
| detailed_output = detailed if success else "Could not get detailed status" | |
| send_email( | |
| "🚨 ZFS POOL ALERT", | |
| f"ZFS Pool Issue Detected at {datetime.now()}:\n\n" | |
| f"{detailed_output}\n\n" | |
| f"Please check your system immediately!\n" | |
| f"Server: {HOSTNAME}" | |
| ) | |
| print("⚠️ ZFS pool issues detected - email sent") | |
| return False | |
| print("✅ All ZFS pools are healthy") | |
| return True | |
| def check_smart_health() -> bool: | |
| """SMART 디스크 상태 확인""" | |
| print("🔍 Checking SMART disk health...") | |
| # 디스크 목록 가져오기 | |
| success, output = run_command(['lsblk', '-d', '-o', 'NAME', '--noheadings']) | |
| if not success: | |
| print("❌ Could not get disk list") | |
| return False | |
| disks = [line.strip() for line in output.split('\n') | |
| if line.strip() and any(line.startswith(prefix) for prefix in ['sd', 'nvme'])] | |
| smart_issues = [] | |
| for disk in disks: | |
| disk_path = f"/dev/{disk}" | |
| success, smart_output = run_command(['smartctl', '-H', disk_path]) | |
| if not success: | |
| continue # smartctl 없거나 디스크 접근 불가 | |
| if "PASSED" not in smart_output and "FAILED" in smart_output: | |
| smart_issues.append(f"- {disk_path}: SMART Health Check FAILED") | |
| # 상세 정보 가져오기 | |
| _, detailed = run_command(['smartctl', '-a', disk_path]) | |
| smart_issues.append(f" Details: {detailed[:500]}...") | |
| if smart_issues: | |
| send_email( | |
| "💿 SMART FAILURE ALERT", | |
| f"SMART Health Check Failed at {datetime.now()}:\n\n" | |
| f"{''.join(smart_issues)}\n\n" | |
| f"Server: {HOSTNAME}" | |
| ) | |
| print("⚠️ SMART issues detected - email sent") | |
| return False | |
| print("✅ All disks pass SMART health check") | |
| return True | |
| def check_disk_usage() -> bool: | |
| """디스크 사용량 확인 (보너스)""" | |
| print("🔍 Checking disk usage...") | |
| success, output = run_command(['df', '-h', '/']) | |
| if not success: | |
| return True | |
| lines = output.split('\n') | |
| if len(lines) > 1: | |
| parts = lines[1].split() | |
| if len(parts) >= 5: | |
| usage_percent = int(parts[4].rstrip('%')) | |
| if usage_percent > 90: | |
| send_email( | |
| "💾 DISK SPACE WARNING", | |
| f"Disk usage is {usage_percent}% at {datetime.now()}:\n\n" | |
| f"{output}\n\n" | |
| f"Server: {HOSTNAME}" | |
| ) | |
| print(f"⚠️ High disk usage: {usage_percent}%") | |
| return False | |
| print("✅ Disk usage is normal") | |
| return True | |
| def main(): | |
| """메인 함수""" | |
| print(f"🚀 Starting ZFS health check on {HOSTNAME} at {datetime.now()}") | |
| # 모든 검사 실행 | |
| checks = [ | |
| check_zfs_pools(), | |
| check_smart_health(), | |
| check_disk_usage() | |
| ] | |
| if all(checks): | |
| print("✅ All health checks passed!") | |
| return 0 | |
| else: | |
| print("⚠️ Some health checks failed - notifications sent") | |
| return 1 | |
| if __name__ == "__main__": | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment