Skip to content

Instantly share code, notes, and snippets.

@koaning
Created September 24, 2025 21:41
Show Gist options
  • Save koaning/1f6b3a2c8ae3c8edefa3c850f82abb62 to your computer and use it in GitHub Desktop.
Save koaning/1f6b3a2c8ae3c8edefa3c850f82abb62 to your computer and use it in GitHub Desktop.
marimo notebook with early R2 SQL support over REST
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "marimo",
# "polars[pyarrow]==1.33.1",
# "python-dotenv==1.1.1",
# "requests==2.32.5",
# "sqlglot==27.17.0",
# ]
# ///
import marimo
__generated_with = "0.16.2"
app = marimo.App()
@app.cell
def _():
import os
import marimo as mo
from dotenv import load_dotenv
load_dotenv('.env')
return mo, os
@app.cell
def _(connect, os):
api_token = os.environ.get("WRANGLER_R2_SQL_AUTH_TOKEN")
conn = connect(
account_id="6867ca40ac1d6a1f1592b6db290d1e8b",
bucket_name="pipelines-tutorial",
api_token=api_token
)
return (conn,)
@app.cell
def _(conn, mo):
df = mo.sql(
f"""
SELECT
user_id,
event_type,
product_id,
amount
FROM default.ecommerce
""",
engine=conn
)
return
@app.cell
def _():
import requests
import json
from typing import Any, Dict, List, Optional, Tuple, Union
class R2SQLError(Exception):
"""Base exception for R2 SQL API errors"""
pass
class R2SQLOperationalError(R2SQLError):
"""Exception for operational errors (network, HTTP errors)"""
pass
class R2SQLProgrammingError(R2SQLError):
"""Exception for programming errors (SQL syntax, etc.)"""
pass
class R2SQLCursor:
"""DB API 2.0 compatible cursor for Cloudflare R2 SQL"""
def __init__(self, connection: 'R2SQLConnection'):
self.connection = connection
self._results: List[Dict[str, Any]] = []
self._description: Optional[List[Tuple]] = None
self.rowcount = -1
self.arraysize = 1
self._closed = False
@property
def description(self) -> Optional[List[Tuple]]:
"""Returns description of columns from last executed query"""
return self._description
def execute(self, query: str, parameters: Optional[Dict[str, Any]] = None) -> None:
"""Execute a SQL query"""
if self._closed:
raise R2SQLProgrammingError("Cursor is closed")
# Handle parameterized queries (simple string substitution)
# Note: This is basic - for production, use proper parameter binding
if parameters:
for key, value in parameters.items():
# Simple parameter substitution - escape single quotes
if isinstance(value, str):
escaped_value = value.replace("'", "''")
query = query.replace(f":{key}", f"'{escaped_value}'")
elif value is None:
query = query.replace(f":{key}", "NULL")
else:
query = query.replace(f":{key}", str(value))
try:
url = f"https://api.sql.cloudflarestorage.com/api/v1/accounts/{self.connection.account_id}/r2-sql/query/{self.connection.bucket_name}"
payload = {"query": query}
headers = {
"Content-Type": "application/json",
}
# Add authorization header if provided
if self.connection.api_token:
headers["Authorization"] = f"Bearer {self.connection.api_token}"
response = requests.post(
url,
json=payload,
headers=headers,
timeout=self.connection.timeout
)
response.raise_for_status()
result_data = response.json()
# Handle different response formats
if isinstance(result_data, dict):
if 'result' in result_data:
self._results = result_data['result']['rows']
else:
self._results = [result_data]
else:
self._results = []
self.rowcount = len(self._results)
# Set description based on first row
if self._results:
first_row = self._results[0]
self._description = [
(col_name, type(col_value).__name__, None, None, None, None, None)
for col_name, col_value in first_row.items()
]
else:
self._description = None
except requests.exceptions.RequestException as e:
raise R2SQLOperationalError(f"HTTP request failed: {e}")
except json.JSONDecodeError as e:
raise R2SQLOperationalError(f"Invalid JSON response: {e}")
except Exception as e:
raise R2SQLProgrammingError(f"Query execution failed: {e}")
def fetchone(self) -> Optional[Tuple]:
"""Fetch next row from query results"""
if self._closed:
raise R2SQLProgrammingError("Cursor is closed")
if not self._results:
return None
row_dict = self._results.pop(0)
return tuple(row_dict.values())
def fetchmany(self, size: Optional[int] = None) -> List[Tuple]:
"""Fetch multiple rows from query results"""
if self._closed:
raise R2SQLProgrammingError("Cursor is closed")
if size is None:
size = self.arraysize
results = []
for _ in range(size):
row = self.fetchone()
if row is None:
break
results.append(row)
return results
def fetchall(self) -> List[Tuple]:
"""Fetch all remaining rows from query results"""
if self._closed:
raise R2SQLProgrammingError("Cursor is closed")
results = []
while self._results:
row_dict = self._results.pop(0)
results.append(tuple(row_dict.values()))
return results
def fetchall_dict(self) -> List[Dict[str, Any]]:
"""Fetch all remaining rows as dictionaries (non-standard but useful)"""
if self._closed:
raise R2SQLProgrammingError("Cursor is closed")
results = self._results.copy()
self._results.clear()
return results
def close(self) -> None:
"""Close the cursor"""
self._closed = True
self._results.clear()
self._description = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
class R2SQLConnection:
"""DB API 2.0 compatible connection for Cloudflare R2 SQL"""
def __init__(
self,
account_id: str,
bucket_name: str,
api_token: Optional[str] = None,
timeout: int = 30
):
self.account_id = account_id
self.bucket_name = bucket_name
self.api_token = api_token
self.timeout = timeout
self._closed = False
def cursor(self) -> R2SQLCursor:
"""Create a new cursor"""
if self._closed:
raise R2SQLProgrammingError("Connection is closed")
return R2SQLCursor(self)
def commit(self) -> None:
"""Commit transaction (no-op for R2 SQL API)"""
if self._closed:
raise R2SQLProgrammingError("Connection is closed")
# R2 SQL API doesn't support transactions, so this is a no-op
pass
def rollback(self) -> None:
"""Rollback transaction (no-op for R2 SQL API)"""
if self._closed:
raise R2SQLProgrammingError("Connection is closed")
# R2 SQL API doesn't support transactions, so this is a no-op
pass
def close(self) -> None:
"""Close the connection"""
self._closed = True
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def connect(
account_id: str,
bucket_name: str,
api_token: Optional[str] = None,
timeout: int = 30
) -> R2SQLConnection:
"""Create a connection to Cloudflare R2 SQL API"""
return R2SQLConnection(account_id, bucket_name, api_token, timeout)
return (connect,)
@app.cell
def _():
return
if __name__ == "__main__":
app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment