Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: support query strings containing multiple statements #2707

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

ligfx
Copy link

@ligfx ligfx commented Jul 17, 2024

Adds support for query strings containing multiple statements to the various SQL components. When a string contains multiple statements, they are all executed in the same database transaction.

Interpolated parameters (?, $1, etc.) may only be used in the first statement, similarly to the MySQL and Snowflake drivers which support this when not using the standard database/sql interfaces.

Affected components:

  • The query argument to the sql_raw input, processor, and output
  • The init_statement arg and init_files contents provided to sql_raw, sql_select, and sql_write components

This is incredibly useful for more complex use cases, and required in cases where you must execute multiple statements in the same transaction. Examples:

  • Create Postgres tables on the fly based on topic names coming in:

    output:
     sql_raw:
       driver: postgres
       dsn: postgres://localhost/postgres
       unsafe_dynamic_query: true
       query: |
         CREATE TABLE IF NOT EXISTS ${!metadata("table_name")} (id varchar primary key, document jsonb);
         INSERT INTO ${!metadata("table_name")} (id, document) VALUES ($1, $2)
           ON CONFLICT (id) DO UPDATE SET document = EXCLUDED.document;
       args_mapping: |
         root = [ this.id, this.document.string() ]
    
  • Upsert individual rows into Snowflake:

    output:
      sql_raw:
        driver: snowflake
        dsn: username:[email protected]/mydatabase/public
        args_mapping: root = [ this.id, this.data ]
        init_statement: |
          CREATE TABLE IF NOT EXISTS mytable (
            id STRING,
            data STRING
          );
          CREATE TABLE IF NOT EXISTS mytable_staging (
            id STRING,
            data STRING
          );
        query: |
          INSERT OVERWRITE INTO mytable_staging (id, data) VALUES (?, ?);
          MERGE INTO mytable USING mytable_staging AS incoming
            ON mytable.id = incoming.id
          WHEN MATCHED THEN UPDATE SET
            mytable.id = incoming.id,
            mytable.data = incoming.data
          WHEN NOT MATCHED THEN
            INSERT (id, data) VALUES (
              incoming.id,
              incoming.data
            )
          ;
    
  • Put batches into a Snowflake staging table and then merge them into the main table:

    output:
      broker:
        pattern: fan_out_sequential
        outputs:
        - snowflake_put:
            init_statement: |
              CREATE TABLE IF NOT EXISTS mytable_staging (
                id string,
                data string,
                updated_at timestamptz
              );
             ...
        - sql_raw:
            driver: snowflake
            dsn: username:[email protected]/mydatabase/public
            init_statement: |
              CREATE TABLE IF NOT EXISTS mytable (
                id string,
                data string,
                updated_at timestamptz
              );
            query: |
              MERGE INTO mytable USING (
                SELECT * FROM mytable_staging
                QUALIFY 1 = RANK() OVER (PARTITION BY id ORDER BY updated_at DESC)
              ) mytable_staging
                ON mytable.id = mytable_staging.id
              WHEN MATCHED AND mytable.updated_at < mytable_staging.updated_at THEN UPDATE SET
                mytable.id = mytable_staging.id,
                mytable.data = mytable_staging.data,
                mytable.updated_at = mytable_staging.updated_at
              WHEN NOT MATCHED THEN
                INSERT (id, data, updated_at) VALUES (
                  mytable_staging.id,
                  mytable_staging.data,
                  mytable_staging.updated_at
                )
              ;
              TRUNCATE mytable_staging
              ;
    

@ligfx ligfx force-pushed the sql_multi_statement branch from 6fbe5bc to 6136116 Compare July 17, 2024 21:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant