diff --git a/guides/introduction/installation.md b/guides/introduction/installation.md index 58cc527..51d91ae 100644 --- a/guides/introduction/installation.md +++ b/guides/introduction/installation.md @@ -6,7 +6,7 @@ #mix.exs def deps do [ - {:bigtable, "~> 0.1.0"}, + {:bigtable, "~> 0.7.0"}, ] end ``` @@ -18,11 +18,14 @@ end ```elixir #dev.exs config :bigtable, - project: "project_id", - instance: "instance_id", - table: "table_name", - endpoint: "localhost:8086" + project: "project", + instance: "instance", + table: "table_name", # Default table name to use in requests + endpoint: "localhost:9035", ssl: false + +config :goth, + disabled: true ``` #### Production Configuration @@ -32,5 +35,10 @@ config :bigtable, config :bigtable, project: "project_id", instance: "instance_id", - table: "table_name" + # Default table name to use in requests + table: "table_name", + # Optional connection pool size. Defaults to 128 + pool_size: 128, + # Optional connection pool overflow when size is exceeded + pool_overflow: 128 ``` diff --git a/lib/admin/table_admin/gc_rule.ex b/lib/admin/table_admin/gc_rule.ex index ee4f528..2488554 100644 --- a/lib/admin/table_admin/gc_rule.ex +++ b/lib/admin/table_admin/gc_rule.ex @@ -1,4 +1,7 @@ defmodule Bigtable.Admin.GcRule do + @moduledoc """ + Provides functions for creating garbage collection rules + """ alias Google.Bigtable.Admin.V2 alias Google.Protobuf.Duration alias V2.GcRule.{Intersection, Union} diff --git a/lib/admin/table_admin/modification.ex b/lib/admin/table_admin/modification.ex index 55229c2..bcc8cb4 100644 --- a/lib/admin/table_admin/modification.ex +++ b/lib/admin/table_admin/modification.ex @@ -1,4 +1,5 @@ defmodule Bigtable.Admin.Modification do - def create(id) do + @moduledoc false + def create(_id) do end end diff --git a/lib/admin/table_admin/table.ex b/lib/admin/table_admin/table.ex index a39b158..137867a 100644 --- a/lib/admin/table_admin/table.ex +++ b/lib/admin/table_admin/table.ex @@ -1,4 +1,8 @@ defmodule Bigtable.Admin.Table do + @moduledoc """ + Provides functionality for building `Google.Bigtable.Admin.V2.Table`. + """ + alias Google.Bigtable.Admin.V2 def build(column_families) when is_map(column_families) do diff --git a/lib/admin/table_admin/table_admin.ex b/lib/admin/table_admin/table_admin.ex index 42a21d5..e4135e2 100644 --- a/lib/admin/table_admin/table_admin.ex +++ b/lib/admin/table_admin/table_admin.ex @@ -2,14 +2,15 @@ defmodule Bigtable.Admin.TableAdmin do @moduledoc """ Provides functions to build `Google.Bigtable.Admin.V2.ListTablesRequest` and submit them to Bigtable. """ - alias Bigtable.Utils + alias Bigtable.{Request, Utils} alias Google.Bigtable.Admin.V2 alias V2.BigtableTableAdmin.Stub def list_tables(opts \\ []) do - Keyword.put_new(opts, :parent, Utils.configured_instance_name()) + opts + |> Keyword.put_new(:parent, Utils.configured_instance_name()) |> V2.ListTablesRequest.new() - |> Utils.process_request(&Stub.list_tables/3) + |> Request.process_request(&Stub.list_tables/3) end def create_table(table, table_id, opts \\ []) do @@ -19,16 +20,16 @@ defmodule Bigtable.Admin.TableAdmin do table: table, initial_splits: Keyword.get(opts, :initial_splits, []) ) - |> Utils.process_request(&Stub.create_table/3) + |> Request.process_request(&Stub.create_table/3) end def delete_table(name) do V2.DeleteTableRequest.new(name: name) - |> Utils.process_request(&Stub.delete_table/3) + |> Request.process_request(&Stub.delete_table/3) end def get_table(name) do V2.GetTableRequest.new(name: name) - |> Utils.process_request(&Stub.get_table/3) + |> Request.process_request(&Stub.get_table/3) end end diff --git a/lib/connection/auth.ex b/lib/auth.ex similarity index 97% rename from lib/connection/auth.ex rename to lib/auth.ex index d0f59cb..b3ca805 100644 --- a/lib/connection/auth.ex +++ b/lib/auth.ex @@ -1,4 +1,4 @@ -defmodule Bigtable.Connection.Auth do +defmodule Bigtable.Auth do @moduledoc false @scopes [ diff --git a/lib/bigtable.ex b/lib/bigtable.ex index 0a81211..c4441c4 100644 --- a/lib/bigtable.ex +++ b/lib/bigtable.ex @@ -6,8 +6,16 @@ defmodule Bigtable do @doc false def start(_type, _args) do + poolboy_config = [ + {:name, {:local, :connection_pool}}, + {:worker_module, Bigtable.Connection.Worker}, + {:size, Application.get_env(:bigtable, :pool_size, 128)}, + {:max_overflow, Application.get_env(:bigtable, :pool_overflow, 0)} + ] + children = [ - Bigtable.Supervisor + Bigtable.Supervisor, + :poolboy.child_spec(:connection_pool, poolboy_config, []) ] opts = [strategy: :one_for_one, name: Bigtable] diff --git a/lib/connection.ex b/lib/connection.ex new file mode 100644 index 0000000..a482ac4 --- /dev/null +++ b/lib/connection.ex @@ -0,0 +1,77 @@ +defmodule Bigtable.Connection do + @moduledoc false + + use GenServer + @default_endpoint "bigtable.googleapis.com:443" + + ## Client API + def start_link(_opts) do + GenServer.start_link(__MODULE__, :ok, name: __MODULE__) + end + + @doc """ + Connects to Bigtable and returns a `GRPC.Channel`. + """ + @spec connect() :: GRPC.Channel.t() + def connect do + GenServer.call(__MODULE__, :connect) + end + + @doc """ + Disconnects from the provided `GRPC.Channel`. + """ + @spec disconnect(GRPC.Channel.t()) :: :ok + def disconnect(channel) do + GenServer.cast(__MODULE__, {:disconnect, channel}) + end + + # Server Callbacks + @spec init(:ok) :: {:ok, map()} + def init(:ok) do + {:ok, %{endpoint: get_endpoint(), opts: build_opts()}} + end + + def handle_call(:connect, _from, %{endpoint: endpoint, opts: opts} = state) do + {:ok, channel} = + GRPC.Stub.connect( + endpoint, + opts + ) + + {:reply, channel, state} + end + + def handle_cast({:disconnect, channel}, state) do + GRPC.Stub.disconnect(channel) + {:noreply, state} + end + + def handle_info(_msg, state) do + {:noreply, state} + end + + @spec build_opts() :: list() + defp build_opts do + if Application.get_env(:bigtable, :ssl, true) do + [ + cred: %GRPC.Credential{ + ssl: [] + } + ] + else + [] + end + end + + @spec get_endpoint() :: binary() + def get_endpoint do + emulator = System.get_env("BIGTABLE_EMULATOR_HOST") + endpoint = Application.get_env(:bigtable, :endpoint, @default_endpoint) + + if emulator != nil do + emulator + else + endpoint + end + end +end diff --git a/lib/connection/connection.ex b/lib/connection/connection.ex deleted file mode 100644 index 83052cf..0000000 --- a/lib/connection/connection.ex +++ /dev/null @@ -1,93 +0,0 @@ -defmodule Bigtable.Connection do - @moduledoc """ - Holds the configured gRPC connection to Bigtable - """ - use GenServer - - alias Bigtable.Connection.Auth - - @default_host "bigtable.googleapis.com:443" - @default_opts [] - - ## Client API - @doc false - - def start_link(_opts) do - GenServer.start_link(__MODULE__, :ok, name: __MODULE__) - end - - @doc """ - Returns the configured `GRPC.Channel` - """ - @spec get_connection() :: GRPC.Channel.t() - def get_connection do - GenServer.call(__MODULE__, :get_connection) - end - - @spec get_metadata() :: Keyword.t() - def get_metadata do - token = Auth.get_token() - metadata = %{authorization: "Bearer #{token.token}"} - [metadata: metadata, content_type: "application/grpc", return_headers: true] - end - - # Server Callbacks - @doc false - @spec init(:ok) :: {:ok, GRPC.Channel.t()} - def init(:ok) do - # Fetches the url to use for Bigtable gRPC connection - endpoint = - case get_custom_endpoint() do - nil -> @default_host - custom -> custom - end - - opts = build_opts() - - # Connects the stub to the Bigtable gRPC server - - {:ok, channel} = - GRPC.Stub.connect( - endpoint, - opts - ) - - {:ok, channel} - end - - def handle_call(:get_connection, _from, state) do - {:reply, state, state} - end - - def handle_info(_msg, state) do - {:noreply, state} - end - - defp build_opts do - case Application.get_env(:bigtable, :ssl, true) do - true -> - @default_opts ++ - [ - cred: %GRPC.Credential{ - ssl: [] - } - ] - - false -> - @default_opts - end - end - - def get_custom_endpoint do - env = System.get_env("BIGTABLE_EMULATOR_HOST") - custom = Application.get_env(:bigtable, :endpoint, nil) - - case env do - nil -> - custom - - endpoint -> - endpoint - end - end -end diff --git a/lib/connection/worker.ex b/lib/connection/worker.ex new file mode 100644 index 0000000..4eca6de --- /dev/null +++ b/lib/connection/worker.ex @@ -0,0 +1,40 @@ +defmodule Bigtable.Connection.Worker do + @moduledoc false + alias Bigtable.Connection + use GenServer + + def start_link(_) do + GenServer.start_link(__MODULE__, nil, []) + end + + def get_connection(pid) do + GenServer.call(pid, :get_connection) + end + + def init(_) do + Process.flag(:trap_exit, true) + {:ok, Connection.connect()} + end + + def handle_call(:get_connection, _from, state) do + {:reply, state, state} + end + + def handle_info({:EXIT, _from, reason}, state) do + disconnect(state) + {:stop, reason, state} + end + + def handle_info(_msg, state) do + {:noreply, state} + end + + def terminate(_reason, state) do + disconnect(state) + state + end + + defp disconnect(connection) do + Connection.disconnect(connection) + end +end diff --git a/lib/data/check_and_mutate_row.ex b/lib/data/check_and_mutate_row.ex index 9b641ae..4248781 100644 --- a/lib/data/check_and_mutate_row.ex +++ b/lib/data/check_and_mutate_row.ex @@ -1,15 +1,17 @@ defmodule Bigtable.CheckAndMutateRow do @moduledoc """ - Provides functions to build `Google.Bigtable.V2.ReadRowsRequest` and submit them to Bigtable. + Provides functionality for building and submitting a `Google.Bigtable.V2.CheckAndMutateRowRequest`. """ - alias Bigtable.Utils + alias Bigtable.{Request, Utils} alias Google.Bigtable.V2 alias V2.Bigtable.Stub + @type entries() :: V2.MutateRowsRequest.Entry | [V2.MutateRowsRequest.Entry] + @doc """ - Builds a `Google.Bigtable.V2.CheckAndMutateRowRequest` given a row_key and optional custom table name. + Builds a `Google.Bigtable.V2.CheckAndMutateRowRequest` given a row key and optional custom table name. - Defaults to configured table name. + Defaults to the configured table name if none is provided. ## Examples @@ -25,19 +27,19 @@ defmodule Bigtable.CheckAndMutateRow do } ### Custom Table - iex> table_name = "projects/[project_id]/instances/[instnace_id]/tables/[table_name]" + iex> table_name = "projects/project-id/instances/instance-id/tables/table-name" iex> Bigtable.CheckAndMutateRow.build(table_name, "Test#123") %Google.Bigtable.V2.CheckAndMutateRowRequest{ app_profile_id: "", false_mutations: [], predicate_filter: nil, row_key: "Test#123", - table_name: "projects/[project_id]/instances/[instnace_id]/tables/[table_name]", + table_name: "projects/project-id/instances/instance-id/tables/table-name", true_mutations: [] } """ @spec build(binary(), binary()) :: V2.CheckAndMutateRowRequest.t() - def build(table_name \\ Bigtable.Utils.configured_table_name(), row_key) + def build(table_name \\ Utils.configured_table_name(), row_key) when is_binary(table_name) and is_binary(row_key) do V2.CheckAndMutateRowRequest.new(table_name: table_name, app_profile_id: "", row_key: row_key) end @@ -48,30 +50,16 @@ defmodule Bigtable.CheckAndMutateRow do %{request | predicate_filter: filter} end - @spec if_true(V2.CheckAndMutateRowRequest.t(), [V2.Mutation.t()]) :: - V2.CheckAndMutateRowRequest.t() - def if_true(%V2.CheckAndMutateRowRequest{} = request, mutations) when is_list(mutations) do + @spec if_true(V2.CheckAndMutateRowRequest.t(), entries) :: V2.CheckAndMutateRowRequest.t() + def if_true(%V2.CheckAndMutateRowRequest{} = request, mutations) do %{request | true_mutations: extract_mutations(mutations)} end - @spec if_true(V2.CheckAndMutateRowRequest.t(), V2.Mutation.t()) :: - V2.CheckAndMutateRowRequest.t() - def if_true(%V2.CheckAndMutateRowRequest{} = request, mutation) do - if_true(request, [mutation]) - end - - @spec if_false(V2.CheckAndMutateRowRequest.t(), [V2.Mutation.t()]) :: - V2.CheckAndMutateRowRequest.t() - def if_false(%V2.CheckAndMutateRowRequest{} = request, mutations) when is_list(mutations) do + @spec if_false(V2.CheckAndMutateRowRequest.t(), entries()) :: V2.CheckAndMutateRowRequest.t() + def if_false(%V2.CheckAndMutateRowRequest{} = request, mutations) do %{request | false_mutations: extract_mutations(mutations)} end - @spec if_false(V2.CheckAndMutateRowRequest.t(), V2.Mutation.t()) :: - V2.CheckAndMutateRowRequest.t() - def if_false(%V2.CheckAndMutateRowRequest{} = request, mutation) do - if_false(request, [mutation]) - end - @doc """ Submits a `Google.Bigtable.V2.CheckAndMutateRowRequest` to Bigtable. """ @@ -79,11 +67,13 @@ defmodule Bigtable.CheckAndMutateRow do {:ok, [V2.CheckAndMutateRowResponse]} | {:error, binary()} def mutate(%V2.CheckAndMutateRowRequest{} = request) do request - |> Utils.process_request(&Stub.check_and_mutate_row/3, single: true) + |> Request.process_request(&Stub.check_and_mutate_row/3, single: true) end - defp extract_mutations(mutations) do - mutations + @spec extract_mutations(entries()) :: [V2.Mutation.t()] + defp extract_mutations(entries) do + entries + |> List.wrap() |> Enum.flat_map(&Map.get(&1, :mutations)) end end diff --git a/lib/data/chunk_reader.ex b/lib/data/chunk_reader.ex index 9b7841e..7375e6d 100644 --- a/lib/data/chunk_reader.ex +++ b/lib/data/chunk_reader.ex @@ -46,7 +46,7 @@ defmodule Bigtable.ChunkReader do @typedoc """ A map containging lists of `Bigtable.ChunkReader.ReadCell` keyed by row key. """ - @type chunk_reader_result :: %{optional(binary()) => [Bigtable.ChunkReader.ReadCell.t()]} + @type chunk_reader_result :: %{optional(binary()) => [ReadCell.t()]} def start_link(_) do GenServer.start_link(__MODULE__, %ReaderState{}, []) @@ -56,7 +56,7 @@ defmodule Bigtable.ChunkReader do Opens a `Bigtable.ChunkReader`. """ @spec open() :: :ignore | {:error, any()} | {:ok, pid()} | {:ok, pid(), any()} - def open() do + def open do DynamicSupervisor.start_child(__MODULE__.Supervisor, __MODULE__) end @@ -243,7 +243,8 @@ defmodule Bigtable.ChunkReader do Map.get(cc, :labels, "") end - Map.put(cr, :cur_val, next_value) + cr + |> Map.put(:cur_val, next_value) |> Map.put(:cur_label, next_label) |> Map.put(:state, :cell_in_progress) end @@ -263,7 +264,8 @@ defmodule Bigtable.ChunkReader do Map.get(cc, :labels, "") end - Map.put(cr, :cur_val, next_value) + cr + |> Map.put(:cur_val, next_value) |> Map.put(:cur_label, next_label) |> finish_cell(cc) end diff --git a/lib/data/mutate_row.ex b/lib/data/mutate_row.ex index 5e1440d..5355cfe 100644 --- a/lib/data/mutate_row.ex +++ b/lib/data/mutate_row.ex @@ -1,17 +1,20 @@ defmodule Bigtable.MutateRow do @moduledoc """ - Provides functions to build `Google.Bigtable.V2.MutateRowRequest` and submit them to Bigtable. + Provides functionality for building and submitting a `Google.Bigtable.V2.MutateRowRequest`. """ - alias Bigtable.Utils + alias Bigtable.{Request, Utils} alias Google.Bigtable.V2 alias V2.Bigtable.Stub alias V2.MutateRowsRequest.Entry + @type response :: {:ok, V2.MutateRowResponse.t()} | {:error, any()} + @doc """ - Builds a `Google.Bigtable.V2.MutateRowRequest` with a provided table name and `Google.Bigtable.V2.MutateRowsRequest.Entry`. + Builds a `Google.Bigtable.V2.MutateRowRequest` given a `Google.Bigtable.V2.MutateRowsRequest.Entry` and optional table name. """ @spec build(V2.MutateRowsRequest.Entry.t(), binary()) :: V2.MutateRowRequest.t() - def build(%Entry{} = row_mutations, table_name) when is_binary(table_name) do + def build(%Entry{} = row_mutations, table_name \\ Utils.configured_table_name()) + when is_binary(table_name) do V2.MutateRowRequest.new( table_name: table_name, row_key: row_mutations.row_key, @@ -19,31 +22,21 @@ defmodule Bigtable.MutateRow do ) end - @doc """ - Builds a `Google.Bigtable.V2.MutateRowRequest` with default table name and provided `Google.Bigtable.V2.MutateRowsRequest.Entry`. - """ - @spec build(V2.MutateRowsRequest.Entry.t()) :: V2.MutateRowRequest.t() - def build(%Entry{} = row_mutations) do - build(row_mutations, Bigtable.Utils.configured_table_name()) - end - @doc """ Submits a `Google.Bigtable.V2.MutateRowRequest` given either a `Google.Bigtable.V2.MutateRowsRequest.Entry` or a `Google.Bigtable.V2.MutateRowRequest`. - Returns a `Google.Bigtable.V2.MutateRowResponse` + Returns a `Google.Bigtable.V2.MutateRowResponse`. """ - @spec mutate(V2.MutateRowRequest.t()) :: {:ok, V2.MutateRowResponse.t()} | {:error, binary()} + @spec mutate(V2.MutateRowRequest.t()) :: response() def mutate(%V2.MutateRowRequest{} = request) do request - |> Utils.process_request(&Stub.mutate_row/3, single: true) + |> Request.process_request(&Stub.mutate_row/3, single: true) end - @spec mutate(V2.MutateRowsRequest.Entry.t()) :: - {:ok, V2.MutateRowResponse.t()} | {:error, binary()} - def mutate(%Entry{} = row_mutations) do - request = build(row_mutations) - - request + @spec mutate(V2.MutateRowsRequest.Entry.t()) :: response() + def mutate(%Entry{} = entry) do + entry + |> build() |> mutate() end end diff --git a/lib/data/mutate_rows.ex b/lib/data/mutate_rows.ex index 3a96818..9800d9d 100644 --- a/lib/data/mutate_rows.ex +++ b/lib/data/mutate_rows.ex @@ -1,32 +1,25 @@ defmodule Bigtable.MutateRows do @moduledoc """ - Provides functions to build `Google.Bigtable.V2.MutateRowsRequest` and submit them to Bigtable. + Provides functionality for building and submitting a `Google.Bigtable.V2.MutateRowsRequest`. """ - alias Bigtable.Utils + alias Bigtable.{Request, Utils} alias Google.Bigtable.V2 alias V2.Bigtable.Stub + @type response :: {:ok, V2.MutateRowsResponse.t()} | {:error, any()} + @doc """ - Builds a `Google.Bigtable.V2.MutateRowsRequest` with a provided table name and a list of `Google.Bigtable.V2.MutateRowsRequest.Entry`. + Builds a `Google.Bigtable.V2.MutateRowsRequest` given a `Google.Bigtable.V2.MutateRowsRequest.Entry` and optional table name. """ @spec build(list(V2.MutateRowsRequest.Entry.t()), binary()) :: V2.MutateRowsRequest.t() - def build(entries, table_name) when is_binary(table_name) and is_list(entries) do + def build(entries, table_name \\ Utils.configured_table_name()) + when is_binary(table_name) and is_list(entries) do V2.MutateRowsRequest.new( table_name: table_name, entries: entries ) end - @doc """ - Builds a `Google.Bigtable.V2.MutateRowsRequest` request with a list of `Google.Bigtable.V2.MutateRowsRequest.Entry`. - - Uses the configured table name. - """ - @spec build(list(V2.MutateRowsRequest.Entry.t())) :: V2.MutateRowsRequest.t() - def build(entries) when is_list(entries) do - build(entries, Bigtable.Utils.configured_table_name()) - end - @doc """ Submits a `Google.Bigtable.V2.MutateRowsRequest` to Bigtable. @@ -34,18 +27,16 @@ defmodule Bigtable.MutateRows do Returns a `Google.Bigtable.V2.MutateRowsResponse` """ - @spec mutate(V2.MutateRowsRequest.t()) :: {:ok, V2.MutateRowsResponse.t()} | {:error, binary()} + @spec mutate(V2.MutateRowsRequest.t()) :: response() def mutate(%V2.MutateRowsRequest{} = request) do request - |> Utils.process_request(&Stub.mutate_rows/3, stream: true) + |> Request.process_request(&Stub.mutate_rows/3, stream: true) end - @spec mutate([V2.MutateRowsRequest.Entry.t()]) :: - {:ok, V2.MutateRowsResponse.t()} | {:error, binary()} + @spec mutate([V2.MutateRowsRequest.Entry.t()]) :: response() def mutate(entries) when is_list(entries) do - request = build(entries) - - request + entries + |> build() |> mutate end end diff --git a/lib/data/read_modify_write_row.ex b/lib/data/read_modify_write_row.ex index 76e16f4..b8ab806 100644 --- a/lib/data/read_modify_write_row.ex +++ b/lib/data/read_modify_write_row.ex @@ -1,8 +1,8 @@ defmodule Bigtable.ReadModifyWriteRow do @moduledoc """ - Provides functions to build `Google.Bigtable.V2.ReadModifyWriteRowRequest` and submit them to Bigtable. + Provides functionality for building and submitting a `Google.Bigtable.V2.ReadModifyWriteRowRequest`. """ - alias Bigtable.Utils + alias Bigtable.Request alias Google.Bigtable.V2 alias V2.Bigtable.Stub @@ -12,8 +12,10 @@ defmodule Bigtable.ReadModifyWriteRow do ReadModifyWriteRule } + @type response :: {:ok, ReadModifyWriteRowResponse.t()} | {:error, binary()} + @doc """ - Builds a `Google.Bigtable.V2.ReadModifyWriteRowRequest` with a provided table name and row key`. + Builds a `Google.Bigtable.V2.ReadModifyWriteRowRequest` given a row key and optional table name. """ @spec build(binary(), binary()) :: ReadModifyWriteRowRequest.t() def build(table_name \\ Bigtable.Utils.configured_table_name(), row_key) @@ -58,11 +60,10 @@ defmodule Bigtable.ReadModifyWriteRow do |> add_rule(request) end - @spec mutate(ReadModifyWriteRowRequest.t()) :: - {:ok, ReadModifyWriteRowResponse.t()} | {:error, binary()} + @spec mutate(ReadModifyWriteRowRequest.t()) :: response() def mutate(%ReadModifyWriteRowRequest{} = request) do request - |> Utils.process_request(&Stub.read_modify_write_row/3, single: true) + |> Request.process_request(&Stub.read_modify_write_row/3, single: true) end @spec add_rule(ReadModifyWriteRule.t(), ReadModifyWriteRowRequest.t()) :: diff --git a/lib/data/read_rows.ex b/lib/data/read_rows.ex index 9ca3291..6819c8c 100644 --- a/lib/data/read_rows.ex +++ b/lib/data/read_rows.ex @@ -1,114 +1,85 @@ defmodule Bigtable.ReadRows do @moduledoc """ - Provides functions to build `Google.Bigtable.V2.ReadRowsRequest` and submit them to Bigtable. + Provides functionality for to building and submitting a `Google.Bigtable.V2.ReadRowsRequest`. """ - alias Bigtable.{ChunkReader, Utils} + alias Bigtable.ChunkReader + alias Bigtable.{Request, Utils} alias Google.Bigtable.V2 alias V2.Bigtable.Stub + @type response :: {:ok, ChunkReader.chunk_reader_result()} | {:error, any()} + @doc """ - Builds a `Google.Bigtable.V2.ReadRowsRequest` with a provided table name. + Builds a `Google.Bigtable.V2.ReadRowsRequest` given an optional table name. + + Defaults to the configured table name if none is provided. ## Examples - iex> table_name = "projects/[project_id]/instances/[instnace_id]/tables/[table_name]" + iex> table_name = "projects/project-id/instances/instance-id/tables/table-name" iex> Bigtable.ReadRows.build(table_name) %Google.Bigtable.V2.ReadRowsRequest{ app_profile_id: "", filter: nil, rows: nil, rows_limit: 0, - table_name: "projects/[project_id]/instances/[instnace_id]/tables/[table_name]" + table_name: "projects/project-id/instances/instance-id/tables/table-name" } """ @spec build(binary()) :: V2.ReadRowsRequest.t() - def build(table_name) when is_binary(table_name) do + def build(table_name \\ Utils.configured_table_name()) when is_binary(table_name) do V2.ReadRowsRequest.new(table_name: table_name, app_profile_id: "") end - @doc """ - Builds a `Google.Bigtable.V2.ReadRowsRequest` with the configured table name. - - ## Examples - iex> Bigtable.ReadRows.build() - %Google.Bigtable.V2.ReadRowsRequest{ - app_profile_id: "", - filter: nil, - rows: nil, - rows_limit: 0, - table_name: "projects/dev/instances/dev/tables/test" - } - """ - @spec build() :: V2.ReadRowsRequest.t() - def build do - build(Bigtable.Utils.configured_table_name()) - end - @doc """ Submits a `Google.Bigtable.V2.ReadRowsRequest` to Bigtable. - Can be called with either a `Google.Bigtable.V2.ReadRowsRequest` or a table name to read all rows from a non-configured table. - - Returns a list of `{:ok, %Google.Bigtable.V2.ReadRowsResponse{}}`. + Can be called with either a `Google.Bigtable.V2.ReadRowsRequest` or an optional table name. """ - @spec read(V2.ReadRowsRequest.t()) :: - {:error, any()} - | {:ok, ChunkReader.chunk_reader_result()} - def read(%V2.ReadRowsRequest{} = request) do - result = - request - |> Utils.process_request(&Stub.read_rows/3, stream: true) - - case result do - {:error, _} -> - result + @spec read(V2.ReadRowsRequest.t() | binary()) :: response() + def read(table_name \\ Utils.configured_table_name()) - {:ok, response} -> - process_response(response) - end + def read(%V2.ReadRowsRequest{} = request) do + request + |> Request.process_request(&Stub.read_rows/3, stream: true) + |> handle_response() end - @spec read(binary()) :: - {:error, any()} - | {:ok, ChunkReader.chunk_reader_result()} def read(table_name) when is_binary(table_name) do - request = build(table_name) - - request + table_name + |> build() |> read() end - @doc """ - Submits a `Google.Bigtable.V2.ReadRowsRequest` to Bigtable. + defp handle_response({:error, _} = response), do: response - Without arguments, `Bigtable.ReadRows.read` will read all rows from the configured table. + defp handle_response({:ok, response}) do + response + |> Enum.filter(&contains_chunks?/1) + |> Enum.flat_map(fn {:ok, resp} -> resp.chunks end) + |> process_chunks() + end - Returns a list of `{:ok, %Google.Bigtable.V2.ReadRowsResponse{}}`. - """ - @spec read() :: - {:error, GRPC.RPCError.t()} - | [ok: V2.ReadRowsResponse.t()] - def read do - request = build() + defp process_chunks(chunks) do + {:ok, cr} = ChunkReader.open() - request - |> read + chunks + |> process_chunks(nil, cr) end - defp process_response(response) do - {:ok, cr} = ChunkReader.open() + defp process_chunks([], _result, chunk_reader) do + ChunkReader.close(chunk_reader) + end - response - |> Enum.filter(&contains_chunks?/1) - |> Enum.flat_map(fn {:ok, resp} -> resp.chunks end) - |> Enum.reduce({:ok, %{}}, fn chunk, accum -> - if match?({:error, _}, accum) do - accum - else - ChunkReader.process(cr, chunk) - end - end) - - ChunkReader.close(cr) + defp process_chunks(_chunks, {:error, _}, chunk_reader) do + ChunkReader.close(chunk_reader) + end + + defp process_chunks([h | t], _result, chunk_reader) do + result = + chunk_reader + |> ChunkReader.process(h) + + process_chunks(t, result, chunk_reader) end defp contains_chunks?({:ok, response}), do: !Enum.empty?(response.chunks) diff --git a/lib/row_filter/row_filter.ex b/lib/data/row_filter.ex similarity index 90% rename from lib/row_filter/row_filter.ex rename to lib/data/row_filter.ex index 6036ca3..ca3fee7 100644 --- a/lib/row_filter/row_filter.ex +++ b/lib/data/row_filter.ex @@ -1,6 +1,7 @@ defmodule Bigtable.RowFilter do - alias Bigtable.RowFilter.ColumnRange - alias Google.Bigtable.V2.{ReadRowsRequest, RowFilter, TimestampRange} + alias Google.Bigtable.V2.{ColumnRange, ReadRowsRequest, RowFilter, TimestampRange} + + @type column_range :: {binary(), binary(), boolean()} | {binary(), binary()} @moduledoc """ Provides functions for creating `Google.Bigtable.V2.RowFilter` and applying them to a `Google.Bigtable.V2.ReadRowsRequest` or `Google.Bigtable.V2.RowFilter.Chain`. @@ -330,7 +331,7 @@ defmodule Bigtable.RowFilter do @spec column_range(binary(), {binary(), binary(), boolean()} | {binary(), binary()}) :: RowFilter.t() def column_range(family_name, range) do - range = ColumnRange.create_range(family_name, range) + range = create_range(family_name, range) {:column_range_filter, range} |> build_filter() @@ -398,7 +399,6 @@ defmodule Bigtable.RowFilter do @doc """ Adds a pass all `Google.Bigtable.V2.RowFilter` a `Google.Bigtable.V2.ReadRowsRequest`. - ## Examples iex> request = Bigtable.ReadRows.build() |> Bigtable.RowFilter.pass_all() iex> with %Google.Bigtable.V2.ReadRowsRequest{} <- request, do: request.filter @@ -426,7 +426,7 @@ defmodule Bigtable.RowFilter do } """ @spec pass_all() :: RowFilter.t() - def pass_all() do + def pass_all do {:pass_all_filter, true} |> build_filter() end @@ -434,7 +434,6 @@ defmodule Bigtable.RowFilter do @doc """ Adds a block all `Google.Bigtable.V2.RowFilter` a `Google.Bigtable.V2.ReadRowsRequest`. - ## Examples iex> request = Bigtable.ReadRows.build() |> Bigtable.RowFilter.block_all() iex> with %Google.Bigtable.V2.ReadRowsRequest{} <- request, do: request.filter @@ -462,14 +461,13 @@ defmodule Bigtable.RowFilter do } """ @spec block_all() :: RowFilter.t() - def block_all() do + def block_all do {:block_all_filter, true} |> build_filter() end @doc """ - Adds a strip value transformer Google.Bigtable.V2.RowFilter` a `Google.Bigtable.V2.ReadRowsRequest`. - + Adds a strip value transformer `Google.Bigtable.V2.RowFilter` a `Google.Bigtable.V2.ReadRowsRequest`. ## Examples iex> request = Bigtable.ReadRows.build() |> Bigtable.RowFilter.strip_value_transformer() @@ -489,7 +487,6 @@ defmodule Bigtable.RowFilter do @doc """ Creates a strip value transformer `Google.Bigtable.V2.RowFilter`. - ## Examples iex> Bigtable.RowFilter.strip_value_transformer() %Google.Bigtable.V2.RowFilter{ @@ -497,14 +494,13 @@ defmodule Bigtable.RowFilter do } """ @spec strip_value_transformer() :: RowFilter.t() - def strip_value_transformer() do + def strip_value_transformer do {:strip_value_transformer, true} |> build_filter() end @doc """ - Adds an apply label transformer Google.Bigtable.V2.RowFilter` a `Google.Bigtable.V2.ReadRowsRequest`. - + Adds an apply label transformer `Google.Bigtable.V2.RowFilter` to a `Google.Bigtable.V2.ReadRowsRequest`. ## Examples iex> request = Bigtable.ReadRows.build() |> Bigtable.RowFilter.apply_label_transformer("label") @@ -546,4 +542,41 @@ defmodule Bigtable.RowFilter do defp apply_filter(%RowFilter{} = filter, %ReadRowsRequest{} = request) do %{request | filter: filter} end + + @spec create_range(binary(), column_range) :: ColumnRange.t() + def create_range(family_name, {start_qualifier, end_qualifier, inclusive}) do + range = translate_range(start_qualifier, end_qualifier, inclusive) + + range + |> Keyword.put(:family_name, family_name) + |> ColumnRange.new() + end + + def create_range(family_name, {start_qualifier, end_qualifier}) do + create_range(family_name, {start_qualifier, end_qualifier, true}) + end + + @spec translate_range(binary(), binary(), boolean()) :: Keyword.t() + defp translate_range(start_qualifier, end_qualifier, inclusive) do + case inclusive do + true -> inclusive_range(start_qualifier, end_qualifier) + false -> exclusive_range(start_qualifier, end_qualifier) + end + end + + @spec exclusive_range(binary(), binary()) :: Keyword.t() + defp exclusive_range(start_qualifier, end_qualifier) do + [ + start_qualifier: {:start_qualifier_open, start_qualifier}, + end_qualifier: {:end_qualifier_open, end_qualifier} + ] + end + + @spec inclusive_range(binary(), binary()) :: Keyword.t() + defp inclusive_range(start_qualifier, end_qualifier) do + [ + start_qualifier: {:start_qualifier_closed, start_qualifier}, + end_qualifier: {:end_qualifier_closed, end_qualifier} + ] + end end diff --git a/lib/data/sample_row_keys.ex b/lib/data/sample_row_keys.ex index 6509c1e..3950693 100644 --- a/lib/data/sample_row_keys.ex +++ b/lib/data/sample_row_keys.ex @@ -1,13 +1,13 @@ defmodule Bigtable.SampleRowKeys do @moduledoc """ - Provides functions to build `Google.Bigtable.V2.SampleRowKeysRequest` and submit them to Bigtable. + Provides functionality for building and submitting `Google.Bigtable.V2.SampleRowKeysRequest`. """ - alias Bigtable.Utils + alias Bigtable.Request alias Google.Bigtable.V2 alias V2.Bigtable.Stub @doc """ - Builds a `Google.Bigtable.V2.SampleRowKeysRequest` given a row_key and optional custom table name. + Builds a `Google.Bigtable.V2.SampleRowKeysRequest` given a row_key and optional table name. Defaults to configured table name. @@ -21,11 +21,11 @@ defmodule Bigtable.SampleRowKeys do } ### Custom Table - iex> table_name = "projects/[project_id]/instances/[instance_id]/tables/[table_name]" + iex> table_name = "projects/project-id/instances/instance-id/tables/table-name" iex> Bigtable.SampleRowKeys.build(table_name) %Google.Bigtable.V2.SampleRowKeysRequest{ app_profile_id: "", - table_name: "projects/[project_id]/instances/[instance_id]/tables/[table_name]", + table_name: "projects/project-id/instances/instance-id/tables/table-name", } """ @spec build(binary()) :: V2.SampleRowKeysRequest.t() @@ -38,14 +38,8 @@ defmodule Bigtable.SampleRowKeys do Submits a `Google.Bigtable.V2.SampleRowKeysRequest` to Bigtable. """ @spec read(V2.SampleRowKeysRequest.t()) :: {:ok, V2.SampleRowKeysResponse} | {:error, any()} - def read(%V2.SampleRowKeysRequest{} = request) do + def read(%V2.SampleRowKeysRequest{} = request \\ build()) do request - |> Utils.process_request(&Stub.sample_row_keys/3, stream: true) - end - - @spec read() :: {:ok, V2.SampleRowKeysResponse} | {:error, any()} - def read() do - build() - |> read() + |> Request.process_request(&Stub.sample_row_keys/3, stream: true) end end diff --git a/lib/grpc/client_stub.ex b/lib/grpc/client_stub.ex index 9a07f2f..8f40d09 100644 --- a/lib/grpc/client_stub.ex +++ b/lib/grpc/client_stub.ex @@ -1,4 +1,4 @@ -defmodule Bigtable.Data.Service do +defmodule Bigtable.Service do @moduledoc false use GRPC.Service, name: "google.bigtable.v2.Bigtable" @@ -18,7 +18,7 @@ defmodule Bigtable.Data.Service do ) end -defmodule Bigtable.Data.Stub do +defmodule Bigtable.Stub do @moduledoc false - use GRPC.Stub, service: Bigtable.Data.Service + use GRPC.Stub, service: Bigtable.Service end diff --git a/lib/request.ex b/lib/request.ex new file mode 100644 index 0000000..f7da3df --- /dev/null +++ b/lib/request.ex @@ -0,0 +1,62 @@ +defmodule Bigtable.Request do + @moduledoc false + alias Bigtable.{Auth, Connection} + alias Connection.Worker + + @spec process_request(any(), function(), list()) :: {:ok, any()} | {:error, any()} + def process_request(request, request_fn, opts \\ []) do + response = + :poolboy.transaction( + :connection_pool, + fn pid -> + token = Auth.get_token() + + pid + |> Worker.get_connection() + |> request_fn.(request, get_metadata(token)) + end, + 10_000 + ) + + handle_response(response, opts) + end + + @spec handle_response(any(), list()) :: {:ok, any()} | {:error, any()} + defp handle_response({:ok, response, _headers}, opts) do + if Keyword.get(opts, :stream, false) do + processed = + response + |> process_stream() + + {:ok, processed} + else + {:ok, response} + end + end + + defp handle_response(error, _opts) do + case error do + {:error, _msg} -> + error + + msg -> + {:error, msg} + end + end + + @spec process_stream(Enumerable.t()) :: [{:ok | :error, any}] + defp process_stream(stream) do + stream + |> Stream.take_while(&remaining_resp?/1) + |> Enum.to_list() + end + + @spec remaining_resp?({:ok | :error | :trailers, any()}) :: boolean() + defp remaining_resp?({status, _}), do: status != :trailers + + @spec get_metadata(map()) :: Keyword.t() + defp get_metadata(%{token: token}) do + metadata = %{authorization: "Bearer #{token}"} + [metadata: metadata, content_type: "application/grpc", return_headers: true] + end +end diff --git a/lib/row_filter/column_range.ex b/lib/row_filter/column_range.ex deleted file mode 100644 index 9a0d08e..0000000 --- a/lib/row_filter/column_range.ex +++ /dev/null @@ -1,43 +0,0 @@ -defmodule Bigtable.RowFilter.ColumnRange do - @moduledoc false - alias Google.Bigtable.V2.ColumnRange - - @type column_range :: {binary(), binary(), boolean()} | {binary(), binary()} - - @spec create_range(binary(), column_range) :: ColumnRange.t() - def create_range(family_name, {start_qualifier, end_qualifier, inclusive}) do - range = translate_range(start_qualifier, end_qualifier, inclusive) - - range - |> Keyword.put(:family_name, family_name) - |> ColumnRange.new() - end - - def create_range(family_name, {start_qualifier, end_qualifier}) do - create_range(family_name, {start_qualifier, end_qualifier, true}) - end - - @spec translate_range(binary(), binary(), boolean()) :: Keyword.t() - defp translate_range(start_qualifier, end_qualifier, inclusive) do - case inclusive do - true -> inclusive_range(start_qualifier, end_qualifier) - false -> exclusive_range(start_qualifier, end_qualifier) - end - end - - @spec exclusive_range(binary(), binary()) :: Keyword.t() - defp exclusive_range(start_qualifier, end_qualifier) do - [ - start_qualifier: {:start_qualifier_open, start_qualifier}, - end_qualifier: {:end_qualifier_open, end_qualifier} - ] - end - - @spec inclusive_range(binary(), binary()) :: Keyword.t() - defp inclusive_range(start_qualifier, end_qualifier) do - [ - start_qualifier: {:start_qualifier_closed, start_qualifier}, - end_qualifier: {:end_qualifier_closed, end_qualifier} - ] - end -end diff --git a/lib/utils.ex b/lib/utils.ex index c7e5cfb..29d5aab 100644 --- a/lib/utils.ex +++ b/lib/utils.ex @@ -1,64 +1,28 @@ defmodule Bigtable.Utils do @moduledoc false - alias Bigtable.Connection - - def process_request(request, request_fn, opts \\ []) do - metadata = Connection.get_metadata() - - connection = Connection.get_connection() - - result = - connection - |> request_fn.(request, metadata) - - case result do - {:ok, response, _} -> - if Keyword.get(opts, :stream, false) do - processed = - response - |> process_stream() - - {:ok, processed} - else - {:ok, response} - end - - {:error, error} when is_map(error) -> - {:error, Map.get(error, :message, "unknown error")} - - _ -> - {:error, "unknown error"} - end - end + @spec configured_table_name() :: binary() def configured_table_name do - project = get_project() - instance = get_instance() + instance = configured_instance_name() table = Application.get_env(:bigtable, :table) - "projects/#{project}/instances/#{instance}/tables/#{table}" + "#{instance}/tables/#{table}" end + @spec configured_instance_name() :: binary() def configured_instance_name do project = get_project() instance = get_instance() "projects/#{project}/instances/#{instance}" end - defp get_project() do + @spec get_project() :: binary() + defp get_project do Application.get_env(:bigtable, :project) end - defp get_instance() do + @spec get_instance() :: binary() + defp get_instance do Application.get_env(:bigtable, :instance) end - - @spec process_stream(Enumerable.t()) :: [{atom(), any}] - defp process_stream(stream) do - stream - |> Stream.take_while(&remaining_resp?/1) - |> Enum.to_list() - end - - defp remaining_resp?({status, _}), do: status != :trailers end diff --git a/mix.exs b/mix.exs index 8a86563..77fa1f7 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,9 @@ defmodule Bigtable.MixProject do use Mix.Project - @version "0.6.1" + alias Bigtable.{Admin, Connection} + + @version "0.7.0" def project do [ @@ -39,7 +41,7 @@ defmodule Bigtable.MixProject do def application do [ mod: {Bigtable, []}, - extra_applications: [:logger, :grpc] + extra_applications: [:logger, :grpc, :poolboy] ] end @@ -54,7 +56,10 @@ defmodule Bigtable.MixProject do formatters: ["html", "epub"], groups_for_modules: groups_for_modules(), extras: extras(), - groups_for_extras: groups_for_extras() + groups_for_extras: groups_for_extras(), + nest_modules_by_prefix: [ + Bigtable.ChunkReader + ] ] end @@ -62,26 +67,38 @@ defmodule Bigtable.MixProject do [ "guides/introduction/overview.md", "guides/introduction/installation.md" - # "guides/operations/read_rows.md" ] end defp groups_for_extras do [ Introduction: ~r/guides\/introduction\/.?/ - # Operations: ~r/guides\/operations\/.?/ ] end defp groups_for_modules do [ - "Typed Bigtable": [ - Bigtable.Schema + Admin: [ + Admin.GcRule, + Admin.Modification, + Admin.Table, + Admin.TableAdmin ], - Operations: [ - Bigtable.ReadRows, + Connection: [ + Connection + ], + Data: [ + Bigtable.CheckAndMutateRow, + Bigtable.ChunkReader, + Bigtable.ChunkReader.ReadCell, Bigtable.MutateRow, - Bigtable.MutateRows + Bigtable.MutateRows, + Bigtable.Mutations, + Bigtable.ReadModifyWriteRow, + Bigtable.ReadRows, + Bigtable.RowFilter, + Bigtable.RowSet, + Bigtable.SampleRowKeys ] ] end @@ -97,16 +114,19 @@ defmodule Bigtable.MixProject do # Run "mix help deps" to learn about dependencies. defp deps do [ - {:poison, "~> 3.1"}, - {:lens, "~> 0.8.0"}, + {:google_protos, "~> 0.1"}, {:goth, "~> 0.11.0"}, + {:grpc, "~> 0.3.1"}, + {:lens, "~> 0.8.0"}, + {:poison, "~> 3.1"}, + {:poolboy, "~> 1.5"}, + {:protobuf, "~> 0.5.3"}, + # Dev Deps {:credo, "~> 1.0.0", only: [:dev, :test, :ci], runtime: false}, + {:dialyxir, "~> 1.0.0-rc.6", only: [:dev], runtime: false}, {:excoveralls, "~> 0.10", only: [:dev, :test, :ci]}, {:ex_doc, "~> 0.19", only: :dev, runtime: false}, - {:mix_test_watch, "~> 0.8", only: :dev, runtime: false}, - {:protobuf, "~> 0.5.3"}, - {:google_protos, "~> 0.1"}, - {:grpc, "~> 0.3.1"} + {:mix_test_watch, "~> 0.8", only: :dev, runtime: false} ] end end diff --git a/mix.lock b/mix.lock index a6d2428..cdf6674 100644 --- a/mix.lock +++ b/mix.lock @@ -5,9 +5,11 @@ "cowboy": {:hex, :cowboy, "2.5.0", "4ef3ae066ee10fe01ea3272edc8f024347a0d3eb95f6fbb9aed556dacbfc1337", [:rebar3], [{:cowlib, "~> 2.6.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.6.2", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm"}, "cowlib": {:hex, :cowlib, "2.6.0", "8aa629f81a0fc189f261dc98a42243fa842625feea3c7ec56c48f4ccdb55490f", [:rebar3], [], "hexpm"}, "credo": {:hex, :credo, "1.0.0", "aaa40fdd0543a0cf8080e8c5949d8c25f0a24e4fc8c1d83d06c388f5e5e0ea42", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"}, + "dialyxir": {:hex, :dialyxir, "1.0.0-rc.6", "78e97d9c0ff1b5521dd68041193891aebebce52fc3b93463c0a6806874557d7d", [:mix], [{:erlex, "~> 0.2.1", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm"}, "diver": {:hex, :diver, "0.2.0", "8a2a85e97c7b1a989db501fe2ce2a5a0e82109b0d8b02e04c0c1a8a10b53795e", [:mix], [], "hexpm"}, "earmark": {:hex, :earmark, "1.3.1", "73812f447f7a42358d3ba79283cfa3075a7580a3a2ed457616d6517ac3738cb9", [:mix], [], "hexpm"}, "elixir_make": {:hex, :elixir_make, "0.4.2", "332c649d08c18bc1ecc73b1befc68c647136de4f340b548844efc796405743bf", [:mix], [], "hexpm"}, + "erlex": {:hex, :erlex, "0.2.1", "cee02918660807cbba9a7229cae9b42d1c6143b768c781fa6cee1eaf03ad860b", [:mix], [], "hexpm"}, "erlport": {:hex, :erlport, "0.10.0", "2436ec2f4ed62538c6e9c52f523f9315b6002ee7e298d9bd10b35abc3f6b32e7", [:rebar3], [], "hexpm"}, "ex_doc": {:hex, :ex_doc, "0.19.2", "6f4081ccd9ed081b6dc0bd5af97a41e87f5554de469e7d76025fba535180565f", [:mix], [{:earmark, "~> 1.2", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.10", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"}, "excoveralls": {:hex, :excoveralls, "0.10.4", "b86230f0978bbc630c139af5066af7cd74fd16536f71bc047d1037091f9f63a9", [:mix], [{:hackney, "~> 1.13", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"}, @@ -33,6 +35,7 @@ "nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"}, "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"}, "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, + "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm"}, "pre_commit": {:hex, :pre_commit, "0.3.4", "e2850f80be8090d50ad8019ef2426039307ff5dfbe70c736ad0d4d401facf304", [:mix], [], "hexpm"}, "protobuf": {:hex, :protobuf, "0.5.4", "2e1b8eec211aff034ad8a14e3674220b0158bfb9a3c7128ac9d2a1ed1b3724d3", [:mix], [], "hexpm"}, "ranch": {:hex, :ranch, "1.6.2", "6db93c78f411ee033dbb18ba8234c5574883acb9a75af0fb90a9b82ea46afa00", [:rebar3], [], "hexpm"}, diff --git a/test/admin/gc_rule_test.exs b/test/admin/gc_rule_test.exs index de00c97..ece2f37 100644 --- a/test/admin/gc_rule_test.exs +++ b/test/admin/gc_rule_test.exs @@ -16,9 +16,12 @@ defmodule GcRuleTest do describe("Bigtagble.Admin.GcRule.max_age/1") do test "should create a table with a max age gc rule", context do - Table.build(%{ + cf = %{ "cf1" => GcRule.max_age(2_592_000_500) - }) + } + + cf + |> Table.build() |> TableAdmin.create_table("gc_rule") expected = %{ @@ -37,9 +40,12 @@ defmodule GcRuleTest do describe("Bigtable.Admin.GcRule.max_num_versions/1") do test "should create a table with a max version gc rule", context do - Table.build(%{ + cf = %{ "cf1" => GcRule.max_num_versions(1) - }) + } + + cf + |> Table.build() |> TableAdmin.create_table("gc_rule") expected = %{ @@ -62,9 +68,12 @@ defmodule GcRuleTest do GcRule.max_age(3000) ] - Table.build(%{ + cf = %{ "cf1" => GcRule.union(rules) - }) + } + + cf + |> Table.build() |> TableAdmin.create_table("gc_rule") expected = %{ @@ -91,9 +100,12 @@ defmodule GcRuleTest do GcRule.max_age(3000) ] - Table.build(%{ + cf = %{ "cf1" => GcRule.intersection(rules) - }) + } + + cf + |> Table.build() |> TableAdmin.create_table("gc_rule") expected = %{ diff --git a/test/admin/table.exs b/test/admin/table.exs index 88c81ad..6efdfea 100644 --- a/test/admin/table.exs +++ b/test/admin/table.exs @@ -50,9 +50,12 @@ defmodule TableAdminTest do context.table_name ) - Table.build(%{ - "cf1" => GcRule.max_age(30000) - }) + cf = %{ + "cf1" => GcRule.max_age(30_000) + } + + cf + |> Table.build() |> TableAdmin.create_table("created") {:ok, after_insert} = TableAdmin.list_tables() diff --git a/test/connection/connection_test.exs b/test/connection/connection_test.exs index 4f74782..f060516 100644 --- a/test/connection/connection_test.exs +++ b/test/connection/connection_test.exs @@ -7,7 +7,7 @@ defmodule ConnectionTest do describe "Connection.get_connection() " do test "should return a Channel struct" do [host, port] = - Connection.get_custom_endpoint() + Connection.get_endpoint() |> String.split(":") expected = %GRPC.Channel{ @@ -19,7 +19,7 @@ defmodule ConnectionTest do scheme: "http" } - connection = Connection.get_connection() + connection = Connection.connect() result = %{ connection diff --git a/test/data/check_and_mutate_row_test.exs b/test/data/check_and_mutate_row_test.exs index f64431f..a2cfce3 100644 --- a/test/data/check_and_mutate_row_test.exs +++ b/test/data/check_and_mutate_row_test.exs @@ -14,13 +14,14 @@ defmodule CheckAndMutateRowTest do qualifier = "column" {:ok, _} = - Mutations.build(row_key) + row_key + |> Mutations.build() |> Mutations.set_cell("cf1", qualifier, "value", 0) |> MutateRow.build() |> MutateRow.mutate() on_exit(fn -> - mutation = Mutations.build(row_key) |> Mutations.delete_from_row() + mutation = row_key |> Mutations.build() |> Mutations.delete_from_row() mutation |> MutateRow.mutate() end) @@ -34,10 +35,11 @@ defmodule CheckAndMutateRowTest do describe "CheckAndMutateRow.mutate/2" do test "should apply a single true mutation when no predicate set and row exists", context do mutation = - Mutations.build(context.row_key) |> Mutations.set_cell("cf1", "truthy", "true", 0) + context.row_key |> Mutations.build() |> Mutations.set_cell("cf1", "truthy", "true", 0) {:ok, _result} = - CheckAndMutateRow.build(context.row_key) + context.row_key + |> CheckAndMutateRow.build() |> CheckAndMutateRow.if_true(mutation) |> CheckAndMutateRow.mutate() @@ -69,13 +71,14 @@ defmodule CheckAndMutateRowTest do test "should apply a multiple true mutation when no predicate set and row exists", context do mutation1 = - Mutations.build(context.row_key) |> Mutations.set_cell("cf1", "truthy", "true", 0) + context.row_key |> Mutations.build() |> Mutations.set_cell("cf1", "truthy", "true", 0) mutation2 = - Mutations.build(context.row_key) |> Mutations.set_cell("cf1", "alsoTruthy", "true", 0) + context.row_key |> Mutations.build() |> Mutations.set_cell("cf1", "alsoTruthy", "true", 0) {:ok, _result} = - CheckAndMutateRow.build(context.row_key) + context.row_key + |> CheckAndMutateRow.build() |> CheckAndMutateRow.if_true([mutation1, mutation2]) |> CheckAndMutateRow.mutate() @@ -116,7 +119,7 @@ defmodule CheckAndMutateRowTest do test "should not apply a true mutation when no predicate set and row does not exist", context do mutation = - Mutations.build(context.row_key) |> Mutations.set_cell("cf1", "truthy", "true", 0) + context.row_key |> Mutations.build() |> Mutations.set_cell("cf1", "truthy", "true", 0) {:ok, _result} = CheckAndMutateRow.build("Doesnt#Exist") @@ -145,10 +148,11 @@ defmodule CheckAndMutateRowTest do filter = RowFilter.column_qualifier_regex(context.qualifier) mutation = - Mutations.build(context.row_key) |> Mutations.set_cell("cf1", "truthy", "true", 0) + context.row_key |> Mutations.build() |> Mutations.set_cell("cf1", "truthy", "true", 0) {:ok, _result} = - CheckAndMutateRow.build(context.row_key) + context.row_key + |> CheckAndMutateRow.build() |> CheckAndMutateRow.predicate(filter) |> CheckAndMutateRow.if_true(mutation) |> CheckAndMutateRow.mutate() @@ -183,13 +187,14 @@ defmodule CheckAndMutateRowTest do filter = RowFilter.column_qualifier_regex(context.qualifier) mutation1 = - Mutations.build(context.row_key) |> Mutations.set_cell("cf1", "truthy", "true", 0) + context.row_key |> Mutations.build() |> Mutations.set_cell("cf1", "truthy", "true", 0) mutation2 = - Mutations.build(context.row_key) |> Mutations.set_cell("cf1", "alsoTruthy", "true", 0) + context.row_key |> Mutations.build() |> Mutations.set_cell("cf1", "alsoTruthy", "true", 0) {:ok, _result} = - CheckAndMutateRow.build(context.row_key) + context.row_key + |> CheckAndMutateRow.build() |> CheckAndMutateRow.predicate(filter) |> CheckAndMutateRow.if_true([mutation1, mutation2]) |> CheckAndMutateRow.mutate() @@ -232,10 +237,11 @@ defmodule CheckAndMutateRowTest do filter = RowFilter.column_qualifier_regex("doesntexist") mutation = - Mutations.build(context.row_key) |> Mutations.set_cell("cf1", "truthy", "true", 0) + context.row_key |> Mutations.build() |> Mutations.set_cell("cf1", "truthy", "true", 0) {:ok, _result} = - CheckAndMutateRow.build(context.row_key) + context.row_key + |> CheckAndMutateRow.build() |> CheckAndMutateRow.predicate(filter) |> CheckAndMutateRow.if_true(mutation) |> CheckAndMutateRow.mutate() @@ -262,10 +268,11 @@ defmodule CheckAndMutateRowTest do filter = RowFilter.column_qualifier_regex("doesntexist") mutation = - Mutations.build(context.row_key) |> Mutations.set_cell("cf1", "false", "false", 0) + context.row_key |> Mutations.build() |> Mutations.set_cell("cf1", "false", "false", 0) {:ok, _result} = - CheckAndMutateRow.build(context.row_key) + context.row_key + |> CheckAndMutateRow.build() |> CheckAndMutateRow.predicate(filter) |> CheckAndMutateRow.if_false(mutation) |> CheckAndMutateRow.mutate() @@ -300,13 +307,14 @@ defmodule CheckAndMutateRowTest do filter = RowFilter.column_qualifier_regex("doesntexist") mutation1 = - Mutations.build(context.row_key) |> Mutations.set_cell("cf1", "false", "false", 0) + context.row_key |> Mutations.build() |> Mutations.set_cell("cf1", "false", "false", 0) mutation2 = - Mutations.build(context.row_key) |> Mutations.set_cell("cf1", "false2", "false2", 0) + context.row_key |> Mutations.build() |> Mutations.set_cell("cf1", "false2", "false2", 0) {:ok, _result} = - CheckAndMutateRow.build(context.row_key) + context.row_key + |> CheckAndMutateRow.build() |> CheckAndMutateRow.predicate(filter) |> CheckAndMutateRow.if_false([mutation1, mutation2]) |> CheckAndMutateRow.mutate() @@ -349,10 +357,11 @@ defmodule CheckAndMutateRowTest do filter = RowFilter.column_qualifier_regex(context.qualifier) mutation = - Mutations.build(context.row_key) |> Mutations.set_cell("cf1", "false", "false", 0) + context.row_key |> Mutations.build() |> Mutations.set_cell("cf1", "false", "false", 0) {:ok, _result} = - CheckAndMutateRow.build(context.row_key) + context.row_key + |> CheckAndMutateRow.build() |> CheckAndMutateRow.predicate(filter) |> CheckAndMutateRow.if_false(mutation) |> CheckAndMutateRow.mutate() diff --git a/test/data/mutate_row_test.exs b/test/data/mutate_row_test.exs index 95b6313..154fc0c 100644 --- a/test/data/mutate_row_test.exs +++ b/test/data/mutate_row_test.exs @@ -1,6 +1,5 @@ defmodule MutateRowTest do @moduledoc false - # TODO: Integration tests including errors alias Bigtable.{MutateRow, Mutations} use ExUnit.Case diff --git a/test/data/mutate_rows_test.exs b/test/data/mutate_rows_test.exs index 085ed3a..033a2b6 100644 --- a/test/data/mutate_rows_test.exs +++ b/test/data/mutate_rows_test.exs @@ -1,9 +1,7 @@ defmodule MutateRowsTest do @moduledoc false - # TODO: Integration tests including errors alias Bigtable.{MutateRows, Mutations} - use ExUnit.Case setup do diff --git a/test/data/read_modify_write_row_test.exs b/test/data/read_modify_write_row_test.exs index 8090704..4634577 100644 --- a/test/data/read_modify_write_row_test.exs +++ b/test/data/read_modify_write_row_test.exs @@ -1,7 +1,6 @@ defmodule ReadModifyWriteRowTest do @moduledoc false - alias Bigtable.{ReadModifyWriteRow, MutateRow, Mutations, ReadRows} - + alias Bigtable.{MutateRow, Mutations, ReadModifyWriteRow, ReadRows, RowFilter} use ExUnit.Case doctest ReadModifyWriteRow @@ -12,7 +11,7 @@ defmodule ReadModifyWriteRowTest do row_key = "Test#123" on_exit(fn -> - mutation = Mutations.build(row_key) |> Mutations.delete_from_row() + mutation = row_key |> Mutations.build() |> Mutations.delete_from_row() mutation |> MutateRow.mutate() end) @@ -29,12 +28,14 @@ defmodule ReadModifyWriteRowTest do val = <<1::integer-signed-64>> {:ok, _result} = - Mutations.build(context.row_key) + context.row_key + |> Mutations.build() |> Mutations.set_cell(context.family, qual, val, 0) |> MutateRow.mutate() {:ok, _result} = - ReadModifyWriteRow.build(context.row_key) + context.row_key + |> ReadModifyWriteRow.build() |> ReadModifyWriteRow.increment_amount(context.family, qual, 1) |> ReadModifyWriteRow.mutate() @@ -42,10 +43,10 @@ defmodule ReadModifyWriteRowTest do {:ok, result} = ReadRows.build() - |> Bigtable.RowFilter.cells_per_column(1) + |> RowFilter.cells_per_column(1) |> ReadRows.read() - new_value = Map.values(result) |> List.flatten() |> List.first() |> Map.get(:value) + new_value = result |> Map.values() |> List.flatten() |> List.first() |> Map.get(:value) assert new_value == expected end @@ -54,7 +55,8 @@ defmodule ReadModifyWriteRowTest do qual = "num" {:ok, _result} = - ReadModifyWriteRow.build(context.row_key) + context.row_key + |> ReadModifyWriteRow.build() |> ReadModifyWriteRow.increment_amount(context.family, qual, 3) |> ReadModifyWriteRow.mutate() @@ -62,10 +64,10 @@ defmodule ReadModifyWriteRowTest do {:ok, result} = ReadRows.build() - |> Bigtable.RowFilter.cells_per_column(1) + |> RowFilter.cells_per_column(1) |> ReadRows.read() - new_value = Map.values(result) |> List.flatten() |> List.first() |> Map.get(:value) + new_value = result |> Map.values() |> List.flatten() |> List.first() |> Map.get(:value) assert new_value == expected end @@ -75,12 +77,14 @@ defmodule ReadModifyWriteRowTest do val = "hello" {:ok, _result} = - Mutations.build(context.row_key) + context.row_key + |> Mutations.build() |> Mutations.set_cell(context.family, qual, val, 0) |> MutateRow.mutate() {:ok, _result} = - ReadModifyWriteRow.build(context.row_key) + context.row_key + |> ReadModifyWriteRow.build() |> ReadModifyWriteRow.append_value(context.family, qual, "world") |> ReadModifyWriteRow.mutate() @@ -88,10 +92,10 @@ defmodule ReadModifyWriteRowTest do {:ok, result} = ReadRows.build() - |> Bigtable.RowFilter.cells_per_column(1) + |> RowFilter.cells_per_column(1) |> ReadRows.read() - new_value = Map.values(result) |> List.flatten() |> List.first() |> Map.get(:value) + new_value = result |> Map.values() |> List.flatten() |> List.first() |> Map.get(:value) assert new_value == expected end @@ -100,7 +104,8 @@ defmodule ReadModifyWriteRowTest do qual = "string" {:ok, _result} = - ReadModifyWriteRow.build(context.row_key) + context.row_key + |> ReadModifyWriteRow.build() |> ReadModifyWriteRow.append_value(context.family, qual, "world") |> ReadModifyWriteRow.mutate() @@ -108,10 +113,10 @@ defmodule ReadModifyWriteRowTest do {:ok, result} = ReadRows.build() - |> Bigtable.RowFilter.cells_per_column(1) + |> RowFilter.cells_per_column(1) |> ReadRows.read() - new_value = Map.values(result) |> List.flatten() |> List.first() |> Map.get(:value) + new_value = result |> Map.values() |> List.flatten() |> List.first() |> Map.get(:value) assert new_value == expected end diff --git a/test/data/read_rows_test.exs b/test/data/read_rows_test.exs index 23ca75d..67f2484 100644 --- a/test/data/read_rows_test.exs +++ b/test/data/read_rows_test.exs @@ -1,7 +1,5 @@ defmodule ReadRowsTest do - # TODO: Integration tests including errors - alias Bigtable.{MutateRow, MutateRows, Mutations, ReadRows} - + alias Bigtable.{ChunkReader, MutateRow, MutateRows, Mutations, ReadRows} use ExUnit.Case doctest ReadRows @@ -86,7 +84,7 @@ defmodule ReadRowsTest do for row_key <- row_keys, into: %{} do {row_key, [ - %Bigtable.ChunkReader.ReadCell{ + %ChunkReader.ReadCell{ family_name: %Google.Protobuf.StringValue{value: context.column_family}, label: "", qualifier: %Google.Protobuf.BytesValue{value: context.column_qualifier}, diff --git a/test/data/row_filter_integration_test.exs b/test/data/row_filter_integration_test.exs index 293f531..2838143 100644 --- a/test/data/row_filter_integration_test.exs +++ b/test/data/row_filter_integration_test.exs @@ -157,14 +157,16 @@ defmodule RowFilterIntegration do test "should properly filter multiple rows based on value" do first_mutation = - Mutations.build("Test#1") + "Test#1" + |> Mutations.build() |> Mutations.set_cell("cf1", "column1", "foo", 0) |> Mutations.set_cell("cf1", "column2", "bar", 0) |> Mutations.set_cell("cf2", "column1", "bar", 0) |> Mutations.set_cell("cf2", "column2", "foo", 0) second_mutation = - Mutations.build("Test#2") + "Test#2" + |> Mutations.build() |> Mutations.set_cell("cf1", "column1", "foo", 0) |> Mutations.set_cell("cf1", "column2", "bar", 0) |> Mutations.set_cell("cf2", "column1", "bar", 0) @@ -1029,7 +1031,8 @@ defmodule RowFilterIntegration do [row_key | _rest] = context.row_keys {:ok, _} = - Mutations.build(row_key) + row_key + |> Mutations.build() |> Mutations.set_cell("cf1", "column", "value", 0) |> MutateRow.mutate() @@ -1060,7 +1063,8 @@ defmodule RowFilterIntegration do [row_key | _rest] = context.row_keys {:ok, _} = - Mutations.build(row_key) + row_key + |> Mutations.build() |> Mutations.set_cell("cf1", "column1", "value", 0) |> Mutations.set_cell("cf1", "column2", "value", 0) |> Mutations.set_cell("cf1", "column3", "value", 0) @@ -1109,7 +1113,8 @@ defmodule RowFilterIntegration do [row_key | _rest] = context.row_keys {:ok, _} = - Mutations.build(row_key) + row_key + |> Mutations.build() |> Mutations.set_cell("cf1", "column1", "value", 4000) |> Mutations.set_cell("cf2", "column2", "value", 1000) |> Mutations.set_cell("cf1", "column2", "value", 1000) @@ -1136,7 +1141,8 @@ defmodule RowFilterIntegration do [row_key | _rest] = context.row_keys {:ok, _} = - Mutations.build(row_key) + row_key + |> Mutations.build() |> Mutations.set_cell("cf1", "column1", "value", 0) |> Mutations.set_cell("cf1", "column2", "value", 0) |> Mutations.set_cell("cf1", "column3", "value", 0) @@ -1192,7 +1198,7 @@ defmodule RowFilterIntegration do expected = %{ "Test#1" => [ - %Bigtable.ChunkReader.ReadCell{ + %ReadCell{ family_name: %Google.Protobuf.StringValue{value: "cf1"}, label: "", qualifier: %Google.Protobuf.BytesValue{value: "column"}, @@ -1226,7 +1232,8 @@ defmodule RowFilterIntegration do defp seed_timestamp_range(row_key) do {:ok, _} = - Mutations.build(row_key) + row_key + |> Mutations.build() |> Mutations.set_cell("cf1", "column1", "value1", 1000) |> Mutations.set_cell("cf1", "column1", "value2", 2000) |> Mutations.set_cell("cf1", "column1", "value3", 3000) @@ -1243,7 +1250,8 @@ defmodule RowFilterIntegration do defp seed_range(row_key) do {:ok, _} = - Mutations.build(row_key) + row_key + |> Mutations.build() |> Mutations.set_cell("cf1", "column1", "value1", 0) |> Mutations.set_cell("cf1", "column2", "value2", 0) |> Mutations.set_cell("cf1", "column3", "value3", 0) @@ -1258,7 +1266,8 @@ defmodule RowFilterIntegration do defp seed_values(context) do Enum.each(context.row_keys, fn key -> {:ok, _} = - Mutations.build(key) + key + |> Mutations.build() |> Mutations.set_cell("cf1", "column", "value", 0) |> MutateRow.build() |> MutateRow.mutate() diff --git a/test/data/row_filter_test.exs b/test/data/row_filter_test.exs index 4cc7103..6619fa4 100644 --- a/test/data/row_filter_test.exs +++ b/test/data/row_filter_test.exs @@ -36,7 +36,7 @@ defmodule RowFilterTest do } ] - expected = expected_chain(filters) |> expected_request() + expected = filters |> expected_chain() |> expected_request() assert RowFilter.chain(context.request, filters) == expected end diff --git a/test/google_acceptance/read_rows_acceptance_test.exs b/test/google_acceptance/read_rows_acceptance_test.exs index 9cdcd4c..6ae3038 100644 --- a/test/google_acceptance/read_rows_acceptance_test.exs +++ b/test/google_acceptance/read_rows_acceptance_test.exs @@ -42,11 +42,7 @@ defmodule GoogleAcceptanceTest do true -> converted = processed_result - |> Enum.flat_map(fn {row_key, read_items} -> - read_items - |> Enum.map(&TestResult.from_chunk(row_key, &1)) - |> Enum.reverse() - end) + |> convert_result() assert converted == expected end @@ -105,4 +101,13 @@ defmodule ReadRowsAcceptanceTest do end defp results_error?(results), do: Enum.any?(results, &Map.get(&1, :error, false)) + + defp convert_result(result) do + result + |> Enum.flat_map(fn {row_key, read_items} -> + read_items + |> Enum.map(&TestResult.from_chunk(row_key, &1)) + |> Enum.reverse() + end) + end end