Find out a schema of JSON files so it can be used during table creation.
gcloud compute instances create duckdb-vs-spark \
--project=PROJECT_ID \
--zone=europe-west4-a \
| {{ | |
| config(enabled=false) | |
| }} | |
| WITH raw_customers AS ( | |
| SELECT | |
| id AS customer_id, | |
| first_name, | |
| last_name, |
| WITH | |
| prod_table as ( | |
| SELECT *, <column_name> AS primary_key FROM <prod_table_name> | |
| ), | |
| dev_table as ( | |
| SELECT *, <column_name> AS primary_key FROM <dev_table_name> | |
| ), |
| import json | |
| def read_json(filename='data.json'): | |
| with open(filename,'r') as f: | |
| data = json.loads(f.read()) | |
| return data | |
| def write_json(data, filename='data.json'): | |
| with open(filename,'w') as f: | |
| json.dump(data, f, indent=4) |
| echo "Starting setup" | |
| # install xcode CLI | |
| xcode-select —-install | |
| # install brew | |
| /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)" | |
| # Update homebrew recipes | |
| brew update |
| import os | |
| import asyncio | |
| import contextlib | |
| from pprint import pprint | |
| from dotenv import load_dotenv | |
| from netsuite import NetSuite, Config, TokenAuth | |
| load_dotenv() # take environment variables from .env. | |
| config = Config( |
| date | url | |
|---|---|---|
| 2020-01-01 | github.com | |
| 2020-01-02 | google.com |
| from kafka import KafkaProducer | |
| from kafka.errors import KafkaError | |
| import logging | |
| logging.basicConfig(level=logging.DEBUG) | |
| producer = KafkaProducer(bootstrap_servers=['localhost:9092']) | |
| topic_name = 'raw_data' | |
| def on_send_success(record_metadata): |
| # df_raw - input dataset | |
| def parse_and_clean(data_frame: pd.DataFrame) -> pd.DataFrame: | |
| # parse json | |
| df = data_frame.join(data_frame["user_json"].apply(json.loads).apply(pd.Series)) | |
| df["user_json"] = df["user_json"].apply(lambda x: x.replace('\n','')) | |
| # explode visits | |
| df2 = pd.DataFrame({ | |
| "uid": df.uid.repeat(df.visits.str.len()), | |
| "sites" : np.concatenate(df.visits.values)} |