Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally read arrays as strings #30

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
)

type conn struct {
athena athenaiface.AthenaAPI
db string
OutputLocation string
athena athenaiface.AthenaAPI
db string
OutputLocation string
arraysAsStrings bool

pollFrequency time.Duration
}
Expand Down Expand Up @@ -48,8 +49,9 @@ func (c *conn) runQuery(ctx context.Context, query string) (driver.Rows, error)
}

return newRows(rowsConfig{
Athena: c.athena,
QueryID: queryID,
Athena: c.athena,
QueryID: queryID,
arraysAsStrings: c.arraysAsStrings,
// todo add check for ddl queries to not skip header(#10)
SkipHeader: true,
})
Expand Down
17 changes: 10 additions & 7 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ func (d *Driver) Open(connStr string) (driver.Conn, error) {
}

return &conn{
athena: athena.New(cfg.Session),
db: cfg.Database,
OutputLocation: cfg.OutputLocation,
pollFrequency: cfg.PollFrequency,
athena: athena.New(cfg.Session),
db: cfg.Database,
OutputLocation: cfg.OutputLocation,
arraysAsStrings: cfg.arraysAsStrings,
pollFrequency: cfg.PollFrequency,
}, nil
}

Expand Down Expand Up @@ -112,9 +113,10 @@ func Open(cfg Config) (*sql.DB, error) {

// Config is the input to Open().
type Config struct {
Session *session.Session
Database string
OutputLocation string
Session *session.Session
Database string
OutputLocation string
arraysAsStrings bool

PollFrequency time.Duration
}
Expand All @@ -138,6 +140,7 @@ func configFromConnectionString(connStr string) (*Config, error) {

cfg.Database = args.Get("db")
cfg.OutputLocation = args.Get("output_location")
cfg.arraysAsStrings = args.Get("arrays_as_strings") == "true"

frequencyStr := args.Get("poll_frequency")
if frequencyStr != "" {
Expand Down
21 changes: 12 additions & 9 deletions rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,28 @@ import (
)

type rows struct {
athena athenaiface.AthenaAPI
queryID string
athena athenaiface.AthenaAPI
queryID string
arraysAsStrings bool

done bool
skipHeaderRow bool
out *athena.GetQueryResultsOutput
}

type rowsConfig struct {
Athena athenaiface.AthenaAPI
QueryID string
SkipHeader bool
Athena athenaiface.AthenaAPI
QueryID string
arraysAsStrings bool
SkipHeader bool
}

func newRows(cfg rowsConfig) (*rows, error) {
r := rows{
athena: cfg.Athena,
queryID: cfg.QueryID,
skipHeaderRow: cfg.SkipHeader,
athena: cfg.Athena,
queryID: cfg.QueryID,
skipHeaderRow: cfg.SkipHeader,
arraysAsStrings: cfg.arraysAsStrings,
}

shouldContinue, err := r.fetchNextPage(nil)
Expand Down Expand Up @@ -82,7 +85,7 @@ func (r *rows) Next(dest []driver.Value) error {
// Shift to next row
cur := r.out.ResultSet.Rows[0]
columns := r.out.ResultSet.ResultSetMetadata.ColumnInfo
if err := convertRow(columns, cur.Data, dest); err != nil {
if err := convertRow(columns, cur.Data, dest, r.arraysAsStrings); err != nil {
return err
}

Expand Down
12 changes: 9 additions & 3 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ const (
DateLayout = "2006-01-02"
)

func convertRow(columns []*athena.ColumnInfo, in []*athena.Datum, ret []driver.Value) error {
func convertRow(columns []*athena.ColumnInfo, in []*athena.Datum, ret []driver.Value, arraysAsStrings bool) error {
for i, val := range in {
coerced, err := convertValue(*columns[i].Type, val.VarCharValue)
coerced, err := convertValue(*columns[i].Type, val.VarCharValue, arraysAsStrings)
if err != nil {
return err
}
Expand All @@ -29,7 +29,7 @@ func convertRow(columns []*athena.ColumnInfo, in []*athena.Datum, ret []driver.V
return nil
}

func convertValue(athenaType string, rawValue *string) (interface{}, error) {
func convertValue(athenaType string, rawValue *string, arraysAsStrings bool) (interface{}, error) {
if rawValue == nil {
return nil, nil
}
Expand Down Expand Up @@ -62,6 +62,12 @@ func convertValue(athenaType string, rawValue *string) (interface{}, error) {
return time.Parse(TimestampWithTimeZoneLayout, val)
case "date":
return time.Parse(DateLayout, val)
case "array":
if arraysAsStrings {
return val, nil
}

fallthrough
default:
panic(fmt.Errorf("unknown type `%s` with value %s", athenaType, val))
}
Expand Down