Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.recommendation

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

import org.apache.spark.sql.DataFrame

import com.microsoft.hyperspace.index.IndexConfig
import com.microsoft.hyperspace.recommendation.cost.{QueryImpact, WorkloadImpact}

/**
* Represents recommendation results.
*/
class Recommendation {

/**
* The map from data frames to their recommended indexes.
* The data frames here do not need be the same as the workload queries.
* In fact, the data frames are either base tables or intermediate
* subqueries (views) on top of which indexes are recommended.
*/
private val dfToIndexConfigs = new mutable.HashMap[DataFrame, ListBuffer[IndexConfig]]()

/**
* Add a recommended index.
*
* @param df The table or data frame on which the index should be created.
* @param indexConfig The recommended index.
*/
def add(df: DataFrame, indexConfig: IndexConfig): Unit = {
val indexConfigs = dfToIndexConfigs.getOrElseUpdate(df, new ListBuffer[IndexConfig])
if (!indexConfigs.contains(indexConfig)) {
indexConfigs += indexConfig
}
}

/**
* Get all recommended indexes for the given table or data frame.
*
* @param df The table or data frame.
* @return The recommended indexes.
*/
def get(df: DataFrame): Seq[IndexConfig] = dfToIndexConfigs(df)

/**
* Get all recommendations.
*
* @return The map from tables or data frames to their recommended indexes.
*/
def getAll: Map[DataFrame, Seq[IndexConfig]] = dfToIndexConfigs.toMap

/**
* Get the number of indexes recommended.
*
* @return The number of indexes recommended.
*/
def numIndexes: Int = getAllIndexes.length

/**
* Get all recommended indexes.
*
* @return A collection of all recommended indexes.
*/
def getAllIndexes: Seq[IndexConfig] =
dfToIndexConfigs.flatMap(x => x._2).toSeq
}

/**
* Represents recommendation for a single query.
*
* @param recommendation The recommendation.
* @param impact The impact on query execution cost.
*/
case class QueryRecommendation(recommendation: Recommendation, impact: Option[QueryImpact])

/**
* Represents recommendation for a workload.
*
* @param recommendation The recommendation.
* @param impact The impact on workload execution cost.
*/
case class WorkloadRecommendation(recommendation: Recommendation, impact: Option[WorkloadImpact])
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.recommendation

/**
* Represent constants used in recommendation options and settings.
*/
object RecommendationConstants {

////////////////////////////////////////////////////////////////////////
// Index recommendation option constants.
////////////////////////////////////////////////////////////////////////

// The maximum number of indexes to recommend.
val INDEX_COUNT_MAX = "index.count.max"
val INDEX_COUNT_MAX_DEFAULT = "50"

// The index storage budget (in GB).
val INDEX_STORAGE_BUDGET_GB = "index.storage.budget"
val INDEX_STORAGE_BUDGET_GB_DEFAULT = "3000"

////////////////////////////////////////////////////////////////////////
// Cost metric option constants.
////////////////////////////////////////////////////////////////////////

val COST_METRIC = "cost.metric"
val COST_METRIC_SIZE = "cost.metric.size"
val COST_METRIC_CARD = "cost.metric.card"
val COST_METRIC_DEFAULT: String = COST_METRIC_SIZE
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.recommendation

import scala.collection.mutable.ListBuffer

import org.apache.spark.sql.{DataFrame, SparkSession}

/**
* Representation of a Spark SQL query.
*
* @param Id The id of the query
* @param Text The text of the query
*/
case class Query(Id: String, Text: String)

/**
* Represents a workload, which is a collection of queries.
*/
class Workload {

// The set of (query, weight) in this workload.
// Each query is associated with a weight that indicates its importance.
// One simple way of assigning weights is to make them proportional to their frequencies.
private val queries = new ListBuffer[(DataFrame, Double)]

/**
* Add a query and its weight into the workload.
*
* @param df The query in its [[DataFrame]] representation.
* @param weight The weight of the query.
*/
def addQuery(df: DataFrame, weight: Double = 1.0): Unit = {
queries += ((df, weight))
}

/**
* Get all queries and their associated weights.
*
* @return A [[Seq]] of queries with their weights.
*/
def getQueries: Seq[(DataFrame, Double)] = {
queries.toList
}
}

object Workload {

/**
* Create a workload with the given queries.
*
* @param queries The queries in SQL text format.
* @param spark The Spark session.
* @return The workload created.
*/
def create(queries: Seq[Query], spark: SparkSession): Workload = {
val workload = new Workload
queries.foreach { query =>
val df = spark.sql(query.Text)
workload.addQuery(df)
}
workload
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.recommendation.cost

import org.apache.spark.sql.catalyst.optimizer.Cost

import com.microsoft.hyperspace.recommendation.RecommendationConstants

/**
* An interface that all costs should implement.
*/
trait GenericCost {

// Invalid cost.
val INVALID_COST: Double = -1.0

/**
* Get cost as a double. Subclasses should override this.
*
* @return The cost as a double.
*/
def getCost: Double = INVALID_COST
}

/**
* Simple cost based on the given cost value (i.e., just a wrapper).
*
* @param cost The cost value in double.
*/
case class SimpleCost(cost: Double) extends GenericCost {

override def getCost: Double = {
// if (cost < 0) INVALID_COST else cost
cost
}
}

/**
* Cost based on the "size" given by the default Spark [[Cost]].
*
* @param cost The default Spark [[Cost]].
*/
case class SizeBasedCost(cost: Cost) extends GenericCost {

override def getCost: Double = {
// Simply cast the "size" as a double.
cost.size.toDouble
}
}

/**
* Cost based on the "card" given by the default Spark [[Cost]].
*
* @param cost The default Spark [[Cost]].
*/
case class CardBasedCost(cost: Cost) extends GenericCost {

override def getCost: Double = {
// Simply cast the "card" as a double.
cost.card.toDouble
}
}

/**
* Improvement of costs.
*/
case class CostImprovement(oldCost: Option[GenericCost], newCost: Option[GenericCost]) {

override def toString: String = {
if (oldCost.isDefined && newCost.isDefined) {
s"[old cost: ${oldCost.get.getCost}, new cost: ${newCost.get.getCost}, " +
s"improvement: $getImprovement]"
} else {
s"[old cost: N/A, new cost: N/A, improvement: N/A"
}
}

/**
* Compute improvement in terms of estimated costs.
*
* @return improvement in percentage.
*/
def getImprovement: Option[Double] = {
if (oldCost.isDefined && newCost.isDefined) {
if (oldCost.get.getCost == 0) {
return Some(0.0)
}
// Compute improvement with respect to the old cost.
return Some(
100 * (oldCost.get.getCost - newCost.get.getCost) / Math.abs(oldCost.get.getCost))
}
None
}
}

/**
* A factory for making costs.
*/
object CostFactory {

def makeCost(costType: String, cost: Cost): GenericCost = {
costType match {
case RecommendationConstants.COST_METRIC_CARD => CardBasedCost(cost)
case _ => SizeBasedCost(cost)
}
}

def makeCost(cost: Double): GenericCost = {
SimpleCost(cost)
}
}
Loading