79686165

Date: 2025-07-01 13:51:13
Score: 0.5
Natty:
Report link

import json

import snowflake.connector

def load_config(file_path):

"""Load the Snowflake account configuration from a JSON file."""

with open(file_path, 'r') as file:

return json.load(file)

def connect_to_snowflake(account, username, password, role):

"""Establish a connection to a Snowflake account."""

try:

conn = snowflake.connector.connect(

user=username,

password=password,

account=account,

role=role

)

return conn

except Exception as e:

print(f"Error connecting to Snowflake: {e}")

return None

def fetch_tags(conn):

"""Fetch tag_list and tag_defn from the Snowflake account."""

cursor = conn.cursor()

try:

cursor.execute("""

SELECT tag_list, tag_defn

FROM platform_common.tags;

""")

return cursor.fetchall() # Return all rows from the query

finally:

cursor.close()

def generate_sql_statements(source_tags, target_tags):

"""Generate SQL statements based on the differences in tag_list values."""

sql_statements = []

# Create a set for target tag_list values for easy lookup

target_tags_set = {tag[0]: tag[1] for tag in target_tags} # {tag_list: tag_defn}

# Check for new tags in source that are not in target

for tag in source_tags:

tag_list = tag[0]

tag_defn = tag[1]

if tag_list not in target_tags_set:

# Create statement for the new tag

create_statement = f"INSERT INTO platform_common.tags (tag_list, tag_defn) VALUES ('{tag_sql_statements.append(create_statement)

return sql_statements

def write_output_file(statements, output_file):

"""Write the generated SQL statements to an output file."""

with open(output_file, 'w') as file:

for statement in statements:

file.write(statement + '\n')

def main():

# Load configuration from JSON file

config = load_config('snowflake_config.json')

# Connect to source Snowflake account

source_conn = connect_to_snowflake(

config['source']['account'],

config['source']['username'],

config['source']['password'],

config['source']['role']

)

# Connect to target Snowflake account

target_conn = connect_to_snowflake(

config['target']['account'],

config['target']['username'],

config['target']['password'],

config['target']['role']

)

if source_conn and target_conn:

# Fetch tags from both accounts

source_tags = fetch_tags(source_conn)

target_tags = fetch_tags(target_conn)

# Generate SQL statements based on the comparison

sql_statements = generate_sql_statements(source_tags, target_tags)

# Write the output to a file

write_output_file(sql_statements, 'execution_plan.sql')

print("Execution plan has been generated in 'execution_plan.sql'.")

# Close connections

if source_conn:

source_conn.close()

if target_conn:

target_conn.close()

if _name_ == "_main_":

main()

Reasons:
  • Long answer (-1):
  • No code block (0.5):
  • Low reputation (1):
Posted by: Velmurugan Shanmugam