Created
September 25, 2012 22:28
-
-
Save mikemccabe/3784845 to your computer and use it in GitHub Desktop.
Revisions
-
mikemccabe revised this gist
Oct 3, 2012 . 1 changed file with 121 additions and 79 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -4,37 +4,26 @@ # It requires gevent; see http://www.gevent.org/intro.html#installation # This is callable from the command line; call with --help for a summary. # If you use it is a library, the main entry point is # metadata_record_iterator(); see main() for an example. import gevent import urllib import json import sys import random import os from itertools import islice from gevent import queue as g_queue from gevent import monkey monkey.patch_all() # Globals! done_queueing_input = False queued_count = 0 hosts = ( "www16", @@ -55,11 +44,9 @@ def get_url(url): "www31" ) skips = [] def md_getter(input_queue, json_queue, recache): while True: i, id = input_queue.get() host = hosts[random.randrange(len(hosts))] while host in skips: host = hosts[random.randrange(len(hosts))] @@ -73,8 +60,7 @@ def md_getter(recache): j = get_url("http://%s.us.archive.org/metadata/%s%s" % (host, id, recache_str)) if len(j) < 100: print >> sys.stderr, "got short string " + str(j) + " for " + id + " - error?" json_queue.put((i, id, j)) except IOError: print >> sys.stderr, host + " failed" @@ -84,65 +70,93 @@ def md_getter(recache): input_queue.task_done() def queue_input(ids, input_queue): global queued_count global done_queueing_input for i, id in enumerate(ids): id = id.strip() input_queue.put((i, id)) queued_count += 1 done_queueing_input = True def get_url(url): f = urllib.urlopen(url) if f.getcode() != 200: print >> sys.stderr, "get failed for " + url c = '{}' else: c = f.read() f.close() return c def metadata_record_iterator(ids, workers=20, sorted=False, recache=False): input_queue = g_queue.JoinableQueue(1000) json_queue = g_queue.Queue(1000) gevent.spawn(queue_input, ids, input_queue) for i in range(workers): gevent.spawn(md_getter, input_queue, json_queue, recache) def metadata_iterator_helper(): got_count = 0 while True: if done_queueing_input and got_count == queued_count: break yield json_queue.get() got_count += 1 def sorted_iterator(results): current_i = 0 pq = g_queue.PriorityQueue() results_remain = True while True: if done_queueing_input and current_i == queued_count: break while True: if results_remain: try: tup = results.next() pq.put(tup) except StopIteration: results_remain = False tup = pq.get() i, _, _ = tup if i == current_i: yield tup current_i += 1 break else: pq.put(tup) if sorted: return sorted_iterator(metadata_iterator_helper()) else: return metadata_iterator_helper() def info_callback(i, id, md_json): o = json.loads(md_json) print "%s %s %s %s %s" % (i, id, o.get('server', ''), o.get('dir', ''), o.get('item_size', '')) def printing_callback(i, id, md_json): print md_json def idonly_callback(i, id, md_json): print "%s %s %s" % (i, id, len(md_json)) def printing_with_ids_callback(i, id, md_json): print str(i), id, md_json def main(argv): import optparse parser = optparse.OptionParser(usage='usage: %prog [options] file_or_id_or_-_for_stdin', version='%prog 0.1', description='get archive metadata for archive ids. Prints JSON to stdout by default.') parser.add_option('--start', action='store', type='int', @@ -162,10 +176,29 @@ def main(argv): action='store_true', default=False, help='Recache when fetching') parser.add_option('--idonly', action='store_true', default=False, help='Print "index id len(json) - for testing"') parser.add_option('--withids', action='store_true', default=False, help='Print "index id json"') parser.add_option('--altformat', action='store_true', default=False, help='Print "index id server dir item_size": parses json') parser.add_option('--ujson', action='store_true', default=False, help='use ujson instead of json') parser.add_option('--sorted', action='store_true', default=False, help='Produce results in sorted order') opts, args = parser.parse_args(argv) @@ -183,18 +216,27 @@ def main(argv): parser.destroy() if opts.ujson: import ujson global json json = ujson callback = printing_callback if opts.altformat: callback = info_callback if opts.idonly: callback = idonly_callback if opts.withids: callback = printing_with_ids_callback stop = opts.start + opts.count if opts.count is not 0 else None ids = islice(ids, opts.start, stop) results = metadata_record_iterator(ids, opts.workers, opts.sorted, opts.recache) for i, id, md_json in results: callback(i, id, md_json) if __name__ == '__main__': sys.exit(main(sys.argv[1:])) -
mikemccabe revised this gist
Sep 27, 2012 . 1 changed file with 159 additions and 23 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -4,13 +4,19 @@ # It requires gevent; see http://www.gevent.org/intro.html#installation # This is callable from the command line. To use it as a library, call # walk_mds with an iterator that returns archive ids, and a callback # that takes id_index, id, id_json. import gevent import httplib import urllib import json import sys import random import os from itertools import islice from gevent import monkey monkey.patch_all() @@ -20,16 +26,7 @@ from gevent import queue as g_queue input_queue = g_queue.JoinableQueue(1000) json_queue = g_queue.JoinableQueue(1000) def get_url(url): @@ -39,26 +36,165 @@ def get_url(url): return c hosts = ( "www16", "www17", "www18", "www19", "www20", "www21", "www22", "www23", "www24", "www25", "www26", "www27", "www28", "www29", "www30", "www31" ) skips = [] def md_getter(recache): while True: i, id = input_queue.get() # if i % 100 == 0: # print >> sys.stderr, str(i) + ' got' host = hosts[random.randrange(len(hosts))] while host in skips: host = hosts[random.randrange(len(hosts))] if recache: recache_str = '?reCache=1' else: recache_str = '' try: j = get_url("http://%s.us.archive.org/metadata/%s%s" % (host, id, recache_str)) if len(j) < 100: print >> sys.stderr, "got " + str(j) + " for " + id + " - error?" # print str(i) + " got" json_queue.put((i, id, j)) except IOError: print >> sys.stderr, host + " failed" skips.append(host) input_queue.put((i, id)) finally: input_queue.task_done() def json_processor(md_callback): while True: i, id, md_json = json_queue.get() try: md_callback(i, id, md_json) finally: pass json_queue.task_done() def info_callback(i, id, md_json): o = json.loads(md_json) print "%s %s %s %s %s" % (i, id, o.get('server', ''), o.get('dir', ''), o.get('item_size', '')) def printing_callback(i, id, md_json): print md_json def qr(ids, start, count): stop = start + count if count is not None else None for i, id in islice(enumerate(ids), start, stop): id = id.strip() input_queue.put((i, id)) def yieldr(ids, start=0, count=0, getters=20, recache=False): for i in range(getters): gevent.spawn(md_getter, recache) gevent.spawn(qr, ids, start, count) gevent.sleep(1) while True and not json_queue.empty(): gevent.sleep(0) yield json_queue.get() def walk_mds_yieldwise(ids, md_callback, start=0, count=0, getters=20, recache=False): for i, id, md_json in yieldr(ids, start, count, getters, recache): md_callback(i, id, md_json) def walk_mds(ids, md_callback, start=0, count=0, getters=20, recache=False): for i in range(getters): gevent.spawn(md_getter, recache) gevent.spawn(json_processor, md_callback) stop = start + count if count is not None else None for i, id in islice(enumerate(ids), start, stop): id = id.strip() input_queue.put((i, id)) input_queue.join() json_queue.join() def main(argv): import optparse parser = optparse.OptionParser(usage='usage: %prog [options] file_or_id_or_-_for_stdin', version='%prog 0.1', description='get archive metadata') parser.add_option('--start', action='store', type='int', default=0, help='Index of first id to fetch') parser.add_option('--count', action='store', type='int', default=0, help='Count of ids to fetch') parser.add_option('--workers', action='store', type='int', default=20, help='How many metadata fetch workers to spawn') parser.add_option('--recache', action='store_true', default=False, help='Recache when fetching') parser.add_option('--altformat', action='store_true', default=False, help='Print results in a different format') opts, args = parser.parse_args(argv) if len(args) != 1: parser.print_usage() sys.exit(1) if os.path.exists(args[0]): ids = open(args[0]) else: if args[0] == '-': ids = sys.stdin else: ids = [args[0]] parser.destroy() callback = info_callback if opts.altformat else printing_callback walk_mds(ids, callback, opts.start, opts.count, opts.workers, opts.recache) if __name__ == '__main__': sys.exit(main(sys.argv[1:])) # Todo: # see these for maybe maybe having it return sorted results # http://stackoverflow.com/questions/4098179/anyone-know-this-python-data-structure # http://pypi.python.org/pypi/blist/ # also priorityqueue, heap # json_queue = g_queue.PriorityQueue(1000) -
mikemccabe created this gist
Sep 25, 2012 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,64 @@ # This demonstrates doing multiple metadata fetches in parallel. # It seems to be fast enough that the json decoding cost becomes # a significant proportion of the execution time. # It requires gevent; see http://www.gevent.org/intro.html#installation # To make this do something useful, modify do_work(). import gevent import httplib import urllib import json from gevent import monkey monkey.patch_all() from gevent import socket from gevent.pool import Pool from gevent import queue as g_queue input_queue = g_queue.JoinableQueue(1000) def queue_ids(ids, start_index=0, count=0): limit_index = start_index + count for i, id in enumerate(ids): if i < start_index: continue if limit_index != 0 and i >= limit_index: break id = id.strip() input_queue.put((id, i)) def get_url(url): f = urllib.urlopen(url) c = f.read() f.close() return c def worker(): while True: id, i = input_queue.get() try: do_work(id, i) finally: input_queue.task_done() def do_work(id, i): j = get_url("http://archive.org/metadata/" + id) o = json.loads(j) # n.b.: o might be empty if the ID couldn't be fetched for whatever reason print "%s %s %s %s %s" % (i, id, o.get('server', ''), o.get('dir', ''), o.get('item_size', '')) for i in range(20): # 20 seems like a reasonable number - don't go nuts! gevent.spawn(worker) queue_ids(open('ids.txt'), 0, 1000); # just do 1000 lines, starting with 1rst input_queue.join()