-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Description
- when run in parallel, the generation system for random csets often lands in a non-converging 'loop' in which one cset after the next doesn´t converge or only does so very slowly. The exact cause is currently unknown.
debug code to generate the plot below:
################################################################################
config = nothing
configpath = nothing
num_workers = 1
num_threads = 1
num_blas_threads = 1
chunksize = 1
args = ARGS
if length(args) == 0
@warn "No command line arguments provided. Using default configuration."
end
for (i, arg) in enumerate(args)
if arg == "--config"
if i + 1 <= length(args)
global configpath = args[i+1]
else
println("Error: --config requires a file path argument.")
exit(1)
end
end
if arg == "--num_workers"
if i + 1 <= length(args)
global num_workers = parse(Int64, args[i+1])
else
print("Error: --num_workers requires an integer argument")
exit(1)
end
end
if arg == "--num_threads"
if i + 1 <= length(args)
global num_threads = parse(Int64, args[i+1])
else
print("Error: --num_threads requires an integer argument")
exit(1)
end
end
if arg == "--num_blas_threads"
if i + 1 <= length(args)
global num_blas_threads = parse(Int64, args[i+1])
else
print("Error: --num_blas_threads requires an integer argument")
exit(1)
end
end
if arg == "--chunksize"
if i + 1 <= length(args)
global chunksize = parse(Int64, args[i+1])
else
print("Error: --chunksize requires an integer argument")
exit(1)
end
end
if arg == "--help" || arg == "-h"
println("Usage: julia create_data.jl [--config <path_to_config_file>]")
println("Options:")
println(" --config <path_to_config_file> Path to the configuration file.")
println(" --num_processes <int> number of processes to run on.")
println(" --num_threads <int> number of threads **per process**")
println(
" --num_blas_threads <int> number of threads the blas library should use. This is independent of --num_threads!",
)
println(
" --chunksize <int> number of csets to generate and write in one go. Setting this to something smaller than the total number of csets to be generated will result in the dataset being generated in chunks so that it doesn't have to be kept in memory all at once.",
)
println(" --help, -h Show this help message.")
exit(0)
end
end
@info "Starting data generation process with config path: $configpath on $(num_workers) workers, each with $(num_threads) threads and $(num_blas_threads) with chunksize $chunksize"
################################################################################
# make a new environment for the data generation and load the necessary packages and dependencies
# this is done to keep the dependencies separate
using Pkg
Pkg.activate(@__DIR__) # Activate environment in the current directory # activate a new data generation environment
Pkg.update()
Pkg.instantiate() # Install dependencies from Project.toml
################################################################################
using Distributed
addprocs(num_workers; exeflags="--threads=$(num_threads)") # add 6 worker processes to the current Julia session
@everywhere import QuantumGrav as QG
@everywhere import CausalSets as CS
@everywhere import YAML
@everywhere import Random
@everywhere import Zarr
@everywhere using ProgressMeter
@everywhere import Statistics
@everywhere import LinearAlgebra
@everywhere import Distributions
@everywhere import SparseArrays
@everywhere import StatsBase
@everywhere import JSON
@everywhere nbt = $num_blas_threads
################################################################################
@everywhere LinearAlgebra.BLAS.set_num_threads($nbt)
################################################################################
@everywhere encode_csettype = Dict(
"polynomial" => 1,
"layered" => 2,
"random" => 3,
"grid" => 4,
"destroyed" => 5,
"destroyed_ambiguous" => 6,
"merged" => 7,
"merged_ambiguous" => 8,
"complex_topology" => 9,
)
@everywhere function make_cset_data(fm::QG.CsetFactory)::Dict{String,Any}
config = fm.conf
rng = fm.rng
# Generate a unique seed from the worker's RNG to ensure reproducibility.
iter = rand(rng, 1:200)
for i in 1:iter
rand(fm.rng, UInt)
end
sample_seed = rand(fm.rng, UInt)
Random.seed!(sample_seed)
n = rand(rng, fm.npoint_distribution)
@debug " Generating cset data with $n atoms"
cset_type = config["cset_type"]
cset = nothing
counter = 0
while isnothing(cset) && counter < 100
try
cset, _ = fm(cset_type, n, rng)
catch e
@warn "cset generator threw an exception: $(e)"
cset = nothing
end
counter += 1
end
if isnothing(cset)
throw(ErrorException("Couldn't create a working cset"))
end
return Dict()
end
#
defaultconfigpath =
joinpath(dirname(@__DIR__), "configs", "createdata_config_default.yaml")
default_config = YAML.load_file(defaultconfigpath)
if configpath === nothing
config = default_config
else
if isfile(configpath)
loaded_config = YAML.load_file(configpath)
# this replaces the overlapping entries i┌ Error: Error during data generation: RemoteException(4, CapturedException(ErrorException("Failed to generate causet with n=2015 and connectivity_goal=0.9777680150730533 after 100 tries."), Any[(#_#185 at cset_factories.jl:374, 1), (#_#190 at cset_factories.jl:1217, 1), (make_cset_data at create_data.jl:102, 1), (#main##0 at none:? [inlined], 1), (#main##0 at create_data.jl:0, 1), (#52 at ProgressMeter.jl:1068, 1), (#exec_from_cache#151 at workerpool.jl:387, 1), (#handle_msg##4 at process_messages.jl:287, 1), (run_work_thunk at process_messages.jl:70, 1), (#handle_msg##2 at process_messages.jl:287, 1)]))n default config with the one from the file
config = merge(default_config, loaded_config)
@debug "loaded config: $(JSON.json(config, 4))"
make_cset_data
else
@info "Error: Config file not found at $configpath"
exit(1)
end
end
if isfile("debug_log.csv") == false
open("debug_log.csv", "w") do io
write(io, "num_threads,num_processes,sequential,threads,processes,sequential_random,threads_random,processes_random\n")
end
end
# run cset creation. For processes and threads, make a new factory functor for each thread or process.
feature_makers_processes = Dict{Int,QG.CsetFactory}()
random_makers_processes = Dict{Int,QG.RandomCsetMaker}()
rngs_processes = Dict{Int,Random.MersenneTwister}()
config["cset_type"] = "random"
config["csetsize_distr_args"] = [512, 513]
for p in workers()
conf = deepcopy(config)
seed = rand(1:100000000000)
conf["seed"] = seed
feature_makers_processes[p] = QG.CsetFactory(conf)
random_makers_processes[p] = QG.RandomCsetMaker(conf["random"])
rngs_processes[p] = Random.MersenneTwister(seed)
end
feature_makers_threads = Dict{Int,QG.CsetFactory}()
random_makers_threads = Dict{Int,QG.RandomCsetMaker}()
rngs_threads = Dict{Int,Random.MersenneTwister}()
for t in 1:Threads.nthreads()
conf = deepcopy(config)
seed = rand(1:100000000000)
conf["seed"] = seed
feature_makers_threads[t] = QG.CsetFactory(conf)
random_makers_threads[t] = QG.RandomCsetMaker(conf["random"])
rngs_threads[t] = Random.MersenneTwister(seed)
end
rng = Random.MersenneTwister(config["seed"])
random_maker = QG.RandomCsetMaker(config["random"])
feature_maker = QG.CsetFactory(config)
make_cset_data(feature_maker) # warm up
random_maker(512, rng)
# sequential
ts = @elapsed for i in 1:150
data = make_cset_data(feature_maker)
end
tsr = @elapsed for i in 1:150
cset = random_maker(512, rng)
end
# threads
tt = @elapsed Threads.@threads for i in 1:150
data = make_cset_data(feature_makers_threads[Threads.threadid()-1])
end
ttr = @elapsed Threads.@threads for i in 1:150
cset = random_makers_threads[Threads.threadid()-1](512, rngs_threads[Threads.threadid()-1])
end
# processes
tp = @elapsed pmap(1:150) do i
data = make_cset_data(feature_makers_processes[myid()])
end
tpr = @elapsed pmap(1:150) do i
cset = random_makers_processes[myid()](512, rngs_processes[myid()])
end
println("sequential time: $ts seconds")
println("threads time: $tt seconds")
println("processes time: $tp seconds")
println("sequential random time: $tsr seconds")
println("threads random time: $ttr seconds")
println("processes random time: $tpr seconds")
open("debug_log.csv", "a") do io
write(io, "$(num_workers),$(num_threads),$ts,$tt,$tp,$tsr,$ttr,$tpr\n")
end
# ################################################################################
# close the processes and clean up
@info "finished data generation"
rmprocs(Distributed.workers()) # remove all worker processes
@info "removed all worker processes"
which results in the following data when plotting debug_log.csv:
y-axis shows runtime in seconds, xaxis shows experiment index (50 per class).
from this plot, it seems that sequential runs always work fine, but parallel runs no matter if its threads or processes create outliers that take massive amounts of time.
Metadata
Metadata
Assignees
Labels
No labels