Replies: 1 comment
-
Please refer to this doc https://docs.risingwave.com/docs/next/create-sink-kafka/#jsonbhandlingmode for encoding a jsonb value to a JSON type |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
CREATE SINK SINK_PROJECT_MANAGER_PERSON AS
select
"CERTNO" as _id,
"CERTNO" as certno,
jsonb_build_object(
'name', jsonb_build_object( 'text', jsonb_build_array( basic."NAME" ), 'value', jsonb_build_array( basic."NAME" ) ),
'working_coid_name', jsonb_build_object( 'text', jsonb_build_array( variadic t_work.WORKING_COID_NAMES ), 'value', jsonb_build_array( variadic t_work.WORKING_COID_ORULES ) ),
'working_dept_name', jsonb_build_object( 'text', jsonb_build_array( variadic t_work.WORKING_DEPT_NAMES ), 'value', jsonb_build_array( variadic t_work.WORKING_DEPT_ORULES ) ),
'birthday', jsonb_build_object( 'text', jsonb_build_array( basic."BIRTHDAY" ), 'value', jsonb_build_array( basic."BIRTHDAY" ) ),
'nativeplace', jsonb_build_object( 'text', jsonb_build_array( basic_detail.nativeplace_name ), 'value', jsonb_build_array( basic."NATIVEPLACE" ) ),
'post', jsonb_build_object( 'text', jsonb_build_array( variadic t_work.posts ), 'value', jsonb_build_array( variadic t_work.posts ) ),
'positiongrade', jsonb_build_object( 'text', jsonb_build_array( variadic t_work.POSITIONGRADENORM_NAME ), 'value', jsonb_build_array( variadic t_work.POSITIONGRADENORM_CODE ) ),
'bestpropost', jsonb_build_object( 'text', jsonb_build_array( basic_detail.bestpropost_name ), 'value', jsonb_build_array( basic."BESTPROPOST" ) ),
'highest', jsonb_build_object( 'text', jsonb_build_array( variadic t_eduinfo.EDUCATION_NAME ), 'value', jsonb_build_array( variadic t_eduinfo.EDUCATION_CODE ) ),
'bestdegree', jsonb_build_object( 'text', jsonb_build_array( variadic t_eduinfo.DEGREE_NAME ), 'value', jsonb_build_array( variadic t_eduinfo.DEGREE_CODE ) ),
'college', jsonb_build_object( 'text', jsonb_build_array( variadic t_eduinfo.COLLEGE_NAME ), 'value', jsonb_build_array( variadic t_eduinfo.COLLEGE_CODE ) ),
'specialty', jsonb_build_object( 'text', jsonb_build_array( variadic t_eduinfo.PROFNAME ), 'value', jsonb_build_array( variadic t_eduinfo.PROFNAME ) ),
'constructor_major', jsonb_build_object( 'text', jsonb_build_array( variadic t_quacert.MAINPROF_NAME ), 'value', jsonb_build_array( variadic t_quacert.MAINPROF_CODE ) ),
'complete_projects', jsonb_build_object( 'text', jsonb_build_array( '' ), 'value', jsonb_build_array( '' ) ),
'accumulated_years', jsonb_build_object( 'text', jsonb_build_array( '' ), 'value', jsonb_build_array( '' ) ),
'overseas_years', jsonb_build_object( 'text', jsonb_build_array( '' ), 'value', jsonb_build_array( '' ) ),
'safe_pro_certificate', jsonb_build_object( 'text', jsonb_build_array( variadic t_quacert.SAFE_CODE ), 'value', jsonb_build_array( variadic t_quacert.SAFE_CODE ) ),
'other_qualification', jsonb_build_object( 'text', jsonb_build_array( variadic t_quacert.OTHER_CODE ), 'value', jsonb_build_array( variadic t_quacert.OTHER_CODE ) )
) as projectInfo,
jsonb_build_object (
'empid', jsonb_build_object ( 'text', jsonb_build_array( basic."EMPID" ), 'value', jsonb_build_array( basic."EMPID" ) ),
'bestpropost_speciality', jsonb_build_object( 'text', jsonb_build_array( variadic t_proftech.PROF ), 'value', jsonb_build_array( variadic t_proftech.PROF ) ),
'name', jsonb_build_object( 'text', jsonb_build_array( basic."NAME" ), 'value', jsonb_build_array( basic."NAME" ) ),
'sex', jsonb_build_object( 'text', jsonb_build_array( basic."SEX" ), 'value', jsonb_build_array( basic."SEX" ) ),
'age', jsonb_build_object( 'text', jsonb_build_array( basic."AGE" ), 'value', jsonb_build_array( basic."AGE" ) ),
'politicsfaces', jsonb_build_object( 'text', jsonb_build_array( basic."POLITICSFACES" ), 'value', jsonb_build_array( basic."POLITICSFACES" ) ),
'work_info', jsonb_build_array( variadic t_workinfo.work_info )
) as empInfo,
CDC_PROJECT_MANAGER_PERFORMANCE.payload as performanceInfo
from
CDC_HR_EMP_BASICINFO basic
left join mv_hr_empinfo_detail basic_detail on basic."EMPID" = basic_detail.empid
LEFT JOIN mv_hr_empinfo_work_detail as t_work ON basic."EMPID" = t_work.EMPID
LEFT JOIN mv_hr_empinfo_propost_detail as t_bestpropost ON basic."EMPID" = t_bestpropost.EMPID
LEFT JOIN mv_hr_empinfo_eduinfo_detail as t_eduinfo ON basic."EMPID" = t_eduinfo.EMPID
LEFT JOIN mv_hr_empinfo_quacert_detail as t_quacert ON basic."EMPID" = t_quacert.EMPID
LEFT JOIN mv_hr_empinfo_proftech_detail as t_proftech ON basic."EMPID" = t_proftech.EMPID
LEFT JOIN CDC_PROJECT_MANAGER_PERFORMANCE ON CDC_PROJECT_MANAGER_PERFORMANCE._id = basic."CERTNO"
left join (
SELECT
t_postwork."EMPID" as EMPID_KEY,
array_agg(jsonb_build_object(
'wpwid', jsonb_build_object( 'text', jsonb_build_array( t_postwork."WPWID" ), 'value', jsonb_build_array( t_postwork."WPWID" ) ),
'empid', jsonb_build_object( 'text', jsonb_build_array( t_postwork."EMPID" ), 'value', jsonb_build_array( t_postwork."EMPID" ) ),
'officedepid', jsonb_build_object( 'text', jsonb_build_array( t_postwork."OFFICEDEPID" ), 'value', jsonb_build_array( t_postwork."OFFICEDEPID" ) ),
'orule', jsonb_build_object( 'text', jsonb_build_array( t_org_detail.orule ), 'value', jsonb_build_array( t_org_detail.orule ) ),
'corprule', jsonb_build_object( 'text', jsonb_build_array( t_org_detail.corule ), 'value', jsonb_build_array( t_org_detail.corule ) ),
'officetype', jsonb_build_object( 'text', jsonb_build_array( t_postwork."OFFICETYPE" ), 'value', jsonb_build_array( t_postwork."OFFICETYPE" ) ),
'posttype', jsonb_build_object( 'text', jsonb_build_array( t_postwork."POSTTYPE" ), 'value', jsonb_build_array( t_postwork."POSTTYPE" ) ),
'post', jsonb_build_object( 'text', jsonb_build_array( t_postwork."POST" ), 'value', jsonb_build_array( t_postwork."POST" ) )
)) as work_info
FROM
CDC_HR_WORK_POSTWORKINFO as t_postwork,
mv_waf_ac_organ_detail as t_org_detail
WHERE
t_postwork."OFFICEDEPID" = t_org_detail.oid
group by
t_postwork."EMPID"
) t_workinfo on basic."EMPID" = t_workinfo.EMPID_KEY
WITH (
connector = 'kafka',
properties.bootstrap.server = 'kafka-stream-kafka-bootstrap.kafka-cluster:9092',
topic = 'PROJECT.PROJECT_MANAGER_PERSON',
primary_key = '_id'
)
format upsert encode json
Currently there are the following issues:
Beta Was this translation helpful? Give feedback.
All reactions