diff --git a/Project.toml b/Project.toml index e955313..1f488c7 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "ClusterManagers" uuid = "34f1f09b-3a8b-5176-ab39-66d58a4d544e" -version = "1.1.0" +version = "2.0.0" [deps] Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" diff --git a/README.md b/README.md index 78c5b42..504b37c 100755 --- a/README.md +++ b/README.md @@ -72,34 +72,7 @@ spread across CPU sockets. Default is `BALANCED`. ### Using `ElasticManager` (dynamically adding workers to a cluster) -The `ElasticManager` is useful in scenarios where we want to dynamically add workers to a cluster. -It achieves this by listening on a known port on the master. The launched workers connect to this -port and publish their own host/port information for other workers to connect to. - -On the master, you need to instantiate an instance of `ElasticManager`. The constructors defined are: - -```julia -ElasticManager(;addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all, printing_kwargs=()) -ElasticManager(port) = ElasticManager(;port=port) -ElasticManager(addr, port) = ElasticManager(;addr=addr, port=port) -ElasticManager(addr, port, cookie) = ElasticManager(;addr=addr, port=port, cookie=cookie) -``` - -You can set `addr=:auto` to automatically use the host's private IP address on the local network, which will allow other workers on this network to connect. You can also use `port=0` to let the OS choose a random free port for you (some systems may not support this). Once created, printing the `ElasticManager` object prints the command which you can run on workers to connect them to the master, e.g.: - -```julia -julia> em = ElasticManager(addr=:auto, port=0) -ElasticManager: - Active workers : [] - Number of workers to be added : 0 - Terminated workers : [] - Worker connect command : - /home/user/bin/julia --project=/home/user/myproject/Project.toml -e 'using ClusterManagers; ClusterManagers.elastic_worker("4cOSyaYpgSl6BC0C","127.0.1.1",36275)' -``` - -By default, the printed command uses the absolute path to the current Julia executable and activates the same project as the current session. You can change either of these defaults by passing `printing_kwargs=(absolute_exename=false, same_project=false))` to the first form of the `ElasticManager` constructor. - -Once workers are connected, you can print the `em` object again to see them added to the list of active workers. +For `ElasticManager`, please see the [ElasticClusterManager.jl](https://github.com/JuliaParallel/ElasticClusterManager.jl) package. ### Sun Grid Engine (SGE) diff --git a/src/ClusterManagers.jl b/src/ClusterManagers.jl index ce91285..b03f3da 100755 --- a/src/ClusterManagers.jl +++ b/src/ClusterManagers.jl @@ -18,6 +18,5 @@ include("scyld.jl") include("condor.jl") include("slurm.jl") include("affinity.jl") -include("elastic.jl") end diff --git a/src/elastic.jl b/src/elastic.jl deleted file mode 100644 index eb865e6..0000000 --- a/src/elastic.jl +++ /dev/null @@ -1,156 +0,0 @@ -# The master process listens on a well-known port -# Launched workers connect to the master and redirect their STDOUTs to the same -# Workers can join and leave the cluster on demand. - -export ElasticManager, elastic_worker - -const HDR_COOKIE_LEN = Distributed.HDR_COOKIE_LEN - -struct ElasticManager <: ClusterManager - active::Dict{Int, WorkerConfig} # active workers - pending::Channel{TCPSocket} # to be added workers - terminated::Set{Int} # terminated worker ids - topology::Symbol - sockname - printing_kwargs - - function ElasticManager(;addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all, printing_kwargs=()) - Distributed.init_multi() - cookie !== nothing && cluster_cookie(cookie) - - # Automatically check for the IP address of the local machine - if addr == :auto - try - addr = Sockets.getipaddr(IPv4) - catch - error("Failed to automatically get host's IP address. Please specify `addr=` explicitly.") - end - end - - l_sock = listen(addr, port) - - lman = new(Dict{Int, WorkerConfig}(), Channel{TCPSocket}(typemax(Int)), Set{Int}(), topology, getsockname(l_sock), printing_kwargs) - - @async begin - while true - let s = accept(l_sock) - @async process_worker_conn(lman, s) - end - end - end - - @async process_pending_connections(lman) - - lman - end -end - -ElasticManager(port) = ElasticManager(;port=port) -ElasticManager(addr, port) = ElasticManager(;addr=addr, port=port) -ElasticManager(addr, port, cookie) = ElasticManager(;addr=addr, port=port, cookie=cookie) - - -function process_worker_conn(mgr::ElasticManager, s::TCPSocket) - # Socket is the worker's STDOUT - wc = WorkerConfig() - wc.io = s - - # Validate cookie - cookie = read(s, HDR_COOKIE_LEN) - if length(cookie) < HDR_COOKIE_LEN - error("Cookie read failed. Connection closed by peer.") - end - self_cookie = cluster_cookie() - for i in 1:HDR_COOKIE_LEN - if UInt8(self_cookie[i]) != cookie[i] - println(i, " ", self_cookie[i], " ", cookie[i]) - error("Invalid cookie sent by remote worker.") - end - end - - put!(mgr.pending, s) -end - -function process_pending_connections(mgr::ElasticManager) - while true - wait(mgr.pending) - try - addprocs(mgr; topology=mgr.topology) - catch e - showerror(stderr, e) - Base.show_backtrace(stderr, Base.catch_backtrace()) - end - end -end - -function launch(mgr::ElasticManager, params::Dict, launched::Array, c::Condition) - # The workers have already been started. - while isready(mgr.pending) - wc=WorkerConfig() - wc.io = take!(mgr.pending) - push!(launched, wc) - end - - notify(c) -end - -function manage(mgr::ElasticManager, id::Integer, config::WorkerConfig, op::Symbol) - if op == :register - mgr.active[id] = config - elseif op == :deregister - delete!(mgr.active, id) - push!(mgr.terminated, id) - end -end - -function Base.show(io::IO, mgr::ElasticManager) - iob = IOBuffer() - - println(iob, "ElasticManager:") - print(iob, " Active workers : [ ") - for id in sort(collect(keys(mgr.active))) - print(iob, id, ",") - end - seek(iob, position(iob)-1) - println(iob, "]") - - println(iob, " Number of workers to be added : ", Base.n_avail(mgr.pending)) - - print(iob, " Terminated workers : [ ") - for id in sort(collect(mgr.terminated)) - print(iob, id, ",") - end - seek(iob, position(iob)-1) - println(iob, "]") - - println(iob, " Worker connect command : ") - print(iob, " ", get_connect_cmd(mgr; mgr.printing_kwargs...)) - - print(io, String(take!(iob))) -end - -# Does not return. If executing from a REPL try -# @async connect_to_cluster(.....) -# addr, port that a ElasticManager on the master processes is listening on. -function elastic_worker(cookie, addr="127.0.0.1", port=9009; stdout_to_master=true) - c = connect(addr, port) - write(c, rpad(cookie, HDR_COOKIE_LEN)[1:HDR_COOKIE_LEN]) - stdout_to_master && redirect_stdout(c) - start_worker(c, cookie) -end - -function get_connect_cmd(em::ElasticManager; absolute_exename=true, same_project=true) - - ip = string(em.sockname[1]) - port = convert(Int,em.sockname[2]) - cookie = cluster_cookie() - exename = absolute_exename ? joinpath(Sys.BINDIR, Base.julia_exename()) : "julia" - project = same_project ? ("--project=$(Pkg.API.Context().env.project_file)",) : () - - join([ - exename, - project..., - "-e 'using ClusterManagers; ClusterManagers.elastic_worker(\"$cookie\",\"$ip\",$port)'" - ]," ") - -end diff --git a/test/elastic.jl b/test/elastic.jl deleted file mode 100644 index faf2a80..0000000 --- a/test/elastic.jl +++ /dev/null @@ -1,25 +0,0 @@ -@testset "ElasticManager" begin - TIMEOUT = 10. - - em = ElasticManager(addr=:auto, port=0) - - # launch worker - run(`sh -c $(ClusterManagers.get_connect_cmd(em))`, wait=false) - - # wait at most TIMEOUT seconds for it to connect - @test :ok == timedwait(TIMEOUT) do - length(em.active) == 1 - end - - wait(rmprocs(workers())) - - @testset "show(io, ::ElasticManager)" begin - str = sprint(show, em) - lines = strip.(split(strip(str), '\n')) - @test lines[1] == "ElasticManager:" - @test lines[2] == "Active workers : []" - @test lines[3] == "Number of workers to be added : 0" - @test lines[4] == "Terminated workers : [ 2]" - @test lines[5] == "Worker connect command :" - end -end diff --git a/test/runtests.jl b/test/runtests.jl index 57e3bae..334b825 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -9,8 +9,6 @@ using Distributed: workers, nworkers using Distributed: procs, nprocs using Distributed: remotecall_fetch, @spawnat using Test: @testset, @test, @test_skip -# ElasticManager: -using ClusterManagers: ElasticManager # Slurm: using ClusterManagers: addprocs_slurm, SlurmManager # SGE: @@ -24,8 +22,6 @@ slurm_is_installed() = !isnothing(Sys.which("sbatch")) qsub_is_installed() = !isnothing(Sys.which("qsub")) @testset "ClusterManagers.jl" begin - include("elastic.jl") - if slurm_is_installed() @info "Running the Slurm tests..." Sys.which("sbatch") include("slurm.jl")