forked from duckdb/dbt-duckdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
__init__.py
152 lines (124 loc) · 5.37 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import importlib
import os
from typing import Any
from typing import Dict
from typing import Optional
from dbt_common.dataclass_schema import dbtClassMixin
from duckdb import DuckDBPyConnection
from ..credentials import DuckDBCredentials
from ..utils import SourceConfig
from ..utils import TargetConfig
class PluginConfig(dbtClassMixin):
"""A helper class for defining the configuration settings a particular plugin uses."""
pass
class BasePlugin:
"""
BasePlugin is the base class for creating plugins. A plugin can be created
from a module name, an optional configuration, and an alias. Each plugin
contains a name and its configuration.
"""
# A set of built-in plugins that are included with dbt-duckdb.
_BUILTIN = set(
[x.split(".")[0] for x in os.listdir(os.path.dirname(__file__)) if "_" not in x]
)
@classmethod
def create(
cls,
module: str,
*,
config: Optional[Dict[str, Any]] = None,
alias: Optional[str] = None,
credentials: Optional[DuckDBCredentials] = None,
) -> "BasePlugin":
"""
Create a plugin from a module name and optional configuration.
:param module: A string representing the module name.
:param config: An optional dictionary with configuration parameters.
:param alias: An optional string representing the alias name of the module.
:return: A Plugin instance.
:raises ImportError: If the module cannot be imported.
"""
if not isinstance(module, str):
raise TypeError("Module name must be a string.")
if module in cls._BUILTIN:
name = module
module = f"dbt.adapters.duckdb.plugins.{module}"
else:
name = module.split(".")[-1]
try:
mod = importlib.import_module(module)
except ImportError as e:
raise ImportError(f"Unable to import module '{module}': {e}")
if config is None and credentials is not None:
config = credentials.settings
if not hasattr(mod, "Plugin"):
raise ImportError(f"Module '{module}' does not have a Plugin class.")
else:
return mod.Plugin(
name=alias or name, plugin_config=config or {}, credentials=credentials
)
def __init__(
self,
name: str,
plugin_config: Dict[str, Any],
credentials: Optional[DuckDBCredentials] = None,
):
"""
Initialize the BasePlugin instance with a name and its configuration.
This method should *not* be overriden by subclasses in general; any
initialization required from the configuration dictionary should be
defined in the `initialize` method.
:param name: A string representing the plugin name.
:param credentials: The DuckDB credentials
:param plugin_config: A dictionary representing the plugin configuration.
"""
self.name = name
self.creds = credentials
self.plugin_config = plugin_config
self.initialize(plugin_config)
def initialize(self, plugin_config: Dict[str, Any]):
"""
Initialize the plugin with its configuration dictionary specified in the
profile. This function may be overridden by subclasses that have
additional initialization steps.
:param plugin_config: A dictionary representing the plugin configuration.
"""
pass
def update_connection_config(self, creds: DuckDBCredentials, config: Dict[str, Any]):
"""
This updates the DuckDB connection config if needed.
This method should be overridden by subclasses to add any additional
config options needed on the connection, such as a connection token or user agent
:param creds: DuckDB credentials
:param config: Config dictionary to be passed to duckdb.connect
"""
pass
def configure_connection(self, conn: DuckDBPyConnection):
"""
Configure the DuckDB connection with any necessary extensions and/or settings.
This method should be overridden by subclasses to provide additional
configuration needed on the connection, such as user-defined functions.
:param conn: A DuckDBPyConnection instance to be configured.
"""
pass
def load(self, source_config: SourceConfig):
"""
Load data from a source config and return it as a DataFrame-like object
that DuckDB can read. This method should be overridden by subclasses that
support loading data from a source config.
:param source_config: A SourceConfig instance representing the source data.
:raises NotImplementedError: If this method is not implemented by a subclass.
"""
raise NotImplementedError(f"load method not implemented for {self.name}")
def store(self, target_config: TargetConfig, df=None):
raise NotImplementedError(f"store method not implemented for {self.name}")
def configure_cursor(self, cursor):
"""
Configure each copy of the DuckDB cursor.
This method should be overridden by subclasses to provide additional
attributes to the connection which are lost in the copy of the parent connection.
:param cursor: A DuckDBPyConnection instance to be configured.
"""
pass
def default_materialization(self):
return "table"