Skip to content

Instantly share code, notes, and snippets.

@IgorBerman
Created February 10, 2015 13:36
Show Gist options
  • Save IgorBerman/1895cf2ee7462cf0de4f to your computer and use it in GitHub Desktop.
Save IgorBerman/1895cf2ee7462cf0de4f to your computer and use it in GitHub Desktop.

Revisions

  1. IgorBerman created this gist Feb 10, 2015.
    825 changes: 825 additions & 0 deletions multithreaded-bulk-loader
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,825 @@
    package com.myproject.cassandraloading;

    import static com.google.common.collect.Iterables.filter;
    import static com.google.common.collect.Iterables.transform;

    import java.io.File;
    import java.lang.management.ManagementFactory;
    import java.net.InetAddress;
    import java.net.UnknownHostException;
    import java.nio.ByteBuffer;
    import java.sql.Array;
    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.util.Collection;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.Set;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;

    import org.apache.cassandra.auth.IAuthenticator;
    import org.apache.cassandra.config.CFMetaData;
    import org.apache.cassandra.config.Config;
    import org.apache.cassandra.config.DatabaseDescriptor;
    import org.apache.cassandra.config.EncryptionOptions;
    import org.apache.cassandra.db.Keyspace;
    import org.apache.cassandra.db.SystemKeyspace;
    import org.apache.cassandra.db.marshal.AbstractType;
    import org.apache.cassandra.db.marshal.CollectionType;
    import org.apache.cassandra.db.marshal.ColumnToCollectionType;
    import org.apache.cassandra.db.marshal.CompositeType;
    import org.apache.cassandra.db.marshal.EmptyType;
    import org.apache.cassandra.db.marshal.Int32Type;
    import org.apache.cassandra.db.marshal.LongType;
    import org.apache.cassandra.db.marshal.MapType;
    import org.apache.cassandra.db.marshal.ReversedType;
    import org.apache.cassandra.db.marshal.TimestampType;
    import org.apache.cassandra.db.marshal.UTF8Type;
    import org.apache.cassandra.dht.Murmur3Partitioner;
    import org.apache.cassandra.dht.Range;
    import org.apache.cassandra.dht.Token;
    import org.apache.cassandra.io.compress.CompressionParameters;
    import org.apache.cassandra.io.compress.LZ4Compressor;
    import org.apache.cassandra.io.sstable.CQLSSTableWriter;
    import org.apache.cassandra.io.sstable.SSTableLoader;
    import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;
    import org.apache.cassandra.streaming.ProgressInfo;
    import org.apache.cassandra.streaming.SessionInfo;
    import org.apache.cassandra.streaming.StreamConnectionFactory;
    import org.apache.cassandra.streaming.StreamEvent;
    import org.apache.cassandra.streaming.StreamEventHandler;
    import org.apache.cassandra.streaming.StreamResultFuture;
    import org.apache.cassandra.streaming.StreamState;
    import org.apache.cassandra.thrift.AuthenticationRequest;
    import org.apache.cassandra.thrift.Cassandra;
    import org.apache.cassandra.thrift.Compression;
    import org.apache.cassandra.thrift.ConsistencyLevel;
    import org.apache.cassandra.thrift.CqlResult;
    import org.apache.cassandra.thrift.CqlRow;
    import org.apache.cassandra.thrift.ITransportFactory;
    import org.apache.cassandra.thrift.TFramedTransportFactory;
    import org.apache.cassandra.thrift.TokenRange;
    import org.apache.cassandra.tools.BulkLoadConnectionFactory;
    import org.apache.cassandra.utils.ByteBufferUtil;
    import org.apache.cassandra.utils.OutputHandler;
    import org.apache.commons.lang3.ArrayUtils;
    import org.apache.ibatis.session.SqlSession;
    import org.apache.log4j.Logger;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.apache.thrift.transport.TTransport;
    import org.joda.time.LocalDate;

    import com.google.common.base.Function;
    import com.google.common.base.Optional;
    import com.google.common.base.Predicate;
    import com.google.common.base.Stopwatch;
    import com.google.common.collect.ImmutableListMultimap;
    import com.google.common.collect.ImmutableMap;
    import com.google.common.collect.Lists;
    import com.google.common.collect.Multimaps;
    import com.google.common.collect.Sets;
    import com.google.common.io.Files;
    import com.myproject.cassandraloading.dal.MapperFactory;
    import com.myproject.cassandraloading.dal.TSLoaderMapper;
    import com.myproject.cassandraloading.dal.TSLoaderSamplesForDay;


    /*

    CREATE KEYSPACE myproject_gihon_oper WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
    USE myproject_gihon_oper;
    CREATE TABLE sample (
    ts_id bigint,
    yr int,
    t timestamp,
    v double,
    tgs map<varchar,varchar>,
    PRIMARY KEY((ts_id,yr), t)
    ) WITH CLUSTERING ORDER BY (t DESC) AND compression={'sstable_compression': 'LZ4Compressor'}
    and compaction = { 'class' : 'LeveledCompactionStrategy' };


    cassandra
    TRUNCATE sample;
    sudo ./cassandra-loading prodlikedb.myproject.dev gihon /mnt/data/gihon


    try to running with this params, otherwise the java enters fullgc with 100% cpu load at some point
    -javaagent:jamm-0.2.5.jar -Dorg.postgresql.forcebinary=true
    -Xmx2g -Xms2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC
    -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=10
    */

    public class TimeSeriesCassandraLoader {
    private static final Logger logger = Logger.getLogger(TimeSeriesCassandraLoader.class.getName());
    private static final String TABLE_NAME = "sample";
    private static final int BUFFER_SIZE = 32;

    private static Function<File, Boolean> deleteFile = new Function<File, Boolean>() {
    @Override
    public Boolean apply(File input) {
    return input.delete();
    }
    };

    private static ImmutableListMultimap<Integer, TSLoaderSamplesForDay> splitByYear(List<TSLoaderSamplesForDay> readSamples) {
    return Multimaps.index(readSamples, byYear);
    }

    final static Function<TSLoaderSamplesForDay, Integer> byYear = new Function<TSLoaderSamplesForDay, Integer>() {
    @Override
    public Integer apply(TSLoaderSamplesForDay input) {
    return LocalDate.fromDateFields(input.dayTimestamp).year().get();
    }
    };

    public static final String SCHEMA =
    "CREATE TABLE %s.%s (ts_id bigint, yr int, t timestamp, v double, tgs map<varchar,varchar>, PRIMARY KEY((ts_id,yr), t)) "
    + " WITH CLUSTERING ORDER BY (t DESC) AND compression={'sstable_compression': 'LZ4Compressor'}"
    + " and compaction = { 'class' : 'LeveledCompactionStrategy' }";

    public static final String INSERT_STMT = "INSERT INTO %s.%s (ts_id, yr, t, v) VALUES (?,?,?,?)";

    public static void main(String[] args) throws Exception {
    Stopwatch global = Stopwatch.createStarted();
    String dbHost = args[0];
    String tenantId = args[1];
    String outputDirPath = args[2];
    String cassandraSeed = args[3];
    long fromId = Long.parseLong(args[4]);
    long toId = Long.parseLong(args[5]);
    outputDirPath = outputDirPath + "/" + args[4] + "-" + args[5];
    int numOfWriters = Integer.parseInt(args[6]);
    Optional<Integer> stage = Optional.absent();
    if (args.length == 8) {
    stage = Optional.of(Integer.parseInt(args[7]));
    }
    logger.info("The output will be placed in " + outputDirPath);

    String keySpace = String.format("myproject_%s_oper", tenantId);
    logger.info("Reading timeseries from " + keySpace + " for tenant " + tenantId);
    SqlSession session = MapperFactory.createSession(dbHost, keySpace, tenantId);
    TSLoaderMapper mapper = session.getMapper(TSLoaderMapper.class);

    List<Long> tsIds = mapper.readAllTsIds(fromId, toId);
    session.rollback();
    String name = ManagementFactory.getRuntimeMXBean().getName();
    // magic!
    Config.setClientMode(true);
    File outputDir = new File(outputDirPath);

    if (!outputDir.exists() && !outputDir.mkdirs()) {
    throw new RuntimeException("Cannot create output directory: " + outputDir);
    }
    if (!stage.isPresent() || stage.get() == 1) {
    logger.info(name + ": Cleaning output dir");
    initCleanup(outputDir);
    }
    if (!stage.isPresent() || stage.get() == 2) {
    logger.info(name + ": Preparing sstables");
    prepareAndLoadSstables(keySpace, tsIds, outputDir, dbHost, tenantId, name, stage, cassandraSeed,
    numOfWriters, true);
    }

    logger.info(name + ": Done in " + global.elapsed(TimeUnit.MINUTES) + " mins");
    System.exit(0);// some daemon threads hold jvm
    }

    private static final class TssReader implements Runnable {
    private final ArrayBlockingQueue<SamplesWithId> queue;
    private final List<Long> tsIds;
    private final String dbHost;
    private final String keySpace;
    private final String tenantId;
    private final CountDownLatch doneSignal;
    private int i;

    private TssReader(int i,
    ArrayBlockingQueue<SamplesWithId> queue,
    List<Long> tsIds,
    String dbHost,
    String keySpace,
    String tenantId,
    CountDownLatch doneSignal) {
    this.i = i;
    this.queue = queue;
    this.tsIds = tsIds;
    this.dbHost = dbHost;
    this.keySpace = keySpace;
    this.tenantId = tenantId;
    this.doneSignal = doneSignal;
    }

    @Override
    public void run() {
    Thread.currentThread().setName("DBReader " + i);
    try {
    String name = ManagementFactory.getRuntimeMXBean().getName();
    int c = 1;
    for (long tsId : tsIds) {
    Connection conn = MapperFactory.createConnection(dbHost, keySpace, tenantId);
    PreparedStatement statement =
    conn.prepareStatement("select day_timestamp as dayTimestamp, times, values from sample where timeseries_id = ?");
    statement.setLong(1, tsId);
    ResultSet rs = statement.executeQuery();
    List<TSLoaderSamplesForDay> readSamples = Lists.newArrayList();
    while (rs.next()) {
    Array tArray = rs.getArray(2);
    double[] times = ArrayUtils.toPrimitive((Double[]) tArray.getArray(), 0);
    Array vArray = rs.getArray(3);
    double[] values = ArrayUtils.toPrimitive((Double[]) vArray.getArray(), 0);
    TSLoaderSamplesForDay samplesForDay = new TSLoaderSamplesForDay(rs.getDate(1), times, values);
    readSamples.add(samplesForDay);
    }
    rs.close();
    statement.close();
    SamplesWithId samplesWithId = new SamplesWithId(readSamples, tsId);
    queue.put(samplesWithId);
    if (c % 10 == 0) {
    logger.info(name + i + ": q size: " + queue.size());
    }
    c++;
    conn.close();
    }
    doneSignal.countDown();
    logger.info(name + i + ": reader done");

    } catch (Exception e) {
    e.printStackTrace();
    throw new RuntimeException("Exception ", e);
    }
    }
    }

    private static final class TssCqlWriter implements Callable<File> {
    private final String keySpace;
    private final ArrayBlockingQueue<SamplesWithId> queue;
    private final File baseOutputDir;
    private final int size;
    private final AtomicInteger c;
    private final CountDownLatch doneSignal;
    private final int i;
    private final Optional<Integer> stage;
    private final String cassandraSeed;

    private TssCqlWriter(int i,
    String keySpace,
    ArrayBlockingQueue<SamplesWithId> queue,
    File outputDir,
    int size,
    AtomicInteger c,
    CountDownLatch doneSignal,
    Optional<Integer> stage,
    String cassandraSeed) {
    this.i = i;
    this.keySpace = keySpace;
    this.queue = queue;
    this.baseOutputDir = outputDir;
    this.size = size;
    this.c = c;
    this.doneSignal = doneSignal;
    this.stage = stage;
    this.cassandraSeed = cassandraSeed;
    }

    @Override
    public File call() throws Exception {
    Thread.currentThread().setName("SSTableWriter " + i);
    File outputDir =
    new File(this.baseOutputDir, String.valueOf(i) + File.separator + keySpace + File.separator +
    TABLE_NAME);
    if (!outputDir.exists() && !outputDir.mkdirs()) {
    throw new RuntimeException("Cannot create output directory: " + outputDir);
    }
    String name = ManagementFactory.getRuntimeMXBean().getName();
    try {
    CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();
    String schema = String.format(SCHEMA, keySpace, TABLE_NAME);
    builder.inDirectory(outputDir).forTable(schema).using(String.format(INSERT_STMT, keySpace, TABLE_NAME)).withPartitioner(new Murmur3Partitioner()).withBufferSizeInMB(BUFFER_SIZE);
    final CQLSSTableWriter writer = builder.build();
    Stopwatch stopWatch = Stopwatch.createStarted();

    while (true) {
    SamplesWithId samplesWithId = queue.poll(1000, TimeUnit.MILLISECONDS);
    if (samplesWithId == null && doneSignal.await(0, TimeUnit.MILLISECONDS)) {
    break;
    } else if (samplesWithId == null) {
    continue;
    }

    int c = this.c.getAndIncrement();
    if (c % 10 == 0) {
    long elapsedSecs = stopWatch.elapsed(TimeUnit.SECONDS);
    double tsPerSec = 1.0 * c / elapsedSecs;
    logger.info(name + i + " : " + elapsedSecs + " secs / " + c + " tss done / " + tsPerSec +
    " #/s / " + size + " / " + (size - c) / (tsPerSec * 60) +
    " mins remaining(estimated) ");
    }

    List<TSLoaderSamplesForDay> samples = samplesWithId.samples;

    ImmutableListMultimap<Integer, TSLoaderSamplesForDay> samplesByYear = splitByYear(samples);
    for (Entry<Integer, Collection<TSLoaderSamplesForDay>> yearSamples : samplesByYear.asMap().entrySet()) {
    int year = yearSamples.getKey();
    for (TSLoaderSamplesForDay daySamples : yearSamples.getValue()) {
    for (int i = 0; i < daySamples.times.length; i++) {
    double timeInDays = daySamples.times[i];
    long timestamp = (long) (TimeUnit.DAYS.toMillis(1) * timeInDays);
    double value = daySamples.values[i];
    Date date = new Date(timestamp);
    writer.addRow(samplesWithId.tsId, year,// yr
    date,// t
    value);
    }
    }
    }
    samplesByYear = null;
    samplesWithId = null;
    }

    logger.info(name + i + ": closing writer");
    writer.close();
    logger.info(name + i + ": writer done");

    } catch (Exception e) {
    e.printStackTrace();
    throw new RuntimeException("Unexpected problem ", e);
    }
    if (!stage.isPresent() || stage.get() == 3) {
    logger.info(name + ": Deleting unneccessary files");
    cleanup(outputDir);
    }
    logger.info(name + ": Loading into cassandra " + cassandraSeed + " " + outputDir);
    load(cassandraSeed, outputDir);
    return outputDir;
    }
    }

    private static final class TssUnsortedWriter implements Callable<File> {
    private final String keySpace;
    private final ArrayBlockingQueue<SamplesWithId> queue;
    private final File baseOutputDir;
    private final int size;
    private final AtomicInteger c;
    private final CountDownLatch doneSignal;
    private final int i;
    private final Optional<Integer> stage;
    private final String cassandraSeed;

    private TssUnsortedWriter(int i,
    String keySpace,
    ArrayBlockingQueue<SamplesWithId> queue,
    File outputDir,
    int size,
    AtomicInteger c,
    CountDownLatch doneSignal,
    Optional<Integer> stage,
    String cassandraSeed) {
    this.i = i;
    this.keySpace = keySpace;
    this.queue = queue;
    this.baseOutputDir = outputDir;
    this.size = size;
    this.c = c;
    this.doneSignal = doneSignal;
    this.stage = stage;
    this.cassandraSeed = cassandraSeed;
    }

    @Override
    public File call() throws UnknownHostException {
    Thread.currentThread().setName("SSTableWriter " + i);
    File outputDir =
    new File(this.baseOutputDir, String.valueOf(i) + File.separator + keySpace + File.separator +
    TABLE_NAME);
    if (!outputDir.exists() && !outputDir.mkdirs()) {
    throw new RuntimeException("Cannot create output directory: " + outputDir);
    }
    String name = ManagementFactory.getRuntimeMXBean().getName();
    try {
    CollectionType mapType = MapType.getInstance(UTF8Type.instance, UTF8Type.instance);
    Map<ByteBuffer, CollectionType> tgsTypeMap =
    ImmutableMap.of(UTF8Type.instance.decompose("tgs"), mapType);

    AbstractType<?> columnFComparator =
    CompositeType.getInstance(ReversedType.getInstance(TimestampType.instance), UTF8Type.instance,
    ColumnToCollectionType.getInstance(tgsTypeMap));
    SSTableSimpleUnsortedWriter writer =
    new SSTableSimpleUnsortedWriter(outputDir, new Murmur3Partitioner(), keySpace, TABLE_NAME,
    columnFComparator, null, BUFFER_SIZE, new CompressionParameters(
    LZ4Compressor.create(null)));

    long insTs = System.currentTimeMillis();

    CompositeType rowKeyType = CompositeType.getInstance(LongType.instance, Int32Type.instance);

    CompositeType timeColumnType = CompositeType.getInstance(TimestampType.instance, EmptyType.instance);

    CompositeType valueColumnType = CompositeType.getInstance(TimestampType.instance, UTF8Type.instance);
    Stopwatch stopWatch = Stopwatch.createStarted();

    while (true) {
    SamplesWithId samplesWithId = queue.poll(1000, TimeUnit.MILLISECONDS);
    if (samplesWithId == null && doneSignal.await(0, TimeUnit.MILLISECONDS)) {
    break;
    } else if (samplesWithId == null) {
    continue;
    }

    int c = this.c.getAndIncrement();
    if (c % 10 == 0) {
    long elapsedSecs = stopWatch.elapsed(TimeUnit.SECONDS);
    double tsPerSec = 1.0 * c / elapsedSecs;
    logger.info(name + i + " : " + elapsedSecs + " secs / " + c + " tss done / " + tsPerSec +
    " #/s / " + size + " / " + (size - c) / (tsPerSec * 60) +
    " mins remaining(estimated) ");
    }

    List<TSLoaderSamplesForDay> samples = samplesWithId.samples;
    long tsId = samplesWithId.tsId;

    ImmutableListMultimap<Integer, TSLoaderSamplesForDay> samplesByYear = splitByYear(samples);
    for (Entry<Integer, Collection<TSLoaderSamplesForDay>> yearSamples : samplesByYear.asMap().entrySet()) {
    int year = yearSamples.getKey();
    for (TSLoaderSamplesForDay daySamples : yearSamples.getValue()) {
    for (int i = 0; i < daySamples.times.length; i++) {
    writer.newRow(rowKeyType.decompose(tsId, year));
    double timeInDays = daySamples.times[i];
    long timestamp = (long) (TimeUnit.DAYS.toMillis(1) * timeInDays);
    double value = daySamples.values[i];
    Date date = new Date(timestamp);
    writer.addColumn(timeColumnType.decompose(date, null),
    ByteBufferUtil.EMPTY_BYTE_BUFFER, insTs);
    writer.addColumn(valueColumnType.decompose(date, "v"), ByteBufferUtil.bytes(value),
    insTs);
    }
    }
    }
    samplesByYear = null;
    samplesWithId = null;
    }

    logger.info(name + i + ": closing writer");
    writer.close();
    logger.info(name + i + ": writer done");

    } catch (Exception e) {
    e.printStackTrace();
    throw new RuntimeException("Unexpected problem ", e);
    }
    if (!stage.isPresent() || stage.get() == 3) {
    logger.info(name + ": Deleting unneccessary files");
    cleanup(outputDir);
    }
    logger.info(name + ": Loading into cassandra " + cassandraSeed + " " + outputDir);
    load(cassandraSeed, outputDir);

    return outputDir;
    }
    }

    static class SamplesWithId {
    public SamplesWithId(List<TSLoaderSamplesForDay> readSamples, long tsId2) {
    this.samples = readSamples;
    this.tsId = tsId2;
    }

    final List<TSLoaderSamplesForDay> samples;
    final long tsId;
    }

    /**
    * this what cqlsstable writer printed so we are using same defintion with unsorted writer cassandra.config.Schema
    * DEBUG > Adding org.apache.cassandra.config.CFMetaData@293e2817[cfId=ab47e018-
    * 7eeb-30c5-8802-7e7651d788ef,ksName=myproject_gihon_oper, cfName=sample,cfType=Standard,
    * comparator=org.apache.cassandra
    * .db.marshal.CompositeType(org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra
    * .db.marshal.TimestampType),
    * org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.ColumnToCollectionType
    * (746773:org.apache.
    * cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal
    * .UTF8Type))),
    * comment=,readRepairChance=0.0,dclocalReadRepairChance=0.1,replicateOnWrite=true,gcGraceSeconds=864000
    * ,defaultValidator=org.apache.cassandra.db.marshal.BytesType,
    * keyValidator=org.apache.cassandra.db.marshal.CompositeType
    * (org.apache.cassandra.db.marshal.LongType,org.apache.cassandra.db.marshal.Int32Type),
    * minCompactionThreshold=4,maxCompactionThreshold=32, column_metadata={java.nio.HeapByteBuffer[pos=0 lim=3
    * cap=3]=ColumnDefinition{name=746773,
    * validator=org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal
    * .UTF8Type,org.apache.cassandra.db.marshal.UTF8Type), type=REGULAR, componentIndex=1, indexName=null,
    * indexType=null}, java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]=ColumnDefinition{name=74735f6964,
    * validator=org.apache.cassandra.db.marshal.LongType, type=PARTITION_KEY, componentIndex=0, indexName=null,
    * indexType=null}, java.nio.HeapByteBuffer[pos=0 lim=1 cap=1]=ColumnDefinition{name=74,
    * validator=org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.TimestampType),
    * type=CLUSTERING_KEY, componentIndex=0, indexName=null, indexType=null}, java.nio.HeapByteBuffer[pos=0 lim=2
    * cap=2]=ColumnDefinition{name=7972, validator=org.apache.cassandra.db.marshal.Int32Type, type=PARTITION_KEY,
    * componentIndex=1, indexName=null, indexType=null}, java.nio.HeapByteBuffer[pos=0 lim=1
    * cap=1]=ColumnDefinition{name=76, validator=org.apache.cassandra.db.marshal.DoubleType, type=REGULAR,
    * componentIndex=1, indexName=null, indexType=null}}, compactionStrategyClass=class
    * org.apache.cassandra.db.compaction
    * .LeveledCompactionStrategy,compactionStrategyOptions={},compressionOptions={sstable_compression
    * =org.apache.cassandra.io.compress.LZ4Compressor},
    * bloomFilterFpChance=0.1,memtable_flush_period_in_ms=0,caching=KEYS_ONLY,
    * defaultTimeToLive=0,speculative_retry=99.0
    * PERCENTILE,indexInterval=128,populateIoCacheOnFlush=false,droppedColumns={},triggers={},isDense=false] to cfIdMap
    */
    private static void prepareAndLoadSstables(final String keySpace,
    final List<Long> tsIds,
    final File outputDir,
    String dbHost,
    String tenantId,
    String name,
    Optional<Integer> stage,
    String cassandraSeed,
    int numOfWriters,
    boolean unsorted) throws Exception {
    Stopwatch sw = Stopwatch.createStarted();

    int writers = numOfWriters;
    int readers = numOfWriters * 2;
    CountDownLatch doneSignal = new CountDownLatch(readers);
    final ArrayBlockingQueue<SamplesWithId> queue = new ArrayBlockingQueue<SamplesWithId>(writers * 2);
    int chunkPerEachReader = (int) (1.0 * tsIds.size() / readers + 0.5);
    List<List<Long>> partitions = Lists.partition(tsIds, chunkPerEachReader);
    ExecutorService pool = Executors.newFixedThreadPool(writers + readers);

    List<Future<?>> readerFutures = Lists.newArrayList();
    for (int i = 0; i < readers; i++) {
    readerFutures.add(pool.submit(new TssReader(i, queue, partitions.get(i), dbHost, keySpace, tenantId,
    doneSignal)));
    }
    AtomicInteger c = new AtomicInteger(1);
    List<Future<File>> writerFutures = Lists.newArrayList();
    for (int i = 0; i < writers; i++) {
    if (unsorted) {
    writerFutures.add(pool.submit(new TssUnsortedWriter(i, keySpace, queue, outputDir, tsIds.size(), c,
    doneSignal, stage, cassandraSeed)));
    } else {
    writerFutures.add(pool.submit(new TssCqlWriter(i, keySpace, queue, outputDir, tsIds.size(), c,
    doneSignal, stage, cassandraSeed)));
    }
    }
    for (Future<?> rf : readerFutures) {
    rf.get();
    }
    List<File> dirs = Lists.newArrayList();
    for (Future<File> wf : writerFutures) {
    dirs.add(wf.get());
    }

    // sstable writers work with 2.1.2, with 2.0.11 or 2.0.12 I get ConcurrentModificationException
    // but they produce diff format of sstables that can't be streamed into cassandra
    // if (!stage.isPresent() || stage.get() == 4) {
    // for (File dir : dirs) {
    // logger.info(name + ": Loading into cassandra " + cassandraSeed + " " + dir);
    // load(cassandraSeed, dir);
    // }
    // }

    logger.info(name + ": Done preparing in " + sw.elapsed(TimeUnit.MINUTES) + " mins");
    }

    private static void initCleanup(File outputDir) {
    Lists.newArrayList(transform(Files.fileTreeTraverser().children(outputDir), deleteFile));
    }

    static class ExternalClient extends SSTableLoader.Client {
    private final Map<String, CFMetaData> knownCfs = new HashMap<>();
    private final Set<InetAddress> hosts;
    private final int rpcPort;
    private final String user;
    private final String passwd;
    private final ITransportFactory transportFactory;
    private final int storagePort;
    private final int sslStoragePort;
    private final EncryptionOptions.ServerEncryptionOptions serverEncOptions;

    public ExternalClient(Set<InetAddress> hosts,
    int port,
    String user,
    String passwd,
    ITransportFactory transportFactory,
    int storagePort,
    int sslStoragePort,
    EncryptionOptions.ServerEncryptionOptions serverEncryptionOptions) {
    super();
    this.hosts = hosts;
    this.rpcPort = port;
    this.user = user;
    this.passwd = passwd;
    this.transportFactory = transportFactory;
    this.storagePort = storagePort;
    this.sslStoragePort = sslStoragePort;
    this.serverEncOptions = serverEncryptionOptions;
    }

    @Override
    public void init(String keyspace) {
    Iterator<InetAddress> hostiter = hosts.iterator();
    while (hostiter.hasNext()) {
    try {
    // Query endpoint to ranges map and schemas from thrift
    InetAddress host = hostiter.next();
    Cassandra.Client client =
    createThriftClient(host.getHostAddress(), rpcPort, this.user, this.passwd,
    this.transportFactory);

    setPartitioner(client.describe_partitioner());
    Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();

    for (TokenRange tr : client.describe_ring(keyspace)) {
    Range<Token> range =
    new Range<>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token),
    getPartitioner());
    for (String ep : tr.endpoints) {
    addRangeForEndpoint(range, InetAddress.getByName(ep));
    }
    }

    String query =
    String.format("SELECT * FROM %s.%s WHERE keyspace_name = '%s'", Keyspace.SYSTEM_KS,
    SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, keyspace);
    CqlResult result =
    client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE,
    ConsistencyLevel.ONE);
    for (CqlRow row : result.rows) {
    CFMetaData metadata = CFMetaData.fromThriftCqlRow(row);
    knownCfs.put(metadata.cfName, metadata);
    }
    break;
    } catch (Exception e) {
    if (!hostiter.hasNext()) throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
    }
    }
    }

    @Override
    public StreamConnectionFactory getConnectionFactory() {
    return new BulkLoadConnectionFactory(storagePort, sslStoragePort, serverEncOptions, false);
    }

    @Override
    public CFMetaData getCFMetaData(String keyspace, String cfName) {
    return knownCfs.get(cfName);
    }

    private static Cassandra.Client createThriftClient(String host,
    int port,
    String user,
    String passwd,
    ITransportFactory transportFactory) throws Exception {
    TTransport trans = transportFactory.openTransport(host, port);
    TProtocol protocol = new TBinaryProtocol(trans);
    Cassandra.Client client = new Cassandra.Client(protocol);
    if (user != null && passwd != null) {
    Map<String, String> credentials = new HashMap<>();
    credentials.put(IAuthenticator.USERNAME_KEY, user);
    credentials.put(IAuthenticator.PASSWORD_KEY, passwd);
    AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials);
    client.login(authenticationRequest);
    }
    return client;
    }
    }

    // Return true when everything is at 100%
    static class ProgressIndicator implements StreamEventHandler {
    private final Map<InetAddress, SessionInfo> sessionsByHost = new ConcurrentHashMap<>();
    private final Map<InetAddress, Set<ProgressInfo>> progressByHost = new ConcurrentHashMap<>();

    private long start;
    private long lastProgress;
    private long lastTime;

    public ProgressIndicator() {
    start = lastTime = System.nanoTime();
    }

    public void onSuccess(StreamState finalState) {}

    public void onFailure(Throwable t) {}

    public void handleStreamEvent(StreamEvent event) {
    if (event.eventType == StreamEvent.Type.STREAM_PREPARED) {
    SessionInfo session = ((StreamEvent.SessionPreparedEvent) event).session;
    sessionsByHost.put(session.peer, session);
    } else if (event.eventType == StreamEvent.Type.FILE_PROGRESS) {
    ProgressInfo progressInfo = ((StreamEvent.ProgressEvent) event).progress;

    // update progress
    Set<ProgressInfo> progresses = progressByHost.get(progressInfo.peer);
    if (progresses == null) {
    progresses = Sets.newSetFromMap(new ConcurrentHashMap<ProgressInfo, Boolean>());
    progressByHost.put(progressInfo.peer, progresses);
    }
    if (progresses.contains(progressInfo)) progresses.remove(progressInfo);
    progresses.add(progressInfo);

    StringBuilder sb = new StringBuilder();
    sb.append("\rprogress: ");

    long totalProgress = 0;
    long totalSize = 0;
    for (Map.Entry<InetAddress, Set<ProgressInfo>> entry : progressByHost.entrySet()) {
    SessionInfo session = sessionsByHost.get(entry.getKey());

    long size = session.getTotalSizeToSend();
    long current = 0;
    int completed = 0;
    for (ProgressInfo progress : entry.getValue()) {
    if (progress.currentBytes == progress.totalBytes) completed++;
    current += progress.currentBytes;
    }
    totalProgress += current;
    totalSize += size;
    sb.append("[").append(entry.getKey());
    sb.append(" ").append(completed).append("/").append(session.getTotalFilesToSend());
    sb.append(" (").append(size == 0 ? 100L : current * 100L / size).append("%)] ");
    }
    long time = System.nanoTime();
    long deltaTime = TimeUnit.NANOSECONDS.toMillis(time - lastTime);
    lastTime = time;
    long deltaProgress = totalProgress - lastProgress;
    lastProgress = totalProgress;

    sb.append("[total: ").append(totalSize == 0 ? 100L : totalProgress * 100L / totalSize).append("% - ");
    sb.append(mbPerSec(deltaProgress, deltaTime)).append("MB/s");
    sb.append(" (avg: ").append(mbPerSec(totalProgress, TimeUnit.NANOSECONDS.toMillis(time - start))).append("MB/s)]");

    System.out.print(sb.toString());
    }
    }

    private int mbPerSec(long bytes, long timeInMs) {
    double bytesPerMs = ((double) bytes) / timeInMs;
    return (int) ((bytesPerMs * 1000) / (1024 * 2024));
    }
    }

    private static void load(String cassandraSeed, File outputDir) throws UnknownHostException {
    // if from localhost might be used as
    // Runtime.getRuntime().exec("/usr/bin/sstableloader -d 127.0.0.1 " + outputDir + "/" +keySpace + "/" +
    // TABLE_NAME);
    // not sure if it can be run concurrently withing same jvm
    ITransportFactory transportFactory = new TFramedTransportFactory();
    EncryptionOptions.ServerEncryptionOptions serverEncOptions = new EncryptionOptions.ServerEncryptionOptions();
    OutputHandler handler = new OutputHandler.SystemOutput(true, false);
    SSTableLoader loader =
    new SSTableLoader(outputDir, new ExternalClient(Sets.newHashSet(InetAddress.getByName(cassandraSeed)),
    9160, null, null, transportFactory, 7000, 7001, serverEncOptions), handler);
    DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(0);
    StreamResultFuture future = null;
    try {
    Set<InetAddress> ignore = Sets.newHashSet();
    future = loader.stream(ignore, new ProgressIndicator());
    } catch (Exception e) {
    logger.error("Problem in streaming", e);
    }

    logger.info(String.format("Streaming session ID: %s", future.planId));

    try {
    future.get();
    } catch (Exception e) {
    logger.error("Streaming to the following hosts failed:");
    logger.error(loader.getFailedHosts());
    logger.error(e);
    }
    }

    private static void cleanup(File outputDir) {
    // due to some bug in cassandra loader see
    // http://mail-archives.apache.org/mod_mbox/incubator-cassandra-user/201412.mbox/%3CCALAMvO_7gU1rOKXO0X33rE3fEnw8bKyY_0mBRU0A-2BVAnesiA@mail.gmail.com%3E
    Iterable<File> children = Files.fileTreeTraverser().children(outputDir);
    Iterable<File> toBeDeleted = filter(children, new Predicate<File>() {
    @Override
    public boolean apply(File input) {
    String nameWithoutExtension = Files.getNameWithoutExtension(input.getName());
    return false;//nameWithoutExtension.contains("Statistics");

    /*
    * || nameWithoutExtension.contains("Digest") || nameWithoutExtension.contains("TOC") ||
    * nameWithoutExtension.contains("CRC") || nameWithoutExtension.contains("CompressionInfo");
    */
    }
    });

    Lists.newArrayList(transform(toBeDeleted, deleteFile));

    }

    }