#include "program_base.hpp" #include "hpds/KDouble.hpp" #include "hash.hpp" #include "mmap/mmap.hpp" #include "hpds/pstring.hpp" #include "hpds/pstringops.hpp" #define ELEM_SEPARATOR "\n\t\t\t" namespace dbtoaster { /* Definitions of auxiliary maps for storing materialized views. */ struct TAXI_entry { long TAXI_ID; long TAXI_TYPE; DOUBLE_TYPE TAXI_PRICE; DOUBLE_TYPE TAXI_X; DOUBLE_TYPE TAXI_Y; long __av; TAXI_entry* nxt; TAXI_entry* prv; explicit TAXI_entry() : nxt(nullptr), prv(nullptr) { /*TAXI_ID = 0L; TAXI_TYPE = 0L; TAXI_PRICE = 0.0; TAXI_X = 0.0; TAXI_Y = 0.0; __av = 0L; */ } explicit TAXI_entry(const long c0, const long c1, const DOUBLE_TYPE c2, const DOUBLE_TYPE c3, const DOUBLE_TYPE c4, const long c5) { TAXI_ID = c0; TAXI_TYPE = c1; TAXI_PRICE = c2; TAXI_X = c3; TAXI_Y = c4; __av = c5; } TAXI_entry(const TAXI_entry& other) : TAXI_ID( other.TAXI_ID ), TAXI_TYPE( other.TAXI_TYPE ), TAXI_PRICE( other.TAXI_PRICE ), TAXI_X( other.TAXI_X ), TAXI_Y( other.TAXI_Y ), __av( other.__av ), nxt( nullptr ), prv( nullptr ) {} FORCE_INLINE TAXI_entry& modify(const long c0, const long c1, const DOUBLE_TYPE c2, const DOUBLE_TYPE c3, const DOUBLE_TYPE c4) { TAXI_ID = c0; TAXI_TYPE = c1; TAXI_PRICE = c2; TAXI_X = c3; TAXI_Y = c4; return *this; } FORCE_INLINE TAXI_entry& modify1(const long c1) { TAXI_TYPE = c1; return *this; } template void serialize(Archive& ar, const unsigned int version) const { ar << ELEM_SEPARATOR; DBT_SERIALIZATION_NVP(ar, TAXI_ID); ar << ELEM_SEPARATOR; DBT_SERIALIZATION_NVP(ar, TAXI_TYPE); ar << ELEM_SEPARATOR; DBT_SERIALIZATION_NVP(ar, TAXI_PRICE); ar << ELEM_SEPARATOR; DBT_SERIALIZATION_NVP(ar, TAXI_X); ar << ELEM_SEPARATOR; DBT_SERIALIZATION_NVP(ar, TAXI_Y); ar << ELEM_SEPARATOR; DBT_SERIALIZATION_NVP(ar, __av); } }; struct TAXI_mapkey01234_idxfn { FORCE_INLINE static size_t hash(const TAXI_entry& e) { size_t h = 0; hash_combine(h, e.TAXI_ID); hash_combine(h, e.TAXI_TYPE); hash_combine(h, e.TAXI_PRICE); hash_combine(h, e.TAXI_X); hash_combine(h, e.TAXI_Y); return h; } FORCE_INLINE static bool equals(const TAXI_entry& x, const TAXI_entry& y) { return x.TAXI_ID == y.TAXI_ID && x.TAXI_TYPE == y.TAXI_TYPE && x.TAXI_PRICE == y.TAXI_PRICE && x.TAXI_X == y.TAXI_X && x.TAXI_Y == y.TAXI_Y; } }; struct TAXI_mapkey1_idxfn { FORCE_INLINE static size_t hash(const TAXI_entry& e) { size_t h = 0; hash_combine(h, e.TAXI_TYPE); return h; } FORCE_INLINE static bool equals(const TAXI_entry& x, const TAXI_entry& y) { return x.TAXI_TYPE == y.TAXI_TYPE; } }; typedef MultiHashMap, HashIndex > TAXI_map; typedef HashIndex HashIndex_TAXI_map_01234; typedef HashIndex HashIndex_TAXI_map_1; struct QUERY_1___SQL_SUM_AGGREGATE_1_entry { long R1_TYPE; DOUBLE_TYPE __av; QUERY_1___SQL_SUM_AGGREGATE_1_entry* nxt; QUERY_1___SQL_SUM_AGGREGATE_1_entry* prv; explicit QUERY_1___SQL_SUM_AGGREGATE_1_entry() : nxt(nullptr), prv(nullptr) { /*R1_TYPE = 0L; __av = 0.0; */ } explicit QUERY_1___SQL_SUM_AGGREGATE_1_entry(const long c0, const DOUBLE_TYPE c1) { R1_TYPE = c0; __av = c1; } QUERY_1___SQL_SUM_AGGREGATE_1_entry(const QUERY_1___SQL_SUM_AGGREGATE_1_entry& other) : R1_TYPE( other.R1_TYPE ), __av( other.__av ), nxt( nullptr ), prv( nullptr ) {} FORCE_INLINE QUERY_1___SQL_SUM_AGGREGATE_1_entry& modify(const long c0) { R1_TYPE = c0; return *this; } template void serialize(Archive& ar, const unsigned int version) const { ar << ELEM_SEPARATOR; DBT_SERIALIZATION_NVP(ar, R1_TYPE); ar << ELEM_SEPARATOR; DBT_SERIALIZATION_NVP(ar, __av); } }; struct QUERY_1___SQL_SUM_AGGREGATE_1_mapkey0_idxfn { FORCE_INLINE static size_t hash(const QUERY_1___SQL_SUM_AGGREGATE_1_entry& e) { size_t h = 0; hash_combine(h, e.R1_TYPE); return h; } FORCE_INLINE static bool equals(const QUERY_1___SQL_SUM_AGGREGATE_1_entry& x, const QUERY_1___SQL_SUM_AGGREGATE_1_entry& y) { return x.R1_TYPE == y.R1_TYPE; } }; typedef MultiHashMap > QUERY_1___SQL_SUM_AGGREGATE_1_map; typedef HashIndex HashIndex_QUERY_1___SQL_SUM_AGGREGATE_1_map_0; /* Type definition providing a way to access the results of the sql program */ struct tlq_t{ struct timeval t0,t; long tT,tN,tS; tlq_t(): tN(0), tS(0), QUERY_2___SQL_COUNT_AGGREGATE_2(0L) { gettimeofday(&t0,NULL); } /* Serialization Code */ template void serialize(Archive& ar, const unsigned int version) const { ar << "\n"; const QUERY_1___SQL_SUM_AGGREGATE_1_map& _QUERY_1___SQL_SUM_AGGREGATE_1 = get_QUERY_1___SQL_SUM_AGGREGATE_1(); dbtoaster::serialize_nvp_tabbed(ar, STRING(QUERY_1___SQL_SUM_AGGREGATE_1), _QUERY_1___SQL_SUM_AGGREGATE_1, "\t"); ar << "\n"; const long _QUERY_2___SQL_COUNT_AGGREGATE_2 = get_QUERY_2___SQL_COUNT_AGGREGATE_2(); dbtoaster::serialize_nvp_tabbed(ar, STRING(QUERY_2___SQL_COUNT_AGGREGATE_2), _QUERY_2___SQL_COUNT_AGGREGATE_2, "\t"); } /* Functions returning / computing the results of top level queries */ const QUERY_1___SQL_SUM_AGGREGATE_1_map& get_QUERY_1___SQL_SUM_AGGREGATE_1() const { return QUERY_1___SQL_SUM_AGGREGATE_1; } const long get_QUERY_2___SQL_COUNT_AGGREGATE_2() const { return QUERY_2___SQL_COUNT_AGGREGATE_2; } protected: /* Data structures used for storing / computing top level queries */ QUERY_1___SQL_SUM_AGGREGATE_1_map QUERY_1___SQL_SUM_AGGREGATE_1; long QUERY_2___SQL_COUNT_AGGREGATE_2; }; /* Type definition providing a way to incrementally maintain the results of the sql program */ struct data_t : tlq_t{ data_t(): tlq_t() { } #ifdef DBT_PROFILE std::shared_ptr exec_stats; std::shared_ptr ivc_stats; #endif /* Registering relations and trigger functions */ ProgramBase* program_base; void register_data(ProgramBase& pb) { program_base = &pb; pb.add_map( "TAXI", TAXI ); pb.add_map( "QUERY_1___SQL_SUM_AGGREGATE_1", QUERY_1___SQL_SUM_AGGREGATE_1 ); pb.add_map( "QUERY_2___SQL_COUNT_AGGREGATE_2", QUERY_2___SQL_COUNT_AGGREGATE_2 ); pb.add_relation("TAXI", true); pb.add_trigger("TAXI", insert_tuple, std::bind(&data_t::unwrap_insert_TAXI, this, std::placeholders::_1)); #ifdef DBT_PROFILE exec_stats = pb.exec_stats; ivc_stats = pb.ivc_stats; exec_stats->register_probe(0, "system_ready_s0"); exec_stats->register_probe(1, "system_ready_s1"); #endif // DBT_PROFILE } /* Trigger functions for table relations */ void on_insert_TAXI(const long taxi_id, const long taxi_type, const DOUBLE_TYPE taxi_price, const DOUBLE_TYPE taxi_x, const DOUBLE_TYPE taxi_y) { TAXI_entry e(taxi_id, taxi_type, taxi_price, taxi_x, taxi_y, 1L); TAXI.addOrDelOnZero(e,1L); } void unwrap_insert_TAXI(const event_args_t& ea) { on_insert_TAXI(*(reinterpret_cast(ea[0].get())), *(reinterpret_cast(ea[1].get())), *(reinterpret_cast(ea[2].get())), *(reinterpret_cast(ea[3].get())), *(reinterpret_cast(ea[4].get()))); } /* Trigger functions for stream relations */ void on_system_ready_event() { BEGIN_TRIGGER(exec_stats,"system_ready_event") BEGIN_TRIGGER(ivc_stats,"system_ready_event") { // QUERY_1___SQL_SUM_AGGREGATE_1.clear(); { //foreach TAXI_entry* e1 = TAXI.head; while(e1){ long r1_id = e1->TAXI_ID; long r1_type = e1->TAXI_TYPE; DOUBLE_TYPE r1_price = e1->TAXI_PRICE; DOUBLE_TYPE r1_x = e1->TAXI_X; DOUBLE_TYPE r1_y = e1->TAXI_Y; long v1 = e1->__av; QUERY_1___SQL_SUM_AGGREGATE_1.addOrDelOnZero(se1.modify(r1_type),(v1 * r1_price)); e1 = e1->nxt; } } long agg1 = 0L; long l1 = 2L; { //slice const HASH_RES_t h1 = TAXI_mapkey1_idxfn::hash(se3.modify1(l1)); const HashIndex_TAXI_map_1* i2 = static_cast(TAXI.index[1]); HashIndex_TAXI_map_1::IdxNode* n2 = &(i2->buckets_[h1 % i2->size_]); TAXI_entry* e2; do if ((e2=n2->obj) && h1 == n2->hash && TAXI_mapkey1_idxfn::equals(se3, *e2)) { long taxi_id = e2->TAXI_ID; DOUBLE_TYPE taxi_price = e2->TAXI_PRICE; DOUBLE_TYPE taxi_x = e2->TAXI_X; DOUBLE_TYPE taxi_y = e2->TAXI_Y; long v2 = e2->__av; agg1 += v2; } while ((n2=n2->nxt)); } QUERY_2___SQL_COUNT_AGGREGATE_2 = agg1; } END_TRIGGER(exec_stats,"system_ready_event") END_TRIGGER(ivc_stats,"system_ready_event") } private: /* Sample entries for avoiding recreation of temporary objects */ QUERY_1___SQL_SUM_AGGREGATE_1_entry se1; TAXI_entry se3; /* Data structures used for storing materialized views */ TAXI_map TAXI; }; /* Type definition providing a way to execute the sql program */ class Program : public ProgramBase { public: Program(int argc = 0, char* argv[] = 0) : ProgramBase(argc,argv) { data.register_data(*this); /* Specifying data sources */ pair source1_adaptor_params[] = { make_pair("fields",","), make_pair("schema","long,long,double,double,double") }; std::shared_ptr source1_adaptor(new csv_adaptor(get_relation_id("TAXI"),2,source1_adaptor_params)); frame_descriptor source1_fd("\n"); std::shared_ptr source1_file(new dbt_file_source("test.csv",source1_fd,source1_adaptor)); add_source(source1_file, true); } /* Imports data for static tables and performs view initialization based on it. */ void init() { //P0_PLACE_HOLDER table_multiplexer.init_source(run_opts->batch_size, run_opts->parallel, true); stream_multiplexer.init_source(run_opts->batch_size, run_opts->parallel, false); process_tables(); data.on_system_ready_event(); //P2_PLACE_HOLDER } /* Saves a snapshot of the data required to obtain the results of top level queries. */ snapshot_t take_snapshot(){ tlq_t* d = new tlq_t((tlq_t&)data); //if (d->tS==0) { gettimeofday(&(d->t),NULL); d->tT=((d->t).tv_sec-(d->t0).tv_sec)*1000000L+((d->t).tv_usec-(d->t0).tv_usec); } printf("SAMPLE=standard,%ld,%ld,%ld\n",d->tT,d->tN,d->tS); return snapshot_t( d ); } protected: data_t data; }; class Test : public Program { public: Test(int argc = 0, char* argv[] = 0) : Program(argc,argv) { } }; } #include "main.inc.cpp"