import os from dotenv import load_dotenv from couchbase.cluster import Cluster, ClusterOptions from couchbase.auth import PasswordAuthenticator from couchbase.exceptions import CouchbaseException from couchbase.collection import UpsertOptions load_dotenv() conn_str = os.getenv("COUCHBASE_CONN_STR") username = os.getenv("COUCHBASE_USERNAME") password = os.getenv("COUCHBASE_PASSWORD") bucket_name = os.getenv("COUCHBASE_BUCKET") try: cluster = Cluster(conn_str, ClusterOptions(PasswordAuthenticator(username, password))) bucket = cluster.bucket(bucket_name) collection = bucket.default_collection() # Test write key = "test-doc" value = {"message": "Hello, Couchbase!"} collection.upsert(key, value) # Test read result = collection.get(key) print("✅ Couchbase connection successful!") print("Document content:", result.content_as[dict]) except CouchbaseException as e: print("❌ Couchbase connection failed:") print(e)