-
Notifications
You must be signed in to change notification settings - Fork 4
/
driver.go
151 lines (137 loc) · 4.59 KB
/
driver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package godynamo
import (
"database/sql"
"database/sql/driver"
"github.com/aws/aws-sdk-go-v2/aws"
"os"
"reflect"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws/transport/http"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/btnguyen2k/consu/reddo"
)
// init is automatically invoked when the driver is imported
func init() {
sql.Register("godynamo", &Driver{})
}
// Driver is AWS DynamoDB implementation of driver.Driver.
type Driver struct {
}
func parseParamValue(params map[string]string, typ reflect.Type, validator func(val interface{}) bool,
defaultVal interface{}, pkeys []string, ekeys []string) interface{} {
for _, key := range pkeys {
val, ok := params[key]
if ok {
pval, err := reddo.Convert(val, typ)
if pval == nil || err != nil || (validator != nil && !validator(pval)) {
return defaultVal
}
return pval
}
}
for _, key := range ekeys {
val := os.Getenv(key)
if val != "" {
pval, err := reddo.Convert(val, typ)
if pval == nil || err != nil || (validator != nil && !validator(pval)) {
return defaultVal
}
return pval
}
}
return defaultVal
}
func parseConnString(connStr string) map[string]string {
params := make(map[string]string)
parts := strings.Split(connStr, ";")
for _, part := range parts {
tokens := strings.SplitN(strings.TrimSpace(part), "=", 2)
key := strings.ToUpper(strings.TrimSpace(tokens[0]))
if len(tokens) >= 2 {
params[key] = strings.TrimSpace(tokens[1])
} else {
params[key] = ""
}
}
return params
}
// Open implements driver.Driver/Open.
//
// connStr is expected in the following format:
//
// Region=<region>;AkId=<aws-key-id>;Secret_Key=<aws-secret-key>[;Endpoint=<dynamodb-endpoint>][;TimeoutMs=<timeout-in-milliseconds>]
//
// If not supplied, default value for TimeoutMs is 10 seconds.
func (d *Driver) Open(connStr string) (driver.Conn, error) {
params := parseConnString(connStr)
timeoutMs := parseParamValue(params, reddo.TypeInt, func(val interface{}) bool {
return val.(int64) >= 0
}, int64(10000), []string{"TIMEOUTMS"}, nil).(int64)
region := parseParamValue(params, reddo.TypeString, nil, "", []string{"REGION"}, []string{"AWS_REGION"}).(string)
akid := parseParamValue(params, reddo.TypeString, nil, "", []string{"AKID"}, []string{"AWS_ACCESS_KEY_ID", "AWS_AKID"}).(string)
secretKey := parseParamValue(params, reddo.TypeString, nil, "", []string{"SECRET_KEY", "SECRETKEY"}, []string{"AWS_SECRET_KEY", "AWS_SECRET_ACCESS_KEY"}).(string)
opts := dynamodb.Options{
Credentials: credentials.NewStaticCredentialsProvider(akid, secretKey, ""),
HTTPClient: http.NewBuildableClient().WithTimeout(time.Millisecond * time.Duration(timeoutMs)),
Region: region,
}
endpoint := parseParamValue(params, reddo.TypeString, nil, "", []string{"ENDPOINT"}, []string{"AWS_DYNAMODB_ENDPOINT"}).(string)
if endpoint != "" {
//opts.EndpointResolver = dynamodb.EndpointResolverFromURL(endpoint)
opts.BaseEndpoint = aws.String(endpoint)
if strings.HasPrefix(endpoint, "http://") {
opts.EndpointOptions.DisableHTTPS = true
}
}
client := dynamodb.New(opts)
awsConfigLock.RLock()
defer awsConfigLock.RUnlock()
conf := awsConfig
if conf != nil {
client = dynamodb.NewFromConfig(*conf, mergeDynamoDBOptions(opts))
}
return &Conn{client: client, timeout: time.Duration(timeoutMs) * time.Millisecond}, nil
}
// awsConfig is the AWS configuration to be used by the dynamodb client.
var (
awsConfigLock = &sync.RWMutex{}
awsConfig *aws.Config
)
// RegisterAWSConfig registers aws.Config to be used by the dynamodb client.
//
// The following configurations do not apply even if they are set in aws.Config.
// - HTTPClient
//
// @Available since v1.3.0
func RegisterAWSConfig(conf aws.Config) {
awsConfigLock.Lock()
defer awsConfigLock.Unlock()
awsConfig = &conf
}
// DeregisterAWSConfig removes the registered aws.Config.
//
// @Available since v1.3.0
func DeregisterAWSConfig() {
awsConfigLock.Lock()
defer awsConfigLock.Unlock()
awsConfig = nil
}
// mergeDynamoDBOptions merges the provided dynamodb.Options into the default dynamodb.Options.
func mergeDynamoDBOptions(providedOpts dynamodb.Options) func(*dynamodb.Options) {
return func(defaultOpts *dynamodb.Options) {
if defaultOpts.Region == "" {
defaultOpts.Region = providedOpts.Region
}
if defaultOpts.Credentials == nil {
defaultOpts.Credentials = providedOpts.Credentials
}
defaultOpts.HTTPClient = providedOpts.HTTPClient
if defaultOpts.BaseEndpoint == nil {
defaultOpts.BaseEndpoint = providedOpts.BaseEndpoint
defaultOpts.EndpointOptions = providedOpts.EndpointOptions
}
}
}