# /// script # requires-python = ">=3.12" # dependencies = [ # "marimo", # "polars[pyarrow]==1.33.1", # "python-dotenv==1.1.1", # "requests==2.32.5", # "sqlglot==27.17.0", # ] # /// import marimo __generated_with = "0.16.2" app = marimo.App() @app.cell def _(): import os import marimo as mo from dotenv import load_dotenv load_dotenv('.env') return mo, os @app.cell def _(connect, os): api_token = os.environ.get("WRANGLER_R2_SQL_AUTH_TOKEN") conn = connect( account_id="6867ca40ac1d6a1f1592b6db290d1e8b", bucket_name="pipelines-tutorial", api_token=api_token ) return (conn,) @app.cell def _(conn, mo): df = mo.sql( f""" SELECT user_id, event_type, product_id, amount FROM default.ecommerce """, engine=conn ) return @app.cell def _(): import requests import json from typing import Any, Dict, List, Optional, Tuple, Union class R2SQLError(Exception): """Base exception for R2 SQL API errors""" pass class R2SQLOperationalError(R2SQLError): """Exception for operational errors (network, HTTP errors)""" pass class R2SQLProgrammingError(R2SQLError): """Exception for programming errors (SQL syntax, etc.)""" pass class R2SQLCursor: """DB API 2.0 compatible cursor for Cloudflare R2 SQL""" def __init__(self, connection: 'R2SQLConnection'): self.connection = connection self._results: List[Dict[str, Any]] = [] self._description: Optional[List[Tuple]] = None self.rowcount = -1 self.arraysize = 1 self._closed = False @property def description(self) -> Optional[List[Tuple]]: """Returns description of columns from last executed query""" return self._description def execute(self, query: str, parameters: Optional[Dict[str, Any]] = None) -> None: """Execute a SQL query""" if self._closed: raise R2SQLProgrammingError("Cursor is closed") # Handle parameterized queries (simple string substitution) # Note: This is basic - for production, use proper parameter binding if parameters: for key, value in parameters.items(): # Simple parameter substitution - escape single quotes if isinstance(value, str): escaped_value = value.replace("'", "''") query = query.replace(f":{key}", f"'{escaped_value}'") elif value is None: query = query.replace(f":{key}", "NULL") else: query = query.replace(f":{key}", str(value)) try: url = f"https://api.sql.cloudflarestorage.com/api/v1/accounts/{self.connection.account_id}/r2-sql/query/{self.connection.bucket_name}" payload = {"query": query} headers = { "Content-Type": "application/json", } # Add authorization header if provided if self.connection.api_token: headers["Authorization"] = f"Bearer {self.connection.api_token}" response = requests.post( url, json=payload, headers=headers, timeout=self.connection.timeout ) response.raise_for_status() result_data = response.json() # Handle different response formats if isinstance(result_data, dict): if 'result' in result_data: self._results = result_data['result']['rows'] else: self._results = [result_data] else: self._results = [] self.rowcount = len(self._results) # Set description based on first row if self._results: first_row = self._results[0] self._description = [ (col_name, type(col_value).__name__, None, None, None, None, None) for col_name, col_value in first_row.items() ] else: self._description = None except requests.exceptions.RequestException as e: raise R2SQLOperationalError(f"HTTP request failed: {e}") except json.JSONDecodeError as e: raise R2SQLOperationalError(f"Invalid JSON response: {e}") except Exception as e: raise R2SQLProgrammingError(f"Query execution failed: {e}") def fetchone(self) -> Optional[Tuple]: """Fetch next row from query results""" if self._closed: raise R2SQLProgrammingError("Cursor is closed") if not self._results: return None row_dict = self._results.pop(0) return tuple(row_dict.values()) def fetchmany(self, size: Optional[int] = None) -> List[Tuple]: """Fetch multiple rows from query results""" if self._closed: raise R2SQLProgrammingError("Cursor is closed") if size is None: size = self.arraysize results = [] for _ in range(size): row = self.fetchone() if row is None: break results.append(row) return results def fetchall(self) -> List[Tuple]: """Fetch all remaining rows from query results""" if self._closed: raise R2SQLProgrammingError("Cursor is closed") results = [] while self._results: row_dict = self._results.pop(0) results.append(tuple(row_dict.values())) return results def fetchall_dict(self) -> List[Dict[str, Any]]: """Fetch all remaining rows as dictionaries (non-standard but useful)""" if self._closed: raise R2SQLProgrammingError("Cursor is closed") results = self._results.copy() self._results.clear() return results def close(self) -> None: """Close the cursor""" self._closed = True self._results.clear() self._description = None def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() class R2SQLConnection: """DB API 2.0 compatible connection for Cloudflare R2 SQL""" def __init__( self, account_id: str, bucket_name: str, api_token: Optional[str] = None, timeout: int = 30 ): self.account_id = account_id self.bucket_name = bucket_name self.api_token = api_token self.timeout = timeout self._closed = False def cursor(self) -> R2SQLCursor: """Create a new cursor""" if self._closed: raise R2SQLProgrammingError("Connection is closed") return R2SQLCursor(self) def commit(self) -> None: """Commit transaction (no-op for R2 SQL API)""" if self._closed: raise R2SQLProgrammingError("Connection is closed") # R2 SQL API doesn't support transactions, so this is a no-op pass def rollback(self) -> None: """Rollback transaction (no-op for R2 SQL API)""" if self._closed: raise R2SQLProgrammingError("Connection is closed") # R2 SQL API doesn't support transactions, so this is a no-op pass def close(self) -> None: """Close the connection""" self._closed = True def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def connect( account_id: str, bucket_name: str, api_token: Optional[str] = None, timeout: int = 30 ) -> R2SQLConnection: """Create a connection to Cloudflare R2 SQL API""" return R2SQLConnection(account_id, bucket_name, api_token, timeout) return (connect,) @app.cell def _(): return if __name__ == "__main__": app.run()