diff --git a/src/main/scala/com/microsoft/hyperspace/recommendation/Recommendation.scala b/src/main/scala/com/microsoft/hyperspace/recommendation/Recommendation.scala new file mode 100644 index 000000000..38440733b --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/recommendation/Recommendation.scala @@ -0,0 +1,98 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.recommendation + +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +import org.apache.spark.sql.DataFrame + +import com.microsoft.hyperspace.index.IndexConfig +import com.microsoft.hyperspace.recommendation.cost.{QueryImpact, WorkloadImpact} + +/** + * Represents recommendation results. + */ +class Recommendation { + + /** + * The map from data frames to their recommended indexes. + * The data frames here do not need be the same as the workload queries. + * In fact, the data frames are either base tables or intermediate + * subqueries (views) on top of which indexes are recommended. + */ + private val dfToIndexConfigs = new mutable.HashMap[DataFrame, ListBuffer[IndexConfig]]() + + /** + * Add a recommended index. + * + * @param df The table or data frame on which the index should be created. + * @param indexConfig The recommended index. + */ + def add(df: DataFrame, indexConfig: IndexConfig): Unit = { + val indexConfigs = dfToIndexConfigs.getOrElseUpdate(df, new ListBuffer[IndexConfig]) + if (!indexConfigs.contains(indexConfig)) { + indexConfigs += indexConfig + } + } + + /** + * Get all recommended indexes for the given table or data frame. + * + * @param df The table or data frame. + * @return The recommended indexes. + */ + def get(df: DataFrame): Seq[IndexConfig] = dfToIndexConfigs(df) + + /** + * Get all recommendations. + * + * @return The map from tables or data frames to their recommended indexes. + */ + def getAll: Map[DataFrame, Seq[IndexConfig]] = dfToIndexConfigs.toMap + + /** + * Get the number of indexes recommended. + * + * @return The number of indexes recommended. + */ + def numIndexes: Int = getAllIndexes.length + + /** + * Get all recommended indexes. + * + * @return A collection of all recommended indexes. + */ + def getAllIndexes: Seq[IndexConfig] = + dfToIndexConfigs.flatMap(x => x._2).toSeq +} + +/** + * Represents recommendation for a single query. + * + * @param recommendation The recommendation. + * @param impact The impact on query execution cost. + */ +case class QueryRecommendation(recommendation: Recommendation, impact: Option[QueryImpact]) + +/** + * Represents recommendation for a workload. + * + * @param recommendation The recommendation. + * @param impact The impact on workload execution cost. + */ +case class WorkloadRecommendation(recommendation: Recommendation, impact: Option[WorkloadImpact]) diff --git a/src/main/scala/com/microsoft/hyperspace/recommendation/RecommendationConstants.scala b/src/main/scala/com/microsoft/hyperspace/recommendation/RecommendationConstants.scala new file mode 100644 index 000000000..24a7e4a1d --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/recommendation/RecommendationConstants.scala @@ -0,0 +1,44 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.recommendation + +/** + * Represent constants used in recommendation options and settings. + */ +object RecommendationConstants { + + //////////////////////////////////////////////////////////////////////// + // Index recommendation option constants. + //////////////////////////////////////////////////////////////////////// + + // The maximum number of indexes to recommend. + val INDEX_COUNT_MAX = "index.count.max" + val INDEX_COUNT_MAX_DEFAULT = "50" + + // The index storage budget (in GB). + val INDEX_STORAGE_BUDGET_GB = "index.storage.budget" + val INDEX_STORAGE_BUDGET_GB_DEFAULT = "3000" + + //////////////////////////////////////////////////////////////////////// + // Cost metric option constants. + //////////////////////////////////////////////////////////////////////// + + val COST_METRIC = "cost.metric" + val COST_METRIC_SIZE = "cost.metric.size" + val COST_METRIC_CARD = "cost.metric.card" + val COST_METRIC_DEFAULT: String = COST_METRIC_SIZE +} diff --git a/src/main/scala/com/microsoft/hyperspace/recommendation/Workload.scala b/src/main/scala/com/microsoft/hyperspace/recommendation/Workload.scala new file mode 100644 index 000000000..d8a4d59d5 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/recommendation/Workload.scala @@ -0,0 +1,78 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.recommendation + +import scala.collection.mutable.ListBuffer + +import org.apache.spark.sql.{DataFrame, SparkSession} + +/** + * Representation of a Spark SQL query. + * + * @param Id The id of the query + * @param Text The text of the query + */ +case class Query(Id: String, Text: String) + +/** + * Represents a workload, which is a collection of queries. + */ +class Workload { + + // The set of (query, weight) in this workload. + // Each query is associated with a weight that indicates its importance. + // One simple way of assigning weights is to make them proportional to their frequencies. + private val queries = new ListBuffer[(DataFrame, Double)] + + /** + * Add a query and its weight into the workload. + * + * @param df The query in its [[DataFrame]] representation. + * @param weight The weight of the query. + */ + def addQuery(df: DataFrame, weight: Double = 1.0): Unit = { + queries += ((df, weight)) + } + + /** + * Get all queries and their associated weights. + * + * @return A [[Seq]] of queries with their weights. + */ + def getQueries: Seq[(DataFrame, Double)] = { + queries.toList + } +} + +object Workload { + + /** + * Create a workload with the given queries. + * + * @param queries The queries in SQL text format. + * @param spark The Spark session. + * @return The workload created. + */ + def create(queries: Seq[Query], spark: SparkSession): Workload = { + val workload = new Workload + queries.foreach { query => + val df = spark.sql(query.Text) + workload.addQuery(df) + } + workload + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/recommendation/cost/GenericCost.scala b/src/main/scala/com/microsoft/hyperspace/recommendation/cost/GenericCost.scala new file mode 100644 index 000000000..c497533b0 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/recommendation/cost/GenericCost.scala @@ -0,0 +1,125 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.recommendation.cost + +import org.apache.spark.sql.catalyst.optimizer.Cost + +import com.microsoft.hyperspace.recommendation.RecommendationConstants + +/** + * An interface that all costs should implement. + */ +trait GenericCost { + + // Invalid cost. + val INVALID_COST: Double = -1.0 + + /** + * Get cost as a double. Subclasses should override this. + * + * @return The cost as a double. + */ + def getCost: Double = INVALID_COST +} + +/** + * Simple cost based on the given cost value (i.e., just a wrapper). + * + * @param cost The cost value in double. + */ +case class SimpleCost(cost: Double) extends GenericCost { + + override def getCost: Double = { + // if (cost < 0) INVALID_COST else cost + cost + } +} + +/** + * Cost based on the "size" given by the default Spark [[Cost]]. + * + * @param cost The default Spark [[Cost]]. + */ +case class SizeBasedCost(cost: Cost) extends GenericCost { + + override def getCost: Double = { + // Simply cast the "size" as a double. + cost.size.toDouble + } +} + +/** + * Cost based on the "card" given by the default Spark [[Cost]]. + * + * @param cost The default Spark [[Cost]]. + */ +case class CardBasedCost(cost: Cost) extends GenericCost { + + override def getCost: Double = { + // Simply cast the "card" as a double. + cost.card.toDouble + } +} + +/** + * Improvement of costs. + */ +case class CostImprovement(oldCost: Option[GenericCost], newCost: Option[GenericCost]) { + + override def toString: String = { + if (oldCost.isDefined && newCost.isDefined) { + s"[old cost: ${oldCost.get.getCost}, new cost: ${newCost.get.getCost}, " + + s"improvement: $getImprovement]" + } else { + s"[old cost: N/A, new cost: N/A, improvement: N/A" + } + } + + /** + * Compute improvement in terms of estimated costs. + * + * @return improvement in percentage. + */ + def getImprovement: Option[Double] = { + if (oldCost.isDefined && newCost.isDefined) { + if (oldCost.get.getCost == 0) { + return Some(0.0) + } + // Compute improvement with respect to the old cost. + return Some( + 100 * (oldCost.get.getCost - newCost.get.getCost) / Math.abs(oldCost.get.getCost)) + } + None + } +} + +/** + * A factory for making costs. + */ +object CostFactory { + + def makeCost(costType: String, cost: Cost): GenericCost = { + costType match { + case RecommendationConstants.COST_METRIC_CARD => CardBasedCost(cost) + case _ => SizeBasedCost(cost) + } + } + + def makeCost(cost: Double): GenericCost = { + SimpleCost(cost) + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/recommendation/cost/WorkloadImpact.scala b/src/main/scala/com/microsoft/hyperspace/recommendation/cost/WorkloadImpact.scala new file mode 100644 index 000000000..926246146 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/recommendation/cost/WorkloadImpact.scala @@ -0,0 +1,101 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.recommendation.cost + +import scala.collection.mutable.ListBuffer + +import org.apache.spark.sql.DataFrame + +import com.microsoft.hyperspace.index.IndexConfig +import com.microsoft.hyperspace.recommendation.RecommendationConstants + +/** + * Represents impact on a single query. + * + * @param df The query in its [[DataFrame]] representation. + * @param cost The [[CostImprovement]] of the query. + * @param usedIndexes The indexes referenced by the query (plan). + */ +case class QueryImpact( + df: DataFrame, + cost: CostImprovement, + usedIndexes: Option[Seq[IndexConfig]]) { + + override def toString: String = { + s"cost: ${cost.toString}\nusedIndexes: ${if (usedIndexes.isEmpty) "None" + else "[" + usedIndexes.get.map(x => x.toString).mkString(", ") + "]"}" + } +} + +/** + * Represents impact on a workload. + */ +class WorkloadImpact { + + // The set of (QueryImpact, weight) in this workload. + private val queryImpacts = new ListBuffer[(QueryImpact, Double)] + + /** + * Add a [[QueryImpact]]. + * + * @param impact The impact on the query. + * @param weight The weight of the query (within the workload). + */ + def addQueryImpact(impact: QueryImpact, weight: Double = 1.0): Unit = { + queryImpacts += ((impact, weight)) + } + + /** + * Get all [[QueryImpact]] and their associated weights. + * + * @return A [[Seq]] of [[QueryImpact]] with their associated weights. + */ + def getQueryImpacts: Seq[(QueryImpact, Double)] = { + queryImpacts.toList + } + + override def toString: String = { + val costImprovement = getCostImprovement + s"Old cost: ${if (costImprovement.oldCost.nonEmpty) costImprovement.oldCost.get.getCost + else "None"}, New cost: " + + s"${if (costImprovement.newCost.nonEmpty) costImprovement.newCost.get.getCost + else "None"}, Improvement: " + + s"${if (costImprovement.getImprovement.nonEmpty) costImprovement.getImprovement.get + else "None"}" + } + + /** + * Get [[CostImprovement]] of the whole workload. + * + * @return The cost improvement of the workload. + */ + def getCostImprovement: CostImprovement = { + if (queryImpacts.forall( + impact => impact._1.cost.oldCost.isDefined && impact._1.cost.newCost.isDefined)) { + var oldWorkloadCost, newWorkloadCost = 0.0 + queryImpacts.foreach { x => + oldWorkloadCost += x._1.cost.oldCost.get.getCost * x._2 + newWorkloadCost += x._1.cost.newCost.get.getCost * x._2 + } + CostImprovement( + Some(CostFactory.makeCost(oldWorkloadCost)), + Some(CostFactory.makeCost(newWorkloadCost))) + } else { + CostImprovement(None, None) + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/recommendation/index/IndexRecommender.scala b/src/main/scala/com/microsoft/hyperspace/recommendation/index/IndexRecommender.scala new file mode 100644 index 000000000..a9f6cae22 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/recommendation/index/IndexRecommender.scala @@ -0,0 +1,45 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.recommendation.index + +import scala.collection.mutable + +import org.apache.spark.sql.DataFrame + +import com.microsoft.hyperspace.recommendation._ + +/** + * An interface that all index recommenders should implement. + */ +trait IndexRecommender { + + /** + * Recommend indexes for a single query with respect to the given recommendation options. + * + * @param df The query. + * @return The recommendation for the query. + */ + def recommend(df: DataFrame): QueryRecommendation + + /** + * Recommend indexes for a workload with respect to the given recommendation options. + * + * @param workload The workload. + * @return The recommendation for the workload. + */ + def recommend(workload: Workload): WorkloadRecommendation +}