From e95b91533be46bf6abc8221aa0db61d58627c0a9 Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Thu, 16 Jan 2025 16:08:37 -0500 Subject: [PATCH] Squashed commit of the following: commit cd7954379c63484c39bfd14d49e575af6cf2c9ac Author: Stephen Carman Date: Thu Jan 16 16:05:10 2025 -0500 feat: support reading tables via Unity Catalog provided credentials Signed-off-by: Stephen Carman commit 5a9ab33fc0bd3c1229de8400170c3443f1b3a5a0 Merge: 9551413d d6e1fbfc Author: Stephen Carman Date: Thu Jan 16 15:48:52 2025 -0500 Merge remote-tracking branch 'mine/unity-catalog-read' into unity-catalog-read # Conflicts: # crates/catalog-unity/Cargo.toml commit 9551413d39ddb75798faee44367c5ec28666ea7a Merge: ed8ac966 7f8b2abe Author: Stephen Carman Date: Thu Jan 16 15:47:41 2025 -0500 feat: support reading tables via Unity Catalog provided credentials Signed-off-by: Stephen Carman commit d6e1fbfc9bfd8819d005c4aff82225409452dfb3 Merge: ed8ac966 7f8b2abe Author: Stephen Carman Date: Thu Jan 16 15:47:41 2025 -0500 Merge remote-tracking branch 'mine/unity-catalog-read' into unity-catalog-read # Conflicts: # crates/catalog-unity/Cargo.toml commit ed8ac9664c7ff30c930c74e5d3bbb504d0198dcf Author: Stephen Carman Date: Sun Dec 22 12:34:42 2024 -0500 feat: support reading tables via Unity Catalog provided credentials Signed-off-by: Stephen Carman commit bb81a51eb502a98552cf1bdb36644c3ae6c588ce Author: Stephen Carman Date: Sun Dec 22 12:34:42 2024 -0500 feat: support reading tables via Unity Catalog provided credentials Signed-off-by: Stephen Carman commit 17a9d1b4e3ed298b61b4b13b3d3e57af6ce404ec Author: Stephen Carman Date: Sun Dec 22 12:34:42 2024 -0500 feat: support reading tables via Unity Catalog provided credentials Signed-off-by: Stephen Carman commit 7f8b2abee35fbe25686d0be96231a7989ff6acb2 Merge: 9a6e48e9 24bbcdbc Author: Stephen Carman Date: Thu Dec 26 11:43:29 2024 -0500 Merge branch 'main' into unity-catalog-read commit 9a6e48e9ce87a8bde86749f28ad098ede434d649 Author: Stephen Carman Date: Sun Dec 22 12:34:42 2024 -0500 feat: support reading tables via Unity Catalog provided credentials Signed-off-by: Stephen Carman commit 1c3fe85185370fa199996715ab9afd76a4b83104 Author: Stephen Carman Date: Sun Dec 22 12:34:42 2024 -0500 feat: support reading tables via Unity Catalog provided credentials Signed-off-by: Stephen Carman commit f7cc78be91a9f31f183c571184ab44333848b6d4 Author: Stephen Carman Date: Sun Dec 22 12:34:42 2024 -0500 feat: support reading tables via Unity Catalog provided credentials Signed-off-by: Stephen Carman --- crates/catalog-unity/Cargo.toml | 19 +- crates/catalog-unity/examples/uc_example.rs | 46 ++ .../catalog-unity/src/client/mock_server.rs | 0 crates/catalog-unity/src/datafusion.rs | 110 ++-- crates/catalog-unity/src/error.rs | 38 -- crates/catalog-unity/src/lib.rs | 72 ++- crates/catalog-unity/src/models.rs | 496 ++++++++++++------ crates/catalog-unity/src/prelude.rs | 4 + crates/core/src/operations/add_column.rs | 4 +- crates/core/src/table/builder.rs | 2 +- 10 files changed, 546 insertions(+), 245 deletions(-) create mode 100644 crates/catalog-unity/examples/uc_example.rs delete mode 100644 crates/catalog-unity/src/client/mock_server.rs delete mode 100644 crates/catalog-unity/src/error.rs create mode 100644 crates/catalog-unity/src/prelude.rs diff --git a/crates/catalog-unity/Cargo.toml b/crates/catalog-unity/Cargo.toml index 99f4426d51..f1d00808ec 100644 --- a/crates/catalog-unity/Cargo.toml +++ b/crates/catalog-unity/Cargo.toml @@ -19,10 +19,12 @@ serde_json.workspace = true thiserror.workspace = true deltalake-core = { version = "0.24.0", path = "../core", features = [ "datafusion", -]} +] } +deltalake-aws = { version = "0.7.0", path = "../aws", optional = true } +deltalake-azure = { version = "0.7.0", path = "../azure", optional = true } reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json", "http2"] } reqwest-retry = "0.7" -reqwest-middleware = "0.4.0" +reqwest-middleware = { version = "0.4.0", features = ["json"] } rand = "0.8" futures = { workspace = true } chrono = { workspace = true } @@ -30,13 +32,22 @@ dashmap = "6" tracing = { workspace = true } datafusion = { workspace = true, optional = true } datafusion-common = { workspace = true, optional = true } +moka = { version = "0.12", optional = true, features = ["future"] } [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } tempfile = "3" httpmock = { version = "0.8.0-alpha.1" } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } [features] -default = [] -datafusion = ["dep:datafusion", "datafusion-common"] +default = ["datafusion", "aws"] +aws = ["deltalake-aws"] +azure = ["deltalake-azure"] +datafusion = ["dep:datafusion", "datafusion-common", "deltalake-core/datafusion", "moka"] + +[[example]] +name = "uc_example" +path = "examples/uc_example.rs" +required-features = ["datafusion", "aws"] diff --git a/crates/catalog-unity/examples/uc_example.rs b/crates/catalog-unity/examples/uc_example.rs new file mode 100644 index 0000000000..bd25b2dd0d --- /dev/null +++ b/crates/catalog-unity/examples/uc_example.rs @@ -0,0 +1,46 @@ +use datafusion::prelude::*; +use deltalake_catalog_unity::prelude::*; +use std::error::Error; +use std::sync::Arc; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let filter = tracing_subscriber::EnvFilter::builder().parse("deltalake_catalog_unity=info")?; + let subscriber = tracing_subscriber::fmt() + .pretty() + .with_env_filter(filter) + .finish(); + tracing::subscriber::set_global_default(subscriber)?; + + let uc = UnityCatalogBuilder::from_env().build()?; + + deltalake_aws::register_handlers(None); + + let catalog = UnityCatalogProvider::try_new(Arc::new(uc), "scarman_sandbox").await?; + let ctx = SessionContext::new(); + ctx.register_catalog("scarman_sandbox", Arc::new(catalog)); + + ctx.sql( + "select hdci.city_name, hdci.country_code, hdci.latitude, hdci.longitude from \ + scarman_sandbox.external_data.historical_hourly_imperial hhi \ + join scarman_sandbox.external_data.historical_daily_calendar_imperial hdci on hdci.country_code = hhi.country_code \ + order by city_name \ + limit 50;" + ) + .await? + .show() + .await?; + + ctx.table("scarman_sandbox.external_data.historical_hourly_imperial") + .await? + .select(vec![ + col("city_name"), + col("country_code"), + col("latitude"), + col("longitude"), + ])? + .show_limit(50) + .await?; + + Ok(()) +} diff --git a/crates/catalog-unity/src/client/mock_server.rs b/crates/catalog-unity/src/client/mock_server.rs deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/crates/catalog-unity/src/datafusion.rs b/crates/catalog-unity/src/datafusion.rs index 23339b0b16..f5e6f2a3de 100644 --- a/crates/catalog-unity/src/datafusion.rs +++ b/crates/catalog-unity/src/datafusion.rs @@ -1,21 +1,24 @@ //! Datafusion integration for UnityCatalog -use std::any::Any; -use std::collections::HashMap; -use std::sync::Arc; - +use chrono::prelude::*; use dashmap::DashMap; use datafusion::catalog::SchemaProvider; use datafusion::catalog::{CatalogProvider, CatalogProviderList}; use datafusion::datasource::TableProvider; use datafusion_common::DataFusionError; +use futures::FutureExt; +use moka::future::Cache; +use moka::Expiry; +use std::any::Any; +use std::sync::Arc; +use std::time::{Duration, Instant}; use tracing::error; use super::models::{ GetTableResponse, ListCatalogsResponse, ListSchemasResponse, ListTableSummariesResponse, + TableTempCredentialsResponse, TemporaryTableCredentials, }; -use super::{DataCatalogResult, UnityCatalog}; - +use super::{DataCatalogResult, UnityCatalog, UnityCatalogError}; use deltalake_core::DeltaTableBuilder; /// In-memory list of catalogs populated by unity catalog @@ -27,20 +30,13 @@ pub struct UnityCatalogList { impl UnityCatalogList { /// Create a new instance of [`UnityCatalogList`] - pub async fn try_new( - client: Arc, - storage_options: impl IntoIterator, impl Into)> + Clone, - ) -> DataCatalogResult { + pub async fn try_new(client: Arc) -> DataCatalogResult { let catalogs = match client.list_catalogs().await? { - ListCatalogsResponse::Success { catalogs } => { + ListCatalogsResponse::Success { catalogs, .. } => { let mut providers = Vec::new(); for catalog in catalogs { - let provider = UnityCatalogProvider::try_new( - client.clone(), - &catalog.name, - storage_options.clone(), - ) - .await?; + let provider = + UnityCatalogProvider::try_new(client.clone(), &catalog.name).await?; providers.push((catalog.name, Arc::new(provider) as Arc)); } providers @@ -87,20 +83,15 @@ impl UnityCatalogProvider { pub async fn try_new( client: Arc, catalog_name: impl Into, - storage_options: impl IntoIterator, impl Into)> + Clone, ) -> DataCatalogResult { let catalog_name = catalog_name.into(); let schemas = match client.list_schemas(&catalog_name).await? { ListSchemasResponse::Success { schemas } => { let mut providers = Vec::new(); for schema in schemas { - let provider = UnitySchemaProvider::try_new( - client.clone(), - &catalog_name, - &schema.name, - storage_options.clone(), - ) - .await?; + let provider = + UnitySchemaProvider::try_new(client.clone(), &catalog_name, &schema.name) + .await?; providers.push((schema.name, Arc::new(provider) as Arc)); } providers @@ -127,20 +118,33 @@ impl CatalogProvider for UnityCatalogProvider { } } +struct TokenExpiry; + +impl Expiry for TokenExpiry { + fn expire_after_read( + &self, + _key: &String, + value: &TemporaryTableCredentials, + _read_at: Instant, + _duration_until_expiry: Option, + _last_modified_at: Instant, + ) -> Option { + let time_to_expire = value.expiration_time - Utc::now(); + tracing::info!("Token {} expires in {}", _key, time_to_expire); + time_to_expire.to_std().ok() + } +} + /// A datafusion [`SchemaProvider`] backed by Databricks UnityCatalog #[derive(Debug)] pub struct UnitySchemaProvider { - /// UnityCatalog Api client client: Arc, - catalog_name: String, - schema_name: String, /// Parent catalog for schemas of interest. table_names: Vec, - - storage_options: HashMap, + token_cache: Cache, } impl UnitySchemaProvider { @@ -149,7 +153,6 @@ impl UnitySchemaProvider { client: Arc, catalog_name: impl Into, schema_name: impl Into, - storage_options: impl IntoIterator, impl Into)>, ) -> DataCatalogResult { let catalog_name = catalog_name.into(); let schema_name = schema_name.into(); @@ -163,17 +166,37 @@ impl UnitySchemaProvider { .collect(), ListTableSummariesResponse::Error(_) => vec![], }; + let token_cache = Cache::builder().expire_after(TokenExpiry).build(); Ok(Self { client, table_names, catalog_name, schema_name, - storage_options: storage_options - .into_iter() - .map(|(key, value)| (key.into(), value.into())) - .collect(), + token_cache, }) } + + async fn get_creds( + &self, + catalog: &str, + schema: &str, + name: &str, + ) -> Result { + tracing::debug!( + "Fetching new credential for: {}.{}.{}", + catalog, + schema, + name + ); + self.client + .get_temp_table_credentials(catalog, schema, name) + .map(|resp| match resp { + Ok(TableTempCredentialsResponse::Success(temp_creds)) => Ok(temp_creds), + Ok(TableTempCredentialsResponse::Error(err)) => Err(err.into()), + Err(err) => Err(err.into()), + }) + .await + } } #[async_trait::async_trait] @@ -195,8 +218,23 @@ impl SchemaProvider for UnitySchemaProvider { match maybe_table { GetTableResponse::Success(table) => { + let temp_creds = self + .token_cache + .try_get_with( + table.table_id, + self.get_creds(&self.catalog_name, &self.schema_name, name), + ) + .await + .map_err(|err| DataFusionError::External(err.into()))?; + + let new_storage_opts = temp_creds + .aws_temp_credentials + .ok_or_else(|| { + DataFusionError::External(UnityCatalogError::MissingCredential.into()) + })? + .into(); let table = DeltaTableBuilder::from_uri(table.storage_location) - .with_storage_options(self.storage_options.clone()) + .with_storage_options(new_storage_opts) .load() .await?; Ok(Some(Arc::new(table))) diff --git a/crates/catalog-unity/src/error.rs b/crates/catalog-unity/src/error.rs deleted file mode 100644 index a13c3dc401..0000000000 --- a/crates/catalog-unity/src/error.rs +++ /dev/null @@ -1,38 +0,0 @@ -#[derive(thiserror::Error, Debug)] -pub enum UnityCatalogError { - /// A generic error qualified in the message - #[error("Error in {catalog} catalog: {source}")] - Generic { - /// Name of the catalog - catalog: &'static str, - /// Error message - source: Box, - }, - - /// A generic error qualified in the message - #[error("{source}")] - Retry { - /// Error message - #[from] - source: crate::client::retry::RetryError, - }, - - #[error("Request error: {source}")] - /// Error from reqwest library - RequestError { - /// The underlying reqwest_middleware::Error - #[from] - source: reqwest::Error, - }, - - /// Error caused by missing environment variable for Unity Catalog. - #[error("Missing Unity Catalog environment variable: {var_name}")] - MissingEnvVar { - /// Variable name - var_name: String, - }, - - /// Error caused by invalid access token value - #[error("Invalid Databricks personal access token")] - InvalidAccessToken, -} diff --git a/crates/catalog-unity/src/lib.rs b/crates/catalog-unity/src/lib.rs index 816376eba8..1f71c961d2 100644 --- a/crates/catalog-unity/src/lib.rs +++ b/crates/catalog-unity/src/lib.rs @@ -1,12 +1,11 @@ //! Databricks Unity Catalog. -use std::str::FromStr; - use reqwest::header::{HeaderValue, InvalidHeaderValue, AUTHORIZATION}; +use std::str::FromStr; use crate::credential::{AzureCliCredential, ClientSecretOAuthProvider, CredentialProvider}; use crate::models::{ - GetSchemaResponse, GetTableResponse, ListCatalogsResponse, ListSchemasResponse, - ListTableSummariesResponse, + ErrorResponse, GetSchemaResponse, GetTableResponse, ListCatalogsResponse, ListSchemasResponse, + ListTableSummariesResponse, TableTempCredentialsResponse, TemporaryTableCredentialsRequest, }; use deltalake_core::data_catalog::DataCatalogResult; @@ -19,8 +18,8 @@ pub mod client; pub mod credential; #[cfg(feature = "datafusion")] pub mod datafusion; -pub mod error; pub mod models; +pub mod prelude; /// Possible errors from the unity-catalog/tables API call #[derive(thiserror::Error, Debug)] @@ -71,6 +70,19 @@ pub enum UnityCatalogError { #[error("Missing or corrupted federated token file for WorkloadIdentity.")] FederatedTokenFile, + + #[cfg(feature = "datafusion")] + #[error("Datafusion error: {0}")] + DatafusionError(#[from] datafusion_common::DataFusionError), +} + +impl From for UnityCatalogError { + fn from(value: ErrorResponse) -> Self { + UnityCatalogError::InvalidTable { + error_code: value.error_code, + message: value.message, + } + } } impl From for DataCatalogError { @@ -189,9 +201,10 @@ impl FromStr for UnityCatalogConfigKey { #[allow(deprecated)] fn from_str(s: &str) -> Result { match s { - "access_token" | "unity_access_token" | "databricks_access_token" => { - Ok(UnityCatalogConfigKey::AccessToken) - } + "access_token" + | "unity_access_token" + | "databricks_access_token" + | "databricks_token" => Ok(UnityCatalogConfigKey::AccessToken), "authority_host" | "unity_authority_host" | "databricks_authority_host" => { Ok(UnityCatalogConfigKey::AuthorityHost) } @@ -348,6 +361,7 @@ impl UnityCatalogBuilder { for (os_key, os_value) in std::env::vars_os() { if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) { if key.starts_with("UNITY_") || key.starts_with("DATABRICKS_") { + tracing::debug!("Found relevant env: {}", key); if let Ok(config_key) = UnityCatalogConfigKey::from_str(&key.to_ascii_lowercase()) { @@ -620,6 +634,39 @@ impl UnityCatalog { Ok(resp.json().await?) } + + pub async fn get_temp_table_credentials( + &self, + catalog_id: impl AsRef, + database_name: impl AsRef, + table_name: impl AsRef, + ) -> Result { + let token = self.get_credential().await?; + let table_info = self + .get_table(catalog_id, database_name, table_name) + .await?; + let response = match table_info { + GetTableResponse::Success(table) => { + let request = TemporaryTableCredentialsRequest::new(&table.table_id, "READ"); + Ok(self + .client + .post(format!( + "{}/temporary-table-credentials", + self.catalog_url() + )) + .header(AUTHORIZATION, token) + .json(&request) + .send() + .await?) + } + GetTableResponse::Error(err) => Err(UnityCatalogError::InvalidTable { + error_code: err.error_code, + message: err.message, + }), + }?; + + Ok(response.json().await?) + } } #[async_trait::async_trait] @@ -713,8 +760,11 @@ mod tests { let get_table_response = client .get_table("catalog_name", "schema_name", "table_name") - .await - .unwrap(); - assert!(matches!(get_table_response, GetTableResponse::Success(_))); + .await; + dbg!(&get_table_response); + assert!(matches!( + get_table_response.unwrap(), + GetTableResponse::Success(_) + )); } } diff --git a/crates/catalog-unity/src/models.rs b/crates/catalog-unity/src/models.rs index 2066a4ee86..bc1179e499 100644 --- a/crates/catalog-unity/src/models.rs +++ b/crates/catalog-unity/src/models.rs @@ -1,18 +1,33 @@ //! Api models for databricks unity catalog APIs - -use core::fmt; +use chrono::serde::*; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; - -use serde::Deserialize; +use std::fmt; /// Error response from unity API -#[derive(Debug, Deserialize)] +#[derive(Deserialize, Debug)] pub struct ErrorResponse { /// The error code pub error_code: String, /// The error message pub message: String, + #[serde(default)] + pub details: Vec, +} + +#[derive(Deserialize, Default, Debug)] +#[serde(default)] +pub struct ErrorDetails { + #[serde(rename = "@type")] + tpe: String, + reason: String, + domain: String, + metadata: HashMap, + request_id: String, + serving_data: String, } + impl fmt::Display for ErrorResponse { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { writeln!(f, "[{}] {}", self.error_code, self.message) @@ -21,20 +36,21 @@ impl fmt::Display for ErrorResponse { impl std::error::Error for ErrorResponse {} /// List catalogs response -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] #[serde(untagged)] pub enum ListCatalogsResponse { /// Successful response Success { /// The schemas within the parent catalog catalogs: Vec, + next_page_token: Option, }, /// Error response Error(ErrorResponse), } /// List schemas response -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] #[serde(untagged)] pub enum ListSchemasResponse { /// Successful response @@ -47,7 +63,7 @@ pub enum ListSchemasResponse { } /// Get table response -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] #[serde(untagged)] pub enum GetTableResponse { /// Successful response @@ -57,7 +73,7 @@ pub enum GetTableResponse { } /// List schemas response -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] #[serde(untagged)] pub enum GetSchemaResponse { /// Successful response @@ -67,7 +83,7 @@ pub enum GetSchemaResponse { } /// List table summaries response -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] #[serde(untagged)] pub enum ListTableSummariesResponse { /// Successful response @@ -82,18 +98,25 @@ pub enum ListTableSummariesResponse { Error(ErrorResponse), } -#[derive(Deserialize, Default)] +#[derive(Deserialize, Debug)] +#[serde(untagged)] +pub enum TableTempCredentialsResponse { + Success(TemporaryTableCredentials), + Error(ErrorResponse), +} + +#[derive(Deserialize, Default, Debug)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] #[allow(missing_docs)] /// Whether the current securable is accessible from all workspaces or a specific set of workspaces. -pub enum IsomationMode { +pub enum IsolationMode { #[default] Undefined, Open, Isolated, } -#[derive(Deserialize, Default)] +#[derive(Deserialize, Default, Debug)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] #[allow(missing_docs)] /// The type of the catalog. @@ -106,67 +129,63 @@ pub enum CatalogType { } /// A catalog within a metastore -#[derive(Deserialize, Default)] +#[derive(Deserialize, Default, Debug)] +#[serde(default)] pub struct Catalog { - /// Username of schema creator. - #[serde(default)] pub created_by: String, - - /// Name of schema, relative to parent catalog. pub name: String, - - /// Username of user who last modified schema. - #[serde(default)] pub updated_by: String, - - #[serde(default)] - /// Whether the current securable is accessible from all workspaces or a specific set of workspaces. - pub isolation_mode: IsomationMode, - - #[serde(default)] - /// The type of the catalog. + pub isolation_mode: IsolationMode, pub catalog_type: CatalogType, - - /// Storage root URL for managed tables within catalog. pub storage_root: String, - - /// The name of delta sharing provider. - /// - /// A Delta Sharing catalog is a catalog that is based on a Delta share on a remote sharing server. - pub provider_name: Option, - - /// Storage Location URL (full path) for managed tables within catalog. + pub provider_name: String, pub storage_location: String, - - /// A map of key-value properties attached to the securable. - #[serde(default)] pub properties: HashMap, - - /// The name of the share under the share provider. - pub share_name: Option, - - /// User-provided free-form text description. - #[serde(default)] + pub share_name: String, pub comment: String, - - /// Time at which this schema was created, in epoch milliseconds. - #[serde(default)] pub created_at: i64, - - /// Username of current owner of schema. - #[serde(default)] pub owner: String, - - /// Time at which this schema was created, in epoch milliseconds. - #[serde(default)] pub updated_at: i64, - - /// Unique identifier of parent metastore. pub metastore_id: String, + pub enabled_predictive_optimization: String, + pub effective_predictive_optimization_flag: EffectivePredictiveOptimizationFlag, + pub connection_name: String, + pub full_name: String, + pub options: HashMap, + pub securable_type: String, + pub provisioning_info: ProvisioningInfo, + pub browse_only: Option, + pub accessible_in_current_workspace: bool, + pub id: String, + pub securable_kind: String, + pub delta_sharing_valid_through_timestamp: u64, +} + +#[allow(unused)] +#[derive(Deserialize, Default, Debug)] +pub struct ProvisioningInfo { + state: ProvisioningState, +} + +#[derive(Deserialize, Debug, Default)] +pub enum ProvisioningState { + #[default] + Provisioning, + Active, + Failed, + Deleting, + Updating, +} + +#[derive(Deserialize, Default, Debug)] +pub struct EffectivePredictiveOptimizationFlag { + pub value: String, + pub inherited_from_type: String, + pub inherited_from_name: String, } /// A schema within a catalog -#[derive(Deserialize, Default)] +#[derive(Deserialize, Default, Debug)] pub struct Schema { /// Username of schema creator. #[serde(default)] @@ -220,10 +239,11 @@ pub struct Schema { pub metastore_id: String, } -#[derive(Deserialize, Default)] +#[derive(Deserialize, Default, Debug)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] #[allow(missing_docs)] /// Possible data source formats for unity tables +#[derive(Clone, PartialEq)] pub enum DataSourceFormat { #[default] Undefined, @@ -236,12 +256,27 @@ pub enum DataSourceFormat { Text, UnityCatalog, Deltasharing, + DatabricksFormat, + MySQLFormat, + PostgreSQLFormat, + RedshiftFormat, + SnowflakeFormat, + SQLDWFormat, + SQLServerFormat, + SalesForceFormat, + BigQueryFormat, + NetSuiteFormat, + WorkdayRAASFormat, + HiveSerde, + HiveCustom, + VectorIndexFormat, } -#[derive(Deserialize, Default)] +#[derive(Deserialize, Default, Debug)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] #[allow(missing_docs)] /// Possible data source formats for unity tables +#[derive(PartialEq, Clone)] pub enum TableType { #[default] Undefined, @@ -252,7 +287,7 @@ pub enum TableType { StreamingTable, } -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] /// Summary of the table pub struct TableSummary { /// The full name of the table. @@ -262,37 +297,179 @@ pub struct TableSummary { } /// A table within a schema -#[derive(Deserialize, Default)] +#[derive(Clone, Debug, PartialEq, Default, Deserialize)] pub struct Table { - /// Username of table creator. - #[serde(default)] - pub created_by: String, + pub name: String, + /// Name of parent catalog. + pub catalog_name: String, + /// Name of parent schema relative to its parent catalog. + pub schema_name: String, + pub table_type: TableType, + pub data_source_format: DataSourceFormat, + /// The array of __ColumnInfo__ definitions of the table's columns. + pub columns: Vec, + /// Storage root URL for table (for **MANAGED**, **EXTERNAL** tables) + pub storage_location: String, + /// User-provided free-form text description. + #[serde(skip_serializing_if = "Option::is_none")] + pub comment: Option, + /// A map of key-value properties attached to the securable. + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub properties: HashMap, + /// Time at which this table was created, in epoch milliseconds. + #[serde(with = "ts_milliseconds")] + pub created_at: DateTime, + /// Time at which this table was last modified, in epoch milliseconds. + #[serde(with = "ts_milliseconds")] + pub updated_at: DateTime, + /// Unique identifier for the table. + pub table_id: String, +} - /// Name of table, relative to parent schema. +#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] +pub struct ColumnInfo { + /// Name of Column. pub name: String, + /// Full data type specification as SQL/catalogString text. + #[serde(skip_serializing_if = "Option::is_none")] + pub type_text: Option, + /// Full data type specification, JSON-serialized. + #[serde(skip_serializing_if = "Option::is_none")] + pub type_json: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub type_name: Option, + /// Digits of precision; required for DecimalTypes. + #[serde(skip_serializing_if = "Option::is_none")] + pub type_precision: Option, + /// Digits to right of decimal; Required for DecimalTypes. + #[serde(skip_serializing_if = "Option::is_none")] + pub type_scale: Option, + /// Format of IntervalType. + #[serde(skip_serializing_if = "Option::is_none")] + pub type_interval_type: Option, + /// Ordinal position of column (starting at position 0). + pub position: u32, + /// User-provided free-form text description. + #[serde(skip_serializing_if = "Option::is_none")] + pub comment: Option, + /// Whether field may be Null. + pub nullable: bool, + /// Partition index for column. + #[serde(skip_serializing_if = "Option::is_none")] + pub partition_index: Option, +} - /// Username of user who last modified the table. - #[serde(default)] - pub updated_by: String, +#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum ColumnTypeName { + Boolean, + Byte, + Short, + Int, + Long, + Float, + Double, + Date, + Timestamp, + TimestampNtz, + String, + Binary, + Decimal, + Interval, + Array, + Struct, + Map, + Char, + Null, + UserDefinedType, + TableType, +} - /// List of schemes whose objects can be referenced without qualification. - #[serde(default)] - pub sql_path: String, +#[derive(Deserialize, Default, Debug)] +#[serde(default)] +pub struct DeltaRuntimeProperties { + pub delta_runtime_properties: HashMap, +} - /// Data source format - pub data_source_format: DataSourceFormat, +#[derive(Deserialize, Debug, Clone)] +pub struct TemporaryTableCredentials { + pub aws_temp_credentials: Option, + pub azure_user_delegation_sas: Option, + pub gcp_oauth_token: Option, + pub r2_temp_credentials: Option, + #[serde(with = "chrono::serde::ts_milliseconds")] + pub expiration_time: DateTime, + pub url: String, +} - /// Full name of table, in form of catalog_name.schema_name.table_name - pub full_name: String, +#[derive(Deserialize, Debug, Clone)] +pub struct AwsTempCredentials { + pub access_key_id: String, + pub secret_access_key: String, + pub session_token: Option, + pub access_point: Option, +} - /// Name of parent schema relative to its parent catalog. - pub schema_name: String, +#[cfg(feature = "aws")] +impl From for HashMap { + fn from(value: AwsTempCredentials) -> Self { + let mut result = HashMap::from_iter([ + ( + deltalake_aws::constants::AWS_ACCESS_KEY_ID.to_string(), + value.access_key_id, + ), + ( + deltalake_aws::constants::AWS_SECRET_ACCESS_KEY.to_string(), + value.secret_access_key, + ), + ]); + if let Some(st) = value.session_token { + result.insert(deltalake_aws::constants::AWS_SESSION_TOKEN.to_string(), st); + } + if let Some(ap) = value.access_point { + result.insert(deltalake_aws::constants::AWS_ENDPOINT_URL.to_string(), ap); + } + result + } +} - /// Storage root URL for table (for MANAGED, EXTERNAL tables) - pub storage_location: String, +#[cfg(feature = "azure")] +impl From for HashMap { + fn from(value: AzureUserDelegationSas) -> Self { + HashMap::from_iter([("azure_storage_sas_key".to_string(), value.sas_token)]) + } +} - /// Unique identifier of parent metastore. - pub metastore_id: String, +#[derive(Deserialize, Debug, Clone)] +pub struct AzureUserDelegationSas { + pub sas_token: String, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct GcpOauthToken { + pub oauth_token: String, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct R2TempCredentials { + pub access_key_id: String, + pub secret_access_key: String, + pub session_token: String, +} + +#[derive(Serialize, Debug, Clone)] +pub struct TemporaryTableCredentialsRequest { + pub table_id: String, + pub operation: String, +} + +impl TemporaryTableCredentialsRequest { + pub fn new(table_id: &str, operation: &str) -> Self { + Self { + table_id: table_id.to_string(), + operation: operation.to_string(), + } + } } #[cfg(test)] @@ -302,7 +479,8 @@ pub(crate) mod tests { pub(crate) const ERROR_RESPONSE: &str = r#" { "error_code": "404", - "message": "error message" + "message": "error message", + "details": [] } "#; @@ -316,6 +494,7 @@ pub(crate) mod tests { "full_name": "string", "catalog_type": "string", "catalog_name": "string", + "schema_name": "string", "storage_root": "string", "storage_location": "string", "properties": { @@ -326,7 +505,8 @@ pub(crate) mod tests { "created_at": 0, "owner": "string", "updated_at": 0, - "metastore_id": "string" + "metastore_id": "string", + "table_id": "string" } ] }"#; @@ -353,21 +533,55 @@ pub(crate) mod tests { }"#; pub(crate) const GET_TABLE_RESPONSE: &str = r#" - { - "created_by": "string", - "name": "table_name", - "updated_by": "string", - "sql_path": "string", - "data_source_format": "DELTA", - "full_name": "string", - "delta_runtime_properties_kvpairs": { - "delta_runtime_properties": { + { + "name": "string", + "catalog_name": "string", + "schema_name": "string", + "table_type": "MANAGED", + "data_source_format": "DELTA", + "columns": [ + { + "name": "string", + "type_text": "string", + "type_name": "BOOLEAN", + "position": 0, + "type_precision": 0, + "type_scale": 0, + "type_interval_type": "string", + "type_json": "string", + "comment": "string", + "nullable": true, + "partition_index": 0, + "mask": { + "function_name": "string", + "using_column_names": [ + "string" + ] + } + } + ], + "storage_location": "string", + "view_definition": "string", + "view_dependencies": { + "dependencies": [ + { + "table": { + "table_full_name": "string" + }, + "function": { + "function_full_name": "string" + } + } + ] + }, + "sql_path": "string", + "owner": "string", + "comment": "string", + "properties": { "property1": "string", "property2": "string" - } - }, - "catalog_name": "string", - "table_constraints": { + }, + "storage_credential_name": "string", "table_constraints": [ { "primary_key_constraint": { @@ -390,63 +604,38 @@ pub(crate) mod tests { "name": "string" } } - ] - }, - "schema_name": "string", - "storage_location": "string", - "properties": { - "property1": "string", - "property2": "string" - }, - "columns": [ - { - "nullable": "true", - "name": "string", - "type_interval_type": "string", - "mask": { - "function_name": "string", - "using_column_names": [ - "string" - ] - }, - "type_scale": 0, - "type_text": "string", - "comment": "string", - "partition_index": 0, - "type_json": "string", - "position": 0, - "type_name": "BOOLEAN", - "type_precision": 0 - } - ], - "comment": "string", - "table_id": "string", - "table_type": "MANAGED", - "created_at": 0, - "row_filter": { - "name": "string", - "input_column_names": [ - "string" - ] - }, - "owner": "string", - "storage_credential_name": "string", - "updated_at": 0, - "view_definition": "string", - "view_dependencies": [ - { - "table": { - "table_full_name": "string" - }, - "function": { - "function_full_name": "string" + ], + "row_filter": { + "function_name": "string", + "input_column_names": [ + "string" + ] + }, + "enable_predictive_optimization": "DISABLE", + "metastore_id": "string", + "full_name": "string", + "data_access_configuration_id": "string", + "created_at": 0, + "created_by": "string", + "updated_at": 0, + "updated_by": "string", + "deleted_at": 0, + "table_id": "string", + "delta_runtime_properties_kvpairs": { + "delta_runtime_properties": { + "property1": "string", + "property2": "string" } - } - ], - "data_access_configuration_id": "string", - "deleted_at": 0, - "metastore_id": "string" - } + }, + "effective_predictive_optimization_flag": { + "value": "DISABLE", + "inherited_from_type": "CATALOG", + "inherited_from_name": "string" + }, + "access_point": "string", + "pipeline_id": "string", + "browse_only": true + } "#; pub(crate) const LIST_TABLES: &str = r#" @@ -507,6 +696,7 @@ pub(crate) mod tests { let get_table: Result = serde_json::from_str(ERROR_RESPONSE); assert!(get_table.is_ok()); + dbg!(&get_table); assert!(matches!(get_table.unwrap(), GetTableResponse::Error(_))) } } diff --git a/crates/catalog-unity/src/prelude.rs b/crates/catalog-unity/src/prelude.rs new file mode 100644 index 0000000000..48767b2331 --- /dev/null +++ b/crates/catalog-unity/src/prelude.rs @@ -0,0 +1,4 @@ +pub use crate::{UnityCatalog, UnityCatalogBuilder, UnityCatalogConfigKey, UnityCatalogError}; + +#[cfg(feature = "datafusion")] +pub use crate::datafusion::{UnityCatalogList, UnityCatalogProvider, UnitySchemaProvider}; diff --git a/crates/core/src/operations/add_column.rs b/crates/core/src/operations/add_column.rs index 9da5b86111..e6646edb9c 100644 --- a/crates/core/src/operations/add_column.rs +++ b/crates/core/src/operations/add_column.rs @@ -6,7 +6,7 @@ use delta_kernel::schema::StructType; use futures::future::BoxFuture; use itertools::Itertools; -use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; +use super::transaction::{CommitBuilder, CommitProperties}; use super::{CustomExecuteHandler, Operation}; use crate::kernel::{StructField, StructTypeExt}; use crate::logstore::LogStoreRef; @@ -28,7 +28,7 @@ pub struct AddColumnBuilder { custom_execute_handler: Option>, } -impl super::Operation<()> for AddColumnBuilder { +impl Operation<()> for AddColumnBuilder { fn log_store(&self) -> &LogStoreRef { &self.log_store } diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 5631079269..77a9fae20b 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -404,7 +404,7 @@ pub fn ensure_table_uri(table_uri: impl AsRef) -> DeltaResult { })?; Url::from_directory_path(path).map_err(|_| { let msg = format!( - "Could not construct a URL from canonicalized path: {}.\n\ + "Could not construct a URL from the canonical path: {}.\n\ Something must be very wrong with the table path.", table_uri );