import faulthandler import os import re from types import SimpleNamespace import sanitizers import cffi SELF_DIR = os.path.dirname(os.path.abspath(__file__)) TOPLEVEL = os.path.abspath(SELF_DIR + "/../../../../") + "/" PREDEFINITIONS = """ typedef ... regex_t; /* * While pycparser supports __int128, cffi does not. Provide these * fake definitions for __[u]int128_t; they look the same to the ABI * (in as much as there is an ABI for this extension). */ typedef struct { uint64_t lo; uint64_t hi; } __uint128_t; typedef struct { uint64_t lo; int64_t hi; } __int128_t; """ REPLACEMENTS = { r"STAILQ_HEAD\s*\(\s*(\w*)\s*,\s*(\w+)\s*\)": r"struct \1 { struct \2 *stqh_first; struct \2 **stqh_last; }", r"STAILQ_ENTRY\s*\(\s*(\w+)\s*\)": r"struct { struct \1 *stqe_next; }", } PUBLIC_HEADERS = [ # ... ] INTERNAL_HEADERS = [ # ... ] STRIPPED_PREFIXES = [ "crdb_", "CRDB_", ] FFI = cffi.FFI() def read_stripped_header(path): """Returns the contents of a header file without preprocessor directives.""" ret = "" in_directive = False with open(path) as f: for line in f: if in_directive or re.match(r"^\s*#", line): in_directive = line.endswith("\\\n") else: in_directive = False for pattern, repl in REPLACEMENTS.items(): line = re.sub(pattern, repl, line) ret += line return ret FFI.cdef(PREDEFINITIONS) for header in PUBLIC_HEADERS: FFI.cdef(read_stripped_header(TOPLEVEL + "src/crdb/include/" + header)) for header in INTERNAL_HEADERS: FFI.cdef(read_stripped_header(TOPLEVEL + "src/crdb/include/crdb/" + header)) if sanitizers.ENABLED: C = FFI.dlopen(TOPLEVEL + "_build_asan/crdb/libcrdb.so") else: C = FFI.dlopen(TOPLEVEL + "_build/crdb/libcrdb.so") def _strip_prefixes(lib): """Returns a namespace with every symbol in `lib` resolved eagerly, and with aliases stripped of the `crdb_` prefix (and other entries in STRIPPED_PREFIXES). """ ret = SimpleNamespace() symbols = dir(lib) for symbol in symbols: # This is where we would wrap with BTS hooks. fn = sanitizers.wrap_with_tracking(getattr(lib, symbol)) setattr(ret, symbol, fn) for prefix in STRIPPED_PREFIXES: if symbol.startswith(prefix): suffix = symbol[len(prefix) :] assert ( suffix not in symbols ), "Name collision when stripping %s from %s." % (prefix, symbol,) setattr(ret, suffix, fn) break return ret C = _strip_prefixes(C) def _init_crdb(): error = FFI.new("crdb_error_t *") assert C.init(FFI.NULL, error), "crdb_init: %s %i" % ( FFI.string(error.message), error.error, ) FFI.init_once(_init_crdb, "init_crdb") # Pass in a copy of stderr in case anyone plays redirection tricks. faulthandler.enable(os.dup(2))