Skip to content

Commit 88d0169

Browse files
committed
Add NDJSON streaming support for queries and update related tests
- Introduced a new `ndjson-streaming-body` function to handle streaming responses in NDJSON format. - Updated the query handler to support streaming queries based on the `Accept` header. - Modified transaction handling to use the `transact` namespace instead of `ledger`. - Added integration tests for streaming queries, including metadata tracking and limit functionality. - Updated dependencies and fixed related test cases.
1 parent e130e17 commit 88d0169

File tree

7 files changed

+325
-15
lines changed

7 files changed

+325
-15
lines changed

README.md

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,74 @@ Expected output (SPARQL Results JSON format):
243243
}
244244
```
245245

246+
### Streaming Queries (NDJSON)
247+
248+
For large result sets, you can use streaming queries to receive results as they are produced rather than waiting for the entire result set. This reduces memory pressure and enables progressive rendering in UIs.
249+
250+
Streaming is enabled by setting the `Accept` header to `application/x-ndjson` (Newline-Delimited JSON):
251+
252+
```bash
253+
curl -X POST http://localhost:8090/fluree/query \
254+
-H "Content-Type: application/json" \
255+
-H "Accept: application/x-ndjson" \
256+
-d '{
257+
"from": "example/ledger",
258+
"@context": {
259+
"schema": "http://schema.org/",
260+
"ex": "http://example.org/"
261+
},
262+
"select": ["?name", "?email"],
263+
"where": {
264+
"@id": "?person",
265+
"@type": "schema:Person",
266+
"schema:name": "?name",
267+
"schema:email": "?email"
268+
}
269+
}'
270+
```
271+
272+
Expected output (NDJSON format - one JSON array per line):
273+
```
274+
["Alice Johnson","alice@example.com"]
275+
["Bob Smith","bob@example.com"]
276+
```
277+
278+
**Key differences from buffered queries:**
279+
- Results are returned as tuples (arrays) instead of maps for efficiency
280+
- Each result is on its own line (newline-delimited)
281+
- Server returns HTTP 206 (Partial Content) instead of 200
282+
- Stream closes when all results are emitted (no explicit completion marker by default)
283+
284+
**Enable query tracking** to get fuel, time, and policy statistics in the metadata:
285+
286+
```bash
287+
curl -X POST http://localhost:8090/fluree/query \
288+
-H "Content-Type: application/json" \
289+
-H "Accept: application/x-ndjson" \
290+
-H "fluree-track-fuel: true" \
291+
-H "fluree-track-time: true" \
292+
-d '{
293+
"from": "example/ledger",
294+
"@context": {
295+
"schema": "http://schema.org/",
296+
"ex": "http://example.org/"
297+
},
298+
"select": ["?name"],
299+
"where": {
300+
"@id": "?person",
301+
"@type": "schema:Person",
302+
"schema:name": "?name"
303+
}
304+
}'
305+
```
306+
307+
Expected output with tracking:
308+
```
309+
["Alice Johnson"]
310+
["Bob Smith"]
311+
{"_fluree-meta":{"status":200,"fuel":245,"time":"3ms"}}
312+
```
313+
246314
### Insert More Data
247315

248316
The `/insert` endpoint adds new data to the ledger. If the subject already exists, the operation will merge the new properties with existing ones:

deps.edn

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{:deps {org.clojure/clojure {:mvn/version "1.11.3"}
22
org.clojure/core.async {:mvn/version "1.6.681"}
33
com.fluree/db {:git/url "https://github.com/fluree/db.git"
4-
:git/sha "f2bcf88ddf90c44ef50369da4edcb82f91f023e1"}
4+
:git/sha "74fcc49d1154146cc0e66126322f3ad5b0047969"}
55
com.fluree/json-ld {:git/url "https://github.com/fluree/json-ld.git"
66
:git/sha "74083536c84d77f8cdd4b686b5661714010baad3"}
77

src/fluree/server/consensus/events.clj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
protocols"
44
(:require [clojure.string :as str]
55
[fluree.db.connection :as connection]
6-
[fluree.db.ledger :as ledger]
76
[fluree.db.track :as-alias track]
7+
[fluree.db.transact :as transact]
88
[fluree.db.util.async :refer [<? go-try]]
99
[fluree.db.util.log :as log]))
1010

@@ -107,7 +107,7 @@
107107
[{:keys [commit-catalog] :as _conn} ledger-id event]
108108
(go-try
109109
(if-let [raw-txn (-> event :opts :raw-txn)]
110-
(let [{:keys [address]} (<? (ledger/save-txn! commit-catalog ledger-id raw-txn))]
110+
(let [{:keys [address]} (<? (transact/save-txn! commit-catalog ledger-id raw-txn))]
111111
(-> event
112112
(assoc-in [:opts :raw-txn-address] address)
113113
(update :opts dissoc :raw-txn)))
@@ -122,7 +122,7 @@
122122
(go-try
123123
(if-let [txn (:txn event)]
124124
(let [{:keys [ledger-id]} event
125-
{:keys [address]} (<? (ledger/save-txn! commit-catalog ledger-id txn))
125+
{:keys [address]} (<? (transact/save-txn! commit-catalog ledger-id txn))
126126
event* (-> event
127127
(assoc :txn-address address)
128128
(dissoc :txn))]

src/fluree/server/consensus/raft/handlers/new_commit.clj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
(ns fluree.server.consensus.raft.handlers.new-commit
22
(:require [clojure.core.async :as async]
3-
[fluree.db.ledger :as ledger]
43
[fluree.db.storage :as storage]
54
[fluree.db.storage.file :as file-storage]
5+
[fluree.db.transact :as transact]
66
[fluree.db.util.async :refer [<? go-try]]
77
[fluree.db.util.bytes :as bytes]
88
[fluree.db.util.filesystem :as fs]
@@ -28,7 +28,7 @@
2828
(let [commit-json (-> (json/parse json false)
2929
;; address is not yet written into the commit file, add it
3030
(assoc "address" address))]
31-
(ledger/publish-commit conn commit-json)))
31+
(transact/publish-commit conn commit-json)))
3232

3333
(defn store-ledger-files
3434
"Persist both the data-file and commit-file to disk only if redundant

src/fluree/server/handlers/ledger.clj

Lines changed: 77 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,86 @@
11
(ns fluree.server.handlers.ledger
2-
(:require [fluree.db.api :as fluree]
2+
(:require [clojure.core.async :as async]
3+
[clojure.java.io :as io]
4+
[fluree.db.api :as fluree]
5+
[fluree.db.util :as util]
6+
[fluree.db.util.json :as json]
37
[fluree.db.util.log :as log]
48
[fluree.server.handler :as-alias handler]
5-
[fluree.server.handlers.shared :refer [defhandler deref!] :as shared]))
9+
[fluree.server.handlers.shared :refer [defhandler deref!] :as shared])
10+
(:import (ring.core.protocols StreamableResponseBody)))
11+
12+
(defn ndjson-streaming-body
13+
"Converts a core.async channel of results into a Ring StreamableResponseBody
14+
that writes NDJSON (newline-delimited JSON) to the output stream.
15+
16+
Handles:
17+
- Normal results: written as JSON objects
18+
- Exceptions: written as error objects with :error and :status keys
19+
- Metadata: {:_fluree-meta {:status 200, :fuel ..., :time ..., :policy ...}}
20+
written as final line when query tracking is enabled
21+
22+
Each item is written as a JSON line followed by newline.
23+
The stream closes when the channel closes."
24+
[result-ch]
25+
(reify StreamableResponseBody
26+
(write-body-to-stream [_ _ output-stream]
27+
(with-open [writer (io/writer output-stream)]
28+
(loop []
29+
(when-some [result (async/<!! result-ch)]
30+
(cond
31+
;; Exception - write error object and close stream
32+
(util/exception? result)
33+
(let [error-data (ex-data result)
34+
status (or (:status error-data) 500)
35+
error-obj {:error (ex-message result)
36+
:status status
37+
:data (dissoc error-data :status)}]
38+
(log/error result "Error during streaming query")
39+
(.write writer (json/stringify error-obj))
40+
(.write writer "\n")
41+
(.flush writer))
42+
43+
;; Metadata (final message from fluree/db) - write as-is
44+
(:_fluree-meta result)
45+
(do
46+
(.write writer (json/stringify result))
47+
(.write writer "\n")
48+
(.flush writer))
49+
50+
;; Normal result - write and continue
51+
:else
52+
(do
53+
(.write writer (json/stringify result))
54+
(.write writer "\n")
55+
(.flush writer)
56+
(recur)))))))))
657

758
(defhandler query
8-
[{:keys [fluree/conn fluree/opts] {:keys [body]} :parameters :as _req}]
59+
[{:keys [fluree/conn fluree/opts headers] {:keys [body]} :parameters :as _req}]
960
(let [query (or (::handler/query body) body)
10-
{:keys [status result] :as query-response}
11-
(deref! (fluree/query-connection conn query opts))]
12-
(log/debug "query handler received query:" query opts)
13-
(shared/with-tracking-headers {:status status, :body result}
14-
query-response)))
61+
accept (get headers "accept")
62+
streaming? (or (= accept "application/x-ndjson")
63+
(= accept "application/ndjson"))]
64+
65+
(if streaming?
66+
;; Streaming response - use NDJSON
67+
(do
68+
(log/debug "Executing streaming query:" query "with opts:" opts)
69+
(let [result-ch (fluree/query-connection-stream conn query opts)]
70+
;; Return 206 Partial Content to avoid response coercion middleware
71+
;; (only 200 responses are coerced against QueryResponse schema)
72+
{:status 206
73+
:headers {"content-type" "application/x-ndjson"
74+
"cache-control" "no-cache"
75+
"x-content-type-options" "nosniff"}
76+
:body (ndjson-streaming-body result-ch)}))
77+
78+
;; Buffered response (existing behavior)
79+
(let [{:keys [status result] :as query-response}
80+
(deref! (fluree/query-connection conn query opts))]
81+
(log/debug "query handler received query:" query opts)
82+
(shared/with-tracking-headers {:status status, :body result}
83+
query-response)))))
1584

1685
(defhandler history
1786
[{:keys [fluree/conn fluree/opts] {{ledger :from :as query} :body} :parameters :as _req}]
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
(ns fluree.server.integration.streaming-query-test
2+
(:require [clojure.string :as str]
3+
[clojure.test :refer [deftest is testing use-fixtures]]
4+
[fluree.db.util.json :as json]
5+
[fluree.server.integration.test-system
6+
:as test-system
7+
:refer [api-post create-rand-ledger json-headers run-test-server]]))
8+
9+
(use-fixtures :once run-test-server)
10+
11+
(deftest ^:integration streaming-query-test
12+
(testing "Query with Accept: application/x-ndjson returns NDJSON stream"
13+
(let [ledger-name (create-rand-ledger "streaming-query-test")
14+
;; Insert test data
15+
txn-req {:body
16+
(json/stringify
17+
{"ledger" ledger-name
18+
"@context" test-system/default-context
19+
"insert" [{"id" "ex:alice"
20+
"type" "schema:Person"
21+
"ex:name" "Alice"
22+
"ex:age" 30}
23+
{"id" "ex:bob"
24+
"type" "schema:Person"
25+
"ex:name" "Bob"
26+
"ex:age" 25}
27+
{"id" "ex:charlie"
28+
"type" "schema:Person"
29+
"ex:name" "Charlie"
30+
"ex:age" 35}]})
31+
:headers json-headers}
32+
txn-res (api-post :transact txn-req)
33+
_ (assert (= 200 (:status txn-res)))
34+
35+
;; Streaming query
36+
query-req {:body
37+
(json/stringify
38+
{"@context" test-system/default-context
39+
"from" ledger-name
40+
"select" ["?name" "?age"]
41+
"where" {"id" "?person"
42+
"type" "schema:Person"
43+
"ex:name" "?name"
44+
"ex:age" "?age"}})
45+
:headers (assoc json-headers "accept" "application/x-ndjson")}
46+
query-res (api-post :query query-req)]
47+
48+
(is (= 206 (:status query-res)))
49+
(is (= "application/x-ndjson" (get-in query-res [:headers "content-type"])))
50+
51+
;; Parse NDJSON response
52+
(let [lines (-> query-res :body str/split-lines)
53+
results (map #(json/parse % false) lines)
54+
;; Last line is metadata, rest are data
55+
data-results (butlast results)
56+
meta-result (last results)]
57+
;; Should get 3 data results + 1 metadata result
58+
(is (= 4 (count results)))
59+
;; Data results are vectors (tuples) in streaming mode: [name age]
60+
(is (every? vector? data-results))
61+
;; Metadata result contains :_fluree-meta key
62+
(is (map? meta-result))
63+
(is (contains? meta-result "_fluree-meta"))
64+
(is (= 200 (get-in meta-result ["_fluree-meta" "status"])))
65+
;; Verify all three people are in results (first element of each tuple is name)
66+
(is (= #{"Alice" "Bob" "Charlie"}
67+
(set (map first data-results)))))))
68+
69+
(testing "Query without Accept header returns buffered JSON array"
70+
(let [ledger-name (create-rand-ledger "buffered-query-test")
71+
txn-req {:body
72+
(json/stringify
73+
{"ledger" ledger-name
74+
"@context" test-system/default-context
75+
"insert" {"id" "ex:test"
76+
"type" "schema:Test"
77+
"ex:name" "test"}})
78+
:headers json-headers}
79+
txn-res (api-post :transact txn-req)
80+
_ (assert (= 200 (:status txn-res)))
81+
82+
query-req {:body
83+
(json/stringify
84+
{"@context" test-system/default-context
85+
"from" ledger-name
86+
"select" ["?name"]
87+
"where" {"id" "?person"
88+
"ex:name" "?name"}})
89+
:headers json-headers}
90+
query-res (api-post :query query-req)]
91+
92+
(is (= 200 (:status query-res)))
93+
(is (str/includes? (get-in query-res [:headers "content-type"]) "application/json"))
94+
95+
(let [results (json/parse (:body query-res) false)]
96+
(is (vector? results))
97+
(is (= 1 (count results))))))
98+
99+
(testing "Streaming query with metadata tracking"
100+
(let [ledger-name (create-rand-ledger "streaming-meta-test")
101+
txn-req {:body
102+
(json/stringify
103+
{"ledger" ledger-name
104+
"@context" test-system/default-context
105+
"insert" {"id" "ex:test"
106+
"type" "schema:Test"
107+
"ex:name" "test"}})
108+
:headers json-headers}
109+
txn-res (api-post :transact txn-req)
110+
_ (assert (= 200 (:status txn-res)))
111+
112+
query-req {:body
113+
(json/stringify
114+
{"@context" test-system/default-context
115+
"from" ledger-name
116+
"select" ["?name"]
117+
"where" {"id" "?person"
118+
"ex:name" "?name"}})
119+
:headers (assoc json-headers
120+
"accept" "application/x-ndjson"
121+
"fluree-track-fuel" "true"
122+
"fluree-track-time" "true")}
123+
query-res (api-post :query query-req)]
124+
125+
(is (= 206 (:status query-res)))
126+
127+
(let [lines (-> query-res :body str/split-lines)
128+
results (map #(json/parse % false) lines)
129+
data-results (butlast results)
130+
meta-result (last results)]
131+
;; Last line should be metadata with _fluree-meta key
132+
(is (contains? meta-result "_fluree-meta"))
133+
(is (contains? (get meta-result "_fluree-meta") "fuel"))
134+
(is (contains? (get meta-result "_fluree-meta") "time"))
135+
;; All other lines should be vectors (tuples)
136+
(is (every? vector? data-results)))))
137+
138+
(testing "Streaming query with LIMIT"
139+
(let [ledger-name (create-rand-ledger "streaming-limit-test")
140+
txn-req {:body
141+
(json/stringify
142+
{"ledger" ledger-name
143+
"@context" test-system/default-context
144+
"insert" (for [i (range 10)]
145+
{"id" (str "ex:person" i)
146+
"type" "schema:Person"
147+
"ex:name" (str "Person " i)})})
148+
:headers json-headers}
149+
txn-res (api-post :transact txn-req)
150+
_ (assert (= 200 (:status txn-res)))
151+
152+
query-req {:body
153+
(json/stringify
154+
{"@context" test-system/default-context
155+
"from" ledger-name
156+
"select" ["?name"]
157+
"where" {"id" "?person"
158+
"type" "schema:Person"
159+
"ex:name" "?name"}
160+
"limit" 3})
161+
:headers (assoc json-headers "accept" "application/x-ndjson")}
162+
query-res (api-post :query query-req)]
163+
164+
(is (= 206 (:status query-res)))
165+
166+
(let [lines (-> query-res :body str/split-lines)
167+
results (map #(json/parse % false) lines)
168+
data-results (butlast results)
169+
meta-result (last results)]
170+
;; Should get 3 data results + 1 metadata result (LIMIT 3)
171+
(is (= 4 (count results)))
172+
(is (= 3 (count data-results)))
173+
(is (contains? meta-result "_fluree-meta"))))))

0 commit comments

Comments
 (0)