Created
September 24, 2025 21:41
-
-
Save koaning/1f6b3a2c8ae3c8edefa3c850f82abb62 to your computer and use it in GitHub Desktop.
marimo notebook with early R2 SQL support over REST
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.12" | |
| # dependencies = [ | |
| # "marimo", | |
| # "polars[pyarrow]==1.33.1", | |
| # "python-dotenv==1.1.1", | |
| # "requests==2.32.5", | |
| # "sqlglot==27.17.0", | |
| # ] | |
| # /// | |
| import marimo | |
| __generated_with = "0.16.2" | |
| app = marimo.App() | |
| @app.cell | |
| def _(): | |
| import os | |
| import marimo as mo | |
| from dotenv import load_dotenv | |
| load_dotenv('.env') | |
| return mo, os | |
| @app.cell | |
| def _(connect, os): | |
| api_token = os.environ.get("WRANGLER_R2_SQL_AUTH_TOKEN") | |
| conn = connect( | |
| account_id="6867ca40ac1d6a1f1592b6db290d1e8b", | |
| bucket_name="pipelines-tutorial", | |
| api_token=api_token | |
| ) | |
| return (conn,) | |
| @app.cell | |
| def _(conn, mo): | |
| df = mo.sql( | |
| f""" | |
| SELECT | |
| user_id, | |
| event_type, | |
| product_id, | |
| amount | |
| FROM default.ecommerce | |
| """, | |
| engine=conn | |
| ) | |
| return | |
| @app.cell | |
| def _(): | |
| import requests | |
| import json | |
| from typing import Any, Dict, List, Optional, Tuple, Union | |
| class R2SQLError(Exception): | |
| """Base exception for R2 SQL API errors""" | |
| pass | |
| class R2SQLOperationalError(R2SQLError): | |
| """Exception for operational errors (network, HTTP errors)""" | |
| pass | |
| class R2SQLProgrammingError(R2SQLError): | |
| """Exception for programming errors (SQL syntax, etc.)""" | |
| pass | |
| class R2SQLCursor: | |
| """DB API 2.0 compatible cursor for Cloudflare R2 SQL""" | |
| def __init__(self, connection: 'R2SQLConnection'): | |
| self.connection = connection | |
| self._results: List[Dict[str, Any]] = [] | |
| self._description: Optional[List[Tuple]] = None | |
| self.rowcount = -1 | |
| self.arraysize = 1 | |
| self._closed = False | |
| @property | |
| def description(self) -> Optional[List[Tuple]]: | |
| """Returns description of columns from last executed query""" | |
| return self._description | |
| def execute(self, query: str, parameters: Optional[Dict[str, Any]] = None) -> None: | |
| """Execute a SQL query""" | |
| if self._closed: | |
| raise R2SQLProgrammingError("Cursor is closed") | |
| # Handle parameterized queries (simple string substitution) | |
| # Note: This is basic - for production, use proper parameter binding | |
| if parameters: | |
| for key, value in parameters.items(): | |
| # Simple parameter substitution - escape single quotes | |
| if isinstance(value, str): | |
| escaped_value = value.replace("'", "''") | |
| query = query.replace(f":{key}", f"'{escaped_value}'") | |
| elif value is None: | |
| query = query.replace(f":{key}", "NULL") | |
| else: | |
| query = query.replace(f":{key}", str(value)) | |
| try: | |
| url = f"https://api.sql.cloudflarestorage.com/api/v1/accounts/{self.connection.account_id}/r2-sql/query/{self.connection.bucket_name}" | |
| payload = {"query": query} | |
| headers = { | |
| "Content-Type": "application/json", | |
| } | |
| # Add authorization header if provided | |
| if self.connection.api_token: | |
| headers["Authorization"] = f"Bearer {self.connection.api_token}" | |
| response = requests.post( | |
| url, | |
| json=payload, | |
| headers=headers, | |
| timeout=self.connection.timeout | |
| ) | |
| response.raise_for_status() | |
| result_data = response.json() | |
| # Handle different response formats | |
| if isinstance(result_data, dict): | |
| if 'result' in result_data: | |
| self._results = result_data['result']['rows'] | |
| else: | |
| self._results = [result_data] | |
| else: | |
| self._results = [] | |
| self.rowcount = len(self._results) | |
| # Set description based on first row | |
| if self._results: | |
| first_row = self._results[0] | |
| self._description = [ | |
| (col_name, type(col_value).__name__, None, None, None, None, None) | |
| for col_name, col_value in first_row.items() | |
| ] | |
| else: | |
| self._description = None | |
| except requests.exceptions.RequestException as e: | |
| raise R2SQLOperationalError(f"HTTP request failed: {e}") | |
| except json.JSONDecodeError as e: | |
| raise R2SQLOperationalError(f"Invalid JSON response: {e}") | |
| except Exception as e: | |
| raise R2SQLProgrammingError(f"Query execution failed: {e}") | |
| def fetchone(self) -> Optional[Tuple]: | |
| """Fetch next row from query results""" | |
| if self._closed: | |
| raise R2SQLProgrammingError("Cursor is closed") | |
| if not self._results: | |
| return None | |
| row_dict = self._results.pop(0) | |
| return tuple(row_dict.values()) | |
| def fetchmany(self, size: Optional[int] = None) -> List[Tuple]: | |
| """Fetch multiple rows from query results""" | |
| if self._closed: | |
| raise R2SQLProgrammingError("Cursor is closed") | |
| if size is None: | |
| size = self.arraysize | |
| results = [] | |
| for _ in range(size): | |
| row = self.fetchone() | |
| if row is None: | |
| break | |
| results.append(row) | |
| return results | |
| def fetchall(self) -> List[Tuple]: | |
| """Fetch all remaining rows from query results""" | |
| if self._closed: | |
| raise R2SQLProgrammingError("Cursor is closed") | |
| results = [] | |
| while self._results: | |
| row_dict = self._results.pop(0) | |
| results.append(tuple(row_dict.values())) | |
| return results | |
| def fetchall_dict(self) -> List[Dict[str, Any]]: | |
| """Fetch all remaining rows as dictionaries (non-standard but useful)""" | |
| if self._closed: | |
| raise R2SQLProgrammingError("Cursor is closed") | |
| results = self._results.copy() | |
| self._results.clear() | |
| return results | |
| def close(self) -> None: | |
| """Close the cursor""" | |
| self._closed = True | |
| self._results.clear() | |
| self._description = None | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.close() | |
| class R2SQLConnection: | |
| """DB API 2.0 compatible connection for Cloudflare R2 SQL""" | |
| def __init__( | |
| self, | |
| account_id: str, | |
| bucket_name: str, | |
| api_token: Optional[str] = None, | |
| timeout: int = 30 | |
| ): | |
| self.account_id = account_id | |
| self.bucket_name = bucket_name | |
| self.api_token = api_token | |
| self.timeout = timeout | |
| self._closed = False | |
| def cursor(self) -> R2SQLCursor: | |
| """Create a new cursor""" | |
| if self._closed: | |
| raise R2SQLProgrammingError("Connection is closed") | |
| return R2SQLCursor(self) | |
| def commit(self) -> None: | |
| """Commit transaction (no-op for R2 SQL API)""" | |
| if self._closed: | |
| raise R2SQLProgrammingError("Connection is closed") | |
| # R2 SQL API doesn't support transactions, so this is a no-op | |
| pass | |
| def rollback(self) -> None: | |
| """Rollback transaction (no-op for R2 SQL API)""" | |
| if self._closed: | |
| raise R2SQLProgrammingError("Connection is closed") | |
| # R2 SQL API doesn't support transactions, so this is a no-op | |
| pass | |
| def close(self) -> None: | |
| """Close the connection""" | |
| self._closed = True | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.close() | |
| def connect( | |
| account_id: str, | |
| bucket_name: str, | |
| api_token: Optional[str] = None, | |
| timeout: int = 30 | |
| ) -> R2SQLConnection: | |
| """Create a connection to Cloudflare R2 SQL API""" | |
| return R2SQLConnection(account_id, bucket_name, api_token, timeout) | |
| return (connect,) | |
| @app.cell | |
| def _(): | |
| return | |
| if __name__ == "__main__": | |
| app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment