Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions deps.edn
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{:deps {org.clojure/clojure {:mvn/version "1.11.3"}
org.clojure/core.async {:mvn/version "1.6.681"}
com.fluree/db {:git/url "https://github.com/fluree/db.git"
:git/sha "f5f76fb2373b712aeee96574e1bbc2cd0277fce1"}
:git/sha "8925f9e677a9965aa7e1a9eb0fed0769f24ea804"}
com.fluree/json-ld {:git/url "https://github.com/fluree/json-ld.git"
:git/sha "74083536c84d77f8cdd4b686b5661714010baad3"}

Expand All @@ -23,8 +23,10 @@
camel-snake-kebab/camel-snake-kebab {:mvn/version "0.4.3"}

;; logging
ch.qos.logback/logback-classic {:mvn/version "1.5.6"}
org.slf4j/slf4j-api {:mvn/version "2.0.13"}
ch.qos.logback/logback-classic {:mvn/version "1.5.6"}
net.logstash.logback/logstash-logback-encoder {:mvn/version "9.0"}
org.slf4j/slf4j-api {:mvn/version "2.0.13"}
com.github.steffan-westcott/clj-otel-api {:mvn/version "0.2.10"}

;; http
;; ring-jetty9-adapter 0.30.x+ uses Jetty 12 & requires JDK 17+
Expand All @@ -50,8 +52,15 @@
:extra-deps {org.clojure/tools.namespace {:mvn/version "1.5.0"}
clj-http/clj-http {:mvn/version "3.13.0"}
criterium/criterium {:mvn/version "0.4.6"}
integrant/repl {:mvn/version "0.3.3"}}
:jvm-opts ["-Djdk.attach.allowAttachSelf"]}
integrant/repl {:mvn/version "0.3.3"}
com.taoensso/trove {:mvn/version "1.1.0"}
com.taoensso/telemere {:mvn/version "1.2.0"}
io.opentelemetry/opentelemetry-api {:mvn/version "1.56.0"}}
:jvm-opts ["-Djdk.attach.allowAttachSelf"
"-javaagent:/home/dan/scratch/learn-otel/opentelemetry-javaagent.jar"
"-Dotel.exporter.otlp.endpoint=http://localhost:4318"
"-Dotel.exporter.otlp.protocol=http/protobuf"
"-Dotel.service.name=fluree-server"]}

:run-dev
{:main-opts ["-m" "fluree.server" "--profile" "dev"]}
Expand Down
26 changes: 13 additions & 13 deletions resources/logback.xml
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
<configuration>

<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
<!-- <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> -->
<!-- <encoder> -->
<!-- <pattern> -->
<!-- %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n -->
<!-- </pattern> -->
<!-- </encoder> -->
<!-- </appender> -->

<logger name="fluree" level="info" additivity="false">
<appender-ref ref="CONSOLE"/>
</logger>
<!-- <logger name="fluree" level="info" additivity="false"> -->
<!-- <appender-ref ref="CONSOLE"/> -->
<!-- </logger> -->

<root level="error">
<appender-ref ref="CONSOLE"/>
</root>
<!-- <root level="error"> -->
<!-- <appender-ref ref="CONSOLE"/> -->
<!-- </root> -->

</configuration>
3 changes: 2 additions & 1 deletion src/fluree/server/broadcast/subscriptions.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
[fluree.db.util.log :as log]
[fluree.server.broadcast :refer [Broadcaster]]
[fluree.server.consensus.events :as events]
[ring.adapter.jetty9.websocket :as ws])
[ring.adapter.jetty9.websocket :as ws]
[steffan-westcott.clj-otel.context :as otel-context])

Check warning on line 8 in src/fluree/server/broadcast/subscriptions.clj

View workflow job for this annotation

GitHub Actions / clj-kondo lint

namespace steffan-westcott.clj-otel.context is required but never used
(:import (java.io Closeable IOException)
(java.nio.channels ClosedChannelException)))

Expand Down
21 changes: 14 additions & 7 deletions src/fluree/server/consensus/events.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
[fluree.db.ledger :as ledger]
[fluree.db.track :as-alias track]
[fluree.db.util.async :refer [<? go-try]]
[fluree.db.util.log :as log]))
[fluree.db.util.log :as log]
[steffan-westcott.clj-otel.context :as otel-context]))

(defn event-type
[event]
Expand All @@ -28,6 +29,10 @@
[x]
(string? x))

(defn with-otel-ctx
[evt]
(assoc evt :otel/context (otel-context/dyn)))

(defn with-txn
[evt txn]
(cond (nil? txn)
Expand Down Expand Up @@ -65,14 +70,16 @@
:ledger-id ledger-id
:opts opts
:instant (System/currentTimeMillis)}]
(with-txn evt txn)))
(-> evt
(with-txn txn)
(with-otel-ctx))))

(defn drop-ledger
"Create a new event message to drop an existing ledger."
[ledger-id]
{:type :ledger-drop
:ledger-id ledger-id
:instant (System/currentTimeMillis)})
(with-otel-ctx {:type :ledger-drop
:ledger-id ledger-id
:instant (System/currentTimeMillis)}))

(defn commit-transaction
"Create a new event message to commit a new transaction. The `txn` argument may
Expand All @@ -85,8 +92,8 @@
:opts opts
:instant (System/currentTimeMillis)}]
(if (= :turtle (:format opts))
(with-turtle-txn evt txn)
(with-txn evt txn))))
(with-otel-ctx (with-turtle-txn evt txn))
(with-otel-ctx (with-txn evt txn)))))

(defn get-txn
"Gets the transaction value, either a transaction document or the storage
Expand Down
31 changes: 18 additions & 13 deletions src/fluree/server/consensus/standalone.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,37 @@
[fluree.server.consensus.events :as events]
[fluree.server.consensus.response :as response]
[fluree.server.consensus.shared.create :as shared-create]
[fluree.server.handlers.shared :refer [deref!]]))
[fluree.server.handlers.shared :refer [deref!]]
[steffan-westcott.clj-otel.api.trace.span :as span]))

(set! *warn-on-reflection* true)

(defn create-ledger!
[conn watcher broadcaster {:keys [ledger-id tx-id txn opts] :as _params}]
[conn watcher broadcaster {:keys [ledger-id tx-id txn opts otel/context] :as _params}]
(go-try
(let [commit-result (if txn
(deref! (fluree/create-with-txn conn txn opts))
(let [db (deref! (fluree/create conn ledger-id opts))]
(shared-create/genesis-result db)))]
(let [commit-result
(span/with-span! {:parent context :name ::process-event}
(if txn
(deref! (fluree/create-with-txn conn txn opts))
(let [db (deref! (fluree/create conn ledger-id opts))]
(shared-create/genesis-result db))))]
(response/announce-new-ledger watcher broadcaster ledger-id tx-id commit-result))))

(defn drop-ledger!
[conn watcher broadcaster {:keys [ledger-id] :as _params}]
[conn watcher broadcaster {:keys [ledger-id otel/context] :as _params}]
(go-try
(let [drop-result (deref! (fluree/drop conn ledger-id))]
(let [drop-result (span/with-span! {:name ::process-event :parent context}
(deref! (fluree/drop conn ledger-id)))]
(response/announce-dropped-ledger watcher broadcaster ledger-id drop-result))))

(defn transact!
[conn watcher broadcaster {:keys [ledger-id tx-id txn opts]}]
[conn watcher broadcaster {:keys [ledger-id tx-id txn opts otel/context]}]
(go-try
(let [commit-result (case (:op opts)
:update (deref! (fluree/update! conn ledger-id txn opts))
:upsert (deref! (fluree/upsert! conn ledger-id txn opts))
:insert (deref! (fluree/insert! conn ledger-id txn opts)))]
(let [commit-result (span/with-span! {:name ::process-event :parent context}
(case (:op opts)
:update (deref! (fluree/update! conn ledger-id txn opts))
:upsert (deref! (fluree/upsert! conn ledger-id txn opts))
:insert (deref! (fluree/insert! conn ledger-id txn opts))))]
(response/announce-commit watcher broadcaster ledger-id tx-id commit-result))))

(defn process-event
Expand Down
6 changes: 4 additions & 2 deletions src/fluree/server/handler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
[reitit.swagger :as swagger]
[reitit.swagger-ui :as swagger-ui]
[ring.adapter.jetty9 :as http]
[ring.middleware.cors :as rmc])
[ring.middleware.cors :as rmc]
[steffan-westcott.clj-otel.api.trace.http :as trace-http])
(:import (java.io InputStream)))

(set! *warn-on-reflection* true)
Expand Down Expand Up @@ -656,4 +657,5 @@
app-routes (cond-> ["" {:middleware app-middleware} default-fluree-routes]
(seq custom-routes) (conj custom-routes))
router (app-router app-routes)]
(ring/ring-handler router fallback-handler))))
(trace-http/wrap-server-span
(ring/ring-handler router fallback-handler)))))
6 changes: 4 additions & 2 deletions src/fluree/server/handlers/create.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
[fluree.server.consensus :as consensus]
[fluree.server.handlers.shared :as shared :refer [deref! defhandler]]
[fluree.server.handlers.transact :as srv-tx]
[fluree.server.watcher :as watcher]))
[fluree.server.watcher :as watcher]
[steffan-westcott.clj-otel.api.trace.span :as span]))

(set! *warn-on-reflection* true)

Expand Down Expand Up @@ -50,7 +51,8 @@
(throw (ex-info "Ledger ID must be provided"
{:status 400 :error :db/invalid-ledger-id})))
opts* (prepare-create-options opts)
commit-event (deref! (create-ledger! consensus watcher ledger-id txn opts*))
commit-event (span/with-span! {:name ::create-handler}
(deref! (create-ledger! consensus watcher ledger-id txn opts*)))
response-body (srv-tx/commit-event->response-body commit-event)]
(shared/with-tracking-headers {:status 201, :body response-body}
commit-event)))
6 changes: 4 additions & 2 deletions src/fluree/server/handlers/drop.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
(:require [fluree.db.util.log :as log]
[fluree.server.consensus :as consensus]
[fluree.server.handlers.shared :as shared :refer [deref! defhandler]]
[fluree.server.watcher :as watcher]))
[fluree.server.watcher :as watcher]
[steffan-westcott.clj-otel.api.trace.span :as span]))

(set! *warn-on-reflection* true)

Expand All @@ -24,5 +25,6 @@
{:keys [body]} :parameters}]
(log/debug "drop body:" body)
(let [ledger-id (:ledger body)
resp-p (drop-ledger consensus watcher ledger-id)]
resp-p (span/with-span! {:name ::drop-handler}
(drop-ledger consensus watcher ledger-id))]
{:status 200, :body (deref! resp-p)}))
9 changes: 6 additions & 3 deletions src/fluree/server/handlers/ledger.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,26 @@
(:require [fluree.db.api :as fluree]
[fluree.db.util.log :as log]
[fluree.server.handler :as-alias handler]
[fluree.server.handlers.shared :refer [defhandler deref!] :as shared]))
[fluree.server.handlers.shared :refer [defhandler deref!] :as shared]
[steffan-westcott.clj-otel.api.trace.span :as span]))

(defhandler query
[{:keys [fluree/conn fluree/opts] {:keys [body path]} :parameters :as _req}]
(let [query (or (::handler/query body) body)
;; supply ledger-alias from path params if not overridden by a header
opts* (update opts :ledger #(or % (:ledger-alias path)))
{:keys [status result] :as query-response}
(deref! (fluree/query-connection conn query opts*))]
(span/with-span! {:name ::query-handler}
(deref! (fluree/query-connection conn query opts*)))]
(log/debug "query handler received query:" query opts*)
(shared/with-tracking-headers {:status status, :body result}
query-response)))

(defhandler history
[{:keys [fluree/conn fluree/opts] {{ledger :from :as query} :body} :parameters :as _req}]
(let [query* (dissoc query :from)
result (deref! (fluree/history conn ledger query* opts))]
result (span/with-span! {:name ::history-handler}
(deref! (fluree/history conn ledger query* opts)))]
(log/debug "history handler received query:" query opts "result:" result)
;; fluree/history may return either raw result or wrapped in {:status :result}
(if (and (map? result) (:status result) (:result result))
Expand Down
12 changes: 8 additions & 4 deletions src/fluree/server/handlers/transact.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
[fluree.json-ld :as json-ld]
[fluree.server.consensus :as consensus]
[fluree.server.handlers.shared :as shared :refer [defhandler deref!]]
[fluree.server.watcher :as watcher]))
[fluree.server.watcher :as watcher]
[steffan-westcott.clj-otel.api.trace.span :as span]))

(set! *warn-on-reflection* true)

Expand Down Expand Up @@ -63,7 +64,8 @@
raw-txn (assoc :raw-txn raw-txn)
did (assoc :did did))
{:keys [status] :as commit-event}
(deref! (transact! consensus watcher ledger-id txn-with-ledger opts*))
(span/with-span! {:name ::update-handler}
(deref! (transact! consensus watcher ledger-id txn-with-ledger opts*)))

body (commit-event->response-body commit-event)]
(shared/with-tracking-headers {:status status, :body body}
Expand All @@ -77,7 +79,8 @@
raw-txn (assoc :raw-txn raw-txn)
did (assoc :identity did))
{:keys [status] :as commit-event}
(deref! (transact! consensus watcher ledger-id insert-txn opts*))
(span/with-span! {:name ::insert-handler}
(deref! (transact! consensus watcher ledger-id insert-txn opts*)))

body (commit-event->response-body commit-event)]
(shared/with-tracking-headers {:status status, :body body}
Expand All @@ -91,7 +94,8 @@
raw-txn (assoc :raw-txn raw-txn)
did (assoc :identity did))
{:keys [status] :as commit-event}
(deref! (transact! consensus watcher ledger-id upsert-txn opts*))
(span/with-span! {:name ::upsert-handler}
(deref! (transact! consensus watcher ledger-id upsert-txn opts*)))

body (commit-event->response-body commit-event)]
(shared/with-tracking-headers {:status status, :body body}
Expand Down
Loading