flink connector redis for flink sql
maven dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis</artifactId>
<version>1.14.0-SNAPSHOT</version>
</dependency>
CREATE TABLE redis_table (
`key` STRING,
`map_key_sid` INT,
`map_key_install_date` STRING,
PRIMARY KEY (`key`) NOT ENFORCED
) WITH (
'connector' = 'redis',
'host' = '127.0.0.1'
);
-- insert
INSERT INTO redis_table
SELECT `key`, `map_key_sid`, `map_key_install_date` FROM T;
-- query lookup join
INSERT INTO console_output
SELECT
s.ouid,
dim.map_key_sid,
dim.map_key_install_date
FROM kafka_source as s
left join redis_table FOR SYSTEM_TIME AS OF s.proctime AS dim
on s.ouid = dim.key
where s.ouid is not null
字段 | Required | 默认值 | 类型 | 说明 |
---|---|---|---|---|
connector | required | (none) | String | redis |
host | required | (none) | String | Redis IP |
port | optional | 6379 | Integer | Redis 端口 |
password | optional | null | String | 如果没有设置,则为 null |
db | optional | 0 | Integer | 默认使用 db0 |
pool-size | optional | 10 | Integer | 连接池大小 |
timeout | optional | 3000 | Integer | 连接超时时间,单位 ms,默认 1s |
test-on-borrow | optional | false | Boolean | 测试连接是否有效 |
mode | optional | hash | String | redis 操作模式,使用 hash 或 string |
sink.parallelism | optional | (none) | Integer | 默认并行度 |
sink.key-ttl | optional | 0s | Integer | 设置的 key 的超时时间,单位秒 |
sink.max-retry | optional | 3 | Integer | 设置写入的重试次数 |
lookup.cache.max-rows | optional | 10000 | Integer | lookup 缓存大小 |
lookup.cache.ttl | optional | 0 | Integer | 缓存过期时间 |
lookup.max-retries | optional | 3 | Integer | lookup 失败重试次数 |
mode 对应的存储模式,如果是
hash
则按照字段存储,如果是string
则使用 k1=v1,k2=v3,... 存储
Flink Sql Type | Redis conversion |
---|---|
CHAR | Store as string |
VARCHAR | Store as string |
STRING | Store as string |
BOOLEAN | String toString(boolean val) boolean toBoolean(String str) |
BINARY | String toString(byte[] val) byte[] toBytes(String str), Store as Base64 encoder string |
VARBINARY | String toString(byte[] val) byte[] toBytes(String str), Store as Base64 encoder string |
DECIMAL | String toString(BigDecimal val) BigDecimal toDecimal(String str) |
TINYINT | String toString(byte val) byte toByte(String str), Store as Base64 encoder string |
SMALLINT | String toString(short val) short toShort(String str) |
INT | String toString(int val) int toInt(String str) |
BIGINT | String toString(long val) long toLong(String str) |
FLOAT | String toString(float val) float toFloat(String str) |
DOUBLE | String toString(double val) double toDouble(String str) |
DATE | Store the string type: '2021-10-10' |
TIME | Store the string type: '10:10' |
TIMESTAMP | Store the milliseconds since epoch as long value |
ARRAY | Not supported |
MAP | Not supported |
MULTISET | Not supported |
ROW | Not supported |
issue | [email protected]