Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,74 @@ Expected output (SPARQL Results JSON format):
}
```

### Streaming Queries (NDJSON)

For large result sets, you can use streaming queries to receive results as they are produced rather than waiting for the entire result set. This reduces memory pressure and enables progressive rendering in UIs.

Streaming is enabled by setting the `Accept` header to `application/x-ndjson` (Newline-Delimited JSON):

```bash
curl -X POST http://localhost:8090/fluree/query \
-H "Content-Type: application/json" \
-H "Accept: application/x-ndjson" \
-d '{
"from": "example/ledger",
"@context": {
"schema": "http://schema.org/",
"ex": "http://example.org/"
},
"select": ["?name", "?email"],
"where": {
"@id": "?person",
"@type": "schema:Person",
"schema:name": "?name",
"schema:email": "?email"
}
}'
```

Expected output (NDJSON format - one JSON value per line):
```
["Alice Johnson","[email protected]"]
["Bob Smith","[email protected]"]
```

**Key differences from buffered queries:**
- Results are streamed as individual items, not wrapped in an outer array
- Each result is on its own line (newline-delimited)
- Server returns HTTP 206 (Partial Content) instead of 200
- Stream closes when all results are emitted (no explicit completion marker by default)

**Enable query tracking** to get fuel, time, and policy statistics in the metadata:

```bash
curl -X POST http://localhost:8090/fluree/query \
-H "Content-Type: application/json" \
-H "Accept: application/x-ndjson" \
-H "fluree-track-fuel: true" \
-H "fluree-track-time: true" \
-d '{
"from": "example/ledger",
"@context": {
"schema": "http://schema.org/",
"ex": "http://example.org/"
},
"select": ["?name"],
"where": {
"@id": "?person",
"@type": "schema:Person",
"schema:name": "?name"
}
}'
```

Expected output with tracking:
```
["Alice Johnson"]
["Bob Smith"]
{"_fluree-meta":{"status":200,"fuel":245,"time":"3ms"}}
```

### Insert More Data

The `/insert` endpoint adds new data to the ledger. If the subject already exists, the operation will merge the new properties with existing ones:
Expand Down
2 changes: 1 addition & 1 deletion deps.edn
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{:deps {org.clojure/clojure {:mvn/version "1.11.3"}
org.clojure/core.async {:mvn/version "1.6.681"}
com.fluree/db {:git/url "https://github.com/fluree/db.git"
:git/sha "f2bcf88ddf90c44ef50369da4edcb82f91f023e1"}
:git/sha "74fcc49d1154146cc0e66126322f3ad5b0047969"}
com.fluree/json-ld {:git/url "https://github.com/fluree/json-ld.git"
:git/sha "74083536c84d77f8cdd4b686b5661714010baad3"}

Expand Down
6 changes: 3 additions & 3 deletions src/fluree/server/consensus/events.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
protocols"
(:require [clojure.string :as str]
[fluree.db.connection :as connection]
[fluree.db.ledger :as ledger]
[fluree.db.track :as-alias track]
[fluree.db.transact :as transact]
[fluree.db.util.async :refer [<? go-try]]
[fluree.db.util.log :as log]))

Expand Down Expand Up @@ -107,7 +107,7 @@
[{:keys [commit-catalog] :as _conn} ledger-id event]
(go-try
(if-let [raw-txn (-> event :opts :raw-txn)]
(let [{:keys [address]} (<? (ledger/save-txn! commit-catalog ledger-id raw-txn))]
(let [{:keys [address]} (<? (transact/save-txn! commit-catalog ledger-id raw-txn))]
(-> event
(assoc-in [:opts :raw-txn-address] address)
(update :opts dissoc :raw-txn)))
Expand All @@ -122,7 +122,7 @@
(go-try
(if-let [txn (:txn event)]
(let [{:keys [ledger-id]} event
{:keys [address]} (<? (ledger/save-txn! commit-catalog ledger-id txn))
{:keys [address]} (<? (transact/save-txn! commit-catalog ledger-id txn))
event* (-> event
(assoc :txn-address address)
(dissoc :txn))]
Expand Down
4 changes: 2 additions & 2 deletions src/fluree/server/consensus/raft/handlers/new_commit.clj
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
(ns fluree.server.consensus.raft.handlers.new-commit
(:require [clojure.core.async :as async]
[fluree.db.ledger :as ledger]
[fluree.db.storage :as storage]
[fluree.db.storage.file :as file-storage]
[fluree.db.transact :as transact]
[fluree.db.util.async :refer [<? go-try]]
[fluree.db.util.bytes :as bytes]
[fluree.db.util.filesystem :as fs]
Expand All @@ -28,7 +28,7 @@
(let [commit-json (-> (json/parse json false)
;; address is not yet written into the commit file, add it
(assoc "address" address))]
(ledger/publish-commit conn commit-json)))
(transact/publish-commit conn commit-json)))

(defn store-ledger-files
"Persist both the data-file and commit-file to disk only if redundant
Expand Down
85 changes: 77 additions & 8 deletions src/fluree/server/handlers/ledger.clj
Original file line number Diff line number Diff line change
@@ -1,17 +1,86 @@
(ns fluree.server.handlers.ledger
(:require [fluree.db.api :as fluree]
(:require [clojure.core.async :as async]
[clojure.java.io :as io]
[fluree.db.api :as fluree]
[fluree.db.util :as util]
[fluree.db.util.json :as json]
[fluree.db.util.log :as log]
[fluree.server.handler :as-alias handler]
[fluree.server.handlers.shared :refer [defhandler deref!] :as shared]))
[fluree.server.handlers.shared :refer [defhandler deref!] :as shared])
(:import (ring.core.protocols StreamableResponseBody)))

(defn ndjson-streaming-body
"Converts a core.async channel of results into a Ring StreamableResponseBody
that writes NDJSON (newline-delimited JSON) to the output stream.

Handles:
- Normal results: written as JSON objects
- Exceptions: written as error objects with :error and :status keys
- Metadata: {:_fluree-meta {:status 200, :fuel ..., :time ..., :policy ...}}
written as final line when query tracking is enabled

Each item is written as a JSON line followed by newline.
The stream closes when the channel closes."
[result-ch]
(reify StreamableResponseBody
(write-body-to-stream [_ _ output-stream]
(with-open [writer (io/writer output-stream)]
(loop []
(when-some [result (async/<!! result-ch)]
(cond
;; Exception - write error object and close stream
(util/exception? result)
(let [error-data (ex-data result)
status (or (:status error-data) 500)
error-obj {:error (ex-message result)
:status status
:data (dissoc error-data :status)}]
(log/error result "Error during streaming query")
(.write writer (json/stringify error-obj))
(.write writer "\n")
(.flush writer))

;; Metadata (final message from fluree/db) - write as-is
(:_fluree-meta result)
(do
(.write writer (json/stringify result))
(.write writer "\n")
(.flush writer))

;; Normal result - write and continue
:else
(do
(.write writer (json/stringify result))
(.write writer "\n")
(.flush writer)
(recur)))))))))

(defhandler query
[{:keys [fluree/conn fluree/opts] {:keys [body]} :parameters :as _req}]
[{:keys [fluree/conn fluree/opts headers] {:keys [body]} :parameters :as _req}]
(let [query (or (::handler/query body) body)
{:keys [status result] :as query-response}
(deref! (fluree/query-connection conn query opts))]
(log/debug "query handler received query:" query opts)
(shared/with-tracking-headers {:status status, :body result}
query-response)))
accept (get headers "accept")
streaming? (or (= accept "application/x-ndjson")
(= accept "application/ndjson"))]

(if streaming?
;; Streaming response - use NDJSON
(do
(log/debug "Executing streaming query:" query "with opts:" opts)
(let [result-ch (fluree/query-connection-stream conn query opts)]
;; Return 206 Partial Content to avoid response coercion middleware
;; (only 200 responses are coerced against QueryResponse schema)
{:status 206
:headers {"content-type" "application/x-ndjson"
"cache-control" "no-cache"
"x-content-type-options" "nosniff"}
:body (ndjson-streaming-body result-ch)}))

;; Buffered response (existing behavior)
(let [{:keys [status result] :as query-response}
(deref! (fluree/query-connection conn query opts))]
(log/debug "query handler received query:" query opts)
(shared/with-tracking-headers {:status status, :body result}
query-response)))))

(defhandler history
[{:keys [fluree/conn fluree/opts] {{ledger :from :as query} :body} :parameters :as _req}]
Expand Down
173 changes: 173 additions & 0 deletions test/fluree/server/integration/streaming_query_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
(ns fluree.server.integration.streaming-query-test
(:require [clojure.string :as str]
[clojure.test :refer [deftest is testing use-fixtures]]
[fluree.db.util.json :as json]
[fluree.server.integration.test-system
:as test-system
:refer [api-post create-rand-ledger json-headers run-test-server]]))

(use-fixtures :once run-test-server)

(deftest ^:integration streaming-query-test
(testing "Query with Accept: application/x-ndjson returns NDJSON stream"
(let [ledger-name (create-rand-ledger "streaming-query-test")
;; Insert test data
txn-req {:body
(json/stringify
{"ledger" ledger-name
"@context" test-system/default-context
"insert" [{"id" "ex:alice"
"type" "schema:Person"
"ex:name" "Alice"
"ex:age" 30}
{"id" "ex:bob"
"type" "schema:Person"
"ex:name" "Bob"
"ex:age" 25}
{"id" "ex:charlie"
"type" "schema:Person"
"ex:name" "Charlie"
"ex:age" 35}]})
:headers json-headers}
txn-res (api-post :transact txn-req)
_ (assert (= 200 (:status txn-res)))

;; Streaming query
query-req {:body
(json/stringify
{"@context" test-system/default-context
"from" ledger-name
"select" ["?name" "?age"]
"where" {"id" "?person"
"type" "schema:Person"
"ex:name" "?name"
"ex:age" "?age"}})
:headers (assoc json-headers "accept" "application/x-ndjson")}
query-res (api-post :query query-req)]

(is (= 206 (:status query-res)))
(is (= "application/x-ndjson" (get-in query-res [:headers "content-type"])))

;; Parse NDJSON response
(let [lines (-> query-res :body str/split-lines)
results (map #(json/parse % false) lines)
;; Last line is metadata, rest are data
data-results (butlast results)
meta-result (last results)]
;; Should get 3 data results + 1 metadata result
(is (= 4 (count results)))
;; Data results are vectors (tuples) in streaming mode: [name age]
(is (every? vector? data-results))
;; Metadata result contains :_fluree-meta key
(is (map? meta-result))
(is (contains? meta-result "_fluree-meta"))
(is (= 200 (get-in meta-result ["_fluree-meta" "status"])))
;; Verify all three people are in results (first element of each tuple is name)
(is (= #{"Alice" "Bob" "Charlie"}
(set (map first data-results)))))))

(testing "Query without Accept header returns buffered JSON array"
(let [ledger-name (create-rand-ledger "buffered-query-test")
txn-req {:body
(json/stringify
{"ledger" ledger-name
"@context" test-system/default-context
"insert" {"id" "ex:test"
"type" "schema:Test"
"ex:name" "test"}})
:headers json-headers}
txn-res (api-post :transact txn-req)
_ (assert (= 200 (:status txn-res)))

query-req {:body
(json/stringify
{"@context" test-system/default-context
"from" ledger-name
"select" ["?name"]
"where" {"id" "?person"
"ex:name" "?name"}})
:headers json-headers}
query-res (api-post :query query-req)]

(is (= 200 (:status query-res)))
(is (str/includes? (get-in query-res [:headers "content-type"]) "application/json"))

(let [results (json/parse (:body query-res) false)]
(is (vector? results))
(is (= 1 (count results))))))

(testing "Streaming query with metadata tracking"
(let [ledger-name (create-rand-ledger "streaming-meta-test")
txn-req {:body
(json/stringify
{"ledger" ledger-name
"@context" test-system/default-context
"insert" {"id" "ex:test"
"type" "schema:Test"
"ex:name" "test"}})
:headers json-headers}
txn-res (api-post :transact txn-req)
_ (assert (= 200 (:status txn-res)))

query-req {:body
(json/stringify
{"@context" test-system/default-context
"from" ledger-name
"select" ["?name"]
"where" {"id" "?person"
"ex:name" "?name"}})
:headers (assoc json-headers
"accept" "application/x-ndjson"
"fluree-track-fuel" "true"
"fluree-track-time" "true")}
query-res (api-post :query query-req)]

(is (= 206 (:status query-res)))

(let [lines (-> query-res :body str/split-lines)
results (map #(json/parse % false) lines)
data-results (butlast results)
meta-result (last results)]
;; Last line should be metadata with _fluree-meta key
(is (contains? meta-result "_fluree-meta"))
(is (contains? (get meta-result "_fluree-meta") "fuel"))
(is (contains? (get meta-result "_fluree-meta") "time"))
;; All other lines should be vectors (tuples)
(is (every? vector? data-results)))))

(testing "Streaming query with LIMIT"
(let [ledger-name (create-rand-ledger "streaming-limit-test")
txn-req {:body
(json/stringify
{"ledger" ledger-name
"@context" test-system/default-context
"insert" (for [i (range 10)]
{"id" (str "ex:person" i)
"type" "schema:Person"
"ex:name" (str "Person " i)})})
:headers json-headers}
txn-res (api-post :transact txn-req)
_ (assert (= 200 (:status txn-res)))

query-req {:body
(json/stringify
{"@context" test-system/default-context
"from" ledger-name
"select" ["?name"]
"where" {"id" "?person"
"type" "schema:Person"
"ex:name" "?name"}
"limit" 3})
:headers (assoc json-headers "accept" "application/x-ndjson")}
query-res (api-post :query query-req)]

(is (= 206 (:status query-res)))

(let [lines (-> query-res :body str/split-lines)
results (map #(json/parse % false) lines)
data-results (butlast results)
meta-result (last results)]
;; Should get 3 data results + 1 metadata result (LIMIT 3)
(is (= 4 (count results)))
(is (= 3 (count data-results)))
(is (contains? meta-result "_fluree-meta"))))))
Loading