Skip to content

Instantly share code, notes, and snippets.

@dkourilov
Created April 13, 2016 16:03
Show Gist options
  • Save dkourilov/1a3d3347136b0f850647d92f23b49405 to your computer and use it in GitHub Desktop.
Save dkourilov/1a3d3347136b0f850647d92f23b49405 to your computer and use it in GitHub Desktop.
test.cpp (dbtoaster)
#include "program_base.hpp"
#include "hpds/KDouble.hpp"
#include "hash.hpp"
#include "mmap/mmap.hpp"
#include "hpds/pstring.hpp"
#include "hpds/pstringops.hpp"
#define ELEM_SEPARATOR "\n\t\t\t"
namespace dbtoaster {
/* Definitions of auxiliary maps for storing materialized views. */
struct TAXI_entry {
long TAXI_ID; long TAXI_TYPE; DOUBLE_TYPE TAXI_PRICE; DOUBLE_TYPE TAXI_X; DOUBLE_TYPE TAXI_Y; long __av; TAXI_entry* nxt; TAXI_entry* prv;
explicit TAXI_entry() : nxt(nullptr), prv(nullptr) { /*TAXI_ID = 0L; TAXI_TYPE = 0L; TAXI_PRICE = 0.0; TAXI_X = 0.0; TAXI_Y = 0.0; __av = 0L; */ }
explicit TAXI_entry(const long c0, const long c1, const DOUBLE_TYPE c2, const DOUBLE_TYPE c3, const DOUBLE_TYPE c4, const long c5) { TAXI_ID = c0; TAXI_TYPE = c1; TAXI_PRICE = c2; TAXI_X = c3; TAXI_Y = c4; __av = c5; }
TAXI_entry(const TAXI_entry& other) : TAXI_ID( other.TAXI_ID ), TAXI_TYPE( other.TAXI_TYPE ), TAXI_PRICE( other.TAXI_PRICE ), TAXI_X( other.TAXI_X ), TAXI_Y( other.TAXI_Y ), __av( other.__av ), nxt( nullptr ), prv( nullptr ) {}
FORCE_INLINE TAXI_entry& modify(const long c0, const long c1, const DOUBLE_TYPE c2, const DOUBLE_TYPE c3, const DOUBLE_TYPE c4) { TAXI_ID = c0; TAXI_TYPE = c1; TAXI_PRICE = c2; TAXI_X = c3; TAXI_Y = c4; return *this; }
FORCE_INLINE TAXI_entry& modify1(const long c1) { TAXI_TYPE = c1; return *this; }
template<class Archive>
void serialize(Archive& ar, const unsigned int version) const
{
ar << ELEM_SEPARATOR;
DBT_SERIALIZATION_NVP(ar, TAXI_ID);
ar << ELEM_SEPARATOR;
DBT_SERIALIZATION_NVP(ar, TAXI_TYPE);
ar << ELEM_SEPARATOR;
DBT_SERIALIZATION_NVP(ar, TAXI_PRICE);
ar << ELEM_SEPARATOR;
DBT_SERIALIZATION_NVP(ar, TAXI_X);
ar << ELEM_SEPARATOR;
DBT_SERIALIZATION_NVP(ar, TAXI_Y);
ar << ELEM_SEPARATOR;
DBT_SERIALIZATION_NVP(ar, __av);
}
};
struct TAXI_mapkey01234_idxfn {
FORCE_INLINE static size_t hash(const TAXI_entry& e) {
size_t h = 0;
hash_combine(h, e.TAXI_ID);
hash_combine(h, e.TAXI_TYPE);
hash_combine(h, e.TAXI_PRICE);
hash_combine(h, e.TAXI_X);
hash_combine(h, e.TAXI_Y);
return h;
}
FORCE_INLINE static bool equals(const TAXI_entry& x, const TAXI_entry& y) {
return x.TAXI_ID == y.TAXI_ID && x.TAXI_TYPE == y.TAXI_TYPE && x.TAXI_PRICE == y.TAXI_PRICE && x.TAXI_X == y.TAXI_X && x.TAXI_Y == y.TAXI_Y;
}
};
struct TAXI_mapkey1_idxfn {
FORCE_INLINE static size_t hash(const TAXI_entry& e) {
size_t h = 0;
hash_combine(h, e.TAXI_TYPE);
return h;
}
FORCE_INLINE static bool equals(const TAXI_entry& x, const TAXI_entry& y) {
return x.TAXI_TYPE == y.TAXI_TYPE;
}
};
typedef MultiHashMap<TAXI_entry,long,
HashIndex<TAXI_entry,long,TAXI_mapkey01234_idxfn,true>,
HashIndex<TAXI_entry,long,TAXI_mapkey1_idxfn,false>
> TAXI_map;
typedef HashIndex<TAXI_entry,long,TAXI_mapkey01234_idxfn,true> HashIndex_TAXI_map_01234;
typedef HashIndex<TAXI_entry,long,TAXI_mapkey1_idxfn,false> HashIndex_TAXI_map_1;
struct QUERY_1___SQL_SUM_AGGREGATE_1_entry {
long R1_TYPE; DOUBLE_TYPE __av; QUERY_1___SQL_SUM_AGGREGATE_1_entry* nxt; QUERY_1___SQL_SUM_AGGREGATE_1_entry* prv;
explicit QUERY_1___SQL_SUM_AGGREGATE_1_entry() : nxt(nullptr), prv(nullptr) { /*R1_TYPE = 0L; __av = 0.0; */ }
explicit QUERY_1___SQL_SUM_AGGREGATE_1_entry(const long c0, const DOUBLE_TYPE c1) { R1_TYPE = c0; __av = c1; }
QUERY_1___SQL_SUM_AGGREGATE_1_entry(const QUERY_1___SQL_SUM_AGGREGATE_1_entry& other) : R1_TYPE( other.R1_TYPE ), __av( other.__av ), nxt( nullptr ), prv( nullptr ) {}
FORCE_INLINE QUERY_1___SQL_SUM_AGGREGATE_1_entry& modify(const long c0) { R1_TYPE = c0; return *this; }
template<class Archive>
void serialize(Archive& ar, const unsigned int version) const
{
ar << ELEM_SEPARATOR;
DBT_SERIALIZATION_NVP(ar, R1_TYPE);
ar << ELEM_SEPARATOR;
DBT_SERIALIZATION_NVP(ar, __av);
}
};
struct QUERY_1___SQL_SUM_AGGREGATE_1_mapkey0_idxfn {
FORCE_INLINE static size_t hash(const QUERY_1___SQL_SUM_AGGREGATE_1_entry& e) {
size_t h = 0;
hash_combine(h, e.R1_TYPE);
return h;
}
FORCE_INLINE static bool equals(const QUERY_1___SQL_SUM_AGGREGATE_1_entry& x, const QUERY_1___SQL_SUM_AGGREGATE_1_entry& y) {
return x.R1_TYPE == y.R1_TYPE;
}
};
typedef MultiHashMap<QUERY_1___SQL_SUM_AGGREGATE_1_entry,DOUBLE_TYPE,
HashIndex<QUERY_1___SQL_SUM_AGGREGATE_1_entry,DOUBLE_TYPE,QUERY_1___SQL_SUM_AGGREGATE_1_mapkey0_idxfn,true>
> QUERY_1___SQL_SUM_AGGREGATE_1_map;
typedef HashIndex<QUERY_1___SQL_SUM_AGGREGATE_1_entry,DOUBLE_TYPE,QUERY_1___SQL_SUM_AGGREGATE_1_mapkey0_idxfn,true> HashIndex_QUERY_1___SQL_SUM_AGGREGATE_1_map_0;
/* Type definition providing a way to access the results of the sql program */
struct tlq_t{
struct timeval t0,t; long tT,tN,tS;
tlq_t(): tN(0), tS(0), QUERY_2___SQL_COUNT_AGGREGATE_2(0L) { gettimeofday(&t0,NULL); }
/* Serialization Code */
template<class Archive>
void serialize(Archive& ar, const unsigned int version) const {
ar << "\n";
const QUERY_1___SQL_SUM_AGGREGATE_1_map& _QUERY_1___SQL_SUM_AGGREGATE_1 = get_QUERY_1___SQL_SUM_AGGREGATE_1();
dbtoaster::serialize_nvp_tabbed(ar, STRING(QUERY_1___SQL_SUM_AGGREGATE_1), _QUERY_1___SQL_SUM_AGGREGATE_1, "\t");
ar << "\n";
const long _QUERY_2___SQL_COUNT_AGGREGATE_2 = get_QUERY_2___SQL_COUNT_AGGREGATE_2();
dbtoaster::serialize_nvp_tabbed(ar, STRING(QUERY_2___SQL_COUNT_AGGREGATE_2), _QUERY_2___SQL_COUNT_AGGREGATE_2, "\t");
}
/* Functions returning / computing the results of top level queries */
const QUERY_1___SQL_SUM_AGGREGATE_1_map& get_QUERY_1___SQL_SUM_AGGREGATE_1() const {
return QUERY_1___SQL_SUM_AGGREGATE_1;
}
const long get_QUERY_2___SQL_COUNT_AGGREGATE_2() const {
return QUERY_2___SQL_COUNT_AGGREGATE_2;
}
protected:
/* Data structures used for storing / computing top level queries */
QUERY_1___SQL_SUM_AGGREGATE_1_map QUERY_1___SQL_SUM_AGGREGATE_1;
long QUERY_2___SQL_COUNT_AGGREGATE_2;
};
/* Type definition providing a way to incrementally maintain the results of the sql program */
struct data_t : tlq_t{
data_t(): tlq_t() {
}
#ifdef DBT_PROFILE
std::shared_ptr<dbtoaster::statistics::trigger_exec_stats> exec_stats;
std::shared_ptr<dbtoaster::statistics::trigger_exec_stats> ivc_stats;
#endif
/* Registering relations and trigger functions */
ProgramBase* program_base;
void register_data(ProgramBase& pb) {
program_base = &pb;
pb.add_map<TAXI_map>( "TAXI", TAXI );
pb.add_map<QUERY_1___SQL_SUM_AGGREGATE_1_map>( "QUERY_1___SQL_SUM_AGGREGATE_1", QUERY_1___SQL_SUM_AGGREGATE_1 );
pb.add_map<long>( "QUERY_2___SQL_COUNT_AGGREGATE_2", QUERY_2___SQL_COUNT_AGGREGATE_2 );
pb.add_relation("TAXI", true);
pb.add_trigger("TAXI", insert_tuple, std::bind(&data_t::unwrap_insert_TAXI, this, std::placeholders::_1));
#ifdef DBT_PROFILE
exec_stats = pb.exec_stats;
ivc_stats = pb.ivc_stats;
exec_stats->register_probe(0, "system_ready_s0");
exec_stats->register_probe(1, "system_ready_s1");
#endif // DBT_PROFILE
}
/* Trigger functions for table relations */
void on_insert_TAXI(const long taxi_id, const long taxi_type, const DOUBLE_TYPE taxi_price, const DOUBLE_TYPE taxi_x, const DOUBLE_TYPE taxi_y) {
TAXI_entry e(taxi_id, taxi_type, taxi_price, taxi_x, taxi_y, 1L);
TAXI.addOrDelOnZero(e,1L);
}
void unwrap_insert_TAXI(const event_args_t& ea) {
on_insert_TAXI(*(reinterpret_cast<long*>(ea[0].get())), *(reinterpret_cast<long*>(ea[1].get())), *(reinterpret_cast<DOUBLE_TYPE*>(ea[2].get())), *(reinterpret_cast<DOUBLE_TYPE*>(ea[3].get())), *(reinterpret_cast<DOUBLE_TYPE*>(ea[4].get())));
}
/* Trigger functions for stream relations */
void on_system_ready_event() {
BEGIN_TRIGGER(exec_stats,"system_ready_event")
BEGIN_TRIGGER(ivc_stats,"system_ready_event")
{ //
QUERY_1___SQL_SUM_AGGREGATE_1.clear();
{ //foreach
TAXI_entry* e1 = TAXI.head;
while(e1){
long r1_id = e1->TAXI_ID;
long r1_type = e1->TAXI_TYPE;
DOUBLE_TYPE r1_price = e1->TAXI_PRICE;
DOUBLE_TYPE r1_x = e1->TAXI_X;
DOUBLE_TYPE r1_y = e1->TAXI_Y;
long v1 = e1->__av;
QUERY_1___SQL_SUM_AGGREGATE_1.addOrDelOnZero(se1.modify(r1_type),(v1 * r1_price));
e1 = e1->nxt;
}
}
long agg1 = 0L;
long l1 = 2L;
{ //slice
const HASH_RES_t h1 = TAXI_mapkey1_idxfn::hash(se3.modify1(l1));
const HashIndex_TAXI_map_1* i2 = static_cast<HashIndex_TAXI_map_1*>(TAXI.index[1]);
HashIndex_TAXI_map_1::IdxNode* n2 = &(i2->buckets_[h1 % i2->size_]);
TAXI_entry* e2;
do if ((e2=n2->obj) && h1 == n2->hash && TAXI_mapkey1_idxfn::equals(se3, *e2)) {
long taxi_id = e2->TAXI_ID;
DOUBLE_TYPE taxi_price = e2->TAXI_PRICE;
DOUBLE_TYPE taxi_x = e2->TAXI_X;
DOUBLE_TYPE taxi_y = e2->TAXI_Y;
long v2 = e2->__av;
agg1 += v2;
} while ((n2=n2->nxt));
}
QUERY_2___SQL_COUNT_AGGREGATE_2 = agg1;
}
END_TRIGGER(exec_stats,"system_ready_event")
END_TRIGGER(ivc_stats,"system_ready_event")
}
private:
/* Sample entries for avoiding recreation of temporary objects */
QUERY_1___SQL_SUM_AGGREGATE_1_entry se1;
TAXI_entry se3;
/* Data structures used for storing materialized views */
TAXI_map TAXI;
};
/* Type definition providing a way to execute the sql program */
class Program : public ProgramBase
{
public:
Program(int argc = 0, char* argv[] = 0) : ProgramBase(argc,argv) {
data.register_data(*this);
/* Specifying data sources */
pair<string,string> source1_adaptor_params[] = { make_pair("fields",","), make_pair("schema","long,long,double,double,double") };
std::shared_ptr<csv_adaptor> source1_adaptor(new csv_adaptor(get_relation_id("TAXI"),2,source1_adaptor_params));
frame_descriptor source1_fd("\n");
std::shared_ptr<dbt_file_source> source1_file(new dbt_file_source("test.csv",source1_fd,source1_adaptor));
add_source(source1_file, true);
}
/* Imports data for static tables and performs view initialization based on it. */
void init() {
//P0_PLACE_HOLDER
table_multiplexer.init_source(run_opts->batch_size, run_opts->parallel, true);
stream_multiplexer.init_source(run_opts->batch_size, run_opts->parallel, false);
process_tables();
data.on_system_ready_event();
//P2_PLACE_HOLDER
}
/* Saves a snapshot of the data required to obtain the results of top level queries. */
snapshot_t take_snapshot(){
tlq_t* d = new tlq_t((tlq_t&)data);
//if (d->tS==0) { gettimeofday(&(d->t),NULL); d->tT=((d->t).tv_sec-(d->t0).tv_sec)*1000000L+((d->t).tv_usec-(d->t0).tv_usec); } printf("SAMPLE=standard,%ld,%ld,%ld\n",d->tT,d->tN,d->tS);
return snapshot_t( d );
}
protected:
data_t data;
};
class Test : public Program
{
public:
Test(int argc = 0, char* argv[] = 0) : Program(argc,argv) {
}
};
}
#include "main.inc.cpp"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment