Skip to content

Commit 8962852

Browse files
committed
benchmark: add client and refactor a little
1 parent 713b558 commit 8962852

File tree

11 files changed

+191
-64
lines changed

11 files changed

+191
-64
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,5 @@ erl_crash.dump
2121
/tmp
2222

2323
/log
24+
25+
.elixir_ls

benchmark/.formatter.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# Used by "mix format"
22
[
3-
inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"]
3+
inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"],
4+
import_deps: [:protobuf, :grpc]
45
]

benchmark/lib/bench_manager.ex

Lines changed: 0 additions & 38 deletions
This file was deleted.

benchmark/lib/benchmark.ex

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
defmodule Benchmark do
2+
end
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
defmodule Benchmark.ClientManager do
2+
alias Benchmark.ClientWorker
3+
4+
def start_client(%Grpc.Testing.ClientConfig{} = config) do
5+
# get security
6+
_payload_type = Benchmark.Manager.payload_type(config.payload_config)
7+
8+
channels =
9+
0..(config.client_channels - 1)
10+
|> Enum.zip(config.server_targets)
11+
|> Enum.map(fn {_, server} -> new_client(server) end)
12+
13+
perform_rpcs(config, channels)
14+
end
15+
16+
defp new_client(addr) do
17+
{:ok, conn} = GRPC.Stub.connect(addr)
18+
conn
19+
end
20+
21+
def perform_rpcs(config, channels) do
22+
payload = client_payload(config.payload_config)
23+
24+
case config.load_params.load do
25+
{:closed_loop, _} ->
26+
:ok
27+
28+
_ ->
29+
raise GRPC.RPCError, status: :unimplemented
30+
end
31+
32+
rpcs_per_conn = config.outstanding_rpcs_per_channel
33+
34+
case Grpc.Testing.RpcType.key(config.rpc_type) do
35+
:UNARY ->
36+
close_loop_unary(channels, rpcs_per_conn, payload)
37+
38+
_ ->
39+
:ok
40+
end
41+
end
42+
43+
def client_payload(nil), do: %{}
44+
45+
def client_payload(config) do
46+
case config.payload do
47+
{:simple_params, payload} ->
48+
%{req_size: payload.req_size, resp_size: payload.resp_size, type: :protobuf}
49+
50+
_ ->
51+
%{}
52+
end
53+
end
54+
55+
def close_loop_unary(channels, rpcs_per_conn, payload) do
56+
Enum.each(channels, fn conn ->
57+
Enum.each(0..(rpcs_per_conn - 1), fn _idx ->
58+
Task.async(ClientWorker, :unary_loop, [conn, payload])
59+
end)
60+
end)
61+
end
62+
end
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
defmodule Benchmark.ClientWorker do
2+
require Logger
3+
4+
def unary_loop(channel, input) do
5+
unary_call(channel, input)
6+
end
7+
8+
def unary_call(channel, input) do
9+
payload =
10+
Grpc.Testing.Payload.new(
11+
type: Grpc.Testing.PayloadType.value(:COMPRESSABLE),
12+
body: String.duplicate(<<0>>, input.req_size)
13+
)
14+
15+
req =
16+
Grpc.Testing.SimpleRequest.new(
17+
response_type: payload.type,
18+
response_size: input.resp_size,
19+
payload: payload
20+
)
21+
22+
Logger.debug("Sending rpc #{req}")
23+
Grpc.Testing.BenchmarkService.Stub.unary_call(channel, req)
24+
end
25+
end

benchmark/lib/benchmark/manager.ex

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
defmodule Benchmark.Manager do
2+
def payload_type(nil), do: :protobuf
3+
4+
def payload_type(config) do
5+
case config.payload do
6+
{:bytebuf_params, _} -> :bytebuf
7+
{:simple_params, _} -> :protobuf
8+
{:complex_params, _} -> :complex
9+
_ -> raise GRPC.RPCError, status: :invalid_argument
10+
end
11+
end
12+
13+
def get_cores() do
14+
:erlang.system_info(:schedulers_online)
15+
end
16+
17+
def set_cores(core_limit) do
18+
if core_limit > 0 do
19+
:erlang.system_flag(:schedulers_online, core_limit)
20+
core_limit
21+
else
22+
get_cores()
23+
end
24+
end
25+
end
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
defmodule Benchmark.ServerManager do
2+
def start_server(%Grpc.Testing.ServerConfig{} = config) do
3+
# get security
4+
payload_type = Benchmark.Manager.payload_type(config.payload_config)
5+
start_server(payload_type, config)
6+
end
7+
8+
def start_server(:protobuf, config) do
9+
{:ok, pid, port} = GRPC.Server.start(Grpc.Testing.BenchmarkService.Server, config.port)
10+
%{port: port, pid: pid}
11+
end
12+
13+
def start_server(_, _), do: raise(GRPC.RPCError, status: :unimplemented)
14+
end
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
defmodule Grpc.Testing.WorkerService.Server do
2+
use GRPC.Server, service: Grpc.Testing.WorkerService.Service
3+
alias GRPC.Server
4+
5+
alias Benchmark.Manager
6+
alias Benchmark.ServerManager
7+
alias Benchmark.ClientManager
8+
9+
require Logger
10+
11+
def run_server(args_enum, stream) do
12+
Enum.each(args_enum, fn args ->
13+
Logger.debug("Got args:")
14+
Logger.debug(inspect(args))
15+
16+
status =
17+
case args.argtype do
18+
{:setup, config} ->
19+
cores = Manager.set_cores(config.core_limit)
20+
server = ServerManager.start_server(config)
21+
Logger.debug("Started server: #{inspect(server)}")
22+
Grpc.Testing.ServerStatus.new(stats: nil, port: server[:port], cores: cores)
23+
24+
{:mark, mark} ->
25+
stats = get_stats(mark.reset)
26+
Grpc.Testing.ServerStatus.new(stats: stats)
27+
end
28+
29+
Server.send_reply(stream, status)
30+
end)
31+
end
32+
33+
def run_client(args_enum, stream) do
34+
Enum.each(args_enum, fn args ->
35+
Logger.debug("Got args:")
36+
Logger.debug(inspect(args))
37+
38+
status =
39+
case args.argtype do
40+
{:setup, client_config} ->
41+
ClientManager.start_client(client_config)
42+
Grpc.Testing.ClientStatus.new()
43+
44+
{:mark, mark} ->
45+
stats = get_stats(mark.reset)
46+
Grpc.Testing.ClientStatus.new(stats: stats)
47+
end
48+
49+
Server.send_reply(stream, status)
50+
end)
51+
end
52+
53+
def core_count(_, _) do
54+
Grpc.Testing.CoreResponse.new(cores: Manager.get_cores())
55+
end
56+
57+
defp get_stats(_reset) do
58+
end
59+
end

0 commit comments

Comments
 (0)