import json
import snowflake.connector
def load_config(file_path):
"""Load the Snowflake account configuration from a JSON file."""
with open(file_path, 'r') as file:
return json.load(file)
def connect_to_snowflake(account, username, password, role):
"""Establish a connection to a Snowflake account."""
try:
conn = snowflake.connector.connect(
user=username,
password=password,
account=account,
role=role
)
return conn
except Exception as e:
print(f"Error connecting to Snowflake: {e}")
return None
def fetch_tags(conn):
"""Fetch tag_list and tag_defn from the Snowflake account."""
cursor = conn.cursor()
try:
cursor.execute("""
SELECT tag_list, tag_defn
FROM platform_common.tags;
""")
return cursor.fetchall() # Return all rows from the query
finally:
cursor.close()
def generate_sql_statements(source_tags, target_tags):
"""Generate SQL statements based on the differences in tag_list values."""
sql_statements = []
# Create a set for target tag_list values for easy lookup
target_tags_set = {tag[0]: tag[1] for tag in target_tags} # {tag_list: tag_defn}
# Check for new tags in source that are not in target
for tag in source_tags:
tag_list = tag[0]
tag_defn = tag[1]
if tag_list not in target_tags_set:
# Create statement for the new tag
create_statement = f"INSERT INTO platform_common.tags (tag_list, tag_defn) VALUES ('{tag_sql_statements.append(create_statement)
return sql_statements
def write_output_file(statements, output_file):
"""Write the generated SQL statements to an output file."""
with open(output_file, 'w') as file:
for statement in statements:
file.write(statement + '\n')
def main():
# Load configuration from JSON file
config = load_config('snowflake_config.json')
# Connect to source Snowflake account
source_conn = connect_to_snowflake(
config['source']['account'],
config['source']['username'],
config['source']['password'],
config['source']['role']
)
# Connect to target Snowflake account
target_conn = connect_to_snowflake(
config['target']['account'],
config['target']['username'],
config['target']['password'],
config['target']['role']
)
if source_conn and target_conn:
# Fetch tags from both accounts
source_tags = fetch_tags(source_conn)
target_tags = fetch_tags(target_conn)
# Generate SQL statements based on the comparison
sql_statements = generate_sql_statements(source_tags, target_tags)
# Write the output to a file
write_output_file(sql_statements, 'execution_plan.sql')
print("Execution plan has been generated in 'execution_plan.sql'.")
# Close connections
if source_conn:
source_conn.close()
if target_conn:
target_conn.close()
if _name_ == "_main_":
main()