Created
February 11, 2023 05:27
-
-
Save csm10495/65ca17fdb758ecec69c2bb8532a78b3b to your computer and use it in GitHub Desktop.
Revisions
-
csm10495 created this gist
Feb 11, 2023 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,79 @@ ''' Simple script to do a simple query against a cosmosdb in azure Can use it to dump a db to json.. if ya need that for some reason. MIT License - Charles Machalow # pip install azure-cosmos # pip install azure-identity ''' import argparse import io import json import sys from azure.cosmos import CosmosClient class OutputJSONStreamer: """ A hacky way of streaming the output of a query in a json-y format """ def __init__(self, output: io.StringIO): self._initial_items = [] self.output = output def __enter__(self): return self def __exit__(self, *args, **kargs): if len(self._initial_items) > 2: self.output.write("\n]") elif len(self._initial_items) == 1: self.output.write(json.dumps(self._initial_items[0], indent=4)) elif len(self._initial_items) == 0: self.output.write('{}') else: self.output.write(json.dumps(self._initial_items, indent=4)) def process(self, item): if len(self._initial_items) < 2: self._initial_items.append(item) if len(self._initial_items) == 2: self.output.write("[\n") self.output.write(json.dumps(self._initial_items[0], indent=4)) self.output.write(",\n") self.output.write(json.dumps(self._initial_items[1], indent=4)) self._initial_items.append(None) return if len(self._initial_items) > 2: self.output.write(",\n") self.output.write(json.dumps(item, indent=4)) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument("--endpoint", required=True, help="Cosmos DB endpoint") parser.add_argument("--key",required=True, help="Cosmos DB key") parser.add_argument("--database", required=True,help="Cosmos database") parser.add_argument("--container",required=True, help="Cosmos container") parser.add_argument("--query",required=True, help="query to execute.. for example: 'select * from c'") parser.add_argument("-o", "--output",required=False, default=None, help="If given a path to send this output to (otherwise goes to stdout)") args = parser.parse_args() with CosmosClient(url=args.endpoint, credential=args.key) as cosmos: db = cosmos.get_database_client(args.database) container = db.get_container_client(args.container) info = container.query_items(args.query, enable_cross_partition_query=True) output = open(args.output, 'w') if args.output else sys.stdout try: with OutputJSONStreamer(output) as streamer: for q in info: streamer.process(q) finally: if args.output: output.close()