Skip to content

Commit

Permalink
PEERDB_CLICKHOUSE_BINARY_FORMAT (#2407)
Browse files Browse the repository at this point in the history
In #2181 we consolidated on transmitting bytea columns as base64 strings

ClickHouse supports binary data in strings. They need not be valid utf8

This adds PEERDB_CLICKHOUSE_BINARY_FORMAT with 3 formats: raw, hex, & base64
Default to base64 to avoid breaking existing setups

Also add caching logic for non-immediate dynamic settings.
Helps here where we'd hit catalog for every bytea field processed

Included `hex` to demonstrate how other formats would fit in,
but expect this feature to be removed once we move everyone to `raw`
  • Loading branch information
serprex authored Jan 4, 2025
1 parent d365a8b commit a4d6525
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 35 deletions.
56 changes: 40 additions & 16 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,38 +366,62 @@ func (c *ClickHouseConnector) NormalizeRecords(
case "Date32", "Nullable(Date32)":
projection.WriteString(fmt.Sprintf(
"toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'),6)) AS `%s`,",
colName,
dstColName,
colName, dstColName,
))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s'),6)) AS `%s`,",
colName,
dstColName,
colName, dstColName,
))
}
case "DateTime64(6)", "Nullable(DateTime64(6))":
projection.WriteString(fmt.Sprintf(
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'),6) AS `%s`,",
colName,
dstColName,
colName, dstColName,
))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s'),6) AS `%s`,",
colName,
dstColName,
colName, dstColName,
))
}
default:
projection.WriteString(fmt.Sprintf("JSONExtract(_peerdb_data, '%s', '%s') AS `%s`,", colName, clickHouseType, dstColName))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"JSONExtract(_peerdb_match_data, '%s', '%s') AS `%s`,",
colName,
clickHouseType,
dstColName,
))
projLen := projection.Len()
if colType == qvalue.QValueKindBytes {
format, err := peerdbenv.PeerDBBinaryFormat(ctx, req.Env)
if err != nil {
return model.NormalizeResponse{}, err
}
switch format {
case peerdbenv.BinaryFormatRaw:
projection.WriteString(fmt.Sprintf("base64Decode(JSONExtractString(_peerdb_data, '%s')) AS `%s`,", colName, dstColName))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"base64Decode(JSONExtractString(_peerdb_match_data, '%s')) AS `%s`,",
colName, dstColName,
))
}
case peerdbenv.BinaryFormatHex:
projection.WriteString(fmt.Sprintf("hex(base64Decode(JSONExtractString(_peerdb_data, '%s'))) AS `%s`,",
colName, dstColName))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"hex(base64Decode(JSONExtractString(_peerdb_match_data, '%s'))) AS `%s`,",
colName, dstColName,
))
}
}
}

// proceed with default logic if logic above didn't add any sql
if projection.Len() == projLen {
projection.WriteString(fmt.Sprintf("JSONExtract(_peerdb_data, '%s', '%s') AS `%s`,", colName, clickHouseType, dstColName))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"JSONExtract(_peerdb_match_data, '%s', '%s') AS `%s`,",
colName, clickHouseType, dstColName,
))
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, env map[s
if err := ctx.Err(); err != nil {
return numRows.Load(), err
} else {
avroMap, err := avroConverter.Convert(qrecord)
avroMap, err := avroConverter.Convert(ctx, env, qrecord)
if err != nil {
logger.Error("Failed to convert QRecord to Avro compatible map", slog.Any("error", err))
return numRows.Load(), fmt.Errorf("failed to convert QRecord to Avro compatible map: %w", err)
Expand Down
61 changes: 60 additions & 1 deletion flow/e2e/clickhouse/peer_flow_ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,6 @@ func (s ClickHouseSuite) testNumericFF(ffValue bool) {

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(c) VALUES($1)", srcFullName), nines)
require.NoError(s.t, err)

e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,c", 2)

rows, err := s.GetRows(dstTableName, "c")
Expand Down Expand Up @@ -666,6 +665,66 @@ func (s ClickHouseSuite) Test_Unbounded_Numeric_Without_FF() {
s.testNumericFF(false)
}

const binaryFormatTestcase = "\x00\x010123\x7f\xff"

// PEERDB_CLICKHOUSE_BINARY_FORMAT
func (s ClickHouseSuite) testBinaryFormat(format string, expected string) {
dstTableName := "binary_format_" + format
srcFullName := s.attachSchemaSuffix(dstTableName)

_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s(
id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
val bytea
);
`, srcFullName))
require.NoError(s.t, err)

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(val) VALUES($1)", srcFullName), []byte(binaryFormatTestcase))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("ch_binary_format_" + format),
TableNameMapping: map[string]string{srcFullName: dstTableName},
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig.DoInitialSnapshot = true
flowConnConfig.Env = map[string]string{"PEERDB_CLICKHOUSE_BINARY_FORMAT": format}
tc := e2e.NewTemporalClient(s.t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)

e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,val", 1)

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(val) VALUES($1)", srcFullName), []byte(binaryFormatTestcase))
require.NoError(s.t, err)
e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,val", 2)

rows, err := s.GetRows(dstTableName, "val")
require.NoError(s.t, err)
require.Len(s.t, rows.Records, 2, "expected 2 rows")
for _, row := range rows.Records {
require.Len(s.t, row, 1, "expected 1 column")
require.Equal(s.t, expected, row[0].Value())
}

env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}

func (s ClickHouseSuite) Test_Binary_Format_Raw() {
s.testBinaryFormat("raw", binaryFormatTestcase)
}

func (s ClickHouseSuite) Test_Binary_Format_Hex() {
s.testBinaryFormat("hex", "0001303132337FFF")
}

func (s ClickHouseSuite) Test_Binary_Format_Base64() {
s.testBinaryFormat("base64", "AAEwMTIzf/8=")
}

func (s ClickHouseSuite) Test_Types_CH() {
srcTableName := "test_types"
srcFullName := s.attachSchemaSuffix("test_types")
Expand Down
9 changes: 3 additions & 6 deletions flow/model/conversion_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,12 @@ func NewQRecordAvroConverter(
}, nil
}

func (qac *QRecordAvroConverter) Convert(qrecord []qvalue.QValue) (map[string]any, error) {
func (qac *QRecordAvroConverter) Convert(ctx context.Context, env map[string]string, qrecord []qvalue.QValue) (map[string]any, error) {
m := make(map[string]any, len(qrecord))
for idx, val := range qrecord {
avroVal, err := qvalue.QValueToAvro(
val,
&qac.Schema.Fields[idx],
qac.TargetDWH,
qac.logger,
qac.UnboundedNumericAsString,
ctx, env, val,
&qac.Schema.Fields[idx], qac.TargetDWH, qac.logger, qac.UnboundedNumericAsString,
)
if err != nil {
return nil, fmt.Errorf("failed to convert QValue to Avro-compatible value: %w", err)
Expand Down
29 changes: 24 additions & 5 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package qvalue
import (
"context"
"encoding/base64"
"encoding/hex"
"errors"
"fmt"
"log/slog"
"strings"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -115,7 +117,11 @@ func GetAvroSchemaFromQValueKind(
case QValueKindBoolean:
return "boolean", nil
case QValueKindBytes:
if targetDWH == protos.DBType_CLICKHOUSE {
format, err := peerdbenv.PeerDBBinaryFormat(ctx, env)
if err != nil {
return nil, err
}
if targetDWH == protos.DBType_CLICKHOUSE && format != peerdbenv.BinaryFormatRaw {
return "string", nil
}
return "bytes", nil
Expand Down Expand Up @@ -245,6 +251,7 @@ type QValueAvroConverter struct {
}

func QValueToAvro(
ctx context.Context, env map[string]string,
value QValue, field *QField, targetDWH protos.DBType, logger log.Logger,
unboundedNumericAsString bool,
) (any, error) {
Expand Down Expand Up @@ -377,7 +384,11 @@ func QValueToAvro(
case QValueNumeric:
return c.processNumeric(v.Val), nil
case QValueBytes:
return c.processBytes(v.Val), nil
format, err := peerdbenv.PeerDBBinaryFormat(ctx, env)
if err != nil {
return nil, err
}
return c.processBytes(v.Val, format), nil
case QValueJSON:
return c.processJSON(v.Val), nil
case QValueHStore:
Expand Down Expand Up @@ -509,9 +520,17 @@ func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) any {
return rat
}

func (c *QValueAvroConverter) processBytes(byteData []byte) interface{} {
if c.TargetDWH == protos.DBType_CLICKHOUSE {
encoded := base64.StdEncoding.EncodeToString(byteData)
func (c *QValueAvroConverter) processBytes(byteData []byte, format peerdbenv.BinaryFormat) interface{} {
if c.TargetDWH == protos.DBType_CLICKHOUSE && format != peerdbenv.BinaryFormatRaw {
var encoded string
switch format {
case peerdbenv.BinaryFormatBase64:
encoded = base64.StdEncoding.EncodeToString(byteData)
case peerdbenv.BinaryFormatHex:
encoded = strings.ToUpper(hex.EncodeToString(byteData))
default:
panic(fmt.Sprintf("unhandled binary format: %d", format))
}
if c.Nullable {
return goavro.Union("string", encoded)
}
Expand Down
6 changes: 2 additions & 4 deletions flow/model/record_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,8 @@ func (r RecordItems) toMap(opts ToJSONOptions) (map[string]interface{}, error) {
if len(v.Val) > 15*1024*1024 {
jsonStruct[col] = "{}"
} else if _, ok := opts.UnnestColumns[col]; ok {
var unnestStruct map[string]interface{}
err := json.Unmarshal([]byte(v.Val), &unnestStruct)
if err != nil {
var unnestStruct map[string]any
if err := json.Unmarshal([]byte(v.Val), &unnestStruct); err != nil {
return nil, err
}

Expand Down Expand Up @@ -185,7 +184,6 @@ func (r RecordItems) toMap(opts ToJSONOptions) (map[string]interface{}, error) {
}
}
jsonStruct[col] = nullableFloatArr

default:
jsonStruct[col] = v.Value()
}
Expand Down
53 changes: 51 additions & 2 deletions flow/peerdbenv/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log/slog"
"os"
"strconv"
"strings"
"time"

"github.com/aws/smithy-go/ptr"
Expand Down Expand Up @@ -107,6 +108,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_CLICKHOUSE_BINARY_FORMAT",
Description: "Binary field encoding on clickhouse destination; either raw, hex, or base64",
DefaultValue: "base64",
ValueType: protos.DynconfValueType_STRING,
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
},
{
Name: "PEERDB_SNOWFLAKE_MERGE_PARALLELISM",
Description: "Parallel MERGE statements to run for CDC mirrors with Snowflake targets. -1 for no limit",
Expand Down Expand Up @@ -238,6 +247,15 @@ var DynamicIndex = func() map[string]int {
return defaults
}()

type BinaryFormat int

const (
BinaryFormatInvalid = iota
BinaryFormatRaw
BinaryFormatBase64
BinaryFormatHex
)

func dynLookup(ctx context.Context, env map[string]string, key string) (string, error) {
if val, ok := env[key]; ok {
return val, nil
Expand All @@ -249,6 +267,11 @@ func dynLookup(ctx context.Context, env map[string]string, key string) (string,
return "", fmt.Errorf("failed to get catalog connection pool: %w", err)
}

var setting *protos.DynamicSetting
if idx, ok := DynamicIndex[key]; ok {
setting = DynamicSettings[idx]
}

var value pgtype.Text
query := "SELECT config_value FROM dynamic_settings WHERE config_name=$1"
if err := conn.QueryRow(ctx, query, key).Scan(&value); err != nil && err != pgx.ErrNoRows {
Expand All @@ -257,12 +280,21 @@ func dynLookup(ctx context.Context, env map[string]string, key string) (string,
}
if !value.Valid {
if val, ok := os.LookupEnv(key); ok {
if env != nil && setting != nil && setting.ApplyMode != protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE {
env[key] = val
}
return val, nil
}
if idx, ok := DynamicIndex[key]; ok {
return DynamicSettings[idx].DefaultValue, nil
if setting != nil {
if env != nil && setting.ApplyMode != protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE {
env[key] = setting.DefaultValue
}
return setting.DefaultValue, nil
}
}
if env != nil && setting != nil && setting.ApplyMode != protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE {
env[key] = value.String
}
return value.String, nil
}

Expand Down Expand Up @@ -398,6 +430,23 @@ func PeerDBNullable(ctx context.Context, env map[string]string) (bool, error) {
return dynamicConfBool(ctx, env, "PEERDB_NULLABLE")
}

func PeerDBBinaryFormat(ctx context.Context, env map[string]string) (BinaryFormat, error) {
format, err := dynLookup(ctx, env, "PEERDB_CLICKHOUSE_BINARY_FORMAT")
if err != nil {
return 0, err
}
switch strings.ToLower(strings.TrimSpace(format)) {
case "raw":
return BinaryFormatRaw, nil
case "hex":
return BinaryFormatHex, nil
case "base64":
return BinaryFormatBase64, nil
default:
return 0, fmt.Errorf("unknown binary format %s", format)
}
}

func PeerDBEnableClickHousePrimaryUpdate(ctx context.Context, env map[string]string) (bool, error) {
return dynamicConfBool(ctx, env, "PEERDB_CLICKHOUSE_ENABLE_PRIMARY_UPDATE")
}
Expand Down

0 comments on commit a4d6525

Please sign in to comment.