#include #include #include #include #include #include #include #include const int64_t MAP_SIZE = 50LL << 30; const int BATCH_SIZE = 100000; static MDB_env *env; static MDB_dbi dbi; static double get_ts() { static struct timeval tv; gettimeofday(&tv, NULL); return tv.tv_sec + (double) tv.tv_usec / 1000000; } static void db_open(char *db_path) { int rc = mkdir(db_path, 0755); if (rc && errno != EEXIST) { perror("mkdir"); exit(1); } MDB_txn *txn; int env_opt = MDB_NORDAHEAD | MDB_NOSYNC; rc = mdb_env_create(&env); rc = mdb_env_set_mapsize(env, MAP_SIZE); rc = mdb_env_open(env, db_path, env_opt, 0664); if (rc) { fprintf(stderr, "open error: %s\n", mdb_strerror(rc)); exit(1); } rc = mdb_txn_begin(env, NULL, 0, &txn); rc = mdb_open(txn, NULL, 0, &dbi); rc = mdb_txn_commit(txn); fprintf(stderr, "DB opened\n"); } static void db_close() { if (env) { mdb_env_close(env); env = NULL; } } static void run_bench(char *input) { MDB_txn *txn; MDB_cursor *mc; MDB_val mkey, mval; int rc; int flag = 0; mdb_txn_begin(env, NULL, 0, &txn); mdb_cursor_open(txn, dbi, &mc); FILE *fp = fopen(input, "r"); int line_cnt = 0, put_cnt = 0, tx_cnt = 0; double ts = get_ts(); char buf[100]; uint8_t key[32]; int value[2]; char *line; do { line = fgets(buf, 100, fp); if (line != NULL) { line_cnt++; // Skip @\0*7 char *p = line + 8; int i, j, len = 64; for (i = 0, j = 0; j < len; i++, j += 2) { sscanf(p + j, "%02x", key + i); } sscanf(p + len, "->%d,%d", &value[0], &value[1]); mkey.mv_data = &key; mkey.mv_size = 32; mval.mv_data = &value; mval.mv_size = sizeof(value); rc = mdb_cursor_put(mc, &mkey, &mval, flag); if (rc) { fprintf(stderr, "put error: %s\n", mdb_strerror(rc)); exit(0); } put_cnt++; } if (line_cnt % 100000 == 0) { double new_ts = get_ts(); double elapsed = new_ts - ts; double put_rate = put_cnt / elapsed; fprintf(stderr, "Read %d lines in %.2f seconds, %.2f puts/s\n", line_cnt, elapsed, put_rate); ts = new_ts; put_cnt = 0; } if (line_cnt % BATCH_SIZE == 0) { mdb_cursor_close(mc); mdb_txn_commit(txn); fprintf(stderr, "commit\n"); mdb_txn_begin(env, NULL, 0, &txn); mdb_cursor_open(txn, dbi, &mc); } } while (line != NULL); fclose(fp); mdb_cursor_close(mc); mdb_txn_commit(txn); } int main(int argc, char **argv) { fprintf(stderr, "db_path: %s, input: %s\n", argv[1], argv[2]); db_open(argv[1]); run_bench(argv[2]); db_close(); return 0; }