-
-
Save aminyuddin/cec4da93245c3660e87c16ab53bfc977 to your computer and use it in GitHub Desktop.
Minecraft discord bot for my kids.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| TOKEN= | |
| GUILD_ID= | |
| ALLOWED_USERS= | |
| XBL_API_KEY= |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # ===================== Imports ===================== | |
| import discord | |
| from discord.ext import commands | |
| from discord import app_commands | |
| import subprocess | |
| from pathlib import Path | |
| from datetime import datetime, timedelta | |
| from dotenv import load_dotenv | |
| import os | |
| import tempfile | |
| import shutil | |
| import json | |
| import zipfile | |
| import requests | |
| import dateutil.parser | |
| # ===================== Configuration ===================== | |
| load_dotenv() | |
| TOKEN = os.getenv("TOKEN") | |
| GUILD_ID = int(os.getenv("GUILD_ID")) | |
| ALLOWED_USERS = [int(uid) for uid in os.getenv("ALLOWED_USERS", "").split(",") if uid.strip()] | |
| XBL_API_KEY = os.getenv("XBL_API_KEY") # API key for xbl.io | |
| # ===================== Path Configuration ===================== | |
| MINECRAFT_BASE = "/opt/minecraft" # Base directory for all Minecraft files | |
| BEDROCK_DIR = f"{MINECRAFT_BASE}/minecraft-bedrock" # Bedrock server directory | |
| WORLD_NAME = "NetherNest" # Name of the Minecraft world | |
| WORLD_DIR = f"{BEDROCK_DIR}/worlds/{WORLD_NAME}" # Path to the world directory | |
| BACKUP_DIR = f"{MINECRAFT_BASE}/minecraft-backups" # Where backups are stored | |
| RCLONE_REMOTE = "GDrive:/MinecraftBackups" # Rclone remote for Google Drive | |
| NBT_PATH = f"{WORLD_DIR}/level.dat" # Path to the NBT data file | |
| SERVER_PROPERTIES = f"{BEDROCK_DIR}/server.properties" # Server properties file | |
| ALLOWLIST_PATH = f"{BEDROCK_DIR}/allowlist.json" | |
| # === MCXboxBroadcastStandalone control constants === | |
| MCBROADCAST_JAR = f"{MINECRAFT_BASE}/MCXboxBroadcastStandalone.jar" | |
| MCBROADCAST_LOG = f"{MINECRAFT_BASE}/logs/latest.log" | |
| MCBROADCAST_SESSION = "mcbroadcast" | |
| intents = discord.Intents.default() | |
| intents.message_content = True | |
| bot = commands.Bot(command_prefix="!", intents=intents) | |
| # ===================== NBT Helpers ===================== | |
| def read_nbt_tags(): | |
| """Read relevant tags from level.dat (NBT).""" | |
| def extract_string_tag(tag_name): | |
| with open(NBT_PATH, "rb") as f: | |
| data = f.read() | |
| tag_bytes = tag_name.encode("utf-8") | |
| search = b'\x08' + len(tag_bytes).to_bytes(2, "little") + tag_bytes | |
| idx = data.find(search) | |
| if idx == -1: | |
| return None | |
| str_len_offset = idx + 1 + 2 + len(tag_bytes) | |
| str_len = int.from_bytes(data[str_len_offset:str_len_offset+2], "little") | |
| str_start = str_len_offset + 2 | |
| value = data[str_start:str_start+str_len].decode("utf-8", errors="replace") | |
| return value | |
| def extract_int_tag(tag_name): | |
| with open(NBT_PATH, "rb") as f: | |
| data = f.read() | |
| tag_bytes = tag_name.encode("utf-8") | |
| search = b'\x03' + len(tag_bytes).to_bytes(2, "little") + tag_bytes | |
| idx = data.find(search) | |
| if idx == -1: | |
| return None | |
| value_offset = idx + 1 + 2 + len(tag_bytes) | |
| value = int.from_bytes(data[value_offset:value_offset+4], "little") | |
| return value | |
| def extract_bool_tag(tag_name): | |
| with open(NBT_PATH, "rb") as f: | |
| data = f.read() | |
| tag_bytes = tag_name.encode("utf-8") | |
| search = b'\x01' + len(tag_bytes).to_bytes(2, "little") + tag_bytes | |
| idx = data.find(search) | |
| if idx == -1: | |
| return None | |
| value_offset = idx + 1 + 2 + len(tag_bytes) | |
| value = data[value_offset] | |
| return bool(value) | |
| try: | |
| return { | |
| "LevelName": extract_string_tag("LevelName"), | |
| "GameType": extract_int_tag("GameType"), | |
| "ForceGameType": extract_bool_tag("ForceGameType"), | |
| "cheatsEnabled": extract_bool_tag("cheatsEnabled"), | |
| "commandsEnabled": extract_bool_tag("commandsEnabled"), | |
| "Difficulty": extract_int_tag("Difficulty"), | |
| "hasBeenLoadedInCreative": extract_bool_tag("hasBeenLoadedInCreative"), | |
| } | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def update_nbt_tags(updates: dict): | |
| """Update relevant tags in level.dat (NBT).""" | |
| def update_int_tag(tag_name, new_value): | |
| with open(NBT_PATH, "rb") as f: | |
| data = bytearray(f.read()) | |
| tag_bytes = tag_name.encode("utf-8") | |
| search = b'\x03' + len(tag_bytes).to_bytes(2, "little") + tag_bytes | |
| idx = data.find(search) | |
| if idx == -1: | |
| return False | |
| value_offset = idx + 1 + 2 + len(tag_bytes) | |
| data[value_offset:value_offset+4] = new_value.to_bytes(4, "little") | |
| with open(NBT_PATH, "wb") as f: | |
| f.write(data) | |
| return True | |
| def update_bool_tag(tag_name, new_value): | |
| with open(NBT_PATH, "rb") as f: | |
| data = bytearray(f.read()) | |
| tag_bytes = tag_name.encode("utf-8") | |
| search = b'\x01' + len(tag_bytes).to_bytes(2, "little") + tag_bytes | |
| idx = data.find(search) | |
| if idx == -1: | |
| return False | |
| value_offset = idx + 1 + 2 + len(tag_bytes) | |
| data[value_offset] = 1 if new_value else 0 | |
| with open(NBT_PATH, "wb") as f: | |
| f.write(data) | |
| return True | |
| def update_string_tag(tag_name, new_value): | |
| with open(NBT_PATH, "rb") as f: | |
| data = bytearray(f.read()) | |
| tag_bytes = tag_name.encode("utf-8") | |
| search = b'\x08' + len(tag_bytes).to_bytes(2, "little") + tag_bytes | |
| idx = data.find(search) | |
| if idx == -1: | |
| return False | |
| str_len_offset = idx + 1 + 2 + len(tag_bytes) | |
| str_len = int.from_bytes(data[str_len_offset:str_len_offset+2], "little") | |
| if len(new_value.encode("utf-8")) != str_len: | |
| return False | |
| str_start = str_len_offset + 2 | |
| data[str_start:str_start+str_len] = new_value.encode("utf-8") | |
| with open(NBT_PATH, "wb") as f: | |
| f.write(data) | |
| return True | |
| try: | |
| for key, value in updates.items(): | |
| if key == "LevelName": | |
| if not update_string_tag(key, value): | |
| return f"Failed to update {key}" | |
| elif key in ["GameType", "Difficulty"]: | |
| if not update_int_tag(key, int(value)): | |
| return f"Failed to update {key}" | |
| elif key in ["ForceGameType", "cheatsEnabled", "commandsEnabled", "hasBeenLoadedInCreative"]: | |
| if not update_bool_tag(key, bool(value)): | |
| return f"Failed to update {key}" | |
| return True | |
| except Exception as e: | |
| return str(e) | |
| # ===================== Server Properties Helpers ===================== | |
| def update_server_properties(properties: dict): | |
| """Update server.properties file with given properties.""" | |
| with open(SERVER_PROPERTIES, "r") as f: | |
| lines = f.readlines() | |
| new_lines = [] | |
| keys_set = set(properties.keys()) | |
| for line in lines: | |
| if line.strip() == "" or line.startswith("#") or "=" not in line: | |
| new_lines.append(line) | |
| continue | |
| key, _ = line.split("=", 1) | |
| if key in properties: | |
| new_lines.append(f"{key}={properties[key]}\n") | |
| keys_set.remove(key) | |
| else: | |
| new_lines.append(line) | |
| for key in keys_set: | |
| new_lines.append(f"{key}={properties[key]}\n") | |
| with open(SERVER_PROPERTIES, "w") as f: | |
| f.writelines(new_lines) | |
| # ===================== Backup and Restore Helpers ===================== | |
| def create_minecraft_backup(suffix=""): | |
| """Create a zip backup of the world, sync to GDrive, and delete old backups. Suffix is appended to the filename before .zip.""" | |
| Path(BACKUP_DIR).mkdir(parents=True, exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y%m%d%H%M%S") | |
| # Add dash before suffix if suffix is not empty | |
| suffix_str = f"-{suffix}" if suffix else "" | |
| backup_file = f"{BACKUP_DIR}/minecraft-backup-{timestamp}{suffix_str}.zip" | |
| # Zip only the world directory, not the full path | |
| parent_dir = str(Path(WORLD_DIR).parent) | |
| world_name = Path(WORLD_DIR).name | |
| subprocess.getoutput(f"cd '{parent_dir}' && zip -r '{backup_file}' '{world_name}'") | |
| subprocess.getoutput(f"rclone copy '{backup_file}' '{RCLONE_REMOTE}'") | |
| now = datetime.now() | |
| for file in Path(BACKUP_DIR).glob("*.zip"): | |
| mtime = datetime.fromtimestamp(file.stat().st_mtime) | |
| if (now - mtime) > timedelta(days=7): | |
| file.unlink() | |
| return backup_file | |
| def restore_simple_backup(backup_filename): | |
| """Restore a simple backup zip that contains only the top-level NetherNest folder.""" | |
| stop_minecraft_server() | |
| subprocess.call(["rm", "-rf", WORLD_DIR]) | |
| Path(WORLD_DIR).parent.mkdir(parents=True, exist_ok=True) | |
| # Unzip so that NetherNest/ goes into the worlds directory | |
| subprocess.getoutput(f"unzip -o '{BACKUP_DIR}/{backup_filename}' -d '{Path(WORLD_DIR).parent}'") | |
| start_minecraft_server() | |
| def restore_problematic_backup(backup_filename): | |
| """Restore a problematic backup zip with possible nested world folders.""" | |
| stop_minecraft_server() | |
| subprocess.call(["rm", "-rf", WORLD_DIR]) | |
| Path(WORLD_DIR).mkdir(parents=True, exist_ok=True) | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| subprocess.getoutput(f"unzip -o '{BACKUP_DIR}/{backup_filename}' -d '{tmpdir}'") | |
| found = False | |
| for root, dirs, files in os.walk(tmpdir): | |
| if 'level.dat' in files: | |
| for item in os.listdir(root): | |
| s = os.path.join(root, item) | |
| d = os.path.join(WORLD_DIR, item) | |
| if os.path.isdir(s): | |
| shutil.copytree(s, d, dirs_exist_ok=True) | |
| else: | |
| shutil.copy2(s, d) | |
| found = True | |
| break | |
| if not found: | |
| raise Exception("No world directory with level.dat found in backup.") | |
| start_minecraft_server() | |
| def restore_local_backup(backup_filename): | |
| """Restore a local backup zip to the world directory. If the backup is simple, use restore_simple_backup. Otherwise, use restore_problematic_backup.""" | |
| with zipfile.ZipFile(f"{BACKUP_DIR}/{backup_filename}", 'r') as zipf: | |
| namelist = zipf.namelist() | |
| top_level = set(x.split('/')[0] for x in namelist if '/' in x) | |
| if len(top_level) == 1 and any(x.endswith('level.dat') for x in namelist): | |
| return restore_simple_backup(backup_filename) | |
| return restore_problematic_backup(backup_filename) | |
| def restore_gdrive_backup(backup_filename): | |
| """Download a backup from GDrive and restore it to the world directory.""" | |
| subprocess.getoutput(f"rclone copy '{RCLONE_REMOTE}/{backup_filename}' '{BACKUP_DIR}'") | |
| restore_local_backup(backup_filename) | |
| # ===================== Helper Functions ===================== | |
| def get_minecraft_server_status(): | |
| """Get Minecraft server status. Returns status string.""" | |
| return subprocess.getoutput("systemctl is-active minecraft-bedrock") | |
| def get_minecraft_server_uptime(): | |
| """Get Minecraft server uptime. Returns formatted uptime string or 'Unknown'.""" | |
| try: | |
| uptime_cmd = subprocess.getoutput("systemctl show minecraft-bedrock --property=ExecMainStartTimestamp --value") | |
| if uptime_cmd: | |
| start_time = dateutil.parser.parse(uptime_cmd) | |
| uptime_delta = datetime.now(start_time.tzinfo) - start_time | |
| days = uptime_delta.days | |
| hours, remainder = divmod(uptime_delta.seconds, 3600) | |
| minutes, _ = divmod(remainder, 60) | |
| if days > 0: | |
| return f"{days}d {hours}h {minutes}m" | |
| else: | |
| return f"{hours}h {minutes}m" | |
| except: | |
| pass | |
| return "Unknown" | |
| def get_recent_player_activity(limit=20): | |
| """Get recent player activity from logs. Returns list of log lines.""" | |
| output = subprocess.getoutput(f"journalctl -u minecraft-bedrock | grep -i -E '(player connected|player disconnected|player.*joined|player.*left)' | tail -n {limit}") | |
| if not output: | |
| return [] | |
| log_lines = [line.strip() for line in output.split('\n') if line.strip()] | |
| return log_lines | |
| def get_recent_connected_players(limit=5): | |
| """Get list of recently connected player names. Returns list of player names.""" | |
| output = subprocess.getoutput(f"journalctl -u minecraft-bedrock | grep -i 'player connected' | tail -n {limit} | sed 's/.*Player connected: //' | sed 's/, xuid.*//'") | |
| if not output: | |
| return [] | |
| players = output.strip().split('\n') | |
| return [p.strip() for p in players if p.strip()] | |
| def get_server_properties(keys=None): | |
| """Get server properties. If keys provided, only return those keys. Returns dict.""" | |
| props = {} | |
| try: | |
| with open(SERVER_PROPERTIES, "r") as f: | |
| for line in f: | |
| line = line.strip() | |
| if line and not line.startswith("#") and "=" in line: | |
| key, value = line.split("=", 1) | |
| if keys is None or key in keys: | |
| props[key] = value | |
| except: | |
| pass | |
| return props | |
| def get_allowlist_count(): | |
| """Get number of players in allowlist. Returns integer.""" | |
| try: | |
| with open(ALLOWLIST_PATH, "r") as f: | |
| allowlist = json.load(f) | |
| return len(allowlist) | |
| except: | |
| return 0 | |
| def get_latest_backup_info(): | |
| """Get info about the latest backup. Returns formatted string or 'None'.""" | |
| backup_files = list(Path(BACKUP_DIR).glob("*.zip")) | |
| if not backup_files: | |
| return "None" | |
| latest_backup = max(backup_files, key=lambda f: f.stat().st_mtime) | |
| backup_time = datetime.fromtimestamp(latest_backup.stat().st_mtime) | |
| time_ago = datetime.now() - backup_time | |
| if time_ago.days > 0: | |
| return f"{time_ago.days}d ago" | |
| elif time_ago.seconds > 3600: | |
| return f"{time_ago.seconds//3600}h ago" | |
| else: | |
| return f"{time_ago.seconds//60}m ago" | |
| def restart_minecraft_server(): | |
| """Restart the Minecraft server using systemctl.""" | |
| subprocess.call(["sudo", "systemctl", "restart", "minecraft-bedrock"]) | |
| def start_minecraft_server(): | |
| """Start the Minecraft server using systemctl.""" | |
| subprocess.call(["sudo", "systemctl", "start", "minecraft-bedrock"]) | |
| def stop_minecraft_server(): | |
| """Stop the Minecraft server using systemctl.""" | |
| subprocess.call(["sudo", "systemctl", "stop", "minecraft-bedrock"]) | |
| def create_allowlist_view_and_content(): | |
| """Helper function to create allowlist view and content. Returns (content, view).""" | |
| try: | |
| with open(ALLOWLIST_PATH, "r") as f: | |
| allowlist = json.load(f) | |
| except Exception: | |
| allowlist = [] | |
| if not allowlist: | |
| view = discord.ui.View(timeout=60) | |
| view.add_item(AddAllowlistButton()) | |
| return "ℹ️ No players in allowlist.", view | |
| lines = [f"- `{entry.get('name', '?')}` (XUID: `{entry.get('xuid', '?')}`)" for entry in allowlist] | |
| content = "📜 **Allowlist:**\n" + "\n".join(lines) | |
| view = AllowlistView(allowlist) | |
| return content, view | |
| # ===================== Discord UI Classes ===================== | |
| class BackupBeforeRestoreView(discord.ui.View): | |
| """Confirmation view for asking if user wants to backup before restoring.""" | |
| def __init__(self, backup_name, restore_function): | |
| super().__init__(timeout=30) | |
| self.backup_name = backup_name | |
| self.restore_function = restore_function | |
| @discord.ui.button(label="Yes, create backup", style=discord.ButtonStyle.primary) | |
| async def create_backup(self, interaction: discord.Interaction, button: discord.ui.Button): | |
| await interaction.response.edit_message(content=f"💾 Creating backup before restoring `{self.backup_name}`...", view=None) | |
| create_minecraft_backup(suffix="pre-restore") | |
| await interaction.followup.send(f"✅ Backup created! Now restoring `{self.backup_name}`...", ephemeral=True) | |
| await self.restore_function(interaction) | |
| @discord.ui.button(label="No, just restore", style=discord.ButtonStyle.danger) | |
| async def skip_backup(self, interaction: discord.Interaction, button: discord.ui.Button): | |
| await interaction.response.edit_message(content=f"⏳ Restoring `{self.backup_name}` without backup...", view=None) | |
| await self.restore_function(interaction) | |
| @discord.ui.button(label="Cancel", style=discord.ButtonStyle.secondary) | |
| async def cancel(self, interaction: discord.Interaction, button: discord.ui.Button): | |
| await interaction.response.edit_message(content="Restore cancelled.", view=None) | |
| class RestoreDropdown(discord.ui.Select): | |
| """Dropdown for selecting a local backup to restore.""" | |
| def __init__(self, backups): | |
| options = [discord.SelectOption(label=b.name, value=b.name) for b in backups] | |
| super().__init__(placeholder="Select local backup", options=options) | |
| async def callback(self, interaction): | |
| selected_backup = self.values[0] | |
| async def perform_restore(interaction): | |
| restore_local_backup(selected_backup) | |
| await interaction.followup.send(f"✅ Restored `{selected_backup}`.", ephemeral=True) | |
| await interaction.response.send_message( | |
| f"Do you want to create a backup before restoring `{selected_backup}`?", | |
| view=BackupBeforeRestoreView(selected_backup, perform_restore), | |
| ephemeral=True | |
| ) | |
| class RestoreByFilenameModal(discord.ui.Modal, title="Restore Local Backup by Filename"): | |
| filename = discord.ui.TextInput( | |
| label="Backup filename", | |
| placeholder="e.g. minecraft-backup-YYYYMMDDHHMMSS.zip", | |
| required=True | |
| ) | |
| async def on_submit(self, interaction: discord.Interaction): | |
| await interaction.response.defer(ephemeral=True) | |
| backup_file = self.filename.value.strip() | |
| if not Path(f"{BACKUP_DIR}/{backup_file}").exists(): | |
| await interaction.followup.send(f"❌ Backup `{backup_file}` not found.", ephemeral=True) | |
| return | |
| async def perform_restore(interaction): | |
| restore_local_backup(backup_file) | |
| await interaction.followup.send(f"✅ Restored `{backup_file}`.", ephemeral=True) | |
| await interaction.followup.send( | |
| f"Do you want to create a backup before restoring `{backup_file}`?", | |
| view=BackupBeforeRestoreView(backup_file, perform_restore) | |
| ) | |
| class RestoreDropdownView(discord.ui.View): | |
| """View for RestoreDropdown.""" | |
| def __init__(self, backups): | |
| super().__init__(timeout=30) | |
| self.add_item(RestoreDropdown(backups)) | |
| self.add_item(self.RestoreByFilenameButton()) | |
| self.add_item(self.CancelButton()) | |
| class RestoreByFilenameButton(discord.ui.Button): | |
| def __init__(self): | |
| super().__init__(label="Restore by Filename", style=discord.ButtonStyle.primary) | |
| async def callback(self, interaction: discord.Interaction): | |
| await interaction.response.send_modal(RestoreByFilenameModal()) | |
| class CancelButton(discord.ui.Button): | |
| def __init__(self): | |
| super().__init__(label="Cancel", style=discord.ButtonStyle.danger) | |
| async def callback(self, interaction: discord.Interaction): | |
| await interaction.response.edit_message(content="Restore cancelled.", view=None) | |
| class GDriveRestoreDropdown(discord.ui.Select): | |
| """Dropdown for selecting a GDrive backup to restore.""" | |
| def __init__(self, files): | |
| options = [discord.SelectOption(label=f, value=f) for f in files] | |
| super().__init__(placeholder="Select GDrive backup", options=options) | |
| async def callback(self, interaction): | |
| file = self.values[0] | |
| async def perform_restore(interaction): | |
| restore_gdrive_backup(file) | |
| await interaction.followup.send(f"✅ Restored `{file}`.", ephemeral=True) | |
| await interaction.response.send_message( | |
| f"Do you want to create a backup before restoring `{file}`?", | |
| view=BackupBeforeRestoreView(file, perform_restore), | |
| ephemeral=True | |
| ) | |
| class GDriveRestoreByFilenameModal(discord.ui.Modal, title="Restore GDrive Backup by Filename"): | |
| filename = discord.ui.TextInput( | |
| label="Backup filename", | |
| placeholder="e.g. minecraft-backup-YYYYMMDDHHMMSS.zip", | |
| required=True | |
| ) | |
| async def on_submit(self, interaction: discord.Interaction): | |
| await interaction.response.defer(ephemeral=True) | |
| backup_file = self.filename.value.strip() | |
| async def perform_restore(interaction): | |
| restore_gdrive_backup(backup_file) | |
| await interaction.followup.send(f"✅ Restored `{backup_file}` from GDrive.", ephemeral=True) | |
| await interaction.followup.send( | |
| f"Do you want to create a backup before restoring `{backup_file}`?", | |
| view=BackupBeforeRestoreView(backup_file, perform_restore) | |
| ) | |
| class GDriveRestoreView(discord.ui.View): | |
| """View for GDriveRestoreDropdown.""" | |
| def __init__(self, files): | |
| super().__init__(timeout=30) | |
| self.add_item(GDriveRestoreDropdown(files)) | |
| self.add_item(self.RestoreByFilenameButton()) | |
| self.add_item(self.CancelButton()) | |
| class RestoreByFilenameButton(discord.ui.Button): | |
| def __init__(self): | |
| super().__init__(label="Restore by Filename", style=discord.ButtonStyle.primary) | |
| async def callback(self, interaction: discord.Interaction): | |
| await interaction.response.send_modal(GDriveRestoreByFilenameModal()) | |
| class CancelButton(discord.ui.Button): | |
| def __init__(self): | |
| super().__init__(label="Cancel", style=discord.ButtonStyle.danger) | |
| async def callback(self, interaction: discord.Interaction): | |
| await interaction.response.edit_message(content="Restore cancelled.", view=None) | |
| class GDriveDeleteConfirm(discord.ui.View): | |
| """Confirmation view for deleting a GDrive backup.""" | |
| def __init__(self, file): | |
| super().__init__(timeout=30) | |
| self.file = file | |
| @discord.ui.button(label="✅ Confirm Delete", style=discord.ButtonStyle.danger) | |
| async def confirm(self, interaction: discord.Interaction, button: discord.ui.Button): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| return | |
| await interaction.response.edit_message(content=f"⏳ Deleting `{self.file}` from GDrive...", view=None) | |
| subprocess.getoutput(f'rclone delete "{RCLONE_REMOTE}/{self.file}"') | |
| try: | |
| await interaction.followup.send(f"🗑️ Deleted `{self.file}` from GDrive.", ephemeral=True) | |
| except discord.HTTPException: | |
| pass | |
| @discord.ui.button(label="❌ Cancel", style=discord.ButtonStyle.secondary) | |
| async def cancel(self, interaction: discord.Interaction, button: discord.ui.Button): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| return | |
| try: | |
| await interaction.response.edit_message(content="❌ Cancelled delete.", view=None) | |
| except discord.NotFound: | |
| try: | |
| await interaction.followup.send("❌ Cancelled (original message not found).", ephemeral=True) | |
| except discord.HTTPException: | |
| pass | |
| class GDriveDeleteDropdown(discord.ui.Select): | |
| """Dropdown for selecting a GDrive backup to delete.""" | |
| def __init__(self, files): | |
| options = [discord.SelectOption(label=f, value=f) for f in files] | |
| super().__init__(placeholder="Select GDrive backup to delete", options=options) | |
| async def callback(self, interaction): | |
| file = self.values[0] | |
| await interaction.response.send_message( | |
| f"⚠️ Confirm delete `{file}`?", | |
| view=GDriveDeleteConfirm(file), | |
| ephemeral=True | |
| ) | |
| class GDriveDeleteView(discord.ui.View): | |
| """View for GDriveDeleteDropdown.""" | |
| def __init__(self, files): | |
| super().__init__(timeout=30) | |
| self.add_item(GDriveDeleteDropdown(files)) | |
| class UpdatePropertyButton(discord.ui.Button): | |
| """Button for updating a server property.""" | |
| def __init__(self, setting: str, value: str): | |
| super().__init__(label=value, style=discord.ButtonStyle.primary) | |
| self.setting = setting | |
| self.value = value | |
| async def callback(self, interaction: discord.Interaction): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| update_server_properties({self.setting: self.value}) | |
| restart_minecraft_server() | |
| # Create a simple success view | |
| success_view = discord.ui.View(timeout=30) | |
| success_button = discord.ui.Button(label=f"✅ {self.value}", style=discord.ButtonStyle.success, disabled=True) | |
| success_view.add_item(success_button) | |
| await interaction.response.edit_message(content=f"✅ `{self.setting}` updated to `{self.value}` and server restarted.", view=success_view) | |
| class MaxPlayersModal(discord.ui.Modal, title="Set Max Players"): | |
| new_value = discord.ui.TextInput(label="Enter new max-players value", placeholder="e.g. 20", required=True) | |
| def __init__(self, setting: str): | |
| super().__init__() | |
| self.setting = setting | |
| async def on_submit(self, interaction: discord.Interaction): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| return | |
| try: | |
| value = int(self.new_value.value) | |
| if value < 1 or value > 100: | |
| await interaction.response.send_message("⚠️ Please enter a value between 1 and 100.", ephemeral=True) | |
| return | |
| update_server_properties({self.setting: str(value)}) | |
| restart_minecraft_server() | |
| await interaction.response.send_message(f"✅ `{self.setting}` updated to `{value}` and server restarted.", ephemeral=True) | |
| except ValueError: | |
| await interaction.response.send_message("⚠️ Please enter a valid integer.", ephemeral=True) | |
| class CancelButton(discord.ui.Button): | |
| def __init__(self): | |
| super().__init__(label="Cancel", style=discord.ButtonStyle.danger) | |
| async def callback(self, interaction: discord.Interaction): | |
| await interaction.response.edit_message(content="Cancelled. No changes made.", view=None) | |
| class ServerPropsView(discord.ui.View): | |
| """View for selecting and updating server.properties settings.""" | |
| def __init__(self): | |
| super().__init__(timeout=60) | |
| self.add_item(self.DismissButton()) | |
| @discord.ui.select( | |
| placeholder="Choose setting to modify", | |
| options=[ | |
| discord.SelectOption(label="Difficulty", value="difficulty"), | |
| discord.SelectOption(label="Gamemode", value="gamemode"), | |
| discord.SelectOption(label="Allow Cheats", value="allow-cheats"), | |
| discord.SelectOption(label="Force Gamemode", value="force-gamemode"), | |
| discord.SelectOption(label="Max Players", value="max-players"), | |
| discord.SelectOption(label="Allow List", value="allow-list"), | |
| ] | |
| ) | |
| async def select_setting(self, interaction: discord.Interaction, select: discord.ui.Select): | |
| setting = select.values[0] | |
| if setting == "difficulty": | |
| options = ["peaceful", "easy", "normal", "hard"] | |
| buttons = discord.ui.View(timeout=30) | |
| for value in options: | |
| buttons.add_item(UpdatePropertyButton(setting, value)) | |
| buttons.add_item(CancelButton()) | |
| await interaction.response.send_message(f"🔧 Choose new value for `{setting}`:", view=buttons, ephemeral=True) | |
| elif setting == "gamemode": | |
| options = ["survival", "creative", "adventure"] | |
| buttons = discord.ui.View(timeout=30) | |
| for value in options: | |
| buttons.add_item(UpdatePropertyButton(setting, value)) | |
| buttons.add_item(CancelButton()) | |
| await interaction.response.send_message(f"🔧 Choose new value for `{setting}`:", view=buttons, ephemeral=True) | |
| elif setting in ["allow-cheats", "force-gamemode"]: | |
| options = ["true", "false"] | |
| buttons = discord.ui.View(timeout=30) | |
| for value in options: | |
| buttons.add_item(UpdatePropertyButton(setting, value)) | |
| buttons.add_item(CancelButton()) | |
| await interaction.response.send_message(f"🔧 Choose new value for `{setting}`:", view=buttons, ephemeral=True) | |
| elif setting == "max-players": | |
| await interaction.response.send_modal(MaxPlayersModal(setting)) | |
| elif setting == "allow-list": | |
| options = ["true", "false"] | |
| buttons = discord.ui.View(timeout=30) | |
| for value in options: | |
| buttons.add_item(UpdatePropertyButton(setting, value)) | |
| buttons.add_item(CancelButton()) | |
| await interaction.response.send_message(f"🔧 Choose new value for `{setting}`:", view=buttons, ephemeral=True) | |
| else: | |
| await interaction.response.send_message("⚠️ Unknown setting.", ephemeral=True) | |
| class DismissButton(discord.ui.Button): | |
| def __init__(self): | |
| super().__init__(label="Dismiss", style=discord.ButtonStyle.secondary) | |
| async def callback(self, interaction: discord.Interaction): | |
| await interaction.response.edit_message(content="Dismissed. No changes made.", view=None) | |
| class ResetWorldConfirmView(discord.ui.View): | |
| def __init__(self, user_id): | |
| super().__init__(timeout=30) | |
| self.user_id = user_id | |
| @discord.ui.button(label="✅ Confirm Reset", style=discord.ButtonStyle.danger) | |
| async def confirm(self, interaction: discord.Interaction, button: discord.ui.Button): | |
| if interaction.user.id != self.user_id: | |
| await interaction.response.send_message("⛔ Only the requester can confirm.", ephemeral=True) | |
| return | |
| await interaction.response.edit_message(content="⏳ Resetting world...", view=None) | |
| backup_file = create_minecraft_backup(suffix="pre-reset") | |
| stop_minecraft_server() | |
| subprocess.call(["rm", "-rf", WORLD_DIR]) | |
| start_minecraft_server() | |
| await interaction.followup.send( | |
| f"🌍 World reset complete.\n💾 Backup created: `{Path(backup_file).name}` and synced to GDrive.", | |
| ephemeral=True | |
| ) | |
| @discord.ui.button(label="❌ Cancel", style=discord.ButtonStyle.secondary) | |
| async def cancel(self, interaction: discord.Interaction, button: discord.ui.Button): | |
| if interaction.user.id != self.user_id: | |
| await interaction.response.send_message("⛔ Only the requester can cancel.", ephemeral=True) | |
| return | |
| await interaction.response.edit_message(content="World reset cancelled.", view=None) | |
| class LocalDeleteConfirm(discord.ui.View): | |
| """Confirmation view for deleting a local backup.""" | |
| def __init__(self, file): | |
| super().__init__(timeout=30) | |
| self.file = file | |
| @discord.ui.button(label="✅ Confirm Delete", style=discord.ButtonStyle.danger) | |
| async def confirm(self, interaction: discord.Interaction, button: discord.ui.Button): | |
| file_path = Path(f"{BACKUP_DIR}/{self.file}") | |
| if not file_path.exists(): | |
| await interaction.response.edit_message(content=f"❌ Backup `{self.file}` not found.", view=None) | |
| return | |
| try: | |
| file_path.unlink() | |
| await interaction.response.edit_message(content=f"🗑️ Deleted `{self.file}` from local backups.", view=None) | |
| except Exception as e: | |
| await interaction.response.edit_message(content=f"❌ Error deleting `{self.file}`: {e}", view=None) | |
| @discord.ui.button(label="❌ Cancel", style=discord.ButtonStyle.secondary) | |
| async def cancel(self, interaction: discord.Interaction, button: discord.ui.Button): | |
| await interaction.response.edit_message(content="❌ Cancelled delete.", view=None) | |
| class DeleteLocalBackupDropdown(discord.ui.Select): | |
| def __init__(self, files): | |
| self.files = files # Store files for use in callback | |
| options = [discord.SelectOption(label=f, value=f) for f in files] | |
| super().__init__(placeholder="Select local backup to delete", options=options) | |
| async def callback(self, interaction): | |
| file = self.values[0] | |
| await interaction.response.send_message( | |
| f"⚠️ Confirm delete `{file}`?", | |
| view=LocalDeleteConfirm(file), | |
| ephemeral=True | |
| ) | |
| class ListLocalBackupsView(discord.ui.View): | |
| def __init__(self, backups, page=0, per_page=10): | |
| super().__init__(timeout=60) | |
| self.backups = backups | |
| self.page = page | |
| self.per_page = per_page | |
| self.max_page = (len(backups) - 1) // per_page | |
| if self.page > 0: | |
| self.add_item(self.PrevButton(self)) | |
| if self.page < self.max_page: | |
| self.add_item(self.NextButton(self)) | |
| def format_page(self): | |
| start = self.page * self.per_page | |
| end = start + self.per_page | |
| page_backups = self.backups[start:end] | |
| lines = [] | |
| for i, f in enumerate(page_backups, start=1+start): | |
| size_mb = f.stat().st_size / 1024 / 1024 | |
| mtime = datetime.fromtimestamp(f.stat().st_mtime).strftime('%Y-%m-%d %H:%M') | |
| lines.append(f"{i}. {f.name} | {mtime} | {size_mb:.2f} MB") | |
| return f"🗂️ Local backups (Page {self.page+1}/{self.max_page+1}):\n\n" + "\n".join(lines) | |
| class PrevButton(discord.ui.Button): | |
| def __init__(self, view): | |
| super().__init__(label="⬅️ Previous", style=discord.ButtonStyle.secondary) | |
| self.view_ref = view | |
| async def callback(self, interaction: discord.Interaction): | |
| view = ListLocalBackupsView(self.view_ref.backups, self.view_ref.page-1, self.view_ref.per_page) | |
| await interaction.response.edit_message(content=view.format_page(), view=view) | |
| class NextButton(discord.ui.Button): | |
| def __init__(self, view): | |
| super().__init__(label="Next ➡️", style=discord.ButtonStyle.secondary) | |
| self.view_ref = view | |
| async def callback(self, interaction: discord.Interaction): | |
| view = ListLocalBackupsView(self.view_ref.backups, self.view_ref.page+1, self.view_ref.per_page) | |
| await interaction.response.edit_message(content=view.format_page(), view=view) | |
| class ListGDriveBackupsView(discord.ui.View): | |
| def __init__(self, backups, page=0, per_page=10): | |
| super().__init__(timeout=60) | |
| self.backups = backups | |
| self.page = page | |
| self.per_page = per_page | |
| self.max_page = (len(backups) - 1) // per_page | |
| if self.page > 0: | |
| self.add_item(self.PrevButton(self)) | |
| if self.page < self.max_page: | |
| self.add_item(self.NextButton(self)) | |
| def format_page(self): | |
| start = self.page * self.per_page | |
| end = start + self.per_page | |
| page_backups = self.backups[start:end] | |
| lines = [] | |
| for i, f in enumerate(page_backups, start=1+start): | |
| size_mb = f['Size'] / 1024 / 1024 | |
| mtime = datetime.fromtimestamp(f['ModTime']).strftime('%Y-%m-%d %H:%M') | |
| lines.append(f"{i}. {f['Name']} | {mtime} | {size_mb:.2f} MB") | |
| return f"☁️ GDrive backups (Page {self.page+1}/{self.max_page+1}):\n\n" + "\n".join(lines) | |
| class PrevButton(discord.ui.Button): | |
| def __init__(self, view): | |
| super().__init__(label="⬅️ Previous", style=discord.ButtonStyle.secondary) | |
| self.view_ref = view | |
| async def callback(self, interaction: discord.Interaction): | |
| view = ListGDriveBackupsView(self.view_ref.backups, self.view_ref.page-1, self.view_ref.per_page) | |
| await interaction.response.edit_message(content=view.format_page(), view=view) | |
| class NextButton(discord.ui.Button): | |
| def __init__(self, view): | |
| super().__init__(label="Next ➡️", style=discord.ButtonStyle.secondary) | |
| self.view_ref = view | |
| async def callback(self, interaction: discord.Interaction): | |
| view = ListGDriveBackupsView(self.view_ref.backups, self.view_ref.page+1, self.view_ref.per_page) | |
| await interaction.response.edit_message(content=view.format_page(), view=view) | |
| class RestartPromptView(discord.ui.View): | |
| def __init__(self): | |
| super().__init__(timeout=60) | |
| @discord.ui.button(label="Restart Now", style=discord.ButtonStyle.danger) | |
| async def restart_now(self, interaction: discord.Interaction, button: discord.ui.Button): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| restart_minecraft_server() | |
| await interaction.response.edit_message(content="🔄 Minecraft server restarted.", view=None) | |
| @discord.ui.button(label="Restart Later", style=discord.ButtonStyle.secondary) | |
| async def restart_later(self, interaction: discord.Interaction, button: discord.ui.Button): | |
| await interaction.response.edit_message(content="⏳ Restart cancelled. Please remember to restart the server later for allowlist changes to take effect.", view=None) | |
| class XuidProfileView(discord.ui.View): | |
| def __init__(self, player_name, xuid, is_in_allowlist): | |
| super().__init__(timeout=60) | |
| self.player_name = player_name | |
| self.xuid = xuid | |
| self.is_in_allowlist = is_in_allowlist | |
| if is_in_allowlist: | |
| self.add_item(self.RemoveFromAllowlistButton(player_name, xuid)) | |
| else: | |
| self.add_item(self.AddToAllowlistButton(player_name, xuid)) | |
| # Always add cancel button | |
| self.add_item(self.CancelButton()) | |
| class AddToAllowlistButton(discord.ui.Button): | |
| def __init__(self, player_name, xuid): | |
| super().__init__(label="➕ Add to Allowlist", style=discord.ButtonStyle.success) | |
| self.player_name = player_name | |
| self.xuid = xuid | |
| async def callback(self, interaction: discord.Interaction): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| try: | |
| with open(ALLOWLIST_PATH, "r") as f: | |
| allowlist = json.load(f) | |
| except Exception as e: | |
| return await interaction.response.send_message(f"❌ Error reading allowlist: {e}", ephemeral=True) | |
| # Check for duplicate | |
| for entry in allowlist: | |
| if entry.get("xuid") == self.xuid: | |
| return await interaction.response.send_message(f"⚠️ `{self.player_name}` is already in the allowlist.", ephemeral=True) | |
| # Add new entry | |
| allowlist.append({"name": self.player_name, "xuid": self.xuid}) | |
| try: | |
| with open(ALLOWLIST_PATH, "w") as f: | |
| json.dump(allowlist, f, indent=2) | |
| # Create a new view with the updated state (now in allowlist) | |
| updated_view = XuidProfileView(self.player_name, self.xuid, True) | |
| await interaction.response.edit_message(view=updated_view) | |
| # Send the restart prompt as a follow-up | |
| await interaction.followup.send( | |
| f"✅ Added `{self.player_name}` to allowlist!\n\n⚠️ You must restart the Minecraft server for changes to take effect.", | |
| view=RestartPromptView(), ephemeral=True) | |
| except Exception as e: | |
| await interaction.response.send_message(f"❌ Error updating allowlist: {e}", ephemeral=True) | |
| class RemoveFromAllowlistButton(discord.ui.Button): | |
| def __init__(self, player_name, xuid): | |
| super().__init__(label="🗑️ Remove from Allowlist", style=discord.ButtonStyle.danger) | |
| self.player_name = player_name | |
| self.xuid = xuid | |
| async def callback(self, interaction: discord.Interaction): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| try: | |
| with open(ALLOWLIST_PATH, "r") as f: | |
| allowlist = json.load(f) | |
| except Exception as e: | |
| return await interaction.response.send_message(f"❌ Error reading allowlist: {e}", ephemeral=True) | |
| # Remove player | |
| new_allowlist = [entry for entry in allowlist if entry.get("xuid") != self.xuid] | |
| if len(new_allowlist) == len(allowlist): | |
| return await interaction.response.send_message(f"❌ `{self.player_name}` not found in allowlist.", ephemeral=True) | |
| try: | |
| with open(ALLOWLIST_PATH, "w") as f: | |
| json.dump(new_allowlist, f, indent=2) | |
| # Create a new view with the updated state (no longer in allowlist) | |
| updated_view = XuidProfileView(self.player_name, self.xuid, False) | |
| await interaction.response.edit_message(view=updated_view) | |
| # Send the restart prompt as a follow-up | |
| await interaction.followup.send( | |
| f"🗑️ Removed `{self.player_name}` from allowlist!\n\n⚠️ You must restart the Minecraft server for changes to take effect.", | |
| view=RestartPromptView(), ephemeral=True) | |
| except Exception as e: | |
| await interaction.response.send_message(f"❌ Error updating allowlist: {e}", ephemeral=True) | |
| class CancelButton(discord.ui.Button): | |
| def __init__(self): | |
| super().__init__(label="❌ Cancel", style=discord.ButtonStyle.secondary) | |
| async def callback(self, interaction: discord.Interaction): | |
| await interaction.response.edit_message(content="Profile lookup cancelled.", view=None, embed=None) | |
| class AllowlistRemoveButton(discord.ui.Button): | |
| def __init__(self, player_name, xuid): | |
| super().__init__(label=f"Remove {player_name}", style=discord.ButtonStyle.danger) | |
| self.player_name = player_name | |
| self.xuid = xuid | |
| async def callback(self, interaction: discord.Interaction): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| # Load allowlist | |
| try: | |
| with open(ALLOWLIST_PATH, "r") as f: | |
| allowlist = json.load(f) | |
| except Exception as e: | |
| return await interaction.response.send_message(f"❌ Error reading allowlist: {e}", ephemeral=True) | |
| # Remove player | |
| new_allowlist = [entry for entry in allowlist if not (entry.get("name") == self.player_name and entry.get("xuid") == self.xuid)] | |
| if len(new_allowlist) == len(allowlist): | |
| return await interaction.response.send_message(f"❌ Player `{self.player_name}` not found in allowlist.", ephemeral=True) | |
| try: | |
| with open(ALLOWLIST_PATH, "w") as f: | |
| json.dump(new_allowlist, f, indent=2) | |
| # Refresh the allowlist view using the helper function | |
| content, new_view = create_allowlist_view_and_content() | |
| await interaction.response.edit_message(content=content, view=new_view) | |
| await interaction.followup.send( | |
| f"🗑️ Removed `{self.player_name}` from allowlist.\n\n⚠️ You must restart the Minecraft server for changes to take effect.", | |
| view=RestartPromptView(), ephemeral=True) | |
| except Exception as e: | |
| await interaction.response.send_message(f"❌ Error updating allowlist: {e}", ephemeral=True) | |
| class AddAllowlistModal(discord.ui.Modal, title="Add to Allowlist"): | |
| player_name = discord.ui.TextInput(label="Player Name", placeholder="e.g. Steve", required=True) | |
| xuid = discord.ui.TextInput(label="XUID", placeholder="e.g. 1234567890123456", required=True) | |
| async def on_submit(self, interaction: discord.Interaction): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| try: | |
| with open(ALLOWLIST_PATH, "r") as f: | |
| allowlist = json.load(f) | |
| except Exception as e: | |
| return await interaction.response.send_message(f"❌ Error reading allowlist: {e}", ephemeral=True) | |
| # Check for duplicate | |
| for entry in allowlist: | |
| if entry.get("name") == self.player_name.value and entry.get("xuid") == self.xuid.value: | |
| return await interaction.response.send_message(f"⚠️ Player `{self.player_name.value}` with XUID `{self.xuid.value}` is already in the allowlist.", ephemeral=True) | |
| # Add new entry | |
| allowlist.append({"name": self.player_name.value, "xuid": self.xuid.value}) | |
| try: | |
| with open(ALLOWLIST_PATH, "w") as f: | |
| json.dump(allowlist, f, indent=2) | |
| await interaction.response.send_message( | |
| f"✅ Added `{self.player_name.value}` (XUID: `{self.xuid.value}`) to allowlist.\n\n⚠️ You must restart the Minecraft server for changes to take effect.", | |
| view=RestartPromptView(), ephemeral=True) | |
| except Exception as e: | |
| await interaction.response.send_message(f"❌ Error updating allowlist: {e}", ephemeral=True) | |
| class AddAllowlistButton(discord.ui.Button): | |
| def __init__(self): | |
| super().__init__(label="Add", style=discord.ButtonStyle.success) | |
| async def callback(self, interaction: discord.Interaction): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| await interaction.response.send_modal(AddAllowlistModal()) | |
| class AllowlistView(discord.ui.View): | |
| def __init__(self, allowlist): | |
| super().__init__(timeout=60) | |
| for entry in allowlist: | |
| name = entry.get("name", "?") | |
| xuid = entry.get("xuid", "?") | |
| self.add_item(AllowlistRemoveButton(name, xuid)) | |
| self.add_item(AddAllowlistButton()) | |
| class PlayerLogsView(discord.ui.View): | |
| def __init__(self, logs, page=0, per_page=10): | |
| super().__init__(timeout=60) | |
| self.logs = logs | |
| self.page = page | |
| self.per_page = per_page | |
| self.max_page = min(4, (len(logs) - 1) // per_page) # Limit to 5 pages (0-4) | |
| if self.page > 0: | |
| self.add_item(self.PrevButton(self)) | |
| if self.page < self.max_page: | |
| self.add_item(self.NextButton(self)) | |
| def format_page(self): | |
| start = self.page * self.per_page | |
| end = start + self.per_page | |
| page_logs = self.logs[start:end] | |
| content = "\n".join(page_logs) | |
| return f"👥 **Recent Players (Page {self.page+1}/{self.max_page+1}):**\n```text\n{content}\n```" | |
| class PrevButton(discord.ui.Button): | |
| def __init__(self, view): | |
| super().__init__(label="⬅️ Previous", style=discord.ButtonStyle.secondary) | |
| self.view_ref = view | |
| async def callback(self, interaction: discord.Interaction): | |
| view = PlayerLogsView(self.view_ref.logs, self.view_ref.page-1, self.view_ref.per_page) | |
| await interaction.response.edit_message(content=view.format_page(), view=view) | |
| class NextButton(discord.ui.Button): | |
| def __init__(self, view): | |
| super().__init__(label="Next ➡️", style=discord.ButtonStyle.secondary) | |
| self.view_ref = view | |
| async def callback(self, interaction: discord.Interaction): | |
| view = PlayerLogsView(self.view_ref.logs, self.view_ref.page+1, self.view_ref.per_page) | |
| await interaction.response.edit_message(content=view.format_page(), view=view) | |
| class ServerHealthActionsView(discord.ui.View): | |
| """Quick action buttons for server health dashboard.""" | |
| def __init__(self): | |
| super().__init__(timeout=300) # 5 minutes timeout | |
| @discord.ui.button(label="🔄 Restart Server", style=discord.ButtonStyle.danger, row=0) | |
| async def restart_server(self, interaction: discord.Interaction, button: discord.ui.Button): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| await interaction.response.send_message("🔄 Restarting Minecraft server...", ephemeral=True) | |
| restart_minecraft_server() | |
| await interaction.followup.send("✅ Server restart initiated!", ephemeral=True) | |
| @discord.ui.button(label="💾 Backup Now", style=discord.ButtonStyle.primary, row=0) | |
| async def backup_now(self, interaction: discord.Interaction, button: discord.ui.Button): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| await interaction.response.send_message("💾 Creating backup...", ephemeral=True) | |
| try: | |
| backup_file = create_minecraft_backup(suffix="quick-action") | |
| await interaction.followup.send(f"✅ Backup completed: `{Path(backup_file).name}` and synced to GDrive!", ephemeral=True) | |
| except Exception as e: | |
| await interaction.followup.send(f"❌ Backup failed: {str(e)}", ephemeral=True) | |
| @discord.ui.button(label="👥 Player List", style=discord.ButtonStyle.secondary, row=0) | |
| async def player_list(self, interaction: discord.Interaction, button: discord.ui.Button): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| await interaction.response.defer(ephemeral=True) | |
| # Get recent player activity using helper function | |
| log_lines = get_recent_player_activity(20) | |
| if not log_lines: | |
| await interaction.followup.send("👥 No recent player activity found.", ephemeral=True) | |
| return | |
| log_lines.reverse() # Show newest first | |
| content = "\n".join(log_lines[:10]) # Show last 10 | |
| await interaction.followup.send(f"👥 **Recent Player Activity:**\n```text\n{content}\n```", ephemeral=True) | |
| @discord.ui.button(label="📊 Server Dashboard", style=discord.ButtonStyle.success, row=0) | |
| async def server_dashboard(self, interaction: discord.Interaction, button: discord.ui.Button): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| await interaction.response.defer(ephemeral=True) | |
| # Get comprehensive server info (similar to /serverdashboard but condensed) | |
| try: | |
| # Get server status using helper function | |
| mc_status = get_minecraft_server_status() | |
| # Get recent players using helper function | |
| player_list = get_recent_connected_players(3) | |
| # Get allowlist count using helper function | |
| allowlist_count = get_allowlist_count() | |
| # Create condensed dashboard | |
| embed = discord.Embed( | |
| title="📊 Quick Server Dashboard", | |
| color=0x00ff00 if mc_status == "active" else 0xff0000, | |
| timestamp=datetime.now() | |
| ) | |
| embed.add_field(name="🖥️ Status", value=f"```{mc_status.title()}```", inline=True) | |
| embed.add_field(name="📋 Allowlist", value=f"```{allowlist_count} players```", inline=True) | |
| embed.add_field(name="🌍 World", value=f"```{WORLD_NAME}```", inline=True) | |
| if player_list: | |
| players_text = "\n".join([f"• {player}" for player in player_list[-3:]]) | |
| embed.add_field(name="👥 Recent Players", value=f"```{players_text}```", inline=False) | |
| embed.set_footer(text="Use /serverdashboard for full details") | |
| await interaction.followup.send(embed=embed, ephemeral=True) | |
| except Exception as e: | |
| await interaction.followup.send(f"❌ Error getting dashboard: {str(e)}", ephemeral=True) | |
| @discord.ui.button(label="🔍 Detailed Health", style=discord.ButtonStyle.secondary, row=1) | |
| async def detailed_health(self, interaction: discord.Interaction, button: discord.ui.Button): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| await interaction.response.defer(ephemeral=True) | |
| try: | |
| # Get more detailed system info | |
| # Load averages | |
| load_avg_full = subprocess.getoutput("cat /proc/loadavg") | |
| load_1min, load_5min, load_15min = load_avg_full.split()[:3] | |
| # Memory details | |
| memory_details = subprocess.getoutput("free -h | head -2") | |
| # Top processes by CPU | |
| top_cpu = subprocess.getoutput("ps aux --sort=-%cpu | head -6 | tail -5") | |
| # Disk I/O | |
| disk_io = subprocess.getoutput("iostat -d 1 1 | tail -n +4 | head -5") | |
| embed = discord.Embed( | |
| title="🔍 Detailed System Health", | |
| color=0x3498db, | |
| timestamp=datetime.now() | |
| ) | |
| embed.add_field( | |
| name="📈 Load Averages", | |
| value=f"```1min: {load_1min}\n5min: {load_5min}\n15min: {load_15min}```", | |
| inline=True | |
| ) | |
| embed.add_field( | |
| name="🧠 Memory Details", | |
| value=f"```{memory_details}```", | |
| inline=False | |
| ) | |
| if top_cpu.strip(): | |
| embed.add_field( | |
| name="🔥 Top CPU Processes", | |
| value=f"```{top_cpu[:1000]}```", # Limit length | |
| inline=False | |
| ) | |
| embed.set_footer(text="Detailed system analysis") | |
| await interaction.followup.send(embed=embed, ephemeral=True) | |
| except Exception as e: | |
| await interaction.followup.send(f"❌ Error getting detailed health: {str(e)}", ephemeral=True) | |
| @discord.ui.button(label="📝 Server Logs", style=discord.ButtonStyle.secondary, row=1) | |
| async def server_logs(self, interaction: discord.Interaction, button: discord.ui.Button): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| await interaction.response.defer(ephemeral=True) | |
| # Get latest server logs | |
| output = subprocess.getoutput("journalctl -u minecraft-bedrock -n 15 --no-pager") | |
| if len(output) > 1900: # Discord limit | |
| output = output[-1900:] | |
| await interaction.followup.send(f"📜 **Latest Server Logs:**\n```text\n{output}\n```", ephemeral=True) | |
| @discord.ui.button(label="❌ Cancel", style=discord.ButtonStyle.secondary, row=1) | |
| async def cancel_actions(self, interaction: discord.Interaction, button: discord.ui.Button): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| await interaction.response.edit_message(view=None) | |
| await interaction.followup.send("✅ Quick action panel dismissed.", ephemeral=True) | |
| @bot.tree.command(name="allowlist", description="View and manage the Minecraft Bedrock allowlist", guild=discord.Object(id=GUILD_ID)) | |
| async def allowlist(interaction: discord.Interaction): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| content, view = create_allowlist_view_and_content() | |
| await interaction.response.send_message(content, view=view, ephemeral=True) | |
| @bot.tree.command(name="addallowlist", description="Add a player to the Minecraft Bedrock allowlist", guild=discord.Object(id=GUILD_ID)) | |
| @discord.app_commands.describe(name="Player name", xuid="Player XUID") | |
| async def addallowlist(interaction: discord.Interaction, name: str, xuid: str): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| try: | |
| with open(ALLOWLIST_PATH, "r") as f: | |
| allowlist = json.load(f) | |
| except Exception as e: | |
| return await interaction.response.send_message(f"❌ Error reading allowlist: {e}", ephemeral=True) | |
| # Check for duplicate | |
| for entry in allowlist: | |
| if entry.get("name") == name and entry.get("xuid") == xuid: | |
| return await interaction.response.send_message(f"⚠️ Player `{name}` with XUID `{xuid}` is already in the allowlist.", ephemeral=True) | |
| # Add new entry | |
| allowlist.append({"name": name, "xuid": xuid}) | |
| try: | |
| with open(ALLOWLIST_PATH, "w") as f: | |
| json.dump(allowlist, f, indent=2) | |
| await interaction.response.send_message( | |
| f"✅ Added `{name}` (XUID: `{xuid}`) to allowlist.\n\n⚠️ You must restart the Minecraft server for changes to take effect.", | |
| view=RestartPromptView(), ephemeral=True) | |
| except Exception as e: | |
| await interaction.response.send_message(f"❌ Error updating allowlist: {e}", ephemeral=True) | |
| @bot.tree.command(name="viewserverprops", description="View relevant settings from server.properties", guild=discord.Object(id=GUILD_ID)) | |
| async def viewserverprops(interaction: discord.Interaction): | |
| """Show relevant server.properties settings.""" | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| await interaction.response.defer(ephemeral=True) | |
| relevant_keys = { | |
| "difficulty", "allow-cheats", "gamemode", "force-gamemode", | |
| "pvp", "max-players", "spawn-protection", "allow-nether", | |
| "online-mode", "white-list", "allow-list" | |
| } | |
| try: | |
| props = {} | |
| with open(SERVER_PROPERTIES, "r") as f: | |
| for line in f: | |
| line = line.strip() | |
| if line and not line.startswith("#") and "=" in line: | |
| key, value = line.split("=", 1) | |
| if key in relevant_keys: | |
| props[key] = value | |
| if not props: | |
| return await interaction.followup.send("ℹ️ No relevant properties found.") | |
| formatted = "\n".join(f"{k}: {v}" for k, v in props.items()) | |
| await interaction.followup.send( | |
| f"⚙️ Relevant `server.properties`:\n```{formatted}```\nUse the dropdown to update a setting.", | |
| view=ServerPropsView() | |
| ) | |
| except Exception as e: | |
| await interaction.followup.send(f"❌ Error reading file: {e}") | |
| @bot.tree.command(name="start", description="Start Minecraft server", guild=discord.Object(id=GUILD_ID)) | |
| async def start(interaction): | |
| """Start the Minecraft server.""" | |
| start_minecraft_server() | |
| await interaction.response.send_message("🟢 Server starting...", ephemeral=True) | |
| @bot.tree.command(name="stop", description="Stop Minecraft server", guild=discord.Object(id=GUILD_ID)) | |
| async def stop(interaction): | |
| """Stop the Minecraft server.""" | |
| stop_minecraft_server() | |
| await interaction.response.send_message("🔴 Server stopped.", ephemeral=True) | |
| @bot.tree.command(name="restart", description="Restart Minecraft server", guild=discord.Object(id=GUILD_ID)) | |
| async def restart(interaction): | |
| """Restart the Minecraft server.""" | |
| restart_minecraft_server() | |
| await interaction.response.send_message("🔁 Server restarting...", ephemeral=True) | |
| @bot.tree.command(name="status", description="Server status", guild=discord.Object(id=GUILD_ID)) | |
| async def status(interaction): | |
| """Show Minecraft server status.""" | |
| status = get_minecraft_server_status() | |
| await interaction.response.send_message(f"📡 Server status: `{status}`", ephemeral=True) | |
| @bot.tree.command(name="logs", description="Latest logs", guild=discord.Object(id=GUILD_ID)) | |
| async def logs(interaction): | |
| """Show latest server logs.""" | |
| await interaction.response.defer(ephemeral=True) | |
| output = subprocess.getoutput("journalctl -u minecraft-bedrock -n 10 --no-pager") | |
| await interaction.followup.send(f"📜 Logs:\n```text\n{output}\n```", ephemeral=True) | |
| @bot.tree.command(name="players", description="Show recent player connections and activity with pagination", guild=discord.Object(id=GUILD_ID)) | |
| async def players(interaction): | |
| """Show recent players and connection activity with pagination.""" | |
| await interaction.response.defer(ephemeral=True) | |
| log_lines = get_recent_player_activity(50) | |
| if not log_lines: | |
| await interaction.followup.send("👥 No recent player activity found.", ephemeral=True) | |
| return | |
| log_lines.reverse() | |
| view = PlayerLogsView(log_lines) | |
| await interaction.followup.send(view.format_page(), view=view, ephemeral=True) | |
| @bot.tree.command(name="backup", description="Trigger backup", guild=discord.Object(id=GUILD_ID)) | |
| @app_commands.describe(suffix="Optional suffix to append to the backup filename") | |
| async def backup(interaction, suffix: str = ""): | |
| """Trigger a backup with an optional suffix.""" | |
| await interaction.response.defer(ephemeral=True) | |
| backup_file = create_minecraft_backup(suffix) | |
| await interaction.followup.send(f"💾 Backup completed: `{Path(backup_file).name}` and synced to GDrive!", ephemeral=True) | |
| @bot.tree.command(name="listlocalbackups", description="Show a list of all local Minecraft world backups", guild=discord.Object(id=GUILD_ID)) | |
| async def listlocalbackups(interaction): | |
| """List local backups with pagination, date, and size.""" | |
| backups = sorted(Path(BACKUP_DIR).glob("*.zip"), reverse=True) | |
| if not backups: | |
| await interaction.response.send_message("❌ No local backups found.", ephemeral=True) | |
| return | |
| view = ListLocalBackupsView(backups) | |
| await interaction.response.send_message(view.format_page(), view=view, ephemeral=True) | |
| @bot.tree.command(name="listgdrivebackups", description="Show a list of all Minecraft world backups on Google Drive", guild=discord.Object(id=GUILD_ID)) | |
| async def listgdrivebackups(interaction): | |
| """List GDrive backups with pagination, date, and size.""" | |
| await interaction.response.defer(ephemeral=True) | |
| output = subprocess.getoutput(f"rclone lsjson {RCLONE_REMOTE}") | |
| try: | |
| files = [f for f in json.loads(output) if f['Name'].endswith('.zip')] | |
| # Convert ModTime to timestamp | |
| for f in files: | |
| f['ModTime'] = int(datetime.fromisoformat(f['ModTime'].replace('Z', '+00:00')).timestamp()) | |
| files = sorted(files, key=lambda f: f['ModTime'], reverse=True) | |
| if not files: | |
| await interaction.followup.send("❌ No GDrive backups found.") | |
| return | |
| view = ListGDriveBackupsView(files) | |
| await interaction.followup.send(view.format_page(), view=view, ephemeral=True) | |
| except Exception as e: | |
| await interaction.followup.send(f"❌ Error reading GDrive backups: {e}") | |
| @bot.tree.command(name="deletegdrivebackup", description="Delete a Minecraft world backup from Google Drive", guild=discord.Object(id=GUILD_ID)) | |
| async def deletegdrivebackup(interaction): | |
| """Delete a GDrive backup.""" | |
| await interaction.response.defer(ephemeral=True) | |
| output = subprocess.getoutput(f"rclone ls {RCLONE_REMOTE}") | |
| files = [line.split(maxsplit=1)[1].strip() for line in output.splitlines() if line.endswith(".zip")] | |
| files = sorted(files, reverse=True)[:25] | |
| if not files: | |
| await interaction.followup.send("❌ No GDrive backups found.") | |
| else: | |
| await interaction.followup.send("🗑️ Select a GDrive backup to delete:", view=GDriveDeleteView(files)) | |
| @bot.tree.command(name="restorelocalbackup", description="Restore a local Minecraft world backup", guild=discord.Object(id=GUILD_ID)) | |
| async def restorelocalbackup(interaction): | |
| """Restore a local backup.""" | |
| backups = sorted(Path(BACKUP_DIR).glob("*.zip"), reverse=True)[:25] | |
| if not backups: | |
| await interaction.response.send_message("❌ No local backups found.", ephemeral=True) | |
| return | |
| await interaction.response.send_message("📂 Select a local backup to restore:", view=RestoreDropdownView(backups), ephemeral=True) | |
| @bot.tree.command(name="restoregdrivebackup", description="Restore a Minecraft world backup from Google Drive", guild=discord.Object(id=GUILD_ID)) | |
| async def restoregdrivebackup(interaction): | |
| """Restore a backup from GDrive.""" | |
| await interaction.response.defer(ephemeral=True) | |
| output = subprocess.getoutput(f"rclone ls {RCLONE_REMOTE}") | |
| files = [line.split(maxsplit=1)[1].strip() for line in output.splitlines() if line.endswith(".zip")] | |
| files = sorted(files, reverse=True)[:25] | |
| if not files: | |
| await interaction.followup.send("❌ No GDrive backups found.") | |
| return | |
| await interaction.followup.send("☁️ Select a GDrive backup to restore:", view=GDriveRestoreView(files)) | |
| @bot.tree.command(name="resetworld", description="Reset the Minecraft world (auto-backup)", guild=discord.Object(id=GUILD_ID)) | |
| async def resetworld(interaction: discord.Interaction): | |
| """Reset the Minecraft world and create a backup (with confirmation).""" | |
| if interaction.user.id not in ALLOWED_USERS: | |
| await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| return | |
| await interaction.response.send_message( | |
| "⚠️ Are you sure you want to reset the world? This will delete the current world after backing it up.", | |
| view=ResetWorldConfirmView(interaction.user.id), | |
| ephemeral=True | |
| ) | |
| @bot.tree.command(name="nbtinfo", description="Show current NBT settings", guild=discord.Object(id=GUILD_ID)) | |
| async def nbtinfo(interaction: discord.Interaction): | |
| """Show current NBT settings from level.dat.""" | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| await interaction.response.defer(ephemeral=True) | |
| tags = read_nbt_tags() | |
| if "error" in tags: | |
| await interaction.followup.send(f"❌ Error: {tags['error']}", ephemeral=True) | |
| return | |
| text = "\n".join(f"{k}: {v}" for k, v in tags.items()) | |
| await interaction.followup.send(f"📘 level.dat:\n```text\n{text}\n```", ephemeral=True) | |
| @bot.tree.command(name="enablecheats", description="Enable or disable cheats (NBT & server.properties)", guild=discord.Object(id=GUILD_ID)) | |
| @app_commands.describe(state="True to enable, False to disable") | |
| async def enablecheats(interaction: discord.Interaction, state: bool): | |
| """Enable or disable cheats.""" | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| await interaction.response.defer(ephemeral=True) | |
| update_server_properties({"allow-cheats": "true" if state else "false"}) | |
| restart_minecraft_server() | |
| await interaction.followup.send(f"✅ Cheats {'enabled' if state else 'disabled'} and server restarted.", ephemeral=True) | |
| @bot.tree.command(name="setgamemode", description="Set game mode (NBT & server.properties)", guild=discord.Object(id=GUILD_ID)) | |
| @app_commands.describe(mode="survival, creative, adventure, spectator") | |
| async def setgamemode(interaction: discord.Interaction, mode: str): | |
| """Set the game mode.""" | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| valid_modes = ["survival", "creative", "adventure", "spectator"] | |
| if mode not in valid_modes: | |
| return await interaction.response.send_message("❌ Invalid mode.", ephemeral=True) | |
| await interaction.response.defer(ephemeral=True) | |
| update_server_properties({"gamemode": mode}) | |
| restart_minecraft_server() | |
| await interaction.followup.send(f"✅ Game mode set to `{mode}` and server restarted.", ephemeral=True) | |
| @bot.tree.command(name="setdifficulty", description="Set difficulty (NBT & server.properties)", guild=discord.Object(id=GUILD_ID)) | |
| @app_commands.describe(level="peaceful, easy, normal, hard") | |
| async def setdifficulty(interaction: discord.Interaction, level: str): | |
| """Set the difficulty.""" | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| valid_levels = ["peaceful", "easy", "normal", "hard"] | |
| if level not in valid_levels: | |
| return await interaction.response.send_message("❌ Invalid difficulty.", ephemeral=True) | |
| await interaction.response.defer(ephemeral=True) | |
| update_server_properties({"difficulty": level}) | |
| restart_minecraft_server() | |
| await interaction.followup.send(f"✅ Difficulty set to `{level}` and server restarted.", ephemeral=True) | |
| @bot.tree.command(name="reenable_achievements", description="Reset world to enable achievements", guild=discord.Object(id=GUILD_ID)) | |
| async def reenable_achievements(interaction: discord.Interaction): | |
| """Reset world to enable achievements (disables cheats, sets survival, etc).""" | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| await interaction.response.defer(ephemeral=True) | |
| # Backup before making changes | |
| backup_file = create_minecraft_backup(suffix="reenable-achievements") | |
| await interaction.followup.send(f"💾 Backup created: `{Path(backup_file).name}` and synced to GDrive! Proceeding to enable achievements...", ephemeral=True) | |
| result = update_nbt_tags({ | |
| "cheatsEnabled": False, | |
| "commandsEnabled": False, | |
| "ForceGameType": False, | |
| "hasBeenLoadedInCreative": False, | |
| "GameType": 0 # Survival | |
| }) | |
| if result is not True: | |
| return await interaction.followup.send(f"❌ Error: {result}") | |
| stop_minecraft_server() | |
| start_minecraft_server() | |
| await interaction.followup.send("✅ level.dat reset for achievements. Server restarted.", ephemeral=True) | |
| @bot.tree.command(name="deletelocalbackup", description="Delete a local Minecraft world backup", guild=discord.Object(id=GUILD_ID)) | |
| async def deletelocalbackup(interaction): | |
| """Delete a local backup.""" | |
| files = sorted([f.name for f in Path(BACKUP_DIR).glob("*.zip")], reverse=True)[:25] | |
| if not files: | |
| await interaction.response.send_message("❌ No local backups found.", ephemeral=True) | |
| else: | |
| view = discord.ui.View(timeout=30) | |
| view.add_item(DeleteLocalBackupDropdown(files)) | |
| await interaction.response.send_message("🗑️ Select a local backup to delete:", view=view, ephemeral=True) | |
| @bot.tree.command(name="downloadleveldat", description="Download the current level.dat file", guild=discord.Object(id=GUILD_ID)) | |
| async def downloadleveldat(interaction: discord.Interaction): | |
| """Send the current level.dat file as an ephemeral attachment (no backup).""" | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| try: | |
| if not Path(NBT_PATH).exists(): | |
| return await interaction.response.send_message("❌ level.dat not found.", ephemeral=True) | |
| file = discord.File(NBT_PATH, filename="level.dat") | |
| await interaction.response.send_message( | |
| content="📥 Here is the current level.dat file.", | |
| file=file, | |
| ephemeral=True | |
| ) | |
| except Exception as e: | |
| await interaction.response.send_message(f"❌ Error sending file: {e}", ephemeral=True) | |
| @bot.tree.command(name="restoreleveldat", description="Restore level.dat from a file upload", guild=discord.Object(id=GUILD_ID)) | |
| @app_commands.describe(file="Upload a new level.dat file") | |
| async def restoreleveldat(interaction: discord.Interaction, file: discord.Attachment): | |
| """Backup and restore level.dat from an uploaded file.""" | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| try: | |
| # Backup first | |
| backup_file = create_minecraft_backup(suffix="leveldat-restore") | |
| # Download the uploaded file | |
| data = await file.read() | |
| with open(NBT_PATH, "wb") as f: | |
| f.write(data) | |
| await interaction.response.send_message(f"✅ Restored level.dat from upload.\n💾 Backup created: `{Path(backup_file).name}` and synced to GDrive!", ephemeral=True) | |
| except Exception as e: | |
| await interaction.response.send_message(f"❌ Error restoring level.dat: {e}", ephemeral=True) | |
| @bot.tree.command(name="startmcbroadcast", description="Start MCXboxBroadcastStandalone.jar in tmux", guild=discord.Object(id=GUILD_ID)) | |
| async def startmcbroadcast(interaction: discord.Interaction): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| # Check if session exists | |
| check = subprocess.call(["tmux", "has-session", "-t", MCBROADCAST_SESSION], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| if check == 0: | |
| return await interaction.response.send_message("🟢 MCXboxBroadcast is already running.", ephemeral=True) | |
| # Start the session in MINECRAFT_BASE directory using the variable | |
| cmd = f"tmux new-session -d -s {MCBROADCAST_SESSION} 'cd {MINECRAFT_BASE} && java -jar {MCBROADCAST_JAR}'" | |
| subprocess.getoutput(cmd) | |
| await interaction.response.send_message("▶️ MCXboxBroadcast started in tmux.", ephemeral=True) | |
| @bot.tree.command(name="stopmcbroadcast", description="Stop MCXboxBroadcastStandalone.jar (kill tmux session)", guild=discord.Object(id=GUILD_ID)) | |
| async def stopmcbroadcast(interaction: discord.Interaction): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| check = subprocess.call(["tmux", "has-session", "-t", MCBROADCAST_SESSION], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| if check != 0: | |
| return await interaction.response.send_message("🛑 MCXboxBroadcast is not running.", ephemeral=True) | |
| subprocess.call(["tmux", "kill-session", "-t", MCBROADCAST_SESSION]) | |
| await interaction.response.send_message("⏹️ MCXboxBroadcast stopped.", ephemeral=True) | |
| @bot.tree.command(name="statusmcbroadcast", description="Status of MCXboxBroadcastStandalone.jar (tmux)", guild=discord.Object(id=GUILD_ID)) | |
| async def statusmcbroadcast(interaction: discord.Interaction): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| check = subprocess.call(["tmux", "has-session", "-t", MCBROADCAST_SESSION], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| if check == 0: | |
| await interaction.response.send_message("🟢 MCXboxBroadcast is running.", ephemeral=True) | |
| else: | |
| await interaction.response.send_message("🔴 MCXboxBroadcast is not running.", ephemeral=True) | |
| @bot.tree.command(name="logsmcbroadcast", description="Show last 20 lines of MCXboxBroadcast log", guild=discord.Object(id=GUILD_ID)) | |
| async def logsmcbroadcast(interaction: discord.Interaction): | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| try: | |
| output = subprocess.getoutput(f"tail -n 20 {MCBROADCAST_LOG}") | |
| if not output: | |
| output = "(No log output)" | |
| await interaction.response.send_message(f"📜 MCXboxBroadcast log:\n```text\n{output}\n```", ephemeral=True) | |
| except Exception as e: | |
| await interaction.response.send_message(f"❌ Error reading log: {e}", ephemeral=True) | |
| @bot.tree.command(name="getxuid", description="Get XUID from Xbox gamertag using xbl.io API", guild=discord.Object(id=GUILD_ID)) | |
| @app_commands.describe(gamertag="Xbox gamertag (e.g. PlayerName123)") | |
| async def getxuid(interaction: discord.Interaction, gamertag: str): | |
| """Get XUID from Xbox gamertag using xbl.io API.""" | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| if not XBL_API_KEY: | |
| return await interaction.response.send_message("❌ XBL API key not configured. Please add XBL_API_KEY to .env file.", ephemeral=True) | |
| await interaction.response.defer(ephemeral=True) | |
| try: | |
| # Use xbl.io API to get XUID | |
| api_url = f"https://xbl.io/api/v2/search/{gamertag.strip()}" | |
| headers = { | |
| "accept": "*/*", | |
| "x-authorization": XBL_API_KEY | |
| } | |
| response = requests.get(api_url, headers=headers, timeout=10) | |
| if response.status_code == 200: | |
| try: | |
| data = response.json() | |
| # Check if we got results | |
| if isinstance(data, dict) and 'people' in data and data['people']: | |
| player = data['people'][0] # Get first result | |
| xuid = str(player.get('xuid', '')) | |
| if xuid and xuid.isdigit(): | |
| # Extract player information | |
| display_name = player.get('modernGamertag', player.get('gamertag', gamertag)) | |
| gamertag_suffix = player.get('modernGamertagSuffix', '') | |
| unique_gamertag = player.get('uniqueModernGamertag', '') | |
| # Use unique gamertag if available, otherwise construct it | |
| if unique_gamertag: | |
| full_gamertag = unique_gamertag | |
| elif gamertag_suffix: | |
| full_gamertag = f"{display_name}#{gamertag_suffix}" | |
| else: | |
| full_gamertag = display_name | |
| gamer_score = player.get('gamerScore', 'Unknown') | |
| reputation = player.get('xboxOneRep', 'Unknown') | |
| # Extract data from detail object | |
| detail = player.get('detail', {}) | |
| account_tier = detail.get('accountTier', 'Unknown') | |
| follower_count = detail.get('followerCount', 0) | |
| following_count = detail.get('followingCount', 0) | |
| has_gamepass = detail.get('hasGamePass', False) | |
| profile_pic = player.get('displayPicRaw', '') | |
| # Get preferred colors | |
| preferred_color = player.get('preferredColor', {}) | |
| primary_color = preferred_color.get('primaryColor', '0099ff') | |
| # Check if player is already in allowlist | |
| is_in_allowlist = False | |
| try: | |
| with open(ALLOWLIST_PATH, "r") as f: | |
| allowlist = json.load(f) | |
| for entry in allowlist: | |
| if entry.get("xuid") == xuid: | |
| is_in_allowlist = True | |
| break | |
| except: | |
| pass # File doesn't exist or error reading | |
| # Create Discord embed | |
| embed = discord.Embed( | |
| title=f"🎮 Xbox Profile: {full_gamertag}", | |
| color=int(primary_color, 16) if primary_color else 0x0099ff | |
| ) | |
| # Add profile picture as thumbnail | |
| if profile_pic: | |
| embed.set_thumbnail(url=profile_pic) | |
| # Add basic info | |
| embed.add_field(name="🆔 XUID", value=f"`{xuid}`", inline=True) | |
| embed.add_field(name="🏆 Gamer Score", value=gamer_score, inline=True) | |
| embed.add_field(name="⭐ Reputation", value=reputation, inline=True) | |
| # Add account details | |
| embed.add_field(name="💳 Account Tier", value=account_tier, inline=True) | |
| embed.add_field(name="👥 Followers", value=follower_count, inline=True) | |
| embed.add_field(name="➕ Following", value=following_count, inline=True) | |
| # Add Game Pass status | |
| gamepass_status = "✅ Yes" if has_gamepass else "❌ No" | |
| embed.add_field(name="🎮 Game Pass", value=gamepass_status, inline=True) | |
| # Add allowlist status | |
| allowlist_status = "✅ In Allowlist" if is_in_allowlist else "❌ Not in Allowlist" | |
| embed.add_field(name="📋 Allowlist Status", value=allowlist_status, inline=True) | |
| # Create view with appropriate button | |
| view = XuidProfileView(display_name, xuid, is_in_allowlist) | |
| await interaction.followup.send(embed=embed, view=view) | |
| return | |
| await interaction.followup.send(f"❌ No player found for gamertag `{gamertag}`. Make sure the gamertag is correct.") | |
| return | |
| except json.JSONDecodeError: | |
| await interaction.followup.send(f"❌ Invalid response from API for `{gamertag}`.") | |
| return | |
| elif response.status_code == 401: | |
| await interaction.followup.send("❌ Invalid API key. Please check XBL_API_KEY in .env file.") | |
| return | |
| elif response.status_code == 404: | |
| await interaction.followup.send(f"❌ Player `{gamertag}` not found.") | |
| return | |
| else: | |
| await interaction.followup.send(f"❌ API error (status {response.status_code}) while looking up `{gamertag}`.") | |
| return | |
| except requests.RequestException as e: | |
| await interaction.followup.send( | |
| f"❌ Network error while looking up `{gamertag}`: {str(e)}\n\n" | |
| f"**Manual alternatives:**\n" | |
| f"• Ask the player for their XUID directly\n" | |
| f"• Use online Xbox XUID lookup tools\n" | |
| f"• Check their Xbox profile URL (contains XUID)" | |
| ) | |
| except Exception as e: | |
| await interaction.followup.send(f"❌ Error looking up XUID: {str(e)}") | |
| @bot.tree.command(name="serverhealth", description="Show server health: CPU, RAM, disk space, uptime", guild=discord.Object(id=GUILD_ID)) | |
| async def serverhealth(interaction: discord.Interaction): | |
| """Show comprehensive server health information.""" | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| await interaction.response.defer(ephemeral=True) | |
| try: | |
| # Get system uptime | |
| uptime_output = subprocess.getoutput("uptime -p").replace("up ", "") | |
| # Get CPU usage (1-minute load average) | |
| load_avg = subprocess.getoutput("cat /proc/loadavg").split()[0] | |
| cpu_cores = subprocess.getoutput("nproc") | |
| try: | |
| cpu_percent = round((float(load_avg) / int(cpu_cores)) * 100, 1) | |
| cpu_status = f"{cpu_percent}% ({load_avg} load)" | |
| except: | |
| cpu_status = f"{load_avg} load" | |
| # Get RAM usage | |
| memory_info = subprocess.getoutput("free -h | grep '^Mem:'").split() | |
| ram_total = memory_info[1] | |
| ram_used = memory_info[2] | |
| ram_available = memory_info[6] | |
| # Get disk usage for root filesystem | |
| disk_info = subprocess.getoutput("df -h / | tail -1").split() | |
| disk_total = disk_info[1] | |
| disk_used = disk_info[2] | |
| disk_available = disk_info[3] | |
| disk_percent = disk_info[4] | |
| # Get Minecraft server status using helper functions | |
| mc_status = get_minecraft_server_status() | |
| mc_uptime = get_minecraft_server_uptime() if mc_status == "active" else "Not running" | |
| mc_memory = "Unknown" | |
| if mc_status == "active": | |
| # Get Minecraft process memory usage | |
| mc_pid = subprocess.getoutput("pgrep -f bedrock_server") | |
| if mc_pid and mc_pid.strip(): | |
| # Strip whitespace and take only the first PID if multiple | |
| mc_pid = mc_pid.strip().split('\n')[0] | |
| if mc_pid.isdigit(): | |
| mc_memory = subprocess.getoutput(f"ps -p {mc_pid} -o rss= | awk '{{printf \"%.1f MB\", $1/1024}}'") | |
| else: | |
| mc_memory = "Unknown" | |
| # Get Discord Bot status | |
| bot_uptime = "Unknown" | |
| bot_memory = "Unknown" | |
| # Calculate bot uptime from when it started | |
| if hasattr(bot, '_start_time'): | |
| uptime_delta = datetime.now() - bot._start_time | |
| days = uptime_delta.days | |
| hours, remainder = divmod(uptime_delta.seconds, 3600) | |
| minutes, _ = divmod(remainder, 60) | |
| if days > 0: | |
| bot_uptime = f"{days}d {hours}h {minutes}m" | |
| else: | |
| bot_uptime = f"{hours}h {minutes}m" | |
| # Get bot process memory usage using ps command | |
| try: | |
| # Get current script name dynamically | |
| script_name = os.path.basename(__file__) | |
| bot_pid = subprocess.getoutput(f"pgrep -f {script_name}") | |
| if bot_pid and bot_pid.strip(): | |
| bot_pid = bot_pid.strip().split('\n')[0] | |
| if bot_pid.isdigit(): | |
| bot_memory = subprocess.getoutput(f"ps -p {bot_pid} -o rss= | awk '{{printf \"%.1f MB\", $1/1024}}'") | |
| except: | |
| bot_memory = "Unknown" | |
| # Get MCXboxBroadcast status | |
| mcb_status = "Stopped" | |
| mcb_uptime = "Not running" | |
| mcb_memory = "N/A" | |
| # Check if MCXboxBroadcast tmux session exists | |
| check_mcb = subprocess.call(["tmux", "has-session", "-t", MCBROADCAST_SESSION], | |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| if check_mcb == 0: | |
| mcb_status = "Running" | |
| # Get MCXboxBroadcast process info | |
| try: | |
| # Find Java process running MCXboxBroadcast | |
| mcb_pid = subprocess.getoutput("pgrep -f MCXboxBroadcastStandalone.jar") | |
| if mcb_pid and mcb_pid.strip(): | |
| mcb_pid = mcb_pid.strip().split('\n')[0] | |
| if mcb_pid.isdigit(): | |
| # Get memory usage | |
| mcb_memory = subprocess.getoutput(f"ps -p {mcb_pid} -o rss= | awk '{{printf \"%.1f MB\", $1/1024}}'") | |
| # Get uptime from process start time | |
| ps_output = subprocess.getoutput(f"ps -p {mcb_pid} -o lstart=") | |
| if ps_output: | |
| try: | |
| start_time = dateutil.parser.parse(ps_output) | |
| uptime_delta = datetime.now(start_time.tzinfo) - start_time | |
| days = uptime_delta.days | |
| hours, remainder = divmod(uptime_delta.seconds, 3600) | |
| minutes, _ = divmod(remainder, 60) | |
| if days > 0: | |
| mcb_uptime = f"{days}d {hours}h {minutes}m" | |
| else: | |
| mcb_uptime = f"{hours}h {minutes}m" | |
| except: | |
| mcb_uptime = "Unknown" | |
| except: | |
| mcb_uptime = "Unknown" | |
| mcb_memory = "Unknown" | |
| # Determine status colors | |
| def get_status_color(percent_str): | |
| try: | |
| percent = float(percent_str.replace('%', '')) | |
| if percent < 70: | |
| return "🟢" | |
| elif percent < 85: | |
| return "🟡" | |
| else: | |
| return "🔴" | |
| except: | |
| return "⚪" | |
| def get_cpu_color(percent): | |
| try: | |
| if percent < 50: | |
| return "🟢" | |
| elif percent < 80: | |
| return "🟡" | |
| else: | |
| return "🔴" | |
| except: | |
| return "⚪" | |
| # Create embed | |
| embed = discord.Embed( | |
| title="🖥️ Server Health Dashboard", | |
| color=0x00ff00 if mc_status == "active" else 0xff0000, | |
| timestamp=datetime.now() | |
| ) | |
| # System Information | |
| embed.add_field( | |
| name="⏱️ System Uptime", | |
| value=f"```{uptime_output}```", | |
| inline=False | |
| ) | |
| embed.add_field( | |
| name=f"{get_cpu_color(cpu_percent if 'cpu_percent' in locals() else 0)} CPU Usage", | |
| value=f"```{cpu_status}```", | |
| inline=True | |
| ) | |
| embed.add_field( | |
| name=f"🧠 Memory (RAM)", | |
| value=f"```Used: {ram_used}/{ram_total}\nFree: {ram_available}```", | |
| inline=True | |
| ) | |
| embed.add_field( | |
| name=f"{get_status_color(disk_percent)} Disk Space", | |
| value=f"```Used: {disk_used}/{disk_total} ({disk_percent})\nFree: {disk_available}```", | |
| inline=True | |
| ) | |
| # Minecraft Server Information | |
| mc_status_emoji = "🟢" if mc_status == "active" else "🔴" | |
| embed.add_field( | |
| name=f"{mc_status_emoji} Minecraft Server", | |
| value=f"```Status: {mc_status.title()}\nUptime: {mc_uptime}\nMemory: {mc_memory}```", | |
| inline=False | |
| ) | |
| # Discord Bot Information | |
| bot_status_emoji = "🟢" if hasattr(bot, 'is_ready') and bot.is_ready() else "🔴" | |
| embed.add_field( | |
| name=f"{bot_status_emoji} Discord Bot", | |
| value=f"```Status: {'Online' if hasattr(bot, 'is_ready') and bot.is_ready() else 'Offline'}\nUptime: {bot_uptime}\nMemory: {bot_memory}```", | |
| inline=True | |
| ) | |
| # MCXboxBroadcast Information | |
| mcb_status_emoji = "🟢" if mcb_status == "Running" else "🔴" | |
| embed.add_field( | |
| name=f"{mcb_status_emoji} MCXboxBroadcast", | |
| value=f"```Status: {mcb_status}\nUptime: {mcb_uptime}\nMemory: {mcb_memory}```", | |
| inline=True | |
| ) | |
| # Add footer | |
| embed.set_footer(text="Live system information") | |
| # Create view with quick action buttons | |
| view = ServerHealthActionsView() | |
| await interaction.followup.send(embed=embed, view=view) | |
| except Exception as e: | |
| await interaction.followup.send(f"❌ Error getting server health: {str(e)}") | |
| @bot.tree.command(name="serverdashboard", description="Complete server overview: status, players, uptime, settings", guild=discord.Object(id=GUILD_ID)) | |
| async def serverdashboard(interaction: discord.Interaction): | |
| """Show comprehensive Minecraft server dashboard.""" | |
| if interaction.user.id not in ALLOWED_USERS: | |
| return await interaction.response.send_message("⛔ Not allowed.", ephemeral=True) | |
| await interaction.response.defer(ephemeral=True) | |
| try: | |
| # Get server status and uptime using helper functions | |
| mc_status = get_minecraft_server_status() | |
| mc_uptime = get_minecraft_server_uptime() if mc_status == "active" else "Not running" | |
| # Get recent players using helper function | |
| player_list = get_recent_connected_players(5)[-3:] # Last 3 players | |
| # Get last backup info using helper function | |
| last_backup = get_latest_backup_info() | |
| # Get server properties using helper function | |
| server_props = get_server_properties(["gamemode", "difficulty", "max-players", "allow-cheats"]) | |
| # Get allowlist count using helper function | |
| allowlist_count = get_allowlist_count() | |
| # Create embed | |
| status_emoji = "🟢" if mc_status == "active" else "🔴" | |
| embed = discord.Embed( | |
| title=f"{status_emoji} Minecraft Server Dashboard", | |
| color=0x00ff00 if mc_status == "active" else 0xff0000, | |
| timestamp=datetime.now() | |
| ) | |
| # Server Status Section | |
| embed.add_field( | |
| name="📊 Server Status", | |
| value=f"```Status: {mc_status.title()}\nUptime: {mc_uptime}\nLast Backup: {last_backup}```", | |
| inline=False | |
| ) | |
| # Recent Players Section | |
| if player_list: | |
| players_text = "\n".join([f"• {player}" for player in player_list]) | |
| else: | |
| players_text = "No recent connections" | |
| embed.add_field( | |
| name="👥 Recent Players", | |
| value=f"```{players_text}```", | |
| inline=True | |
| ) | |
| # Server Settings Section | |
| settings_text = [] | |
| for key, value in server_props.items(): | |
| display_key = key.replace("-", " ").title() | |
| settings_text.append(f"{display_key}: {value}") | |
| if settings_text: | |
| settings_display = "\n".join(settings_text) | |
| else: | |
| settings_display = "Settings unavailable" | |
| embed.add_field( | |
| name="⚙️ Settings", | |
| value=f"```{settings_display}```", | |
| inline=True | |
| ) | |
| # Quick Stats Section | |
| embed.add_field( | |
| name="📋 Quick Stats", | |
| value=f"```Allowlist: {allowlist_count} players\nWorld: {WORLD_NAME}\nServer: Bedrock```", | |
| inline=True | |
| ) | |
| # Add footer with quick actions | |
| embed.set_footer(text="💡 Use /start, /stop, /restart, /backup for quick actions") | |
| await interaction.followup.send(embed=embed) | |
| except Exception as e: | |
| await interaction.followup.send(f"❌ Error getting server dashboard: {str(e)}") | |
| # ===================== Legacy ! Commands ===================== | |
| @bot.command(name="ping", help="Check latency") | |
| @commands.has_permissions(administrator=True) | |
| async def ping(ctx): | |
| """Check bot latency.""" | |
| await ctx.send(f"🏓 Pong! {round(bot.latency * 1000)}ms") | |
| @bot.command(name="status", help="Check Minecraft server status") | |
| @commands.has_permissions(administrator=True) | |
| async def status_cmd(ctx): | |
| """Show Minecraft server status.""" | |
| result = get_minecraft_server_status() | |
| await ctx.send(f"📡 Server status: `{result}`") | |
| @bot.command(name="helpme", help="List all ! commands") | |
| @commands.has_permissions(administrator=True) | |
| async def helpme(ctx): | |
| """List all legacy ! commands.""" | |
| cmd_list = "\n".join(f"`!{cmd.name}` – {cmd.help}" for cmd in bot.commands) | |
| await ctx.send(f"📘 Available `!` Commands:\n{cmd_list}") | |
| # ===================== Bot Events ===================== | |
| @bot.event | |
| async def on_ready(): | |
| """Event: Bot is ready.""" | |
| bot._start_time = datetime.now() # Track bot start time for uptime calculation | |
| print(f"✅ Logged in as {bot.user}") | |
| try: | |
| synced = await bot.tree.sync(guild=discord.Object(id=GUILD_ID)) | |
| print(f"🔁 Synced {len(synced)} slash commands") | |
| except Exception as e: | |
| print(f"❌ Sync failed: {e}") | |
| @bot.event | |
| async def on_command_error(ctx, error): | |
| """Gracefully handle unknown ! commands.""" | |
| if isinstance(error, commands.CommandNotFound): | |
| await ctx.send("❓ Command not found. Type !helpme for a list of commands.") | |
| else: | |
| raise error | |
| # ===================== Run Bot ===================== | |
| bot.run(TOKEN) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment