-
Notifications
You must be signed in to change notification settings - Fork 5
/
frosty_gen.py
358 lines (328 loc) · 17 KB
/
frosty_gen.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
import streamlit as st
import pandas as pd
import random
import uuid
import snowflake.connector
import numpy as np
from datetime import datetime, timedelta
from snowflake.snowpark import Session, FileOperation
from snowflake.connector.pandas_tools import write_pandas
from streamlit_modal import Modal
# FrostyGen - Random Data Generator
# Author: Matteo Consoli
# v.1.0 - 18-09-2023
### --------------------------- ###
### Header & Config ###
### --------------------------- ###
# Set page title, icon, description
st.set_page_config(
page_title="FrostyGen - Random Data Generator ",
page_icon="logo.png",
layout="wide",
initial_sidebar_state="expanded",
)
st.title("FrostyGen - Random Data Generator ")
st.subheader("#Generate #Iced? #Data")
# Variable to avoid error on df.write before generating data.
df_exists = False
### --------------------------- ###
### Snowflake Connection ###
### --------------------------- ###
def get_snowflake_connection():
try:
# Check if required session state attributes are defined
if (
not hasattr(st.session_state, 'snowflake_username_s') or
not hasattr(st.session_state, 'snowflake_password_s') or
not hasattr(st.session_state, 'snowflake_account_s') or
not hasattr(st.session_state, 'snowflake_warehouse_s') or
not hasattr(st.session_state, 'snowflake_role_s')
):
raise ValueError("Snowflake connection parameters are missing in session state")
# Check if the connection already exists and is not None
if (
not hasattr(st.session_state, 'snowflake_connection') or
st.session_state.snowflake_connection is None
):
st.session_state.snowflake_connection = snowflake.connector.connect(
user=st.session_state.snowflake_username_s,
password=st.session_state.snowflake_password_s,
account=st.session_state.snowflake_account_s,
warehouse=st.session_state.snowflake_warehouse_s,
role=st.session_state.snowflake_role_s
)
return st.session_state.snowflake_connection
except ValueError as e:
st.error(f"Error: {e}")
return None
except Exception as e:
st.error(f"Error connecting to Snowflake: {e}")
return None
def close_snowflake_connection():
if (hasattr(st.session_state, 'snowflake_connection')):
st.session_state.snowflake_connection.close()
del st.session_state["snowflake_connection"]
return 1
### --------------------------- ###
### Main Page - Formats Config ###
### --------------------------- ###
cols=st.columns(4)
with cols[0]:
num_records = st.number_input("# of Records to Generate", min_value=1, max_value=1000000, value=10, step=1)
with cols[1]:
num_fields = st.number_input("# of Fields to Create", min_value=2, max_value=20, step=1)
with cols[2]:
file_separator = st.selectbox("Field Separator", [",",";",":","|","#"],key=f"separator")
with cols[3]:
include_header = st.selectbox("Include Header?", ["Yes", "No"])
st.markdown("""----""")
### ------------------------------- ###
### Main Page - Data Fields Config ###
### ------------------------------- ###
st.header("Define Data Fields")
user_string_values = {}
table_string_values = {}
# Crazy part. Once number of fields is defined, the for loop is cycling and setting configs for all of them.
field_config = []
for i in range(num_fields):
cols_field=st.columns(2)
with cols_field[0]:
field_name=st.text_input(f"Field {i+1}: Name", f"FIELD_NAME_{i}", key=f"field_name{i}").replace(" ","").upper()
with cols_field[1]:
field_type=st.selectbox(f"Field {i+1}: Type", ["Integer", "Text", "DateTime", "Double", "UUID", "DatabaseColumn"], key=f"field_type_{i}")
#--------------
#DateTime Config
#--------------
if field_type == "DateTime":
selected_date = st.date_input("Select a date", datetime.today(),key=f"field_date{i}")
num_days = st.number_input("Range of days for random date", min_value=1, value=7, key=f"field_date_number_input{i}" )
else:
selected_date = None
num_days = 0
#--------------
#Integer Config
#--------------
if field_type == "Integer":
min_int_value = st.number_input("Min Random Value", min_value=0, value=1, key=f"min_value_integer_{i}" )
max_int_value = st.number_input("Max Random Value", min_value=1, value=100, key=f"max_value_integer_{i}" )
else:
min_int_value = 0
max_int_value = 0
#--------------
#Database column lookup
#--------------
if field_type == "DatabaseColumn":
selected_column = None
if (hasattr(st.session_state, 'snowflake_connection')):
conn = get_snowflake_connection()
cursor = conn.cursor()
cursor.execute("SHOW DATABASES")
databases = [row[1] for row in cursor]
# Select Database
selected_database = st.selectbox("Select Database", databases, key=f"selected_db_name_{i}" )
if selected_database:
# Fetch schemas in the selected database
cursor.execute(f"SHOW SCHEMAS IN DATABASE {selected_database}")
schemas = [row[1] for row in cursor]
# Select Schema
selected_schema = st.selectbox("Select Schema", schemas, index=0, key=f"selected_schema_name_{i}" )
if selected_schema is not None :
# Fetch tables in the selected schema
cursor.execute(f"SHOW TABLES IN SCHEMA {selected_database}.{selected_schema}")
tables = [row[1] for row in cursor]
# Select Table
selected_table = st.selectbox("Select Table", tables, key=f"selected_table_name_{i}")
if selected_table is not None:
# Fetch columns in the selected table
cursor.execute(f"DESCRIBE TABLE {selected_database}.{selected_schema}.{selected_table}")
columns = [row[0] for row in cursor]
# Select Column
selected_column = st.selectbox("Select Column", columns, key=f"selected_column_name_{i}")
select_limit_distinct = st.number_input("# of distinct records to extract", min_value=1, value=10, max_value=1000, key=f"selected_limit_name_{i}")
# Generate and execute the query
if selected_column:
query = f"SELECT DISTINCT {selected_column} FROM {selected_database}.{selected_schema}.{selected_table} LIMIT {select_limit_distinct}"
cursor.execute(query)
# Fetch distinct values from the column
distinct_values = [row[0] for row in cursor]
result = cursor.fetchall()
result_text = "\n".join(str(row) for row in distinct_values)
table_field_values = st.text_area("Extracted Values for Field", value=result_text, key=f"table_field_values_{i}")
table_string_values[field_name] = table_field_values.split('\n')
else:
st.warning("Connect to Snowflake first.")
#--------------
#Text data type Configuration
#--------------
if field_type == "Text":
text_input_option = st.radio("Text Input Option", ["Write Values (One per line)", "Auto-generate based on Length, Prefix, and Suffix"], key=f"text_input_option_{i}")
if text_input_option == "Write Values (One per line)":
field_values = st.text_area("Values for Field", key=f"field_values_{i}")
user_string_values[field_name] = field_values.split('\n')
else:
string_length = st.number_input("String Length", min_value=1, step=1, key=f"string_length_{i}")
prefix = st.text_input("Prefix", key=f"prefix_{i}")
suffix = st.text_input("Suffix", key=f"suffix_{i}")
category = st.selectbox("Category", ["Letters", "Digits", "Alphanumeric"], key=f"category_{i}")
if category == "Letters":
chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
elif category == "Digits":
chars = "0123456789"
else:
chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
# Generate 100 random strings and store them in a list
random_strings = [f"{prefix}{''.join(random.choice(chars) for _ in range(string_length))}{suffix}" for _ in range(100)]
user_string_values[field_name] = random_strings
st.markdown("""----""")
#--------------
#Other data types don't require further configuration, they will be directly randomly generated. Field_config contains attributes configured above.
#--------------
field_config.append({"name": field_name, "type": field_type, "selected_date": selected_date,"num_days": num_days, "min_int_value":min_int_value, "max_int_value": max_int_value})
### ---------------------------- ###
### Sidebar - Configurations ###
### ---------------------------- ###
st.sidebar.image("logo.png")
### ---------------------------- ###
### Sidebar - Connection Configs ###
### ---------------------------- ###
st.sidebar.header("Connect to Snowflake")
snowflake_account = st.sidebar.text_input("Snowflake Account")
cols=st.sidebar.columns(2)
with cols[0]:
snowflake_user = st.text_input("Snowflake User")
with cols[1]:
snowflake_password = st.text_input("Snowflake Password", type="password")
cols=st.sidebar.columns(2)
with cols[0]:
snowflake_role = st.text_input("Snowflake Role")
with cols[1]:
snowflake_warehouse = st.text_input("Snowflake Warehouse")
st.session_state.snowflake_username_s = snowflake_user
st.session_state.snowflake_password_s = snowflake_password
st.session_state.snowflake_account_s = snowflake_account
st.session_state.snowflake_warehouse_s = snowflake_warehouse
st.session_state.snowflake_role_s = snowflake_role
cols=st.sidebar.columns(2)
with cols[0]:
if st.sidebar.button("Connect", key='openConnection', disabled=hasattr(st.session_state, 'snowflake_connection')):
if get_snowflake_connection():
st.sidebar.success(f"You are connected! Happy Coding!")
### --------------------------- ###
### Sidebar - ExportOptions ###
### --------------------------- ###
st.sidebar.header("Export Your Dataset")
export_option = st.sidebar.selectbox("Export Options", ["Save to File", "Export to Snowflake Stage", "Export to Snowflake Table"])
if export_option == "Export to Snowflake Stage":
if (hasattr(st.session_state, 'snowflake_connection')):
snowflake_stage = st.sidebar.text_input("Snowflake Stage").replace(" ","").upper()
# Snowflake export details
file_prefix = st.sidebar.text_input("File Prefix", "data.csv").replace(" ","").upper()
file_suffix = ""
else:
st.sidebar.warning("You are not connected to Snowflake yet.")
elif export_option == "Export to Snowflake Table":
if (hasattr(st.session_state, 'snowflake_connection')):
database_name = st.sidebar.text_input("Database Name").replace(" ","").upper()
schema_name = st.sidebar.text_input("Schema Name").replace(" ","").upper()
table_name = st.sidebar.text_input("Table Name").replace(" ","").upper()
table_strategy = st.sidebar.selectbox("Table Strategy", ["CREATE IF NOT EXISTS", "CREATE OR REPLACE"],index=0)
#st.sidebar.write("NOTE: Table will be created if it doesn't exist.")
else:
st.sidebar.warning("You are not connected to Snowflake yet.")
else:
# File export details
file_prefix = st.sidebar.text_input("File Prefix", "data.csv")
file_suffix = ""
#Currently not implemented yet, possibility to split in multiple files the output
#max_records_per_file = st.sidebar.number_input("Max Records per File", min_value=1)
### --------------------------- ###
### Sidebar - Export Engine ###
### --------------------------- ###
# Generate data and create CSV or export to Snowflake
if st.sidebar.button("Export Data"):
if num_records <= 0:
st.sidebar.error("Number of records must be greater than 0.")
elif num_fields <= 0:
st.sidebar.error("Number of fields must be greater than 0.")
else:
data = []
# Generate random values at this point.
for _ in range(num_records):
record = {}
for field in field_config:
field_name = field["name"]
field_type = field["type"]
if (field_type == "Text"):
record[field_name] = random.choice(user_string_values[field_name])
elif field_type == "DatabaseColumn":
record[field_name] = random.choice(table_string_values[field_name])
elif field_type == "Integer":
field_min_int_value = field["min_int_value"]
field_max_int_value = field["max_int_value"]
record[field_name] = random.randint(field_min_int_value, field_max_int_value)
elif field_type == "DateTime":
field_selected_date = field["selected_date"]
field_num_days = field["num_days"]
date_range = pd.date_range(start=field_selected_date - pd.DateOffset(days=field_num_days),
end=field_selected_date + pd.DateOffset(days=field_num_days))
record[field_name] = np.random.choice(date_range)
elif field_type == "Double":
record[field_name] = round(random.uniform(0, 1), 2)
elif field_type == "UUID":
record[field_name] = str(uuid.uuid4())
data.append(record)
# Finally, the dataframe is ready!
df = pd.DataFrame(data)
df_exists = True
### --------------------------- ###
### Sidebar - Export Logic ###
### --------------------------- ###
if export_option == "Save to File":
# Currently not implemented yet -> Split data into multiple CSV files if necessary
file_name = f"{file_prefix}{file_suffix}"
file_csv = df.to_csv(index=False,sep=file_separator, encoding='utf-8')
st.sidebar.success(f"Data generated. Click the button to download {file_name}")
st.sidebar.download_button(
label="Download Locally",
data=file_csv,
file_name=file_name,
mime='text/csv',
)
elif export_option == "Export to Snowflake Stage":
try:
file_name = f"{file_prefix}{file_suffix}"
file_csv = df.to_csv(file_name, index=False, header=include_header, sep=file_separator)
conn = get_snowflake_connection()
session = conn.cursor()
# Create internal stage if it does not exists
session.execute(f"create stage if not exists {snowflake_stage} ")
session.execute("PUT file://"+file_name+" @"+snowflake_stage +" OVERWRITE = TRUE")
st.sidebar.success(f"Data exported to Snowflake stage {snowflake_stage}")
except Exception as e:
st.sidebar.error(f"Error exporting data to Snowflake: {str(e)}")
elif export_option == "Export to Snowflake Table":
conn = get_snowflake_connection()
cursor = conn.cursor()
table_name_full = f"{database_name}.{schema_name}.{table_name}";
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {database_name}")
cursor.execute(f"CREATE SCHEMA IF NOT EXISTS {database_name}.{schema_name}")
if table_strategy == "CREATE OR REPLACE":
cursor.execute(f"CREATE OR REPLACE TABLE "+table_name_full+" ( " +' VARCHAR(100), '.join(df.columns)+" VARCHAR(100))")
else:
cursor.execute(f"CREATE TABLE IF NOT EXISTS "+table_name_full+" ( " +' VARCHAR(100), '.join(df.columns)+" VARCHAR(100))")
cursor.execute(f"USE DATABASE {database_name};")
try:
write_pandas(conn=conn, df=df, database=database_name, schema=schema_name, table_name=table_name)
st.sidebar.success(f"Table '{table_name}' created successfully in Snowflake!")
except Exception as e:
st.sidebar.error(f"Error while writing on Database: {e}")
### --------------------------- ###
### Footer and Preview ###
### --------------------------- ###
st.sidebar.text("Author: Matteo Consoli")
# Display the generated data in a table preview
if df_exists:
st.header("Generated Data Sample")
st.table(df.head(10))
else:
st.warning("Select export option and click the export button to see a preview.")