Skip to content

Commit

Permalink
Bug 5356: Update conversation date randomizer with better distributio…
Browse files Browse the repository at this point in the history
…n logic
  • Loading branch information
bkeller108 committed Jul 15, 2024
1 parent d800fcb commit fd7057d
Showing 1 changed file with 1 addition and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"cells":[{"cell_type":"code","execution_count":null,"id":"3b73b213-58af-4209-9efd-ac34c9e1e1d7","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["# IMPORTANT: This notebook manipulates sample data to guarantee that the Power BI report includes data for the current date, the last two days, and the last seven days. \n","# It is OPTIONAL and is only used to ensure the Power BI report can display data during each deployment."]},{"cell_type":"code","execution_count":null,"id":"33be8396-a753-45f0-9f8a-fd4ea2177d88","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["import pandas as pd\n","from pyspark.sql.functions import col, date_sub, expr, to_date, date_add, explode, split\n","import random\n","import uuid"]},{"cell_type":"code","execution_count":null,"id":"9bf126cc-c972-4460-8eee-2c9440203fcc","metadata":{"collapsed":false,"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["df = spark.sql(\"SELECT * FROM ckm_conv_processed\")\n","# display(df)"]},{"cell_type":"code","execution_count":null,"id":"09a6a7fc-1701-41d4-a4e2-e9c0d5b6b6c4","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["record_count = df.count()\n","\n","# Print the number of records\n","print(f\"Total number of records in the DataFrame: {record_count}\")"]},{"cell_type":"code","execution_count":null,"id":"430c4303-0cc6-4afb-b375-e6da844ea90d","metadata":{"collapsed":false,"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["# This code manipulates sample data that allocates a percentage of the data to the current date, the previous day, and the last seven days, \n","# while assigning the remaining records to any day within the last 30 days. \n","\n","\n","from pyspark.sql import SparkSession\n","from pyspark.sql.functions import col, max as spark_max, current_timestamp, unix_timestamp, from_unixtime, expr,lit\n","from pyspark.sql.types import TimestampType\n","\n","# Convert string columns to timestamp types\n","df = df.withColumn(\"StartTime\", col(\"StartTime\").cast(\"timestamp\"))\n","df = df.withColumn(\"EndTime\", col(\"EndTime\").cast(\"timestamp\"))\n","df = df.withColumn(\"ConversationDate\", col(\"ConversationDate\").cast(\"timestamp\"))\n","\n","# Calculate the maximum StartTime\n","max_date_df = df.select(spark_max(col(\"StartTime\")).alias(\"max_date\"))\n","max_date = max_date_df.collect()[0][\"max_date\"]\n","\n","# Get current timestamp\n","current_ts = spark.sql(\"SELECT current_timestamp() as current_ts\").collect()[0][\"current_ts\"]\n","\n","print(\"max_date: \", max_date)\n","print(\"current time: \", current_ts)\n","\n","# Calculate the difference in seconds between the current timestamp and the maximum StartTime\n","time_diff_seconds = (current_ts - max_date).total_seconds()\n","\n","# Convert the time difference to days, hours, minutes, and seconds\n","days = int(time_diff_seconds // (24 * 3600))\n","hours = int((time_diff_seconds % (24 * 3600)) // 3600)\n","minutes = int((time_diff_seconds % 3600) // 60)\n","seconds = int(time_diff_seconds % 60)\n","\n","# Total number of records\n","total_records = df.count()\n","\n","# Calculate the number of records for each time range\n","today_count = int(total_records) * .4\n","yesterday_today_count = int(total_records * 0.25)\n","two_days_prior_count = int(total_records * 0.1)\n","last_7_days_count = int(total_records * 0.15)\n","current_month_count = int(total_records * 0.1)\n","prior_month_count = total_records - (yesterday_today_count + two_days_prior_count + current_month_count)\n","\n","# # Assign random dates based on the calculated counts\n","df_temp = df.withColumn(\"row_num\", expr(\n"," f\"\"\"\n"," CASE\n"," WHEN rand() < {today_count / total_records} THEN 1\n"," WHEN rand() < {(yesterday_today_count + today_count) / total_records} THEN 2\n"," WHEN rand() < {(last_7_days_count + yesterday_today_count + today_count) / total_records} THEN 3\n"," ELSE 4\n"," END\n"," \"\"\"\n","))\n","\n","# Generate new dates based on row_num\n","df_temp = df_temp.withColumn(\"NewStartTime\", expr(\n"," \"\"\"\n"," CASE\n"," WHEN row_num = 1 THEN current_date()\n"," WHEN row_num = 2 THEN date_add(current_date(), -1)\n"," WHEN row_num = 3 THEN date_add(current_date(), -cast(rand() * 7 as int))\n"," ELSE date_add(date_add(current_date(), -7), -30 + cast(rand() * 30 as int))\n"," END\n"," \"\"\"\n",").cast('timestamp'))\n","\n","\n","# Combine the new date with the original time part of StartTime\n","df_temp = df_temp.withColumn(\"StartTime\", expr(\"to_timestamp(concat(date_format(NewStartTime, 'yyyy-MM-dd'), ' ', date_format(StartTime, 'HH:mm:ss.SSS')))\"))\n","\n","\n","# Adjust EndTime based on NewStartTime and Duration (Duration is in minutes)\n","interval_str = \"Duration minutes\"\n","df_temp = df_temp.withColumn(\"EndTime\", expr(\"StartTime + make_interval(0, 0, 0, 0, 0, Duration, 0)\"))\n","\n","\n","# Print the time difference in a sentence\n","# print(f\"The difference between the current time and the maximum date is {days} days, {hours} hours, {minutes} minutes, and {seconds} seconds.\")\n","\n","\n","\n","# Combine the new date with the original time part of ConversationDate to form NewConversationDate\n","df_temp = df_temp.withColumn(\"ConversationDate\", expr(\"concat(date_format(StartTime, 'yyyy-MM-dd'), ' ', date_format(ConversationDate, 'HH:mm:ss.SSS'))\"))\n","df_temp = df_temp.withColumn(\"ConversationDate\", col(\"ConversationDate\").cast(\"timestamp\"))\n","\n","\n","# Drop helper columns\n","df_temp = df_temp.drop(\"row_num\", \"NewStartTime\")\n","\n","# display(df_temp)\n","\n"]},{"cell_type":"code","execution_count":null,"id":"f91bb363-38ca-4490-94f1-ec67b457aa0f","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["df_temp.write.format('delta').mode('overwrite').option(\"overwriteSchema\", \"true\").saveAsTable('ckm_conv_processed_temp')"]},{"cell_type":"code","execution_count":null,"id":"e8e036de-0d34-4ea5-ab75-b624ddc2e220","metadata":{"collapsed":false,"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["df = spark.sql(\"SELECT * FROM ckm_conv_processed_temp \")"]},{"cell_type":"code","execution_count":null,"id":"82c35c12-b919-4e55-959a-2300f0412ee0","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["df.write.format('delta').mode('overwrite').option(\"overwriteSchema\", \"false\").saveAsTable('ckm_conv_processed')"]}],"metadata":{"dependencies":{"lakehouse":{"default_lakehouse":"e6ad9dad-e3da-4da5-bca6-6572c466b69a","default_lakehouse_name":"ckm_lakehouse","default_lakehouse_workspace_id":"0d98d480-171b-4b4d-a8e7-80fbd031d1a6","known_lakehouses":[{"id":"e6ad9dad-e3da-4da5-bca6-6572c466b69a"}]}},"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"display_name":"Synapse PySpark","language":"Python","name":"synapse_pyspark"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"synapse_pyspark","ms_spell_check":{"ms_spell_check_language":"en"}},"nteract":{"version":"[email protected]"},"spark_compute":{"compute_id":"/trident/default"},"synapse_widget":{"state":{},"version":"0.1"},"widgets":{}},"nbformat":4,"nbformat_minor":5}
{"cells":[{"cell_type":"code","execution_count":null,"id":"3b73b213-58af-4209-9efd-ac34c9e1e1d7","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["# IMPORTANT: This notebook manipulates sample data to guarantee that the Power BI report includes data for the current date, the last two days, and the last seven days. \n","# It is OPTIONAL and is only used to ensure the Power BI report can display data during each deployment."]},{"cell_type":"code","execution_count":null,"id":"e8e036de-0d34-4ea5-ab75-b624ddc2e220","metadata":{"collapsed":false,"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["%%sql\n","--# RUN TO MOVE THE DATES FORWARD TO TODAY\n","UPDATE ckm_conv_processed\n","SET StartTime = DATEADD(day, (SELECT DATEDIFF(NOW(), MAX(ConversationDate)) FROM ckm_conv_processed), StartTime),\n","EndTime = DATEADD(day, (SELECT DATEDIFF(NOW(), MAX(ConversationDate)) FROM ckm_conv_processed), EndTime),\n","ConversationDate = DATEADD(day, (SELECT DATEDIFF(NOW(), MAX(ConversationDate)) FROM ckm_conv_processed), ConversationDate)"]},{"cell_type":"code","execution_count":null,"id":"82c35c12-b919-4e55-959a-2300f0412ee0","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["# This code manipulates sample data that allocates a percentage of the data\n","# across a two weeks period to support storytelling and demo\n","\n","import pandas as pd\n","from datetime import date, datetime, timedelta\n","from pyspark.sql.functions import col\n","\n","df = spark.sql(\"SELECT * FROM ckm_conv_processed\")\n","\n","# Convert string columns to timestamp types\n","df = df.withColumn(\"StartTime\", col(\"StartTime\").cast(\"timestamp\"))\n","df = df.withColumn(\"EndTime\", col(\"EndTime\").cast(\"timestamp\"))\n","df = df.withColumn(\"ConversationDate\", col(\"ConversationDate\").cast(\"timestamp\"))\n","\n","dfp = df.toPandas()\n","dfp = dfp.sample(frac=1) # This line randomly shuffles the df for a new distribution and demo percentages\n","\n","# Following list are date weights from Today-0 to Today-13 (two weeks)\n","weights = [30, 26, 5, 5, 5, 5, 15, 2, 2, 1, 1, 1, 1, 1]\n","dfindex = 0 # index loop through all conversations\n","daysback = 0 # start at today and work backwards\n","for row in weights:\n"," numconvos = int((row/100.00) * df.count())\n"," for i in range(numconvos):\n"," dfp.at[dfindex, 'StartTime'] = datetime.combine(date.today() - timedelta(days = daysback) , dfp.at[dfindex, 'StartTime'].time())\n"," dfp.at[dfindex, 'EndTime'] = datetime.combine(date.today() - timedelta(days = daysback) , dfp.at[dfindex, 'EndTime'].time())\n"," dfp.at[dfindex, 'ConversationDate'] = datetime.combine(date.today() - timedelta(days = daysback) , dfp.at[dfindex, 'ConversationDate'].time())\n"," dfindex += 1\n"," daysback += 1\n","df = spark.createDataFrame(dfp)\n","\n","# Write to temp table, then update final results table\n","df.write.format('delta').mode('overwrite').option(\"overwriteSchema\", \"true\").saveAsTable('ckm_conv_processed_temp')\n","df = spark.sql(\"SELECT * FROM ckm_conv_processed_temp \")\n","df.write.format('delta').mode('overwrite').option(\"overwriteSchema\", \"false\").saveAsTable('ckm_conv_processed')"]}],"metadata":{"dependencies":{"lakehouse":{"default_lakehouse":"e6ad9dad-e3da-4da5-bca6-6572c466b69a","default_lakehouse_name":"ckm_lakehouse","default_lakehouse_workspace_id":"0d98d480-171b-4b4d-a8e7-80fbd031d1a6","known_lakehouses":[{"id":"e6ad9dad-e3da-4da5-bca6-6572c466b69a"}]}},"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"display_name":"Synapse PySpark","language":"Python","name":"synapse_pyspark"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"synapse_pyspark","ms_spell_check":{"ms_spell_check_language":"en"}},"nteract":{"version":"[email protected]"},"spark_compute":{"compute_id":"/trident/default"},"synapse_widget":{"state":{},"version":"0.1"},"widgets":{}},"nbformat":4,"nbformat_minor":5}

0 comments on commit fd7057d

Please sign in to comment.