Commit 59f0070f authored by Jason Rhinelander's avatar Jason Rhinelander
Browse files

Add CONNDELAY feature/option, with default 0.25

If a host supports many simultaneous jobs (e.g. a 20-core system),
establishing all 20 connections simultaneously is a bad idea; this adds
a minimum delay (defaulting to 0.25) between connections to the same
host.
parent 86e47194
......@@ -37,7 +37,8 @@ template <class CharT> std::string put_time(const std::tm *tmb, const CharT *fmt
#define RE_HOSTNAME "[a-zA-Z0-9][a-zA-Z0-9.-]*"
#define RE_HOST "(?:" RE_USERNAME "@)?" RE_HOSTNAME "(?:=\\d+)?"
#define RE_HOSTLIST "\\(?\\s*((?:" RE_HOST "(?=\\s|\\)|$)\\s*?)*)(?:\\s*\\))?\\s*(?:#.*)?"
const std::regex numeric_re{R"(\d+)"},
const std::regex integer_re{R"(\d+)"}, // NB: unsigned
float_re{R"(\d*\.?\d+(?:[eE][-+]?\d+)?)"}, // NB: unsigned
empty_or_comment_re{R"(\s*(?:#.*)?)"},
done_re{R"(\s*#\s*PARALLEL-RUNNER-DONE\s*)"},
hosts_re{"HOSTS([+])?=" RE_HOSTLIST},
......@@ -46,6 +47,7 @@ const std::regex numeric_re{R"(\d+)"},
per_host_re{R"(PER_HOST=(\d+)\s*(?:#.*)?)"},
file_re{"FILE=(.*)"},
first_jobno_re{"FIRST=(\\d+)"},
conn_delay_re{"CONNDELAY=(\\d*\\.?\\d+)"},
help_re{"HELP|--help|-h|-?", std::regex::icase};
std::string log_dir;
......@@ -56,6 +58,7 @@ unsigned long perhost_default = 4;
std::vector<std::pair<std::string, std::string>> hosts; // (host,username) pairs (username can be empty)
unsigned long host_strlen = 0, hostthr_strlen = 0, job_strlen = 0;
std::unordered_map<std::string, unsigned long> host_max_jobs;
double conn_delay = 0.25;
constexpr auto date_format = "%e %B %Y %Z";
constexpr auto time_format = "%r";
constexpr auto datetime_format = "%e %B %Y %r %Z";
......@@ -65,6 +68,7 @@ std::set<pid_t> ssh_pids;
double total_job_time{0};
std::unordered_map<std::string, std::tuple<long, long, double>> host_stats; // #success, #failure, cumul. time
std::time_t last_clock_check;
std::unordered_map<std::string, std::chrono::high_resolution_clock::time_point> host_next_conn;
std::mutex thread_list_mutex; // Protects:
std::list<std::thread> thread_list;
......@@ -140,6 +144,18 @@ This specifies the default maximum jobs per host (for any hosts without an
individual '=j' specifier).
CONNDELAY=s
This specifies the minimum time (in seconds, which can be fractional) between
initializing parallel connections to the same host, to avoid too many
simultaneous connections to the host at once. The default is 0.25, i.e. don't
initiate a new connection until at least a quarter of a second after the last
connection was initiated. Since long-running processes don't typically end at
the same time, this typically matters only during startup to prevent a sudden
flood of connections (which particular matters on hosts with large PER_HOST
values).
FIRST=n
Takes an integer, n, which will be the job number of the first job (available to
......@@ -228,7 +244,7 @@ void parse_host_string(std::string::const_iterator begin, std::string::const_ite
}
}
void parse_parallel_hosts(std::string filename, bool parse_hosts, bool parse_perhost, bool parse_first) {
void parse_parallel_hosts(std::string filename, bool parse_hosts, bool parse_perhost, bool parse_first, bool parse_delay) {
std::ifstream f(filename);
if (not f.is_open())
throw std::runtime_error("Unable to open " + filename + ": " + strerror(errno));
......@@ -246,19 +262,16 @@ void parse_parallel_hosts(std::string filename, bool parse_hosts, bool parse_per
// Skip this line
}
else if (std::regex_match(line, m, hosts_re)) {
if (parse_hosts) {
parse_host_string(m[2].first, m[2].second);
}
if (parse_hosts) parse_host_string(m[2].first, m[2].second);
}
else if (std::regex_match(line, m, per_host_re)) {
if (parse_perhost) {
perhost_default = std::stoul(m[1]);
}
if (parse_perhost) perhost_default = std::stoul(m[1]);
}
else if (std::regex_match(line, m, first_jobno_re)) {
if (parse_first) {
jobno = std::stol(m[1]);
}
if (parse_first) jobno = std::stol(m[1]);
}
else if (std::regex_match(line, m, conn_delay_re)) {
if (parse_delay) conn_delay = std::stod(m[1]);
}
else {
std::cerr << "Error: invalid line in " << filename << ", line " << lineno << "\n";
......@@ -334,13 +347,32 @@ void thread_runner(std::promise<void> started, const std::pair<std::string, std:
pid_t child;
int open_pipe = -1;
{
std::lock_guard<std::mutex> lock(thr_mutex);
check_date_change();
std::unique_lock<std::mutex> lock(thr_mutex);
started = std::chrono::high_resolution_clock::now();
if (host_next_conn.count(hostname) > 0) {
auto until = host_next_conn[hostname];
// If we can't start yet (because some other thread started too recently) then
// release the lock and sleep until the appropriate time. Do this repeatedly,
// because when we wake up, some other thread might have already started and pushed
// back the earliest start time.
while (started < until) {
lock.unlock();
std::this_thread::sleep_until(until);
lock.lock();
started = std::chrono::high_resolution_clock::now();
until = host_next_conn[hostname];
}
}
check_date_change();
std::time_t t = std::chrono::system_clock::to_time_t(started);
std::cout << "Starting job " << std::setw(job_strlen) << myjobno <<
" on " << std::setw(hostthr_strlen) << std::left << hostthr << std::right <<
" at " << std::put_time(std::localtime(&t), time_format) << std::endl;
if (conn_delay > 0) {
host_next_conn[hostname] = started + std::chrono::nanoseconds(std::llround(conn_delay * 1e9));
}
jobs_active++;
if (jobs_active > max_jobs_active) max_jobs_active = jobs_active;
......@@ -545,7 +577,8 @@ int main(int argc, char *argv[]) {
// Remove any leading arguments that look like 'HOSTS=...', 'PER_HOST=j', 'FILE=...' or 'N'
// HOSTS= can be repeated; the others can occur only once.
bool host_arg = false, perhost_arg = false, jobs_arg = false, file_arg = false, jobno_arg = false, help = false;
bool host_arg = false, perhost_arg = false, jobs_arg = false, file_arg = false,
jobno_arg = false, delay_arg = false, help = false;
std::string par_host_file;
while (not args.empty()) {
std::smatch m;
......@@ -559,7 +592,7 @@ int main(int argc, char *argv[]) {
perhost_arg = true;
args.pop_front();
}
else if (not jobs_arg and std::regex_match(args.front(), numeric_re)) {
else if (not jobs_arg and std::regex_match(args.front(), integer_re)) {
total_jobs = std::stol(args.front());
jobs_arg = true;
args.pop_front();
......@@ -574,12 +607,17 @@ int main(int argc, char *argv[]) {
jobno_arg = true;
args.pop_front();
}
else if (not delay_arg and std::regex_match(args.front(), m, conn_delay_re)) {
conn_delay = std::stod(m[1]);
delay_arg = true;
args.pop_front();
}
else if (std::regex_match(args.front(), help_re)) {
help = true;
args.pop_front();
}
else {
// Not a host, perhost, or job number argument: stop looking.
// Not a valid argument, so must be the start of a command: stop looking.
break;
}
}
......@@ -597,7 +635,7 @@ int main(int argc, char *argv[]) {
}
if (not perhost_arg and (envval = std::getenv("PARALLEL_PER_HOST"))) {
std::smatch m;
if (std::regex_match(envval, std::regex("\\d+"))) {
if (std::regex_match(envval, integer_re)) {
perhost_default = std::stoul(envval);
perhost_arg = true;
}
......@@ -608,17 +646,25 @@ int main(int argc, char *argv[]) {
file_arg = true;
}
if (not jobno_arg and (envval = std::getenv("PARALLEL_FIRST"))) {
if (std::regex_match(envval, std::regex("\\d+"))) {
if (std::regex_match(envval, integer_re)) {
jobno = std::stol(envval);
jobno_arg = true;
}
else std::cerr << "Warning: PARALLEL_FIRST set but ignored (invalid value)\n";
}
if (not delay_arg and (envval = std::getenv("PARALLEL_CONNDELAY"))) {
if (std::regex_match(envval, float_re)) {
conn_delay = std::stod(envval);
delay_arg = true;
}
else std::cerr << "Warning: PARALLEL_CONNDELAY set but ignored (invalid value)\n";
}
if (help) exit_with_usage(argv[0], "", true);
if (not jobs_arg) exit_with_usage(argv[0], args.empty() ? "No arguments specified!" : "Invalid arguments: invalid options (could not find N, the number of jobs to run)");
if (total_jobs <= 0) exit_with_usage(argv[0], "Invalid N value: " + std::to_string(total_jobs));
if (total_jobs <= 0) exit_with_usage(argv[0], "Invalid N (number of jobs) value: " + std::to_string(total_jobs));
if (perhost_arg and perhost_default <= 0) exit_with_usage(argv[0], "Invalid PER_HOST value: " + std::to_string(perhost_default));
if (delay_arg and not (std::isfinite(delay_arg) and delay_arg >= 0)) exit_with_usage(argv[0], "Invalid CONNDELAY value: " + std::to_string(conn_delay));
if (host_arg and hosts.empty()) exit_with_usage(argv[0], "Invalid HOSTS value: no hosts specified");
if (args.empty()) exit_with_usage(argv[0], "Invalid arguments: CMD is required");
......@@ -626,7 +672,7 @@ int main(int argc, char *argv[]) {
if (file_arg) {
if (not par_host_file.empty()) { // Allow FILE= to explicitly suppress parallel.hosts reading
std::cout << "Loading " << par_host_file << "\n";
try { parse_parallel_hosts(par_host_file, not host_arg, not perhost_arg, not jobno_arg); }
try { parse_parallel_hosts(par_host_file, not host_arg, not perhost_arg, not jobno_arg, not delay_arg); }
catch (const std::exception &e) {
std::cerr << "Unable to read " << par_host_file << ": " << e.what() << "\n";
std::exit(10);
......@@ -651,7 +697,7 @@ int main(int argc, char *argv[]) {
if (stat(loc.c_str(), &statbuf) == 0) {
std::cout << "Loading " << loc << "\n";
try {
parse_parallel_hosts(loc, not host_arg, not perhost_arg, not jobno_arg);
parse_parallel_hosts(loc, not host_arg, not perhost_arg, not jobno_arg, not delay_arg);
}
catch (const std::exception &e) {
std::cerr << "Unable to read " << loc << ": " << e.what() << "\n";
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment