Skip to content

Instantly share code, notes, and snippets.

@mikemccabe
Created September 25, 2012 22:28
Show Gist options
  • Select an option

  • Save mikemccabe/3784845 to your computer and use it in GitHub Desktop.

Select an option

Save mikemccabe/3784845 to your computer and use it in GitHub Desktop.

Revisions

  1. mikemccabe revised this gist Oct 3, 2012. 1 changed file with 121 additions and 79 deletions.
    200 changes: 121 additions & 79 deletions parallel_md_get.py
    Original file line number Diff line number Diff line change
    @@ -4,37 +4,26 @@

    # It requires gevent; see http://www.gevent.org/intro.html#installation

    # This is callable from the command line. To use it as a library, call
    # walk_mds with an iterator that returns archive ids, and a callback
    # that takes id_index, id, id_json.
    # This is callable from the command line; call with --help for a summary.

    # If you use it is a library, the main entry point is
    # metadata_record_iterator(); see main() for an example.

    import gevent
    import httplib
    import urllib
    import json
    import sys
    import random
    import os

    from itertools import islice

    from gevent import queue as g_queue
    from gevent import monkey
    monkey.patch_all()

    from gevent import socket
    from gevent.pool import Pool
    from gevent import queue as g_queue

    input_queue = g_queue.JoinableQueue(1000)
    json_queue = g_queue.JoinableQueue(1000)


    def get_url(url):
    f = urllib.urlopen(url)
    c = f.read()
    f.close()
    return c

    # Globals!
    done_queueing_input = False
    queued_count = 0

    hosts = (
    "www16",
    @@ -55,11 +44,9 @@ def get_url(url):
    "www31"
    )
    skips = []
    def md_getter(recache):
    def md_getter(input_queue, json_queue, recache):
    while True:
    i, id = input_queue.get()
    # if i % 100 == 0:
    # print >> sys.stderr, str(i) + ' got'
    host = hosts[random.randrange(len(hosts))]
    while host in skips:
    host = hosts[random.randrange(len(hosts))]
    @@ -73,8 +60,7 @@ def md_getter(recache):
    j = get_url("http://%s.us.archive.org/metadata/%s%s"
    % (host, id, recache_str))
    if len(j) < 100:
    print >> sys.stderr, "got " + str(j) + " for " + id + " - error?"
    # print str(i) + " got"
    print >> sys.stderr, "got short string " + str(j) + " for " + id + " - error?"
    json_queue.put((i, id, j))
    except IOError:
    print >> sys.stderr, host + " failed"
    @@ -84,65 +70,93 @@ def md_getter(recache):
    input_queue.task_done()


    def json_processor(md_callback):
    while True:
    i, id, md_json = json_queue.get()
    try:
    md_callback(i, id, md_json)
    finally:
    pass
    json_queue.task_done()
    def queue_input(ids, input_queue):
    global queued_count
    global done_queueing_input
    for i, id in enumerate(ids):
    id = id.strip()
    input_queue.put((i, id))
    queued_count += 1
    done_queueing_input = True


    def get_url(url):
    f = urllib.urlopen(url)
    if f.getcode() != 200:
    print >> sys.stderr, "get failed for " + url
    c = '{}'
    else:
    c = f.read()
    f.close()
    return c


    def metadata_record_iterator(ids, workers=20, sorted=False, recache=False):
    input_queue = g_queue.JoinableQueue(1000)
    json_queue = g_queue.Queue(1000)

    gevent.spawn(queue_input, ids, input_queue)
    for i in range(workers):
    gevent.spawn(md_getter, input_queue, json_queue, recache)

    def metadata_iterator_helper():
    got_count = 0
    while True:
    if done_queueing_input and got_count == queued_count:
    break
    yield json_queue.get()
    got_count += 1

    def sorted_iterator(results):
    current_i = 0
    pq = g_queue.PriorityQueue()
    results_remain = True
    while True:
    if done_queueing_input and current_i == queued_count:
    break
    while True:
    if results_remain:
    try:
    tup = results.next()
    pq.put(tup)
    except StopIteration:
    results_remain = False
    tup = pq.get()
    i, _, _ = tup
    if i == current_i:
    yield tup
    current_i += 1
    break
    else:
    pq.put(tup)

    if sorted:
    return sorted_iterator(metadata_iterator_helper())
    else:
    return metadata_iterator_helper()



    def info_callback(i, id, md_json):
    o = json.loads(md_json)
    print "%s %s %s %s %s" % (i, id, o.get('server', ''),
    o.get('dir', ''), o.get('item_size', ''))


    def printing_callback(i, id, md_json):
    print md_json

    def idonly_callback(i, id, md_json):
    print "%s %s %s" % (i, id, len(md_json))

    def qr(ids, start, count):
    stop = start + count if count is not None else None
    for i, id in islice(enumerate(ids), start, stop):
    id = id.strip()
    input_queue.put((i, id))

    def yieldr(ids, start=0, count=0, getters=20, recache=False):
    for i in range(getters):
    gevent.spawn(md_getter, recache)
    gevent.spawn(qr, ids, start, count)
    gevent.sleep(1)
    while True and not json_queue.empty():
    gevent.sleep(0)
    yield json_queue.get()

    def walk_mds_yieldwise(ids, md_callback, start=0, count=0, getters=20, recache=False):
    for i, id, md_json in yieldr(ids, start, count, getters, recache):
    md_callback(i, id, md_json)


    def walk_mds(ids, md_callback, start=0, count=0, getters=20, recache=False):
    for i in range(getters):
    gevent.spawn(md_getter, recache)
    gevent.spawn(json_processor, md_callback)

    stop = start + count if count is not None else None
    for i, id in islice(enumerate(ids), start, stop):
    id = id.strip()
    input_queue.put((i, id))

    input_queue.join()
    json_queue.join()

    def printing_with_ids_callback(i, id, md_json):
    print str(i), id, md_json


    def main(argv):
    import optparse
    parser = optparse.OptionParser(usage='usage: %prog [options] file_or_id_or_-_for_stdin',
    version='%prog 0.1',
    description='get archive metadata')
    description='get archive metadata for archive ids. Prints JSON to stdout by default.')
    parser.add_option('--start',
    action='store',
    type='int',
    @@ -162,10 +176,29 @@ def main(argv):
    action='store_true',
    default=False,
    help='Recache when fetching')

    parser.add_option('--idonly',
    action='store_true',
    default=False,
    help='Print "index id len(json) - for testing"')
    parser.add_option('--withids',
    action='store_true',
    default=False,
    help='Print "index id json"')
    parser.add_option('--altformat',
    action='store_true',
    default=False,
    help='Print results in a different format')
    help='Print "index id server dir item_size": parses json')

    parser.add_option('--ujson',
    action='store_true',
    default=False,
    help='use ujson instead of json')

    parser.add_option('--sorted',
    action='store_true',
    default=False,
    help='Produce results in sorted order')

    opts, args = parser.parse_args(argv)

    @@ -183,18 +216,27 @@ def main(argv):

    parser.destroy()

    callback = info_callback if opts.altformat else printing_callback
    walk_mds(ids, callback,
    opts.start, opts.count, opts.workers, opts.recache)
    if opts.ujson:
    import ujson
    global json
    json = ujson

    callback = printing_callback
    if opts.altformat:
    callback = info_callback
    if opts.idonly:
    callback = idonly_callback
    if opts.withids:
    callback = printing_with_ids_callback

    if __name__ == '__main__':
    sys.exit(main(sys.argv[1:]))
    stop = opts.start + opts.count if opts.count is not 0 else None
    ids = islice(ids, opts.start, stop)

    results = metadata_record_iterator(ids, opts.workers,
    opts.sorted, opts.recache)
    for i, id, md_json in results:
    callback(i, id, md_json)

    # Todo:
    # see these for maybe maybe having it return sorted results
    # http://stackoverflow.com/questions/4098179/anyone-know-this-python-data-structure
    # http://pypi.python.org/pypi/blist/
    # also priorityqueue, heap
    # json_queue = g_queue.PriorityQueue(1000)

    if __name__ == '__main__':
    sys.exit(main(sys.argv[1:]))
  2. mikemccabe revised this gist Sep 27, 2012. 1 changed file with 159 additions and 23 deletions.
    182 changes: 159 additions & 23 deletions parallel_md_get.py
    Original file line number Diff line number Diff line change
    @@ -4,13 +4,19 @@

    # It requires gevent; see http://www.gevent.org/intro.html#installation

    # To make this do something useful, modify do_work().
    # This is callable from the command line. To use it as a library, call
    # walk_mds with an iterator that returns archive ids, and a callback
    # that takes id_index, id, id_json.

    import gevent

    import httplib
    import urllib
    import json
    import sys
    import random
    import os

    from itertools import islice

    from gevent import monkey
    monkey.patch_all()
    @@ -20,16 +26,7 @@
    from gevent import queue as g_queue

    input_queue = g_queue.JoinableQueue(1000)

    def queue_ids(ids, start_index=0, count=0):
    limit_index = start_index + count
    for i, id in enumerate(ids):
    if i < start_index:
    continue
    if limit_index != 0 and i >= limit_index:
    break
    id = id.strip()
    input_queue.put((id, i))
    json_queue = g_queue.JoinableQueue(1000)


    def get_url(url):
    @@ -39,26 +36,165 @@ def get_url(url):
    return c


    def worker():
    hosts = (
    "www16",
    "www17",
    "www18",
    "www19",
    "www20",
    "www21",
    "www22",
    "www23",
    "www24",
    "www25",
    "www26",
    "www27",
    "www28",
    "www29",
    "www30",
    "www31"
    )
    skips = []
    def md_getter(recache):
    while True:
    id, i = input_queue.get()
    i, id = input_queue.get()
    # if i % 100 == 0:
    # print >> sys.stderr, str(i) + ' got'
    host = hosts[random.randrange(len(hosts))]
    while host in skips:
    host = hosts[random.randrange(len(hosts))]

    if recache:
    recache_str = '?reCache=1'
    else:
    recache_str = ''

    try:
    do_work(id, i)
    j = get_url("http://%s.us.archive.org/metadata/%s%s"
    % (host, id, recache_str))
    if len(j) < 100:
    print >> sys.stderr, "got " + str(j) + " for " + id + " - error?"
    # print str(i) + " got"
    json_queue.put((i, id, j))
    except IOError:
    print >> sys.stderr, host + " failed"
    skips.append(host)
    input_queue.put((i, id))
    finally:
    input_queue.task_done()


    def do_work(id, i):
    j = get_url("http://archive.org/metadata/" + id)
    o = json.loads(j)
    # n.b.: o might be empty if the ID couldn't be fetched for whatever reason
    def json_processor(md_callback):
    while True:
    i, id, md_json = json_queue.get()
    try:
    md_callback(i, id, md_json)
    finally:
    pass
    json_queue.task_done()


    def info_callback(i, id, md_json):
    o = json.loads(md_json)
    print "%s %s %s %s %s" % (i, id, o.get('server', ''),
    o.get('dir', ''), o.get('item_size', ''))


    for i in range(20): # 20 seems like a reasonable number - don't go nuts!
    gevent.spawn(worker)
    def printing_callback(i, id, md_json):
    print md_json


    def qr(ids, start, count):
    stop = start + count if count is not None else None
    for i, id in islice(enumerate(ids), start, stop):
    id = id.strip()
    input_queue.put((i, id))

    def yieldr(ids, start=0, count=0, getters=20, recache=False):
    for i in range(getters):
    gevent.spawn(md_getter, recache)
    gevent.spawn(qr, ids, start, count)
    gevent.sleep(1)
    while True and not json_queue.empty():
    gevent.sleep(0)
    yield json_queue.get()

    def walk_mds_yieldwise(ids, md_callback, start=0, count=0, getters=20, recache=False):
    for i, id, md_json in yieldr(ids, start, count, getters, recache):
    md_callback(i, id, md_json)

    queue_ids(open('ids.txt'), 0, 1000); # just do 1000 lines, starting with 1rst

    input_queue.join()
    def walk_mds(ids, md_callback, start=0, count=0, getters=20, recache=False):
    for i in range(getters):
    gevent.spawn(md_getter, recache)
    gevent.spawn(json_processor, md_callback)

    stop = start + count if count is not None else None
    for i, id in islice(enumerate(ids), start, stop):
    id = id.strip()
    input_queue.put((i, id))

    input_queue.join()
    json_queue.join()


    def main(argv):
    import optparse
    parser = optparse.OptionParser(usage='usage: %prog [options] file_or_id_or_-_for_stdin',
    version='%prog 0.1',
    description='get archive metadata')
    parser.add_option('--start',
    action='store',
    type='int',
    default=0,
    help='Index of first id to fetch')
    parser.add_option('--count',
    action='store',
    type='int',
    default=0,
    help='Count of ids to fetch')
    parser.add_option('--workers',
    action='store',
    type='int',
    default=20,
    help='How many metadata fetch workers to spawn')
    parser.add_option('--recache',
    action='store_true',
    default=False,
    help='Recache when fetching')
    parser.add_option('--altformat',
    action='store_true',
    default=False,
    help='Print results in a different format')

    opts, args = parser.parse_args(argv)

    if len(args) != 1:
    parser.print_usage()
    sys.exit(1)

    if os.path.exists(args[0]):
    ids = open(args[0])
    else:
    if args[0] == '-':
    ids = sys.stdin
    else:
    ids = [args[0]]

    parser.destroy()

    callback = info_callback if opts.altformat else printing_callback
    walk_mds(ids, callback,
    opts.start, opts.count, opts.workers, opts.recache)


    if __name__ == '__main__':
    sys.exit(main(sys.argv[1:]))


    # Todo:
    # see these for maybe maybe having it return sorted results
    # http://stackoverflow.com/questions/4098179/anyone-know-this-python-data-structure
    # http://pypi.python.org/pypi/blist/
    # also priorityqueue, heap
    # json_queue = g_queue.PriorityQueue(1000)
  3. mikemccabe created this gist Sep 25, 2012.
    64 changes: 64 additions & 0 deletions parallel_md_get.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,64 @@
    # This demonstrates doing multiple metadata fetches in parallel.
    # It seems to be fast enough that the json decoding cost becomes
    # a significant proportion of the execution time.

    # It requires gevent; see http://www.gevent.org/intro.html#installation

    # To make this do something useful, modify do_work().

    import gevent

    import httplib
    import urllib
    import json

    from gevent import monkey
    monkey.patch_all()

    from gevent import socket
    from gevent.pool import Pool
    from gevent import queue as g_queue

    input_queue = g_queue.JoinableQueue(1000)

    def queue_ids(ids, start_index=0, count=0):
    limit_index = start_index + count
    for i, id in enumerate(ids):
    if i < start_index:
    continue
    if limit_index != 0 and i >= limit_index:
    break
    id = id.strip()
    input_queue.put((id, i))


    def get_url(url):
    f = urllib.urlopen(url)
    c = f.read()
    f.close()
    return c


    def worker():
    while True:
    id, i = input_queue.get()
    try:
    do_work(id, i)
    finally:
    input_queue.task_done()


    def do_work(id, i):
    j = get_url("http://archive.org/metadata/" + id)
    o = json.loads(j)
    # n.b.: o might be empty if the ID couldn't be fetched for whatever reason
    print "%s %s %s %s %s" % (i, id, o.get('server', ''),
    o.get('dir', ''), o.get('item_size', ''))


    for i in range(20): # 20 seems like a reasonable number - don't go nuts!
    gevent.spawn(worker)

    queue_ids(open('ids.txt'), 0, 1000); # just do 1000 lines, starting with 1rst

    input_queue.join()