Commit 86e47194 authored by Jason Rhinelander's avatar Jason Rhinelander
Browse files

Added FIRST; --help; environment var options

FIRST lets you control the first PARALLEL_JOBNO (instead of always
starting from 1).  This is useful when you do something like
'$((PARALLEL_JOBNO % 10))' to select one of 10 different cases in the
job, so that you can restart exactly where you left off, instead of
always having to restart at case '1'.

Added --help argument (also HELP, -h, -?) to display full help.  Bad/no
arguments now displays a short usage message without the help details.

Allowed HOSTS, PER_HOST, and FIRST to be set through environment
variables, prefixed with PARALLEL_, e.g.

    $ export PARALLEL_FIRST=27
    $ parallel-runner 10 'echo hi'

now does the same thing as:

    $ parallel-runner FIRST=27 10 'echo hi'

Command line arguments have highest precedence, then env vars, then
file vars.
parent d217c2e4
Pipeline #108 skipped
......@@ -36,17 +36,21 @@ template <class CharT> std::string put_time(const std::tm *tmb, const CharT *fmt
#define RE_USERNAME "[a-zA-Z0-9._][a-zA-Z0-9._-]*"
#define RE_HOSTNAME "[a-zA-Z0-9][a-zA-Z0-9.-]*"
#define RE_HOST "(?:" RE_USERNAME "@)?" RE_HOSTNAME "(?:=\\d+)?"
#define RE_HOSTLIST "\\(?\\s*((?:" RE_HOST "(?=\\s|\\)|$)\\s*?)*)(?:\\s*\\))?\\s*(?:#.*)?"
const std::regex numeric_re{R"(\d+)"},
empty_or_comment_re{R"(\s*(?:#.*)?)"},
done_re{R"(\s*#\s*PARALLEL-RUNNER-DONE\s*)"},
hosts_re{"HOSTS([+])?=\\(?\\s*((?:" RE_HOST "(?=\\s|\\)|$)\\s*?)*)(?:\\s*\\))?\\s*(?:#.*)?"},
hosts_re{"HOSTS([+])?=" RE_HOSTLIST},
envhosts_re{RE_HOSTLIST},
one_host_re{"\\s*(?:(" RE_USERNAME ")@)?(" RE_HOSTNAME ")(?:=(\\d+))?\\s*"},
per_host_re{R"(PER_HOST=(\d+)\s*(?:#.*)?)"},
file_re{R"(FILE=(.*))"};
file_re{"FILE=(.*)"},
first_jobno_re{"FIRST=(\\d+)"},
help_re{"HELP|--help|-h|-?", std::regex::icase};
std::string log_dir;
std::string remote_cmd;
std::atomic<long> jobno{0};
std::atomic<long> jobno{1};
std::atomic<bool> abort_jobs{false}, kill_jobs{false};
unsigned long perhost_default = 4;
std::vector<std::pair<std::string, std::string>> hosts; // (host,username) pairs (username can be empty)
......@@ -56,7 +60,7 @@ constexpr auto date_format = "%e %B %Y %Z";
constexpr auto time_format = "%r";
constexpr auto datetime_format = "%e %B %Y %r %Z";
std::mutex thr_mutex; // Protectes std::{cout,cerr} and the following:
long jobs_active{0}, max_jobs_active{0}, total_jobs{0}, successes{0}, failures{0};
long jobs_active{0}, max_jobs_active{0}, total_jobs{0}, last_jobno{0}, successes{0}, failures{0};
std::set<pid_t> ssh_pids;
double total_job_time{0};
std::unordered_map<std::string, std::tuple<long, long, double>> host_stats; // #success, #failure, cumul. time
......@@ -66,10 +70,15 @@ std::mutex thread_list_mutex; // Protects:
std::list<std::thread> thread_list;
// Prints an error message and usage, then exits.
[[noreturn]] void exit_with_usage(std::string progname, std::string error) {
if (error.length() > 0) std::cerr << error << "\n" << std::endl;
std::cout << "Usage: " << progname << R"USAGE( [FILE=/path/to/parallel.hosts] [HOSTS='host1=j host2 ...'] [PER_HOST=J] N CMD
[[noreturn]] void exit_with_usage(std::string progname, std::string error, bool full_usage = false) {
if (error.length() > 0) std::cerr << error << "\n\n" << std::endl;
std::cout << "Usage: " << progname << " [FILE=/path/to/parallel.hosts] [HOSTS='host1=j host2 ...'] [PER_HOST=J] [FIRST=n] N CMD\n";
if (not full_usage) {
std::cout << "Run with --help to see full usage information.\n\n";
}
else {
std::cout << R"USAGE(
Runs CMD N times on remote machines.
The command CMD (including any arguments) is executed via ssh on multiple hosts
......@@ -97,6 +106,7 @@ all initiated jobs.
Program Arguments:
==================
HOSTS='host1 host2.example.com=6 username@host3 username@host4=2'
......@@ -130,6 +140,13 @@ This specifies the default maximum jobs per host (for any hosts without an
individual '=j' specifier).
FIRST=n
Takes an integer, n, which will be the job number of the first job (available to
remote tasks as $PARALLEL_JOBNO). The default, when unspecified, is 1. All
subsequent jobs are incremented from this number.
FILE=/path/to/parallel.hosts
If one or both of the HOSTS= and PER_HOST= arguments are omitted, the HOSTS and
......@@ -166,13 +183,22 @@ command-line. In other words, parallel-runner PER_HOST=2 'some command' with a
parallel.hosts file containing 'HOSTS=(hosta=1 hostb=2 hostc=3)' will run one
job at a time on hosta and two jobs at a time on both hostb and hostc.
Environment variables:
======================
The above arguments can also be provided as environment variables prefixed with
"PARALLEL_", e.g. PARALLEL_PER_HOST=2 or PARALLEL_FIRST=27. Command line
arguments take precedence over environment variables, which take precedent over
file values.
)USAGE";
}
std::exit(1);
}
// Parses a regex sub_match for a hosts line such as 'HOSTS=a b=2 c' or 'HOSTS=(a b=2 c)' or 'HOSTS=(a b=2 c) # comment'
void parse_host_string(const std::smatch &host_match) {
auto begin = host_match[2].first, end = host_match[2].second;
void parse_host_string(std::string::const_iterator begin, std::string::const_iterator end) {
std::smatch sm;
while (begin != end) {
if (std::regex_search(begin, end, sm, one_host_re, std::regex_constants::match_continuous)) {
......@@ -202,7 +228,7 @@ void parse_host_string(const std::smatch &host_match) {
}
}
void parse_parallel_hosts(std::string filename, bool parse_hosts, bool parse_perhost) {
void parse_parallel_hosts(std::string filename, bool parse_hosts, bool parse_perhost, bool parse_first) {
std::ifstream f(filename);
if (not f.is_open())
throw std::runtime_error("Unable to open " + filename + ": " + strerror(errno));
......@@ -221,7 +247,7 @@ void parse_parallel_hosts(std::string filename, bool parse_hosts, bool parse_per
}
else if (std::regex_match(line, m, hosts_re)) {
if (parse_hosts) {
parse_host_string(m);
parse_host_string(m[2].first, m[2].second);
}
}
else if (std::regex_match(line, m, per_host_re)) {
......@@ -229,6 +255,11 @@ void parse_parallel_hosts(std::string filename, bool parse_hosts, bool parse_per
perhost_default = std::stoul(m[1]);
}
}
else if (std::regex_match(line, m, first_jobno_re)) {
if (parse_first) {
jobno = std::stol(m[1]);
}
}
else {
std::cerr << "Error: invalid line in " << filename << ", line " << lineno << "\n";
std::exit(2);
......@@ -289,7 +320,7 @@ void thread_runner(std::promise<void> started, const std::pair<std::string, std:
FILE *LOGFILE = nullptr;
sigset_t sigint; sigaddset(&sigint, SIGINT);
while (not abort_jobs and (myjobno = ++jobno) <= total_jobs) {
while (not abort_jobs and (myjobno = jobno++) <= last_jobno) {
if (!LOGFILE) {
std::string log_file = log_dir + "/" + hostname + "-" + std::to_string(threadnum) + ".log";
......@@ -514,12 +545,12 @@ int main(int argc, char *argv[]) {
// Remove any leading arguments that look like 'HOSTS=...', 'PER_HOST=j', 'FILE=...' or 'N'
// HOSTS= can be repeated; the others can occur only once.
bool host_arg = false, perhost_arg = false, jobs_arg = false, file_arg = false;
bool host_arg = false, perhost_arg = false, jobs_arg = false, file_arg = false, jobno_arg = false, help = false;
std::string par_host_file;
while (not args.empty()) {
std::smatch m;
if (std::regex_match(args.front(), m, hosts_re)) {
parse_host_string(m);
parse_host_string(m[2].first, m[2].second);
if (!host_arg and not m[1].matched) host_arg = true; // Don't count the host arg if 'HOSTS+=' was used
args.pop_front();
}
......@@ -538,23 +569,64 @@ int main(int argc, char *argv[]) {
par_host_file = m[1];
args.pop_front();
}
else if (not jobno_arg and std::regex_match(args.front(), m, first_jobno_re)) {
jobno = std::stol(m[1]);
jobno_arg = true;
args.pop_front();
}
else if (std::regex_match(args.front(), help_re)) {
help = true;
args.pop_front();
}
else {
// Not a host, perhost, or job number argument: stop looking.
break;
}
}
if (not jobs_arg) exit_with_usage(argv[0], "Invalid arguments: invalid options (could not find N, the number of jobs to run)");
// Check env variables
const char *envval;
if (not host_arg and (envval = std::getenv("PARALLEL_HOSTS"))) {
std::string hosts = envval;
std::smatch m;
if (std::regex_match(hosts, m, envhosts_re)) {
parse_host_string(m[1].first, m[1].second);
host_arg = true;
}
else std::cerr << "Warning: PARALLEL_HOSTS set but ignored (invalid contents)\n";
}
if (not perhost_arg and (envval = std::getenv("PARALLEL_PER_HOST"))) {
std::smatch m;
if (std::regex_match(envval, std::regex("\\d+"))) {
perhost_default = std::stoul(envval);
perhost_arg = true;
}
else std::cerr << "Warning: PARALLEL_PER_HOST set but ignored (invalid value)\n";
}
if (not file_arg and (envval = std::getenv("PARALLEL_FILE"))) {
par_host_file = envval;
file_arg = true;
}
if (not jobno_arg and (envval = std::getenv("PARALLEL_FIRST"))) {
if (std::regex_match(envval, std::regex("\\d+"))) {
jobno = std::stol(envval);
jobno_arg = true;
}
else std::cerr << "Warning: PARALLEL_FIRST set but ignored (invalid value)\n";
}
if (help) exit_with_usage(argv[0], "", true);
if (not jobs_arg) exit_with_usage(argv[0], args.empty() ? "No arguments specified!" : "Invalid arguments: invalid options (could not find N, the number of jobs to run)");
if (total_jobs <= 0) exit_with_usage(argv[0], "Invalid N value: " + std::to_string(total_jobs));
if (perhost_arg and perhost_default <= 0) exit_with_usage(argv[0], "Invalid PER_HOST value: " + std::to_string(perhost_default));
if (host_arg and hosts.empty()) exit_with_usage(argv[0], "Invalid HOSTS value: no hosts specified");
if (args.empty()) exit_with_usage(argv[0], "Invalid arguments: CMD is required");
if (not host_arg or not perhost_arg) {
if (not host_arg or not perhost_arg or not jobno_arg) {
if (file_arg) {
if (not par_host_file.empty()) { // Allow FILE= to explicitly suppress parallel.hosts reading
std::cout << "Loading " << par_host_file << "\n";
try { parse_parallel_hosts(par_host_file, not host_arg, not perhost_arg); }
try { parse_parallel_hosts(par_host_file, not host_arg, not perhost_arg, not jobno_arg); }
catch (const std::exception &e) {
std::cerr << "Unable to read " << par_host_file << ": " << e.what() << "\n";
std::exit(10);
......@@ -579,7 +651,7 @@ int main(int argc, char *argv[]) {
if (stat(loc.c_str(), &statbuf) == 0) {
std::cout << "Loading " << loc << "\n";
try {
parse_parallel_hosts(loc, not host_arg, not perhost_arg);
parse_parallel_hosts(loc, not host_arg, not perhost_arg, not jobno_arg);
}
catch (const std::exception &e) {
std::cerr << "Unable to read " << loc << ": " << e.what() << "\n";
......@@ -645,10 +717,13 @@ int main(int argc, char *argv[]) {
pthread_sigmask(SIG_BLOCK, &set, NULL);
}
// Handle a != 1 first jobno:
last_jobno = total_jobs + jobno - 1;
bool none = false;
for (unsigned j = 0; not none and jobno < total_jobs; j++) {
for (unsigned j = 0; not none and jobno <= last_jobno; j++) {
none = true;
for (unsigned i = 0; i < hosts.size() and jobno < total_jobs; i++) {
for (unsigned i = 0; i < hosts.size() and jobno <= last_jobno; i++) {
if (j < host_jobs(hosts[i].first, perhost_arg and not host_arg)) {
none = false;
// Use a promise/future to sychronize so that we are guaranteed to run at least
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment