Skip to content

Conversation

tanmaya-panda1
Copy link
Contributor

@tanmaya-panda1 tanmaya-panda1 commented Sep 1, 2025

This pull request introduces support for streaming ingestion to Azure Kusto in the Fluent Bit output plugin. Streaming ingestion allows data to be sent directly to the Kusto engine, bypassing blob storage and ingestion queues, with a 4MB per-request payload limit and no local buffering. The changes include configuration options, endpoint management, ingestion logic, and resource cleanup.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features

    • Optional streaming ingestion mode for Azure Data Explorer output, sending data directly to the cluster with a 4 MB per-request limit and no local buffering.
  • Configuration

    • Added streaming_ingestion_enabled (default: false).
    • Validation prevents enabling both streaming_ingestion_enabled and buffering_enabled simultaneously.
    • Improved logs clarify the selected ingestion mode.
  • Stability

    • Improved resource cleanup on shutdown when streaming mode is used.

Copy link

coderabbitai bot commented Sep 1, 2025

Walkthrough

Adds optional streaming ingestion to Azure Kusto output plugin. Introduces a config flag, new upstream for cluster, streaming request flow with 4MB limit, conditional routing in flush, config validation against buffering, cleanup on exit, new streaming API, and a queued ingestion signature change in implementation.

Changes

Cohort / File(s) Summary of Changes
Core plugin logic & lifecycle
plugins/out_azure_kusto/azure_kusto.c
Adds streaming_ingestion_enabled flag handling; derives/creates cluster upstream (ctx->u_cluster) from ingestion endpoint; routes flush to streaming path with 4MB limit; falls back to existing error path on failure; retains queued ingestion when disabled; cleans up u_cluster on exit; exposes flag in config map.
Public context struct
plugins/out_azure_kusto/azure_kusto.h
Adds public fields: int streaming_ingestion_enabled; struct flb_upstream *u_cluster.
Configuration & validation
plugins/out_azure_kusto/azure_kusto_conf.c
Validates mutual exclusivity: errors if buffering_enabled and streaming_ingestion_enabled are both true; logs selected mode; early return on invalid config.
Ingestion implementation
plugins/out_azure_kusto/azure_kusto_ingest.c
Adds azure_kusto_streaming_ingestion(...) implementing HTTP streaming ingestion using cluster upstream and OAuth2; enforces headers/encoding and success codes; updates azure_kusto_queued_ingestion signature to include struct azure_kusto_file *upload_file.
Ingestion header/API
plugins/out_azure_kusto/azure_kusto_ingest.h
Declares new function: int azure_kusto_streaming_ingestion(...). Existing queued ingestion declaration remains as-is.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant FLB as Fluent Bit Output
  participant Conf as Config
  participant AK as Azure Kusto Plugin
  participant U_ing as Ingestion Endpoint
  participant U_clu as Cluster (Streaming)
  participant Auth as OAuth2

  Note over Conf,AK: Initialization
  Conf->>AK: Parse config (buffering_enabled, streaming_ingestion_enabled)
  alt streaming enabled
    AK->>AK: Derive cluster endpoint (remove "ingest-" if present)
    AK->>U_clu: Create upstream (ctx.u_cluster)
  else streaming disabled
    AK->>U_ing: Use ingestion upstream
  end

  Note over FLB,AK: Flush
  FLB->>AK: flush(tag, payload)
  alt streaming_ingestion_enabled
    AK->>AK: Enforce 4MB payload limit
    AK->>Auth: Get access token
    AK->>U_clu: HTTP POST streaming ingest
    alt 200/204
      U_clu-->>AK: Success
    else error
      U_clu-->>AK: Failure
      AK->>FLB: Return error (retry via existing path)
    end
  else queued_ingestion
    AK->>AK: Load ingestion resources
    AK->>U_ing: Queued ingestion (blob/queue path)
  end

  Note over AK: Shutdown
  AK->>U_clu: Destroy ctx.u_cluster (if created)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

I tug my ears at streaming’s gleam,
A 4MB hop—so swift, so clean!
If buffers bloat, I thump a warning,
Choose one burrow, night or morning.
New tunnels carved, queues still true—
I whisk the bytes from me to you. 🐇✨

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (12)
plugins/out_azure_kusto/azure_kusto.h (3)

111-113: Use boolean type and document default.

Prefer flb_boolean (FLB_TRUE/FLB_FALSE) for flags; clarify default behavior in a brief comment (default: disabled).

-    /* streaming ingestion mode */
-    int streaming_ingestion_enabled;
+    /* streaming ingestion mode (default: disabled) */
+    int streaming_ingestion_enabled; /* consider flb_boolean */

173-175: Clarify ownership and configure timeouts for u_cluster.

Document lifecycle: who creates/destroys u_cluster; ensure it inherits TLS/timeout settings and is freed on exit/errors.

-    /* Upstream connection to the main Kusto cluster for streaming ingestion */
-    struct flb_upstream *u_cluster;
+    /* Upstream connection to the main Kusto cluster for streaming ingestion.
+     * Ownership: created by init; must be destroyed on exit/error. */
+    struct flb_upstream *u_cluster;

188-189: Header guard naming consistency.

Minor: this header uses FLB_OUT_AZURE_KUSTO while ingest.h uses FLB_OUT_AZURE_KUSTO_INGEST_H. If the repo prefers _H suffix, align.

plugins/out_azure_kusto/azure_kusto_conf.c (1)

806-815: Clarify mode logs and 4MB constraint location.

Good logging. Consider stating that the 4MB limit applies to the request body (post-compression) and will be pre-validated.

plugins/out_azure_kusto/azure_kusto_ingest.h (1)

29-31: API contract: document 4MB limit and return semantics.

Add a brief comment noting payload must be ≤ 4 MiB (per-request), whether size is before/after compression, and what return codes mean.

-int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
-                                    size_t tag_len, flb_sds_t payload, size_t payload_size);
+/* Returns 0 on success, -1 on transport/HTTP error.
+ * Payload must respect Kusto streaming 4 MiB limit (effective on-the-wire size). */
+int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
+                                    size_t tag_len, flb_sds_t payload, size_t payload_size);
plugins/out_azure_kusto/azure_kusto_ingest.c (4)

548-555: Honor configured timeouts for streaming upstream.

Align cluster upstream timeouts with ingestion settings to avoid long hangs.

-    u_conn = flb_upstream_conn_get(ctx->u_cluster);
+    ctx->u_cluster->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout;
+    ctx->u_cluster->base.net.io_timeout = ctx->io_timeout;
+    u_conn = flb_upstream_conn_get(ctx->u_cluster);

575-588: URI building: guard against oversized database/table names.

flb_sds_snprintf auto-expands, but log the final length or validate against reasonable upper bounds to catch misconfigurations early.


595-612: Reduce payload logging to avoid PII leakage.

Even at debug, 200 chars may leak sensitive data. Trim further or behind a verbose flag.

-        flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Payload sample (first 200 chars): %.200s", (char*)payload);
+        flb_plg_trace(ctx->ins, "[STREAMING_INGESTION] Payload sample (first 64 chars): %.64s", (char*)payload);

622-636: Accept HTTP 202 as success (defensive).

Some services use 202 for accepted streaming writes. Consider treating 200/202/204 as success.

-            if (c->resp.status == 200 || c->resp.status == 204) {
+            if (c->resp.status == 200 || c->resp.status == 202 || c->resp.status == 204) {
                 ret = 0;
plugins/out_azure_kusto/azure_kusto.c (3)

1461-1471: Consider fallback from streaming to queued ingestion on failure

When streaming ingestion fails, the code returns FLB_RETRY which will retry with the same streaming method. Consider falling back to queued ingestion for more resilient operation.

         /* Perform streaming ingestion to Kusto */
         flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Initiating streaming ingestion to Kusto");
         ret = azure_kusto_streaming_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size);
         flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion completed with result: %d", ret);
 
         if (ret != 0) {
-            flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Streaming ingestion failed, will retry");
-            ret = FLB_RETRY;
-            goto error;
+            flb_plg_warn(ctx->ins, "[FLUSH_STREAMING] WARNING: Streaming ingestion failed, falling back to queued ingestion");
+            
+            /* Fallback to queued ingestion */
+            ret = azure_kusto_load_ingestion_resources(ctx, config);
+            if (ret != 0) {
+                flb_plg_error(ctx->ins, "cannot load ingestion resources for fallback");
+                ret = FLB_RETRY;
+                goto error;
+            }
+            
+            ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL);
+            if (ret != 0) {
+                flb_plg_error(ctx->ins, "fallback queued ingestion also failed");
+                ret = FLB_RETRY;
+                goto error;
+            }
         } else {
             flb_plg_info(ctx->ins, "[FLUSH_STREAMING] SUCCESS: Streaming ingestion completed successfully");
         }

1449-1459: Add metrics/monitoring for streaming ingestion

Consider adding metrics to track streaming ingestion success/failure rates and payload sizes to help with monitoring and debugging in production.

+        /* Track metrics for streaming ingestion */
+        if (ctx->metrics) {
+            flb_metrics_add(ctx->metrics->streaming_attempts, 1);
+            flb_metrics_add(ctx->metrics->streaming_bytes, final_payload_size);
+        }
+        
         /* Check payload size limit for streaming ingestion (4MB) */
         flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Checking payload size: %zu bytes against 4MB limit", final_payload_size);
         if (final_payload_size > 4194304) { /* 4MB = 4 * 1024 * 1024 */
             flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Payload size %zu bytes exceeds 4MB limit for streaming ingestion", final_payload_size);
+            if (ctx->metrics) {
+                flb_metrics_add(ctx->metrics->streaming_size_exceeded, 1);
+            }
             ret = FLB_ERROR;
             goto error;
         }

982-983: Consider configuration validation for invalid endpoint formats

The warning when the ingestion endpoint doesn't contain "ingest-" might not be appropriate for all deployment scenarios. Some users might intentionally use direct cluster endpoints.

     } else {
-        flb_plg_warn(ctx->ins, "ingestion endpoint does not contain 'ingest-' prefix, using as cluster endpoint");
+        flb_plg_info(ctx->ins, "Using ingestion endpoint directly as cluster endpoint (no 'ingest-' prefix found)");
         /* Use ingestion endpoint directly as cluster endpoint */
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 1132279 and bf150d8.

📒 Files selected for processing (5)
  • plugins/out_azure_kusto/azure_kusto.c (4 hunks)
  • plugins/out_azure_kusto/azure_kusto.h (3 hunks)
  • plugins/out_azure_kusto/azure_kusto_conf.c (1 hunks)
  • plugins/out_azure_kusto/azure_kusto_ingest.c (2 hunks)
  • plugins/out_azure_kusto/azure_kusto_ingest.h (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
plugins/out_azure_kusto/azure_kusto_ingest.h (1)
plugins/out_azure_kusto/azure_kusto_ingest.c (1)
  • azure_kusto_streaming_ingestion (520-658)
plugins/out_azure_kusto/azure_kusto_ingest.c (5)
include/fluent-bit/flb_compat.h (1)
  • gmtime_r (75-81)
src/flb_upstream.c (2)
  • flb_upstream_conn_get (700-833)
  • flb_upstream_conn_release (851-936)
plugins/out_azure_kusto/azure_kusto.c (1)
  • get_azure_kusto_token (114-161)
src/flb_sds.c (3)
  • flb_sds_create_size (92-95)
  • flb_sds_destroy (389-399)
  • flb_sds_snprintf (405-428)
src/flb_http_client.c (4)
  • flb_http_client (800-845)
  • flb_http_add_header (949-981)
  • flb_http_do (1522-1571)
  • flb_http_client_destroy (1627-1634)
plugins/out_azure_kusto/azure_kusto.c (3)
src/flb_sds.c (2)
  • flb_sds_create (78-90)
  • flb_sds_destroy (389-399)
src/flb_upstream.c (2)
  • flb_upstream_create_url (416-481)
  • flb_upstream_destroy (649-687)
plugins/out_azure_kusto/azure_kusto_ingest.c (1)
  • azure_kusto_streaming_ingestion (520-658)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (29)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
  • GitHub Check: PR - fuzzing test
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-22.04, clang-12)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
  • GitHub Check: pr-compile-centos-7
🔇 Additional comments (3)
plugins/out_azure_kusto/azure_kusto_ingest.c (1)

646-655: Cleanup is correct; ensure u_conn non-NULL prior to release.

Looks fine since early returns happen before assignment; keep as-is.

plugins/out_azure_kusto/azure_kusto.c (2)

1580-1583: LGTM! Proper cleanup of cluster upstream

The cleanup of the cluster upstream connection in the exit callback is correctly implemented.


1649-1653: LGTM! Well-documented configuration option

The streaming ingestion configuration option is properly documented with clear limitations (4MB limit, no buffering support).

Comment on lines +799 to +805
/* Validate mutual exclusivity between buffering and streaming ingestion */
if (ctx->buffering_enabled && ctx->streaming_ingestion_enabled) {
flb_plg_error(ctx->ins, "buffering_enabled and streaming_ingestion_enabled cannot both be true. When buffering is enabled, streaming ingestion is automatically disabled");
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Don’t hard-fail when both buffering and streaming are enabled; resolve deterministically.

Current error contradicts the log (“streaming… automatically disabled”). Either disable buffering or streaming and continue with a warning.

-    if (ctx->buffering_enabled && ctx->streaming_ingestion_enabled) {
-        flb_plg_error(ctx->ins, "buffering_enabled and streaming_ingestion_enabled cannot both be true. When buffering is enabled, streaming ingestion is automatically disabled");
-        flb_azure_kusto_conf_destroy(ctx);
-        return NULL;
-    }
+    if (ctx->buffering_enabled && ctx->streaming_ingestion_enabled) {
+        /* Prefer queued ingestion when buffering is explicitly enabled */
+        ctx->streaming_ingestion_enabled = FLB_FALSE;
+        flb_plg_warn(ctx->ins, "buffering_enabled=true overrides streaming_ingestion_enabled; streaming disabled");
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/* Validate mutual exclusivity between buffering and streaming ingestion */
if (ctx->buffering_enabled && ctx->streaming_ingestion_enabled) {
flb_plg_error(ctx->ins, "buffering_enabled and streaming_ingestion_enabled cannot both be true. When buffering is enabled, streaming ingestion is automatically disabled");
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}
/* Validate mutual exclusivity between buffering and streaming ingestion */
if (ctx->buffering_enabled && ctx->streaming_ingestion_enabled) {
/* Prefer queued ingestion when buffering is explicitly enabled */
ctx->streaming_ingestion_enabled = FLB_FALSE;
flb_plg_warn(ctx->ins,
"buffering_enabled=true overrides streaming_ingestion_enabled; streaming disabled");
}
🤖 Prompt for AI Agents
In plugins/out_azure_kusto/azure_kusto_conf.c around lines 799-805, the code
currently hard-fails when both buffering_enabled and streaming_ingestion_enabled
are true; change this to deterministically resolve the conflict: replace the
error+destroy+return with a warning log stating streaming ingestion will be
disabled, set ctx->streaming_ingestion_enabled = FLB_FALSE (or 0) so buffering
remains active, and continue initialization without freeing ctx so operation
proceeds with buffering only.

Comment on lines +534 to +541
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Starting for tag: %.*s, payload: %zu bytes, db: %s, table: %s, compression: %s",
(int)tag_len, tag, payload_size, ctx->database_name, ctx->table_name, ctx->compression_enabled ? "enabled" : "disabled");

now = time(NULL);
gmtime_r(&now, &tm);
len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm);
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Request timestamp: %s", tmp);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Pre-validate the 4 MiB streaming limit (avoid 413s and wasted I/O).

Check payload_size upfront (and compressed size when applicable) before opening a connection.

     flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Starting for tag: %.*s, payload: %zu bytes, db: %s, table: %s, compression: %s",
                  (int)tag_len, tag, payload_size, ctx->database_name, ctx->table_name, ctx->compression_enabled ? "enabled" : "disabled");
+
+    /* Kusto streaming hard limit: 4 MiB per request payload */
+    if (payload_size > (4 * 1024 * 1024)) {
+        flb_plg_error(ctx->ins, "[STREAMING_INGESTION] payload_size=%zu exceeds 4MiB limit", payload_size);
+        return -1;
+    }
🤖 Prompt for AI Agents
In plugins/out_azure_kusto/azure_kusto_ingest.c around lines 534-541, add an
upfront size validation before any network/connection work: check payload_size
against the 4 MiB streaming limit and, if compression is enabled, compute or
obtain the compressed size and validate that too; if the size exceeds the limit,
log an informative error (including tag, db/table, sizes) and abort/return early
with the appropriate error code instead of proceeding to open a connection or
send data.

Comment on lines +946 to +993
/*
* Create upstream context for Kusto Cluster endpoint (for streaming ingestion)
* Convert ingestion endpoint to cluster endpoint by removing "ingest-" prefix
*/
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
flb_sds_t cluster_endpoint = NULL;

/* Check if ingestion endpoint contains "ingest-" prefix */
if (strstr(ctx->ingestion_endpoint, "ingest-") != NULL) {
/* Create cluster endpoint by removing "ingest-" prefix */
cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint);
if (!cluster_endpoint) {
flb_plg_error(ctx->ins, "failed to create cluster endpoint string");
return -1;
}

/* Replace "ingest-" with empty string to get cluster endpoint */
char *ingest_pos = strstr(cluster_endpoint, "ingest-");
if (ingest_pos) {
/* Move the rest of the string to remove "ingest-" */
memmove(ingest_pos, ingest_pos + 7, strlen(ingest_pos + 7) + 1);
flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - 7);
}

flb_plg_info(ctx->ins, "Creating cluster upstream connection to: %s", cluster_endpoint);

/* Create upstream connection to cluster endpoint */
ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint);
flb_sds_destroy(cluster_endpoint);
return -1;
}

flb_sds_destroy(cluster_endpoint);
} else {
flb_plg_warn(ctx->ins, "ingestion endpoint does not contain 'ingest-' prefix, using as cluster endpoint");
/* Use ingestion endpoint directly as cluster endpoint */
ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed");
return -1;
}
}

flb_plg_info(ctx->ins, "Cluster upstream connection created successfully for streaming ingestion");
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential memory leak when cluster endpoint creation fails

When flb_sds_create fails (line 956), the function returns -1 without proper cleanup. The upstream connection ctx->u created earlier (line 935) will leak.

Apply this fix to properly clean up on failure:

 if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
     flb_sds_t cluster_endpoint = NULL;
 
     /* Check if ingestion endpoint contains "ingest-" prefix */
     if (strstr(ctx->ingestion_endpoint, "ingest-") != NULL) {
         /* Create cluster endpoint by removing "ingest-" prefix */
         cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint);
         if (!cluster_endpoint) {
             flb_plg_error(ctx->ins, "failed to create cluster endpoint string");
+            flb_upstream_destroy(ctx->u);
+            ctx->u = NULL;
             return -1;
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/*
* Create upstream context for Kusto Cluster endpoint (for streaming ingestion)
* Convert ingestion endpoint to cluster endpoint by removing "ingest-" prefix
*/
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
flb_sds_t cluster_endpoint = NULL;
/* Check if ingestion endpoint contains "ingest-" prefix */
if (strstr(ctx->ingestion_endpoint, "ingest-") != NULL) {
/* Create cluster endpoint by removing "ingest-" prefix */
cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint);
if (!cluster_endpoint) {
flb_plg_error(ctx->ins, "failed to create cluster endpoint string");
return -1;
}
/* Replace "ingest-" with empty string to get cluster endpoint */
char *ingest_pos = strstr(cluster_endpoint, "ingest-");
if (ingest_pos) {
/* Move the rest of the string to remove "ingest-" */
memmove(ingest_pos, ingest_pos + 7, strlen(ingest_pos + 7) + 1);
flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - 7);
}
flb_plg_info(ctx->ins, "Creating cluster upstream connection to: %s", cluster_endpoint);
/* Create upstream connection to cluster endpoint */
ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint);
flb_sds_destroy(cluster_endpoint);
return -1;
}
flb_sds_destroy(cluster_endpoint);
} else {
flb_plg_warn(ctx->ins, "ingestion endpoint does not contain 'ingest-' prefix, using as cluster endpoint");
/* Use ingestion endpoint directly as cluster endpoint */
ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed");
return -1;
}
}
flb_plg_info(ctx->ins, "Cluster upstream connection created successfully for streaming ingestion");
}
/*
* Create upstream context for Kusto Cluster endpoint (for streaming ingestion)
* Convert ingestion endpoint to cluster endpoint by removing "ingest-" prefix
*/
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
flb_sds_t cluster_endpoint = NULL;
/* Check if ingestion endpoint contains "ingest-" prefix */
if (strstr(ctx->ingestion_endpoint, "ingest-") != NULL) {
/* Create cluster endpoint by removing "ingest-" prefix */
cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint);
if (!cluster_endpoint) {
flb_plg_error(ctx->ins, "failed to create cluster endpoint string");
flb_upstream_destroy(ctx->u);
ctx->u = NULL;
return -1;
}
/* Replace "ingest-" with empty string to get cluster endpoint */
char *ingest_pos = strstr(cluster_endpoint, "ingest-");
if (ingest_pos) {
/* Move the rest of the string to remove "ingest-" */
memmove(ingest_pos, ingest_pos + 7, strlen(ingest_pos + 7) + 1);
flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - 7);
}
flb_plg_info(ctx->ins, "Creating cluster upstream connection to: %s", cluster_endpoint);
/* Create upstream connection to cluster endpoint */
ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint);
flb_sds_destroy(cluster_endpoint);
return -1;
}
flb_sds_destroy(cluster_endpoint);
} else {
flb_plg_warn(ctx->ins, "ingestion endpoint does not contain 'ingest-' prefix, using as cluster endpoint");
/* Use ingestion endpoint directly as cluster endpoint */
ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed");
return -1;
}
}
flb_plg_info(ctx->ins, "Cluster upstream connection created successfully for streaming ingestion");
}
🤖 Prompt for AI Agents
In plugins/out_azure_kusto/azure_kusto.c around lines 946-993, the code returns
-1 when flb_sds_create(cluster_endpoint) fails but does not clean up the
previously created upstream (ctx->u), causing a memory/resource leak; modify the
error path to check if ctx->u is non-NULL and properly destroy it (e.g., call
the appropriate flb_upstream_destroy or cleanup function) before returning, and
apply the same cleanup pattern for any other early returns in this block
(including failures creating ctx->u_cluster) so all allocated upstreams are
released on error.

Comment on lines +950 to +969
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
flb_sds_t cluster_endpoint = NULL;

/* Check if ingestion endpoint contains "ingest-" prefix */
if (strstr(ctx->ingestion_endpoint, "ingest-") != NULL) {
/* Create cluster endpoint by removing "ingest-" prefix */
cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint);
if (!cluster_endpoint) {
flb_plg_error(ctx->ins, "failed to create cluster endpoint string");
return -1;
}

/* Replace "ingest-" with empty string to get cluster endpoint */
char *ingest_pos = strstr(cluster_endpoint, "ingest-");
if (ingest_pos) {
/* Move the rest of the string to remove "ingest-" */
memmove(ingest_pos, ingest_pos + 7, strlen(ingest_pos + 7) + 1);
flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - 7);
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve URL prefix removal for better reliability

The current implementation uses strstr which finds "ingest-" anywhere in the URL, not just in the hostname. This could match unintended parts like path segments. Additionally, the memmove operation assumes the prefix is at the exact position found by strstr.

Consider using a more robust approach that specifically targets the hostname:

-    /* Check if ingestion endpoint contains "ingest-" prefix */
-    if (strstr(ctx->ingestion_endpoint, "ingest-") != NULL) {
+    /* Check if hostname starts with "ingest-" prefix */
+    const char *prefix = "ingest-";
+    const char *schema_end = strstr(ctx->ingestion_endpoint, "://");
+    const char *hostname_start = schema_end ? schema_end + 3 : ctx->ingestion_endpoint;
+    
+    if (strncmp(hostname_start, prefix, strlen(prefix)) == 0) {
         /* Create cluster endpoint by removing "ingest-" prefix */
         cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint);
         if (!cluster_endpoint) {
             flb_plg_error(ctx->ins, "failed to create cluster endpoint string");
             return -1;
         }
 
-        /* Replace "ingest-" with empty string to get cluster endpoint */
-        char *ingest_pos = strstr(cluster_endpoint, "ingest-");
-        if (ingest_pos) {
-            /* Move the rest of the string to remove "ingest-" */
-            memmove(ingest_pos, ingest_pos + 7, strlen(ingest_pos + 7) + 1);
-            flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - 7);
-        }
+        /* Find the position in our copy and remove the prefix */
+        char *copy_hostname = strstr(cluster_endpoint, "://");
+        if (copy_hostname) {
+            copy_hostname += 3;
+            memmove(copy_hostname, copy_hostname + strlen(prefix), 
+                    strlen(copy_hostname + strlen(prefix)) + 1);
+            flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - strlen(prefix));
+        }
🤖 Prompt for AI Agents
In plugins/out_azure_kusto/azure_kusto.c around lines 950 to 969, the code uses
strstr on the whole ingestion_endpoint and memmove assuming the found position
is the hostname prefix; instead parse or isolate the URL host portion, verify
the host starts with "ingest-" and then remove only that leading prefix from the
hostname (not from the entire URL). Update the logic to: extract the host
substring (or use a URL parse helper), check if host[0..6] == "ingest-", create
a new SDS for the cluster endpoint by copying the original URL and replacing
only the host portion with host+7 (or build the URL from scheme +
host_without_prefix + path), and adjust SDS length safely with flb_sds_len_set
using the actual removed byte count; ensure bounds checks and null checks are
present and avoid memmove on the whole URL.

Comment on lines +973 to +978
ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint);
flb_sds_destroy(cluster_endpoint);
return -1;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add cleanup for ingestion upstream on cluster upstream creation failure

If cluster upstream creation fails, the function returns -1 but doesn't clean up the previously created ctx->u upstream connection.

         ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls);
         if (!ctx->u_cluster) {
             flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint);
             flb_sds_destroy(cluster_endpoint);
+            flb_upstream_destroy(ctx->u);
+            ctx->u = NULL;
             return -1;
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint);
flb_sds_destroy(cluster_endpoint);
return -1;
}
ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint);
flb_sds_destroy(cluster_endpoint);
flb_upstream_destroy(ctx->u);
ctx->u = NULL;
return -1;
}
🤖 Prompt for AI Agents
In plugins/out_azure_kusto/azure_kusto.c around lines 973-978, when
flb_upstream_create_url for ctx->u_cluster fails the code currently returns -1
without cleaning up the previously created ctx->u upstream; modify the error
path to call flb_upstream_destroy(ctx->u) (and set ctx->u = NULL) before
destroying cluster_endpoint and returning -1 so the earlier upstream is properly
released; keep the flb_sds_destroy(cluster_endpoint) and the error log as-is.

Comment on lines +984 to +988
ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed");
return -1;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add cleanup for ingestion upstream when cluster creation fails (alternate path)

Similar cleanup issue in the alternate code path when the ingestion endpoint doesn't contain "ingest-" prefix.

         ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
         if (!ctx->u_cluster) {
             flb_plg_error(ctx->ins, "cluster upstream creation failed");
+            flb_upstream_destroy(ctx->u);
+            ctx->u = NULL;
             return -1;
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed");
return -1;
}
ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed");
flb_upstream_destroy(ctx->u);
ctx->u = NULL;
return -1;
}
🤖 Prompt for AI Agents
In plugins/out_azure_kusto/azure_kusto.c around lines 984-988, when creating the
cluster upstream fails in this alternate path you must clean up the previously
created ingestion upstream to avoid resource leaks; if ctx->u_ingestion is
non-NULL call the appropriate upstream destroy function (e.g.
flb_upstream_destroy(ctx->u_ingestion)), set ctx->u_ingestion to NULL,
optionally log the cleanup, and then return -1.

Comment on lines +1447 to +1459
/* Check if streaming ingestion is enabled */
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion mode enabled for tag: %s", event_chunk->tag);

/* Perform queued ingestion to Kusto */
ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL);
flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot perform queued ingestion");
ret = FLB_RETRY;
goto error;
/* Check payload size limit for streaming ingestion (4MB) */
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Checking payload size: %zu bytes against 4MB limit", final_payload_size);
if (final_payload_size > 4194304) { /* 4MB = 4 * 1024 * 1024 */
flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Payload size %zu bytes exceeds 4MB limit for streaming ingestion", final_payload_size);
ret = FLB_ERROR;
goto error;
}
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Payload size check passed (%zu bytes < 4MB)", final_payload_size);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Validate payload size before compression for streaming ingestion

The 4MB limit check happens after compression, but the error message and documentation suggest the limit applies to the uncompressed payload. This could lead to confusion and unexpected failures.

Clarify the documentation and consider checking the size before compression:

     /* Check if streaming ingestion is enabled */
     if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
         flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion mode enabled for tag: %s", event_chunk->tag);
 
-        /* Check payload size limit for streaming ingestion (4MB) */
-        flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Checking payload size: %zu bytes against 4MB limit", final_payload_size);
+        /* Check payload size limit for streaming ingestion (4MB after compression if enabled) */
+        flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Checking %s payload size: %zu bytes against 4MB limit", 
+                     is_compressed ? "compressed" : "uncompressed", final_payload_size);
         if (final_payload_size > 4194304) { /* 4MB = 4 * 1024 * 1024 */
-            flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Payload size %zu bytes exceeds 4MB limit for streaming ingestion", final_payload_size);
+            flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: %s payload size %zu bytes exceeds 4MB limit for streaming ingestion", 
+                          is_compressed ? "Compressed" : "Uncompressed", final_payload_size);
             ret = FLB_ERROR;
             goto error;
         }

Let me verify the streaming ingestion size limit behavior:


🌐 Web query:

Azure Kusto streaming ingestion 4MB size limit compressed or uncompressed

💡 Result:

Short answer: The 4 MB streaming-ingestion limit is on the uncompressed data size. Microsoft docs and guidance treat the 4 MB streaming request limit as uncompressed; if the uncompressed size exceeds 4 MB the service will route to queued ingestion (and some SDKs explicitly state data >4 MB is moved to queued ingestion regardless of compression). [1][2][3][4]

Sources:

  • Azure Data Explorer — Configure streaming ingestion (data size limit: 4 MB). [1]
  • Azure Synapse / Data Explorer blog: “maximum data size per request is 4MB (uncompressed).” [2]
  • ManagedStreamingIngestionClient guidance: “Data that's larger than 4 MB is automatically sent to queued ingestion, regardless of format or compression” and SDK payload note. [3]
  • Ingestion FAQ: batching/data-size is calculated on uncompressed data (compressed uploads are evaluated by uncompressed size). [4]

Validate uncompressed payload size before compression

The 4 MB streaming-ingestion limit applies to the uncompressed data size; current checks against the post-compression size can allow payloads that exceed 4 MB uncompressed to be sent. Update the code to measure and enforce the 4 MB limit on the raw buffer before applying any compression, and adjust log messages accordingly.

     /* Check if streaming ingestion is enabled */
     if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
         flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion mode enabled for tag: %s", event_chunk->tag);

-        /* Check payload size limit for streaming ingestion (4MB) */
-        flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Checking payload size: %zu bytes against 4MB limit", final_payload_size);
+        /* Check payload size limit for streaming ingestion (4MB uncompressed) */
+        flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Checking uncompressed payload size: %zu bytes against 4MB limit", uncompressed_size);
         if (uncompressed_size > 4194304) { /* 4MB = 4 * 1024 * 1024 */
-            flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Payload size %zu bytes exceeds 4MB limit for streaming ingestion", final_payload_size);
+            flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Uncompressed payload size %zu bytes exceeds 4MB limit for streaming ingestion", uncompressed_size);
             ret = FLB_ERROR;
             goto error;
         }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In plugins/out_azure_kusto/azure_kusto.c around lines 1447 to 1459, the code
currently enforces the 4 MB streaming ingestion limit on the final
(post-compression) payload; change this to measure and check the
uncompressed/raw buffer size before any compression occurs, so the 4MB check
uses the original buffer length (e.g., raw_buf_size) instead of
final_payload_size, update the log messages to reflect "uncompressed payload
size" when logging and erroring, and only proceed to compression if the raw size
passes the 4MB check (return FLB_ERROR and goto error when raw size > 4194304).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant