import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job import boto3 import json ## @params: [JOB_NAME, SECRET_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME', 'SECRET_NAME']) glue_database = 'default' sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) # Initialize Secrets Manager client secrets_manager_client = boto3.client('secretsmanager') # Retrieve the HANA service key from Secrets Manager secret_name = args['SECRET_NAME'] secret_response = secrets_manager_client.get_secret_value(SecretId=secret_name) secret_string = secret_response['SecretString'] hana_service_key = json.loads(secret_string) # Parse the JSON from the secret # Function to dynamically fetch data with a query def query(query, connection_type="custom.jdbc"): # Use hana_service_key as the base for connection_options connection_options = hana_service_key.copy() connection_options["query"] = query connection_options["className"] = hana_service_key["driver"] return glueContext.create_dynamic_frame.from_options( connection_type=connection_type, connection_options=connection_options ) # Show current schema current_schema = query("select CURRENT_SCHEMA from DUMMY") current_schema.show() # List tables tables = query("select TABLE_NAME from TABLES where SCHEMA_NAME in ( select CURRENT_SCHEMA from DUMMY )") tables.show() tables_list = [row['TABLE_NAME'] for row in tables.toDF().collect()] def replicate_table(table_name, glue_database): print(f"Replicating table: {table_name}") # Fetch the table data table_data = query(f"SELECT * FROM {table_name}") # Check if the DynamicFrame is empty if table_data.count() == 0: print(f"Skipping table '{table_name}' as it has no data.") return # Write the data to S3 and update the Glue Data Catalog glueContext.write_dynamic_frame.from_options( frame=table_data, connection_type="s3", connection_options={ "path": f"s3://glue-replicate-hdi/{table_name}/" # Replace with your S3 bucket }, format="parquet" ) # Replicate each table to Glue for table in tables_list: replicate_table(table, glue_database) # Commit the job job.commit()