Skip to content

Instantly share code, notes, and snippets.

@csm10495
Created February 11, 2023 05:27
Show Gist options
  • Select an option

  • Save csm10495/65ca17fdb758ecec69c2bb8532a78b3b to your computer and use it in GitHub Desktop.

Select an option

Save csm10495/65ca17fdb758ecec69c2bb8532a78b3b to your computer and use it in GitHub Desktop.

Revisions

  1. csm10495 created this gist Feb 11, 2023.
    79 changes: 79 additions & 0 deletions cosmos2json.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,79 @@
    '''
    Simple script to do a simple query against a cosmosdb in azure
    Can use it to dump a db to json.. if ya need that for some reason.
    MIT License - Charles Machalow
    # pip install azure-cosmos
    # pip install azure-identity
    '''
    import argparse
    import io
    import json
    import sys

    from azure.cosmos import CosmosClient

    class OutputJSONStreamer:
    """
    A hacky way of streaming the output of a query in a json-y format
    """
    def __init__(self, output: io.StringIO):
    self._initial_items = []
    self.output = output

    def __enter__(self):
    return self

    def __exit__(self, *args, **kargs):
    if len(self._initial_items) > 2:
    self.output.write("\n]")
    elif len(self._initial_items) == 1:
    self.output.write(json.dumps(self._initial_items[0], indent=4))
    elif len(self._initial_items) == 0:
    self.output.write('{}')
    else:
    self.output.write(json.dumps(self._initial_items, indent=4))

    def process(self, item):
    if len(self._initial_items) < 2:
    self._initial_items.append(item)

    if len(self._initial_items) == 2:
    self.output.write("[\n")
    self.output.write(json.dumps(self._initial_items[0], indent=4))
    self.output.write(",\n")
    self.output.write(json.dumps(self._initial_items[1], indent=4))
    self._initial_items.append(None)
    return

    if len(self._initial_items) > 2:
    self.output.write(",\n")
    self.output.write(json.dumps(item, indent=4))


    if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument("--endpoint", required=True, help="Cosmos DB endpoint")
    parser.add_argument("--key",required=True, help="Cosmos DB key")
    parser.add_argument("--database", required=True,help="Cosmos database")
    parser.add_argument("--container",required=True, help="Cosmos container")
    parser.add_argument("--query",required=True, help="query to execute.. for example: 'select * from c'")
    parser.add_argument("-o", "--output",required=False, default=None, help="If given a path to send this output to (otherwise goes to stdout)")
    args = parser.parse_args()

    with CosmosClient(url=args.endpoint, credential=args.key) as cosmos:
    db = cosmos.get_database_client(args.database)
    container = db.get_container_client(args.container)

    info = container.query_items(args.query, enable_cross_partition_query=True)

    output = open(args.output, 'w') if args.output else sys.stdout
    try:
    with OutputJSONStreamer(output) as streamer:
    for q in info:
    streamer.process(q)
    finally:
    if args.output:
    output.close()