Skip to content

Instantly share code, notes, and snippets.

@dkourilov
Created April 13, 2016 16:03
Show Gist options
  • Save dkourilov/1a3d3347136b0f850647d92f23b49405 to your computer and use it in GitHub Desktop.
Save dkourilov/1a3d3347136b0f850647d92f23b49405 to your computer and use it in GitHub Desktop.

Revisions

  1. dkourilov created this gist Apr 13, 2016.
    284 changes: 284 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,284 @@
    #include "program_base.hpp"
    #include "hpds/KDouble.hpp"
    #include "hash.hpp"
    #include "mmap/mmap.hpp"
    #include "hpds/pstring.hpp"
    #include "hpds/pstringops.hpp"
    #define ELEM_SEPARATOR "\n\t\t\t"

    namespace dbtoaster {

    /* Definitions of auxiliary maps for storing materialized views. */
    struct TAXI_entry {
    long TAXI_ID; long TAXI_TYPE; DOUBLE_TYPE TAXI_PRICE; DOUBLE_TYPE TAXI_X; DOUBLE_TYPE TAXI_Y; long __av; TAXI_entry* nxt; TAXI_entry* prv;
    explicit TAXI_entry() : nxt(nullptr), prv(nullptr) { /*TAXI_ID = 0L; TAXI_TYPE = 0L; TAXI_PRICE = 0.0; TAXI_X = 0.0; TAXI_Y = 0.0; __av = 0L; */ }
    explicit TAXI_entry(const long c0, const long c1, const DOUBLE_TYPE c2, const DOUBLE_TYPE c3, const DOUBLE_TYPE c4, const long c5) { TAXI_ID = c0; TAXI_TYPE = c1; TAXI_PRICE = c2; TAXI_X = c3; TAXI_Y = c4; __av = c5; }
    TAXI_entry(const TAXI_entry& other) : TAXI_ID( other.TAXI_ID ), TAXI_TYPE( other.TAXI_TYPE ), TAXI_PRICE( other.TAXI_PRICE ), TAXI_X( other.TAXI_X ), TAXI_Y( other.TAXI_Y ), __av( other.__av ), nxt( nullptr ), prv( nullptr ) {}
    FORCE_INLINE TAXI_entry& modify(const long c0, const long c1, const DOUBLE_TYPE c2, const DOUBLE_TYPE c3, const DOUBLE_TYPE c4) { TAXI_ID = c0; TAXI_TYPE = c1; TAXI_PRICE = c2; TAXI_X = c3; TAXI_Y = c4; return *this; }
    FORCE_INLINE TAXI_entry& modify1(const long c1) { TAXI_TYPE = c1; return *this; }
    template<class Archive>
    void serialize(Archive& ar, const unsigned int version) const
    {
    ar << ELEM_SEPARATOR;
    DBT_SERIALIZATION_NVP(ar, TAXI_ID);
    ar << ELEM_SEPARATOR;
    DBT_SERIALIZATION_NVP(ar, TAXI_TYPE);
    ar << ELEM_SEPARATOR;
    DBT_SERIALIZATION_NVP(ar, TAXI_PRICE);
    ar << ELEM_SEPARATOR;
    DBT_SERIALIZATION_NVP(ar, TAXI_X);
    ar << ELEM_SEPARATOR;
    DBT_SERIALIZATION_NVP(ar, TAXI_Y);
    ar << ELEM_SEPARATOR;
    DBT_SERIALIZATION_NVP(ar, __av);
    }
    };
    struct TAXI_mapkey01234_idxfn {
    FORCE_INLINE static size_t hash(const TAXI_entry& e) {
    size_t h = 0;
    hash_combine(h, e.TAXI_ID);
    hash_combine(h, e.TAXI_TYPE);
    hash_combine(h, e.TAXI_PRICE);
    hash_combine(h, e.TAXI_X);
    hash_combine(h, e.TAXI_Y);
    return h;
    }
    FORCE_INLINE static bool equals(const TAXI_entry& x, const TAXI_entry& y) {
    return x.TAXI_ID == y.TAXI_ID && x.TAXI_TYPE == y.TAXI_TYPE && x.TAXI_PRICE == y.TAXI_PRICE && x.TAXI_X == y.TAXI_X && x.TAXI_Y == y.TAXI_Y;
    }
    };

    struct TAXI_mapkey1_idxfn {
    FORCE_INLINE static size_t hash(const TAXI_entry& e) {
    size_t h = 0;
    hash_combine(h, e.TAXI_TYPE);
    return h;
    }
    FORCE_INLINE static bool equals(const TAXI_entry& x, const TAXI_entry& y) {
    return x.TAXI_TYPE == y.TAXI_TYPE;
    }
    };

    typedef MultiHashMap<TAXI_entry,long,
    HashIndex<TAXI_entry,long,TAXI_mapkey01234_idxfn,true>,
    HashIndex<TAXI_entry,long,TAXI_mapkey1_idxfn,false>
    > TAXI_map;
    typedef HashIndex<TAXI_entry,long,TAXI_mapkey01234_idxfn,true> HashIndex_TAXI_map_01234;
    typedef HashIndex<TAXI_entry,long,TAXI_mapkey1_idxfn,false> HashIndex_TAXI_map_1;

    struct QUERY_1___SQL_SUM_AGGREGATE_1_entry {
    long R1_TYPE; DOUBLE_TYPE __av; QUERY_1___SQL_SUM_AGGREGATE_1_entry* nxt; QUERY_1___SQL_SUM_AGGREGATE_1_entry* prv;
    explicit QUERY_1___SQL_SUM_AGGREGATE_1_entry() : nxt(nullptr), prv(nullptr) { /*R1_TYPE = 0L; __av = 0.0; */ }
    explicit QUERY_1___SQL_SUM_AGGREGATE_1_entry(const long c0, const DOUBLE_TYPE c1) { R1_TYPE = c0; __av = c1; }
    QUERY_1___SQL_SUM_AGGREGATE_1_entry(const QUERY_1___SQL_SUM_AGGREGATE_1_entry& other) : R1_TYPE( other.R1_TYPE ), __av( other.__av ), nxt( nullptr ), prv( nullptr ) {}
    FORCE_INLINE QUERY_1___SQL_SUM_AGGREGATE_1_entry& modify(const long c0) { R1_TYPE = c0; return *this; }
    template<class Archive>
    void serialize(Archive& ar, const unsigned int version) const
    {
    ar << ELEM_SEPARATOR;
    DBT_SERIALIZATION_NVP(ar, R1_TYPE);
    ar << ELEM_SEPARATOR;
    DBT_SERIALIZATION_NVP(ar, __av);
    }
    };
    struct QUERY_1___SQL_SUM_AGGREGATE_1_mapkey0_idxfn {
    FORCE_INLINE static size_t hash(const QUERY_1___SQL_SUM_AGGREGATE_1_entry& e) {
    size_t h = 0;
    hash_combine(h, e.R1_TYPE);
    return h;
    }
    FORCE_INLINE static bool equals(const QUERY_1___SQL_SUM_AGGREGATE_1_entry& x, const QUERY_1___SQL_SUM_AGGREGATE_1_entry& y) {
    return x.R1_TYPE == y.R1_TYPE;
    }
    };

    typedef MultiHashMap<QUERY_1___SQL_SUM_AGGREGATE_1_entry,DOUBLE_TYPE,
    HashIndex<QUERY_1___SQL_SUM_AGGREGATE_1_entry,DOUBLE_TYPE,QUERY_1___SQL_SUM_AGGREGATE_1_mapkey0_idxfn,true>
    > QUERY_1___SQL_SUM_AGGREGATE_1_map;
    typedef HashIndex<QUERY_1___SQL_SUM_AGGREGATE_1_entry,DOUBLE_TYPE,QUERY_1___SQL_SUM_AGGREGATE_1_mapkey0_idxfn,true> HashIndex_QUERY_1___SQL_SUM_AGGREGATE_1_map_0;



    /* Type definition providing a way to access the results of the sql program */
    struct tlq_t{
    struct timeval t0,t; long tT,tN,tS;
    tlq_t(): tN(0), tS(0), QUERY_2___SQL_COUNT_AGGREGATE_2(0L) { gettimeofday(&t0,NULL); }

    /* Serialization Code */
    template<class Archive>
    void serialize(Archive& ar, const unsigned int version) const {

    ar << "\n";
    const QUERY_1___SQL_SUM_AGGREGATE_1_map& _QUERY_1___SQL_SUM_AGGREGATE_1 = get_QUERY_1___SQL_SUM_AGGREGATE_1();
    dbtoaster::serialize_nvp_tabbed(ar, STRING(QUERY_1___SQL_SUM_AGGREGATE_1), _QUERY_1___SQL_SUM_AGGREGATE_1, "\t");
    ar << "\n";
    const long _QUERY_2___SQL_COUNT_AGGREGATE_2 = get_QUERY_2___SQL_COUNT_AGGREGATE_2();
    dbtoaster::serialize_nvp_tabbed(ar, STRING(QUERY_2___SQL_COUNT_AGGREGATE_2), _QUERY_2___SQL_COUNT_AGGREGATE_2, "\t");

    }

    /* Functions returning / computing the results of top level queries */
    const QUERY_1___SQL_SUM_AGGREGATE_1_map& get_QUERY_1___SQL_SUM_AGGREGATE_1() const {
    return QUERY_1___SQL_SUM_AGGREGATE_1;

    }
    const long get_QUERY_2___SQL_COUNT_AGGREGATE_2() const {
    return QUERY_2___SQL_COUNT_AGGREGATE_2;

    }

    protected:

    /* Data structures used for storing / computing top level queries */
    QUERY_1___SQL_SUM_AGGREGATE_1_map QUERY_1___SQL_SUM_AGGREGATE_1;
    long QUERY_2___SQL_COUNT_AGGREGATE_2;

    };

    /* Type definition providing a way to incrementally maintain the results of the sql program */
    struct data_t : tlq_t{
    data_t(): tlq_t() {

    }

    #ifdef DBT_PROFILE
    std::shared_ptr<dbtoaster::statistics::trigger_exec_stats> exec_stats;
    std::shared_ptr<dbtoaster::statistics::trigger_exec_stats> ivc_stats;
    #endif

    /* Registering relations and trigger functions */
    ProgramBase* program_base;
    void register_data(ProgramBase& pb) {
    program_base = &pb;

    pb.add_map<TAXI_map>( "TAXI", TAXI );
    pb.add_map<QUERY_1___SQL_SUM_AGGREGATE_1_map>( "QUERY_1___SQL_SUM_AGGREGATE_1", QUERY_1___SQL_SUM_AGGREGATE_1 );
    pb.add_map<long>( "QUERY_2___SQL_COUNT_AGGREGATE_2", QUERY_2___SQL_COUNT_AGGREGATE_2 );

    pb.add_relation("TAXI", true);

    pb.add_trigger("TAXI", insert_tuple, std::bind(&data_t::unwrap_insert_TAXI, this, std::placeholders::_1));



    #ifdef DBT_PROFILE
    exec_stats = pb.exec_stats;
    ivc_stats = pb.ivc_stats;
    exec_stats->register_probe(0, "system_ready_s0");
    exec_stats->register_probe(1, "system_ready_s1");
    #endif // DBT_PROFILE

    }

    /* Trigger functions for table relations */
    void on_insert_TAXI(const long taxi_id, const long taxi_type, const DOUBLE_TYPE taxi_price, const DOUBLE_TYPE taxi_x, const DOUBLE_TYPE taxi_y) {
    TAXI_entry e(taxi_id, taxi_type, taxi_price, taxi_x, taxi_y, 1L);
    TAXI.addOrDelOnZero(e,1L);
    }

    void unwrap_insert_TAXI(const event_args_t& ea) {
    on_insert_TAXI(*(reinterpret_cast<long*>(ea[0].get())), *(reinterpret_cast<long*>(ea[1].get())), *(reinterpret_cast<DOUBLE_TYPE*>(ea[2].get())), *(reinterpret_cast<DOUBLE_TYPE*>(ea[3].get())), *(reinterpret_cast<DOUBLE_TYPE*>(ea[4].get())));
    }



    /* Trigger functions for stream relations */
    void on_system_ready_event() {
    BEGIN_TRIGGER(exec_stats,"system_ready_event")
    BEGIN_TRIGGER(ivc_stats,"system_ready_event")
    { //
    QUERY_1___SQL_SUM_AGGREGATE_1.clear();
    { //foreach
    TAXI_entry* e1 = TAXI.head;
    while(e1){
    long r1_id = e1->TAXI_ID;
    long r1_type = e1->TAXI_TYPE;
    DOUBLE_TYPE r1_price = e1->TAXI_PRICE;
    DOUBLE_TYPE r1_x = e1->TAXI_X;
    DOUBLE_TYPE r1_y = e1->TAXI_Y;
    long v1 = e1->__av;
    QUERY_1___SQL_SUM_AGGREGATE_1.addOrDelOnZero(se1.modify(r1_type),(v1 * r1_price));
    e1 = e1->nxt;
    }
    }
    long agg1 = 0L;
    long l1 = 2L;
    { //slice
    const HASH_RES_t h1 = TAXI_mapkey1_idxfn::hash(se3.modify1(l1));
    const HashIndex_TAXI_map_1* i2 = static_cast<HashIndex_TAXI_map_1*>(TAXI.index[1]);
    HashIndex_TAXI_map_1::IdxNode* n2 = &(i2->buckets_[h1 % i2->size_]);
    TAXI_entry* e2;
    do if ((e2=n2->obj) && h1 == n2->hash && TAXI_mapkey1_idxfn::equals(se3, *e2)) {
    long taxi_id = e2->TAXI_ID;
    DOUBLE_TYPE taxi_price = e2->TAXI_PRICE;
    DOUBLE_TYPE taxi_x = e2->TAXI_X;
    DOUBLE_TYPE taxi_y = e2->TAXI_Y;
    long v2 = e2->__av;
    agg1 += v2;
    } while ((n2=n2->nxt));
    }
    QUERY_2___SQL_COUNT_AGGREGATE_2 = agg1;
    }
    END_TRIGGER(exec_stats,"system_ready_event")
    END_TRIGGER(ivc_stats,"system_ready_event")
    }

    private:

    /* Sample entries for avoiding recreation of temporary objects */
    QUERY_1___SQL_SUM_AGGREGATE_1_entry se1;
    TAXI_entry se3;

    /* Data structures used for storing materialized views */
    TAXI_map TAXI;



    };

    /* Type definition providing a way to execute the sql program */
    class Program : public ProgramBase
    {
    public:
    Program(int argc = 0, char* argv[] = 0) : ProgramBase(argc,argv) {
    data.register_data(*this);

    /* Specifying data sources */

    pair<string,string> source1_adaptor_params[] = { make_pair("fields",","), make_pair("schema","long,long,double,double,double") };
    std::shared_ptr<csv_adaptor> source1_adaptor(new csv_adaptor(get_relation_id("TAXI"),2,source1_adaptor_params));
    frame_descriptor source1_fd("\n");
    std::shared_ptr<dbt_file_source> source1_file(new dbt_file_source("test.csv",source1_fd,source1_adaptor));
    add_source(source1_file, true);

    }

    /* Imports data for static tables and performs view initialization based on it. */
    void init() {
    //P0_PLACE_HOLDER
    table_multiplexer.init_source(run_opts->batch_size, run_opts->parallel, true);
    stream_multiplexer.init_source(run_opts->batch_size, run_opts->parallel, false);
    process_tables();
    data.on_system_ready_event();
    //P2_PLACE_HOLDER
    }

    /* Saves a snapshot of the data required to obtain the results of top level queries. */
    snapshot_t take_snapshot(){
    tlq_t* d = new tlq_t((tlq_t&)data);
    //if (d->tS==0) { gettimeofday(&(d->t),NULL); d->tT=((d->t).tv_sec-(d->t0).tv_sec)*1000000L+((d->t).tv_usec-(d->t0).tv_usec); } printf("SAMPLE=standard,%ld,%ld,%ld\n",d->tT,d->tN,d->tS);
    return snapshot_t( d );
    }

    protected:
    data_t data;
    };
    class Test : public Program
    {
    public:
    Test(int argc = 0, char* argv[] = 0) : Program(argc,argv) {
    }
    };

    }
    #include "main.inc.cpp"