Skip to content

Instantly share code, notes, and snippets.

@gbraccialli
Last active March 25, 2020 05:23
Show Gist options
  • Select an option

  • Save gbraccialli/827a2180ffbfecec594483fb3875cf8d to your computer and use it in GitHub Desktop.

Select an option

Save gbraccialli/827a2180ffbfecec594483fb3875cf8d to your computer and use it in GitHub Desktop.

Revisions

  1. gbraccialli revised this gist Mar 25, 2020. No changes.
  2. gbraccialli revised this gist Mar 25, 2020. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions kedro_load_git_emr.py
    Original file line number Diff line number Diff line change
    @@ -1,5 +1,5 @@
    GIT_PROJECT = "pfj-next-gen-b2c"
    PROJECT = "project_pico"
    GIT_PROJECT = "xxxx"
    PROJECT = "aaaaa"
    USERNAME = "guilherme"
    BRANCH = "develop"
    SPARK_MODE = "local" # local or yarn
  3. gbraccialli revised this gist Mar 25, 2020. 1 changed file with 9 additions and 0 deletions.
    9 changes: 9 additions & 0 deletions kedro_load_git_emr.py
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,12 @@
    GIT_PROJECT = "pfj-next-gen-b2c"
    PROJECT = "project_pico"
    USERNAME = "guilherme"
    BRANCH = "develop"
    SPARK_MODE = "local" # local or yarn
    %run /home/jupyter/kedro_load.py $GIT_PROJECT $PROJECT $USERNAME $BRANCH $SPARK_MODE

    ######################################################################

    def randomString(stringLength=10):
    import random, string
    """Generate a random string of fixed length """
  4. gbraccialli renamed this gist Mar 25, 2020. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  5. gbraccialli created this gist Mar 25, 2020.
    114 changes: 114 additions & 0 deletions kedro_load_git.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,114 @@
    def randomString(stringLength=10):
    import random, string
    """Generate a random string of fixed length """
    letters = string.ascii_lowercase
    return ''.join(random.choice(letters) for i in range(stringLength))

    def init_spark(spark_hadoop_path,username):

    import os
    import findspark

    findspark.init(
    spark_hadoop_path
    )

    from pyspark.sql import SparkSession
    if (SPARK_MODE=="yarn"):
    os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
    queue = f"{username}_{randomString(5)}"
    spark = (
    SparkSession.builder
    .master("yarn")
    .config("spark.sql.execution.arrow.enabled", "false")
    .config("spark.yarn.queue",queue)
    .appName(queue)
    .getOrCreate()
    )
    else:
    spark = (
    SparkSession.builder
    .master("local[*]")
    .config("spark.driver.memory", "4g")
    .config("spark.sql.execution.arrow.enabled", "false")
    .appName(f"app-{username}")
    .getOrCreate()
    )

    return spark

    import logging
    import pathlib
    import datetime
    import os
    import sys
    import getpass
    import importlib

    GIT_PROJECT = sys.argv[1]
    PROJECT = sys.argv[2]
    USERNAME = sys.argv[3]
    BRANCH = sys.argv[4]
    SPARK_MODE = sys.argv[5]

    spark = init_spark("/usr/lib/spark/",USERNAME)

    from pyspark.sql import functions as F
    from pyspark.sql import Window as W
    from pyspark.sql.types import *
    import pandas as pd

    os.system("mkdir /tmp/code")
    os.system("chmod 777 -R /tmp/code")
    os.system(f"rm -rf /tmp/code/{USERNAME}_{PROJECT}/")
    os.system(f"mkdir -p /tmp/code/{USERNAME}_{PROJECT}/")
    os.system("chmod 777 -R /tmp/code")

    sync_command1 = f"cd /tmp/code/{USERNAME}_{PROJECT}/; git clone https://github.com/xxx/{GIT_PROJECT}.git;"
    os.system(sync_command1)
    sync_command2 = f"cd /tmp/code/{USERNAME}_{PROJECT}/{GIT_PROJECT}/; git pull; git checkout {BRANCH};"
    os.system(sync_command2)
    #logging.info(sync_command)
    #print(sync_command1)
    #print(sync_command2)


    LOCAL_BASE_PATH = f'/tmp/code/{USERNAME}_{PROJECT}/{GIT_PROJECT}/'
    CONF_ROOT = "conf"

    PROJECT_BASE_PATH = f'{LOCAL_BASE_PATH}'
    LOGS_DIR = f'{LOCAL_BASE_PATH}/logs/'
    VIZUALIZATION_DIR = f'{LOCAL_BASE_PATH}/conf/visualization'

    pathlib.Path(LOGS_DIR).mkdir(parents=True, exist_ok=True)
    pathlib.Path(VIZUALIZATION_DIR).mkdir(parents=True, exist_ok=True)

    os.chdir(PROJECT_BASE_PATH) # Move to project root

    path_to_add = f"{PROJECT_BASE_PATH}/src/"

    if path_to_add not in sys.path:
    sys.path.insert(0, path_to_add)

    #logging.getLogger().handlers[0].setLevel(logging.ERROR)
    logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR)
    from kedro.context import KedroContext, load_context

    project_context = load_context(f"{PROJECT_BASE_PATH}")

    io = project_context.io
    catalog = project_context.catalog
    pipeline = project_context.pipeline

    logging.getLogger("kedro").setLevel(logging.ERROR)
    logging.getLogger("kedro.io").setLevel(logging.ERROR)
    logging.getLogger("kedro.pipeline").setLevel(logging.INFO)

    def run_pipeline(pipeline):
    from kedro.runner import SequentialRunner
    SequentialRunner().run(pipeline, io)

    import pandas as pd
    pd.set_option('display.max_columns', None)

    print("kedro loaded at {}".format(datetime.datetime.now()))