Skip to content

Commit

Permalink
added more documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
zookzook committed Sep 18, 2019
1 parent 3024046 commit 7583705
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 34 deletions.
2 changes: 1 addition & 1 deletion lib/bson.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule BSON do

@type t :: document | String.t | atom | number | boolean | BSON.Binary.t |
BSON.ObjectId.t | BSON.Regex.t |
BSON.JavaScript.t | BSON.Timestamp.t | [t]
BSON.JavaScript.t | BSON.Timestamp.t | BSON.LongNumber.t | [t]
@type document :: %{atom => BSON.t} | %{String.t => BSON.t} |
[{atom, BSON.t}] | [{String.t, BSON.t}]

Expand Down
140 changes: 107 additions & 33 deletions lib/mongo/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ defmodule Mongo.Session do

@moduledoc """
This module implements the details of the transactions api ([see specs](https://github.com/mongodb/specifications/blob/master/source/transactions/transactions.rst#committransaction)).
It uses the `:gen_statem` behaviour ([A nice tutorial](https://andrealeopardi.com/posts/connection-managers-with-gen_statem/)) to manage the different states.
It uses the `:gen_statem` behaviour ([a nice introduction](https://andrealeopardi.com/posts/connection-managers-with-gen_statem/)) to manage the different states.
In case of MongoDB 3.6 or greater the driver uses sessions for each operation. If no session is created the driver will create a so-called implict session. A session is a UUID-Number which
is added to some operations. The sessions are used to manage the transaction state as well. In most situation you need not to create a session instance, so the interface of the driver is not changed.
In case of MongoDB 3.6 or greater the driver uses sessions for each operation. If no session is created the driver will create a so-called implicit session. A session is a UUID-Number which
is added to some operations. The sessions are used to manage the transaction state as well. In most situation you need not to create a session instance, so the api of the driver is not changed.
In case of multiple insert statemantes you can use transaction (MongoDB 4.x) to be sure that all operations are grouped like a single operation. Prerequisites for transactions are:
MongoDB 4.x must be used as replica set or cluster deployment. The collection used in the operations must already exist. Some operation are not allowed (For example: create index or call count).
Expand All @@ -24,7 +24,7 @@ defmodule Mongo.Session do
:ok = Session.commit_transaction(session)
:ok = Session.end_session(top, session)
First you start a explicit session and a transactions. Use need to use the session for each insert statement as an options with key `:session` otherwise the insert statement won't be
First you start a explicit session and a transactions. Use the session for each insert statement as an options with key `:session` otherwise the insert statement won't be
executed in the transaction. After that you commit the transaction and end the session by calling `end_session`.
## Convenient API for Transactions
Expand All @@ -42,7 +42,59 @@ defmodule Mongo.Session do
{:ok, [id1, id2, id3]}
end, w: 1)
If the callback is successfull then it returns a tupel with the keyword `:ok` and a used defined result like `{:ok, [id1, id2, id3]}`
If the callback is successfull then it returns a tupel with the keyword `:ok` and a used defined result like `{:ok, [id1, id2, id3]}`. In this example we use
the write concern `w: 1`. The write concern used in the insert operation will be removed by the driver. It is applied in the commit transaction command.
## Implicit vs explicit sessions
In most cases the driver will create implicit sessions for you. Each time when you run a query or a command the driver
executes the following functions:
with {:ok, session} <- Session.start_implicit_session(topology_pid, type, opts),
result <- exec_command_session(session, new_cmd, opts),
:ok <- Session.end_implict_session(topology_pid, session) do
...
This behaviour is specified by the mongodb specification for [drivers](https://github.com/mongodb/specifications/blob/master/source/sessions/driver-sessions.rst#explicit-vs-implicit-sessions).
If you use the `:causal_consistency` flag, then you need to create an explicit session:
alias Mongo.Session
{:ok, session} = Session.start_session(top, :write, causal_consistency: true)
Mongo.delete_many(top, "dogs", %{"Greta"}, session: session)
{:ok, 0} = Mongo.count(top, "dogs", %{name: "Greta"}, session: session)
:ok = Session.end_session(top, session)
For more information about causal consistency see the [officially documentation](https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#causal-consistency).
If you want to use transaction, then you need to create a session as well:
alias Mongo.Session
{:ok, session} = Session.start_session(top, :write, [])
:ok = Session.start_transaction(session)
Mongo.insert_one(top, "dogs", %{name: "Greta"}, session: session)
Mongo.insert_one(top, "dogs", %{name: "Waldo"}, session: session)
Mongo.insert_one(top, "dogs", %{name: "Tom"}, session: session)
:ok = Session.commit_transaction(session)
:ok = Session.end_session(top, session)
You can shorten this code by using the `with_transaction` function:
alias Mongo.Session
{:ok, ids} = Session.with_transaction(top, fn opts ->
{:ok, %InsertOneResult{:inserted_id => id1}} = Mongo.insert_one(top, "dogs", %{name: "Greta"}, opts)
{:ok, %InsertOneResult{:inserted_id => id2}} = Mongo.insert_one(top, "dogs", %{name: "Waldo"}, opts)
{:ok, %InsertOneResult{:inserted_id => id3}} = Mongo.insert_one(top, "dogs", %{name: "Tom"}, opts)
{:ok, [id1, id2, id3]}
end, w: 1)
"""

@behaviour :gen_statem
Expand Down Expand Up @@ -101,18 +153,11 @@ defmodule Mongo.Session do
end
end

@doc """
Start a new transation.
"""
@spec start_transaction(Session.t) :: :ok | {:error, term()}
def start_transaction(pid) do
:gen_statem.call(pid, {:start_transaction})
end

@doc """
Start a new implicit session only if no explicit session exists. It returns the session in the `opts` keyword list or
creates a new one.
"""
@spec start_implicit_session(GenServer.server, atom, keyword()) :: {:ok, Session.t} | {:error, term()}
def start_implicit_session(topology_pid, type, opts) do
case Keyword.get(opts, :session, nil) do
nil ->
Expand All @@ -128,22 +173,33 @@ defmodule Mongo.Session do
end

@doc """
Commit the current transation
Start a new transation.
"""
@spec start_transaction(Session.t) :: :ok | {:error, term()}
def start_transaction(pid) do
:gen_statem.call(pid, {:start_transaction})
end

@doc """
Commit the current transation.
"""
@spec commit_transaction(Session.t) :: :ok | {:error, term()}
def commit_transaction(pid) do
:gen_statem.call(pid, {:commit_transaction})
end

@doc """
Abort the current transation and rollback all updates.
Abort the current transation and rollback all changes.
"""
@spec abort_transaction(Session.t) :: :ok | {:error, term()}
def abort_transaction(pid) do
:gen_statem.call(pid, {:abort_transaction})
end

@doc """
Merge the session / transaction data into the cmd.
Merge the session / transaction data into the cmd. There is no need to call this function directly. It is called automatically.
"""
@spec bind_session(Session.t, BSON.document) :: :ok | {:error, term()}
def bind_session(nil, cmd) do
cmd
end
Expand All @@ -152,8 +208,10 @@ defmodule Mongo.Session do
end

@doc """
Update the `operationTime` for causally consistent read commands
Update the `operationTime` for causally consistent read commands. There is no need to call this function directly. It is called automatically.
"""
@spec update_session(Session.t, %{key: BSON.Timestamp.t}, keyword()) :: BSON.document
def update_session(pid, doc, opts \\ [])
def update_session(pid, %{"operationTime" => operationTime} = doc, opts) do
case opts |> write_concern() |> acknowledged?() do
true -> advance_operation_time(pid, operationTime)
Expand All @@ -168,13 +226,15 @@ defmodule Mongo.Session do
@doc """
Advance the `operationTime` for causally consistent read commands
"""
@spec advance_operation_time(Session.t, BSON.Timestamp.t) :: none()
def advance_operation_time(pid, timestamp) do
:gen_statem.cast(pid, {:advance_operation_time, timestamp})
end

@doc """
End implicit session
End implicit session. There is no need to call this function directly. It is called automatically.
"""
@spec end_implict_session(GenServer.server, Session.t) :: :ok | :error
def end_implict_session(topology_pid, session) do
with {:ok, session_server} <- :gen_statem.call(session, {:end_implicit_session}) do
Topology.checkin_session(topology_pid, session_server)
Expand All @@ -185,17 +245,30 @@ defmodule Mongo.Session do
end

@doc """
End explicit session
End explicit session.
"""
@spec end_session(GenServer.server, Session.t) :: :ok | :error
def end_session(topology_pid, session) do
with {:ok, session_server} <- :gen_statem.call(session, {:end_session}) do
Topology.checkin_session(topology_pid, session_server)
end
end

@doc """
Convient function for running multiple write commands in a transaction
Convenient function for running multiple write commands in a transaction.
## Example
alias Mongo.Session
{:ok, ids} = Session.with_transaction(top, fn opts ->
{:ok, %InsertOneResult{:inserted_id => id1}} = Mongo.insert_one(top, "dogs", %{name: "Greta"}, opts)
{:ok, %InsertOneResult{:inserted_id => id2}} = Mongo.insert_one(top, "dogs", %{name: "Waldo"}, opts)
{:ok, %InsertOneResult{:inserted_id => id3}} = Mongo.insert_one(top, "dogs", %{name: "Tom"}, opts)
{:ok, [id1, id2, id3]}
end, w: 1)
"""
@spec with_transaction(Session.t, (keyword() -> {:ok, any()} | :error)) :: {:ok, any()} | :error | {:error, term}
def with_transaction(topology_pid, fun, opts \\ []) do

with {:ok, session} <- Session.start_session(topology_pid, :write, opts),
Expand Down Expand Up @@ -232,26 +305,30 @@ defmodule Mongo.Session do
end

@doc """
Return the connection used in the session
Return the connection used in the session.
"""
@spec connection(Session.t) :: pid
def connection(pid) do
:gen_statem.call(pid, {:connection})
end

@doc """
Return the server session used in the session
Return the server session used in the session.
"""
@spec server_session(Session.t) :: ServerSession.t
def server_session(pid) do
:gen_statem.call(pid, {:server_session})
end

@doc"""
Check if the session is alive
Check if the session is alive.
"""
@spec server_session(Session.t) :: boolean()
def alive?(nil), do: false
def alive?(pid), do: Process.alive?(pid)

@impl true
# Initialization of the state machine
def init({conn, server_session, type, wire_version, opts}) do
data = %Session{conn: conn,
server_session: server_session,
Expand All @@ -263,6 +340,7 @@ defmodule Mongo.Session do
end

@impl true
# start the transaction
def handle_event({:call, from},
{:start_transaction},
transaction,
Expand Down Expand Up @@ -345,7 +423,6 @@ defmodule Mongo.Session do
def handle_event({:call, from}, {:end_implicit_session}, _state, %Session{implicit: false}) do
{:keep_state_and_data, {:reply, from, :noop}}
end

def handle_event({:call, from}, {:server_session}, _state, %Session{server_session: session_server, implicit: implicit}) do
{:keep_state_and_data, {:reply, from, {:ok, session_server, implicit}}}
end
Expand All @@ -368,6 +445,9 @@ defmodule Mongo.Session do
Logger.debug("Terminating because of #{inspect reason}")
end

##
# Run the commit transaction command.
#
defp run_commit_command(%{conn: conn, server_session: %ServerSession{session_id: id, txn_num: txn_num}, opts: opts}) do

Logger.debug("Running commit transaction")
Expand All @@ -387,12 +467,12 @@ defmodule Mongo.Session do

_doc = Mongo.exec_command(conn, cmd, database: "admin")

# {:ok, %{"$clusterTime" => %{"clusterTime" => #BSON.Timestamp<1567853627:8>,
# "signature" => %{"hash" => #BSON.Binary<0000000000000000000000000000000000000000>, "keyId" => 0}},
# "ok" => 1.0, "operationTime" => #BSON.Timestamp<1567853627:6>}}
:ok
end

##
# Run the abort transaction command.
#
defp run_abort_command(%{conn: conn, server_session: %ServerSession{session_id: id, txn_num: txn_num}, opts: opts}) do

Logger.debug("Running abort transaction")
Expand All @@ -407,12 +487,6 @@ defmodule Mongo.Session do

_doc = Mongo.exec_command(conn, cmd, database: "admin")

#
# doc:
# %{"$clusterTime" => %{"clusterTime" => #BSON.Timestamp<1567853164:4>,
# "signature" => %{"hash" => #BSON.Binary<0000000000000000000000000000000000000000>, "keyId" => 0}},
#"ok" => 1.0, "operationTime" => #BSON.Timestamp<1567853164:4>}

:ok
end

Expand Down

0 comments on commit 7583705

Please sign in to comment.