#!/usr/bin/env python # # allreduce_loop.py # # Showcase for how to use Gloo collectives from Caffe2. # For rendezvous, this example can use a shared filesystem, Redis, or MPI. # # To use a shared filesystem for rendezvous, use: # # python ./allreduce_loop.py \ # --num_gpus 1 \ # --distributed_transport tcp \ # --distributed_interface eth0 \ # --num_shards 2 \ # --shard_id 0 \ # Specify a different rank on every machine # --file_store_path /path/to/nfs/share \ # --run_id 12345 # Unique for each separate run # # To use Redis for rendezvous, use: # # python ./allreduce_loop.py \ # --num_gpus 1 \ # --distributed_transport ibverbs \ # --distributed_interface mlx5_0 \ # --gpu_direct \ # --num_shards 2 \ # --shard_id 0 \ # Specify a different rank on every machine # --redis_host some.ip.address \ # --redis_port 6380 \ # --run_id 12345 # Unique for each separate run # # To use MPI for rendezvous, use: # # mpirun python ./allreduce_loop.py \ # --num_gpus 1 \ # --distributed_transport ibverbs \ # --distributed_interface mlx5_0 \ # --gpu_direct # from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals import argparse import logging import numpy as np import time import os from caffe2.python import core, workspace from caffe2.proto import caffe2_pb2 logging.basicConfig() log = logging.getLogger("allreduce_loop") log.setLevel(logging.DEBUG) def main(): parser = argparse.ArgumentParser( description="Caffe2 rendezvous example", ) parser.add_argument("--gpus", type=str, help="Comma separated list of GPU devices to use") parser.add_argument("--num_gpus", type=int, default=1, help="Number of GPU devices (instead of --gpus)") parser.add_argument("--num_shards", type=int, default=1, help="Number of machines in distributed run") parser.add_argument("--shard_id", type=int, default=0, help="Shard id.") parser.add_argument("--run_id", type=str, help="Unique run identifier (e.g. uuid)") parser.add_argument("--redis_host", type=str, help="Host of Redis server (for rendezvous)") parser.add_argument("--redis_port", type=int, default=6379, help="Port of Redis server (for rendezvous)") parser.add_argument("--file_store_path", type=str, default="/tmp", help="Path to directory to use for rendezvous") parser.add_argument("--distributed_transport", type=str, default="tcp", help="Transport to use for distributed run [tcp|ibverbs]") parser.add_argument("--distributed_interfaces", type=str, default="", help="Network interfaces to use for distributed run") parser.add_argument("--gpu_direct", default=False, action="store_true", help="Use GPUDirect (if using ibverbs transport)") parser.add_argument("--iterations", type=int, default=100, help="Number of iterations to run for") args = parser.parse_args() # Either use specified device list or generate one if args.gpus is not None: gpus = [int(x) for x in args.gpus.split(",")] num_gpus = len(gpus) else: gpus = list(range(args.num_gpus)) num_gpus = args.num_gpus log.info("Running on GPUs: {}".format(gpus)) num_shards = args.num_shards shard_id = args.shard_id store_handler = None # Expect interfaces to be comma separated. # Use of multiple network interfaces is not yet complete, # so simply use the first one in the list. interfaces = args.distributed_interfaces.split(",") # Rendezvous using MPI when run with mpirun if os.getenv("OMPI_COMM_WORLD_SIZE") is not None: num_shards = int(os.getenv("OMPI_COMM_WORLD_SIZE", 1)) shard_id = int(os.getenv("OMPI_COMM_WORLD_RANK", 0)) if num_shards > 1: rendezvous = dict( kv_handler=None, num_shards=num_shards, shard_id=shard_id, engine="GLOO", transport=args.distributed_transport, interface=interfaces[0], mpi_rendezvous=True) elif num_shards > 1: # Create rendezvous for distributed computation store_handler = "store_handler" if args.redis_host is not None: # Use Redis for rendezvous if Redis host is specified workspace.RunOperatorOnce( core.CreateOperator( "RedisStoreHandlerCreate", [], [store_handler], host=args.redis_host, port=args.redis_port, prefix=args.run_id, ) ) else: # Use filesystem for rendezvous otherwise workspace.RunOperatorOnce( core.CreateOperator( "FileStoreHandlerCreate", [], [store_handler], path=args.file_store_path, prefix=args.run_id, ) ) rendezvous = dict( kv_handler=store_handler, num_shards=num_shards, shard_id=shard_id, engine="GLOO", transport=args.distributed_transport, interface=interfaces[0]) if rendezvous is None: raise(RuntimeError("No rendezvous mechanism configured!")) init_net = core.Net("init_net") shape = [32, 3, 224, 224] num_elements = reduce(lambda x, y: x*y, shape) num_bytes = num_elements * 4 num_kilobytes = num_bytes / 1024.0 num_megabytes = num_kilobytes / 1024.0 num_gigabytes = num_megabytes / 1024.0 # Initialize N blobs, 1 per GPU blobs = [] for gpu in gpus: with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, gpu)): blobs.append( init_net.UniformFill( [], [init_net.NextBlob("blob")], shape=shape)) # Create Gloo common world with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, 0)): comm_world = init_net.CreateCommonWorld( [store_handler] if store_handler is not None else [], [init_net.NextBlob("comm_world")], name="first_and_only_common_world", size=rendezvous["num_shards"], rank=rendezvous["shard_id"], engine=rendezvous["engine"], transport=rendezvous["transport"], interface=rendezvous["interface"], mpi_rendezvous=rendezvous.get("mpi_rendezvous", False), ) # Initialize workspace.RunNetOnce(init_net) # Our main net is just looping on Allreduce main_net = core.Net("main_net") with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, 0)): main_net.Allreduce( inputs=[comm_world] + blobs, outputs=blobs, engine=rendezvous["engine"], gpu_direct=args.gpu_direct, ) workspace.CreateNet(main_net) for i in xrange(args.iterations): t1 = time.time() workspace.RunNet(main_net) t2 = time.time() if shard_id == 0: dt = (t2 - t1) print("Took {:.3f}s ({:.3f} GB/sec)".format(dt, num_gigabytes / dt)) if __name__ == "__main__": workspace.GlobalInit(["caffe2", "--caffe2_log_level=2"]) main()