Created
April 13, 2016 16:03
-
-
Save dkourilov/1a3d3347136b0f850647d92f23b49405 to your computer and use it in GitHub Desktop.
test.cpp (dbtoaster)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #include "program_base.hpp" | |
| #include "hpds/KDouble.hpp" | |
| #include "hash.hpp" | |
| #include "mmap/mmap.hpp" | |
| #include "hpds/pstring.hpp" | |
| #include "hpds/pstringops.hpp" | |
| #define ELEM_SEPARATOR "\n\t\t\t" | |
| namespace dbtoaster { | |
| /* Definitions of auxiliary maps for storing materialized views. */ | |
| struct TAXI_entry { | |
| long TAXI_ID; long TAXI_TYPE; DOUBLE_TYPE TAXI_PRICE; DOUBLE_TYPE TAXI_X; DOUBLE_TYPE TAXI_Y; long __av; TAXI_entry* nxt; TAXI_entry* prv; | |
| explicit TAXI_entry() : nxt(nullptr), prv(nullptr) { /*TAXI_ID = 0L; TAXI_TYPE = 0L; TAXI_PRICE = 0.0; TAXI_X = 0.0; TAXI_Y = 0.0; __av = 0L; */ } | |
| explicit TAXI_entry(const long c0, const long c1, const DOUBLE_TYPE c2, const DOUBLE_TYPE c3, const DOUBLE_TYPE c4, const long c5) { TAXI_ID = c0; TAXI_TYPE = c1; TAXI_PRICE = c2; TAXI_X = c3; TAXI_Y = c4; __av = c5; } | |
| TAXI_entry(const TAXI_entry& other) : TAXI_ID( other.TAXI_ID ), TAXI_TYPE( other.TAXI_TYPE ), TAXI_PRICE( other.TAXI_PRICE ), TAXI_X( other.TAXI_X ), TAXI_Y( other.TAXI_Y ), __av( other.__av ), nxt( nullptr ), prv( nullptr ) {} | |
| FORCE_INLINE TAXI_entry& modify(const long c0, const long c1, const DOUBLE_TYPE c2, const DOUBLE_TYPE c3, const DOUBLE_TYPE c4) { TAXI_ID = c0; TAXI_TYPE = c1; TAXI_PRICE = c2; TAXI_X = c3; TAXI_Y = c4; return *this; } | |
| FORCE_INLINE TAXI_entry& modify1(const long c1) { TAXI_TYPE = c1; return *this; } | |
| template<class Archive> | |
| void serialize(Archive& ar, const unsigned int version) const | |
| { | |
| ar << ELEM_SEPARATOR; | |
| DBT_SERIALIZATION_NVP(ar, TAXI_ID); | |
| ar << ELEM_SEPARATOR; | |
| DBT_SERIALIZATION_NVP(ar, TAXI_TYPE); | |
| ar << ELEM_SEPARATOR; | |
| DBT_SERIALIZATION_NVP(ar, TAXI_PRICE); | |
| ar << ELEM_SEPARATOR; | |
| DBT_SERIALIZATION_NVP(ar, TAXI_X); | |
| ar << ELEM_SEPARATOR; | |
| DBT_SERIALIZATION_NVP(ar, TAXI_Y); | |
| ar << ELEM_SEPARATOR; | |
| DBT_SERIALIZATION_NVP(ar, __av); | |
| } | |
| }; | |
| struct TAXI_mapkey01234_idxfn { | |
| FORCE_INLINE static size_t hash(const TAXI_entry& e) { | |
| size_t h = 0; | |
| hash_combine(h, e.TAXI_ID); | |
| hash_combine(h, e.TAXI_TYPE); | |
| hash_combine(h, e.TAXI_PRICE); | |
| hash_combine(h, e.TAXI_X); | |
| hash_combine(h, e.TAXI_Y); | |
| return h; | |
| } | |
| FORCE_INLINE static bool equals(const TAXI_entry& x, const TAXI_entry& y) { | |
| return x.TAXI_ID == y.TAXI_ID && x.TAXI_TYPE == y.TAXI_TYPE && x.TAXI_PRICE == y.TAXI_PRICE && x.TAXI_X == y.TAXI_X && x.TAXI_Y == y.TAXI_Y; | |
| } | |
| }; | |
| struct TAXI_mapkey1_idxfn { | |
| FORCE_INLINE static size_t hash(const TAXI_entry& e) { | |
| size_t h = 0; | |
| hash_combine(h, e.TAXI_TYPE); | |
| return h; | |
| } | |
| FORCE_INLINE static bool equals(const TAXI_entry& x, const TAXI_entry& y) { | |
| return x.TAXI_TYPE == y.TAXI_TYPE; | |
| } | |
| }; | |
| typedef MultiHashMap<TAXI_entry,long, | |
| HashIndex<TAXI_entry,long,TAXI_mapkey01234_idxfn,true>, | |
| HashIndex<TAXI_entry,long,TAXI_mapkey1_idxfn,false> | |
| > TAXI_map; | |
| typedef HashIndex<TAXI_entry,long,TAXI_mapkey01234_idxfn,true> HashIndex_TAXI_map_01234; | |
| typedef HashIndex<TAXI_entry,long,TAXI_mapkey1_idxfn,false> HashIndex_TAXI_map_1; | |
| struct QUERY_1___SQL_SUM_AGGREGATE_1_entry { | |
| long R1_TYPE; DOUBLE_TYPE __av; QUERY_1___SQL_SUM_AGGREGATE_1_entry* nxt; QUERY_1___SQL_SUM_AGGREGATE_1_entry* prv; | |
| explicit QUERY_1___SQL_SUM_AGGREGATE_1_entry() : nxt(nullptr), prv(nullptr) { /*R1_TYPE = 0L; __av = 0.0; */ } | |
| explicit QUERY_1___SQL_SUM_AGGREGATE_1_entry(const long c0, const DOUBLE_TYPE c1) { R1_TYPE = c0; __av = c1; } | |
| QUERY_1___SQL_SUM_AGGREGATE_1_entry(const QUERY_1___SQL_SUM_AGGREGATE_1_entry& other) : R1_TYPE( other.R1_TYPE ), __av( other.__av ), nxt( nullptr ), prv( nullptr ) {} | |
| FORCE_INLINE QUERY_1___SQL_SUM_AGGREGATE_1_entry& modify(const long c0) { R1_TYPE = c0; return *this; } | |
| template<class Archive> | |
| void serialize(Archive& ar, const unsigned int version) const | |
| { | |
| ar << ELEM_SEPARATOR; | |
| DBT_SERIALIZATION_NVP(ar, R1_TYPE); | |
| ar << ELEM_SEPARATOR; | |
| DBT_SERIALIZATION_NVP(ar, __av); | |
| } | |
| }; | |
| struct QUERY_1___SQL_SUM_AGGREGATE_1_mapkey0_idxfn { | |
| FORCE_INLINE static size_t hash(const QUERY_1___SQL_SUM_AGGREGATE_1_entry& e) { | |
| size_t h = 0; | |
| hash_combine(h, e.R1_TYPE); | |
| return h; | |
| } | |
| FORCE_INLINE static bool equals(const QUERY_1___SQL_SUM_AGGREGATE_1_entry& x, const QUERY_1___SQL_SUM_AGGREGATE_1_entry& y) { | |
| return x.R1_TYPE == y.R1_TYPE; | |
| } | |
| }; | |
| typedef MultiHashMap<QUERY_1___SQL_SUM_AGGREGATE_1_entry,DOUBLE_TYPE, | |
| HashIndex<QUERY_1___SQL_SUM_AGGREGATE_1_entry,DOUBLE_TYPE,QUERY_1___SQL_SUM_AGGREGATE_1_mapkey0_idxfn,true> | |
| > QUERY_1___SQL_SUM_AGGREGATE_1_map; | |
| typedef HashIndex<QUERY_1___SQL_SUM_AGGREGATE_1_entry,DOUBLE_TYPE,QUERY_1___SQL_SUM_AGGREGATE_1_mapkey0_idxfn,true> HashIndex_QUERY_1___SQL_SUM_AGGREGATE_1_map_0; | |
| /* Type definition providing a way to access the results of the sql program */ | |
| struct tlq_t{ | |
| struct timeval t0,t; long tT,tN,tS; | |
| tlq_t(): tN(0), tS(0), QUERY_2___SQL_COUNT_AGGREGATE_2(0L) { gettimeofday(&t0,NULL); } | |
| /* Serialization Code */ | |
| template<class Archive> | |
| void serialize(Archive& ar, const unsigned int version) const { | |
| ar << "\n"; | |
| const QUERY_1___SQL_SUM_AGGREGATE_1_map& _QUERY_1___SQL_SUM_AGGREGATE_1 = get_QUERY_1___SQL_SUM_AGGREGATE_1(); | |
| dbtoaster::serialize_nvp_tabbed(ar, STRING(QUERY_1___SQL_SUM_AGGREGATE_1), _QUERY_1___SQL_SUM_AGGREGATE_1, "\t"); | |
| ar << "\n"; | |
| const long _QUERY_2___SQL_COUNT_AGGREGATE_2 = get_QUERY_2___SQL_COUNT_AGGREGATE_2(); | |
| dbtoaster::serialize_nvp_tabbed(ar, STRING(QUERY_2___SQL_COUNT_AGGREGATE_2), _QUERY_2___SQL_COUNT_AGGREGATE_2, "\t"); | |
| } | |
| /* Functions returning / computing the results of top level queries */ | |
| const QUERY_1___SQL_SUM_AGGREGATE_1_map& get_QUERY_1___SQL_SUM_AGGREGATE_1() const { | |
| return QUERY_1___SQL_SUM_AGGREGATE_1; | |
| } | |
| const long get_QUERY_2___SQL_COUNT_AGGREGATE_2() const { | |
| return QUERY_2___SQL_COUNT_AGGREGATE_2; | |
| } | |
| protected: | |
| /* Data structures used for storing / computing top level queries */ | |
| QUERY_1___SQL_SUM_AGGREGATE_1_map QUERY_1___SQL_SUM_AGGREGATE_1; | |
| long QUERY_2___SQL_COUNT_AGGREGATE_2; | |
| }; | |
| /* Type definition providing a way to incrementally maintain the results of the sql program */ | |
| struct data_t : tlq_t{ | |
| data_t(): tlq_t() { | |
| } | |
| #ifdef DBT_PROFILE | |
| std::shared_ptr<dbtoaster::statistics::trigger_exec_stats> exec_stats; | |
| std::shared_ptr<dbtoaster::statistics::trigger_exec_stats> ivc_stats; | |
| #endif | |
| /* Registering relations and trigger functions */ | |
| ProgramBase* program_base; | |
| void register_data(ProgramBase& pb) { | |
| program_base = &pb; | |
| pb.add_map<TAXI_map>( "TAXI", TAXI ); | |
| pb.add_map<QUERY_1___SQL_SUM_AGGREGATE_1_map>( "QUERY_1___SQL_SUM_AGGREGATE_1", QUERY_1___SQL_SUM_AGGREGATE_1 ); | |
| pb.add_map<long>( "QUERY_2___SQL_COUNT_AGGREGATE_2", QUERY_2___SQL_COUNT_AGGREGATE_2 ); | |
| pb.add_relation("TAXI", true); | |
| pb.add_trigger("TAXI", insert_tuple, std::bind(&data_t::unwrap_insert_TAXI, this, std::placeholders::_1)); | |
| #ifdef DBT_PROFILE | |
| exec_stats = pb.exec_stats; | |
| ivc_stats = pb.ivc_stats; | |
| exec_stats->register_probe(0, "system_ready_s0"); | |
| exec_stats->register_probe(1, "system_ready_s1"); | |
| #endif // DBT_PROFILE | |
| } | |
| /* Trigger functions for table relations */ | |
| void on_insert_TAXI(const long taxi_id, const long taxi_type, const DOUBLE_TYPE taxi_price, const DOUBLE_TYPE taxi_x, const DOUBLE_TYPE taxi_y) { | |
| TAXI_entry e(taxi_id, taxi_type, taxi_price, taxi_x, taxi_y, 1L); | |
| TAXI.addOrDelOnZero(e,1L); | |
| } | |
| void unwrap_insert_TAXI(const event_args_t& ea) { | |
| on_insert_TAXI(*(reinterpret_cast<long*>(ea[0].get())), *(reinterpret_cast<long*>(ea[1].get())), *(reinterpret_cast<DOUBLE_TYPE*>(ea[2].get())), *(reinterpret_cast<DOUBLE_TYPE*>(ea[3].get())), *(reinterpret_cast<DOUBLE_TYPE*>(ea[4].get()))); | |
| } | |
| /* Trigger functions for stream relations */ | |
| void on_system_ready_event() { | |
| BEGIN_TRIGGER(exec_stats,"system_ready_event") | |
| BEGIN_TRIGGER(ivc_stats,"system_ready_event") | |
| { // | |
| QUERY_1___SQL_SUM_AGGREGATE_1.clear(); | |
| { //foreach | |
| TAXI_entry* e1 = TAXI.head; | |
| while(e1){ | |
| long r1_id = e1->TAXI_ID; | |
| long r1_type = e1->TAXI_TYPE; | |
| DOUBLE_TYPE r1_price = e1->TAXI_PRICE; | |
| DOUBLE_TYPE r1_x = e1->TAXI_X; | |
| DOUBLE_TYPE r1_y = e1->TAXI_Y; | |
| long v1 = e1->__av; | |
| QUERY_1___SQL_SUM_AGGREGATE_1.addOrDelOnZero(se1.modify(r1_type),(v1 * r1_price)); | |
| e1 = e1->nxt; | |
| } | |
| } | |
| long agg1 = 0L; | |
| long l1 = 2L; | |
| { //slice | |
| const HASH_RES_t h1 = TAXI_mapkey1_idxfn::hash(se3.modify1(l1)); | |
| const HashIndex_TAXI_map_1* i2 = static_cast<HashIndex_TAXI_map_1*>(TAXI.index[1]); | |
| HashIndex_TAXI_map_1::IdxNode* n2 = &(i2->buckets_[h1 % i2->size_]); | |
| TAXI_entry* e2; | |
| do if ((e2=n2->obj) && h1 == n2->hash && TAXI_mapkey1_idxfn::equals(se3, *e2)) { | |
| long taxi_id = e2->TAXI_ID; | |
| DOUBLE_TYPE taxi_price = e2->TAXI_PRICE; | |
| DOUBLE_TYPE taxi_x = e2->TAXI_X; | |
| DOUBLE_TYPE taxi_y = e2->TAXI_Y; | |
| long v2 = e2->__av; | |
| agg1 += v2; | |
| } while ((n2=n2->nxt)); | |
| } | |
| QUERY_2___SQL_COUNT_AGGREGATE_2 = agg1; | |
| } | |
| END_TRIGGER(exec_stats,"system_ready_event") | |
| END_TRIGGER(ivc_stats,"system_ready_event") | |
| } | |
| private: | |
| /* Sample entries for avoiding recreation of temporary objects */ | |
| QUERY_1___SQL_SUM_AGGREGATE_1_entry se1; | |
| TAXI_entry se3; | |
| /* Data structures used for storing materialized views */ | |
| TAXI_map TAXI; | |
| }; | |
| /* Type definition providing a way to execute the sql program */ | |
| class Program : public ProgramBase | |
| { | |
| public: | |
| Program(int argc = 0, char* argv[] = 0) : ProgramBase(argc,argv) { | |
| data.register_data(*this); | |
| /* Specifying data sources */ | |
| pair<string,string> source1_adaptor_params[] = { make_pair("fields",","), make_pair("schema","long,long,double,double,double") }; | |
| std::shared_ptr<csv_adaptor> source1_adaptor(new csv_adaptor(get_relation_id("TAXI"),2,source1_adaptor_params)); | |
| frame_descriptor source1_fd("\n"); | |
| std::shared_ptr<dbt_file_source> source1_file(new dbt_file_source("test.csv",source1_fd,source1_adaptor)); | |
| add_source(source1_file, true); | |
| } | |
| /* Imports data for static tables and performs view initialization based on it. */ | |
| void init() { | |
| //P0_PLACE_HOLDER | |
| table_multiplexer.init_source(run_opts->batch_size, run_opts->parallel, true); | |
| stream_multiplexer.init_source(run_opts->batch_size, run_opts->parallel, false); | |
| process_tables(); | |
| data.on_system_ready_event(); | |
| //P2_PLACE_HOLDER | |
| } | |
| /* Saves a snapshot of the data required to obtain the results of top level queries. */ | |
| snapshot_t take_snapshot(){ | |
| tlq_t* d = new tlq_t((tlq_t&)data); | |
| //if (d->tS==0) { gettimeofday(&(d->t),NULL); d->tT=((d->t).tv_sec-(d->t0).tv_sec)*1000000L+((d->t).tv_usec-(d->t0).tv_usec); } printf("SAMPLE=standard,%ld,%ld,%ld\n",d->tT,d->tN,d->tS); | |
| return snapshot_t( d ); | |
| } | |
| protected: | |
| data_t data; | |
| }; | |
| class Test : public Program | |
| { | |
| public: | |
| Test(int argc = 0, char* argv[] = 0) : Program(argc,argv) { | |
| } | |
| }; | |
| } | |
| #include "main.inc.cpp" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment