Commit ba0984e8 authored by Jason Rhinelander's avatar Jason Rhinelander
Browse files

Yet another attempt at a fix; logging improvements

My previous attempts weren't working: stdout was still occasionally
lost.  This redoes it to not require pseudoterminal allocation at all
by instead using a pipe to send input to the remote process, and using
an extra subprocess that blocks for input (which never comes), and
kills the subprocess if it complete (i.e. because of an EOF).

It seems to work: I haven't lost any output yet.

This also adds some various logging improvements, showing
connections/disconnections in the log as well as exit status (plus a
reinterpretation of bash's 128+n as a N terminal signal).
parent 57a248f3
Pipeline #100 skipped
......@@ -286,11 +286,11 @@ void thread_runner(std::promise<void> started, const std::pair<std::string, std:
if (!LOGFILE) {
throw std::system_error(errno, std::system_category(), "Unable to open log file '" + log_file + "'");
}
setbuf(LOGFILE, nullptr);
}
if (first) { first = false; started.set_value(); }
std::chrono::high_resolution_clock::time_point started;
pid_t child;
int open_pipe = -1;
{
std::lock_guard<std::mutex> lock(thr_mutex);
check_date_change();
......@@ -303,79 +303,126 @@ void thread_runner(std::promise<void> started, const std::pair<std::string, std:
if (jobs_active > max_jobs_active) max_jobs_active = jobs_active;
std::string run =
"export PARALLEL_HOST=\"" + hostname + "\"; "
"export PARALLEL_THREAD=" + std::to_string(threadnum) + "; "
"export PARALLEL_JOBNO=" + std::to_string(myjobno) + "; "
// This is complicated. bash won't propagate a signal to
// running task; we could exec, but that would not allow
// running multiple commands. So what we do is fork off the
// commands in a subprocess, then trap the signals in the
// main process and resend them to both the subprocess and
// its children.
"set +m; " // Disable job control
// This is the subprocess: it *also* needs to trap so that
// it knows to exit when it gets a signal (instead of
// running the next command in a list of commands).
"{ trap exit INT TERM HUP; " + remote_cmd + (remote_cmd.back() == ';' ? "" : ";") + " } & "
"subproc=$!; "
// Set up parent bash signal handlers to kill the child and
// child's children:
"trap \"kill -INT \\$subproc; pkill -INT -P \\$subproc\" INT; "
"trap \"kill -TERM \\$subproc; pkill -TERM -P \\$subproc\" TERM; "
"trap \"kill -HUP \\$subproc; pkill -HUP -P \\$subproc\" HUP; "
"wait"; // Waits for child to finish
"export PARALLEL_HOST=\"" + hostname + "\" "
"PARALLEL_THREAD=" + std::to_string(threadnum) + " "
"PARALLEL_JOBNO=" + std::to_string(myjobno) + ";"
// This is complicated.
//
// We want to be able to kill the remote process. ssh won't propagate a signal
// unless allocating a pseudoterminal (with -tt), but that causes problems
// (sometimes we don't get all the remote program output before the process
// terminates).
//
// The approach below, then, uses three processes:
// - the main process starts the command in a subprocess, starts a listener in a
// second subprocess, then waits for the command subprocess to finish.
// - the command subprocess does its stuff, but also respects a TERM signal: if
// received, it stops executing further commands once the current one ends.
// - the listener tries to read a line from the connection's stdin, which is a
// pipe back to the controlling process. This pipe never actually sends
// anything, but as long as it remains open, the listener remains blocked. When
// it terminates, it kills the command subprocess, and any children of the
// command subprocess, so that a disconnection will result in killing the
// running programs.
//
// So the local connection just keeps the pipe open until ssh exits on its own,
// or until the main thread sends it a SIGTERM (after receiving a second
// Ctrl-C).
//
// Command subprocess:
"(trap exit TERM;" + remote_cmd + ")&"
"p=$!;"
// Listener subprocess:
"{ (read -u 3;ifkill $p;pkill -TERM -P $p)& w=$!; } 3<&0;"
// Waits for the subprocess to finish (and return its exit status).
"wait $p";
const char execfailed[] = "Exec /usr/bin/ssh failed";
std::string host_connect = username.empty() ? hostname : username + "@" + hostname;
if ((child = fork()) == 0) {
fprintf(LOGFILE, "Connecting to %s [thr %d, job %d]\n", hostname.c_str(), threadnum, myjobno);
fflush(LOGFILE);
// Set up a pipe to connect to the remote process stdin. We never send anything on it,
// but when we send a SIGTERM to ssh it gets closed, which causes the remote subprocess
// blocking on stdin to get an EOF, in which case it stops blocking and kills the
// running process.
// (so we can tell it to kill
// itself). The above listens to our stdin for commands, and kills itself if it
// it gets EOF.
int remote_input[2];
pipe(remote_input);
const char execfailed[] = "Exec /usr/bin/ssh failed";
if ((child = fork()) == 0) { // Child
close(remote_input[1]);
dup2(fileno(LOGFILE), STDOUT_FILENO);
dup2(fileno(LOGFILE), STDERR_FILENO);
close(STDIN_FILENO);
// Restore the previously-blocked SIGINT so that ssh can be interrupted
pthread_sigmask(SIG_UNBLOCK, &sigint, NULL);
// -t -t forces a tty allocation, which is needed for ssh to
// propagate signals through to the process on the other end of
// the connection. -n and PasswordAuthentication=no should
// ensure that we get an error rather than an ill-fated attempt
// to ask for a password if one is needed.
execl("/usr/bin/ssh", "/usr/bin/ssh", "-t", "-t", "-n", "-o", "PasswordAuthentication=no",
close(fileno(LOGFILE));
dup2(remote_input[0], STDIN_FILENO);
close(remote_input[0]);
// -t -t forces a tty allocation, which is needed for ssh to propagate signals
// through to the process on the other end of the connection. -n and
// PasswordAuthentication=no should ensure that we get an error rather than an
// ill-fated attempt to ask for a password if one is needed. -q suppresses the
// "Connection HOST closed." message, but unfortunately also suppressed connection
// failed messages.
execl("/usr/bin/ssh", "/usr/bin/ssh", "-o", "PasswordAuthentication=no",
host_connect.c_str(),
run.c_str(),
nullptr);
write(STDERR_FILENO, execfailed, sizeof(execfailed));
}
else if (child != 0) {
close(remote_input[0]);
open_pipe = remote_input[1];
ssh_pids.insert(child);
}
else {
std::cerr << "Fork failed: " << std::strerror(errno) << std::endl;
close(remote_input[0]);
close(remote_input[1]);
throw std::system_error(errno, std::system_category(), std::string{"Fork failed: "} + std::strerror(errno));
}
}
int result;
waitpid(child, &result, 0);
std::string bad;
if (WIFSIGNALED(result)) {
bad = std::string{"ssh: "} + strsignal(WTERMSIG(result));
}
else if (WIFEXITED(result) and WEXITSTATUS(result) != 0) {
if (WEXITSTATUS(result) == 255) { bad = "Unable to connect to remote host"; }
else if (WEXITSTATUS(result) > 128) {
// Bash converts signal termination to 128+signal, so conver it back
bad = strsignal(WEXITSTATUS(result) - 128);
}
else { bad = "Process exited with status " + std::to_string(WEXITSTATUS(result)); }
}
// If the exit code is 255, it was *probably* an ssh connection failure, in which case ssh
// already printed an error message, and so we don't want to print another one, and also
// don't want to print "Connection closed" since the connection was probably never opened.
// For anything else (i.e. death-by-signal or non-255 exit), print an error message, if
// appropriate (i.e. not exit status 0), and a Connection closed message.
if (not WIFEXITED(result) or WEXITSTATUS(result) != 255) {
if (not bad.empty()) fprintf(LOGFILE, "%s\n", bad.c_str());
fprintf(LOGFILE, "Connection to %s closed.\n", hostname.c_str());
fflush(LOGFILE);
}
auto finished = std::chrono::high_resolution_clock::now();
double elapsed = std::chrono::duration<double>(finished - started).count();
std::time_t t_finished = std::chrono::system_clock::to_time_t(finished);
std::lock_guard<std::mutex> lock(thr_mutex);
ssh_pids.erase(child);
if (open_pipe != -1) close(open_pipe);
jobs_active--;
std::string bad;
if (WIFSIGNALED(result)) {
bad = "process terminated with signal " + std::to_string(WTERMSIG(result));
}
else if (WIFEXITED(result) and WEXITSTATUS(result) != 0) {
bad = "process exited with status " + std::to_string(WEXITSTATUS(result));
}
else if (result != 0) {
bad = "process exited abnormally (status" + std::to_string(result) + ")";
}
auto &host_stat = host_stats[hostname];
auto &host_succ = std::get<0>(host_stat);
auto &host_fail = std::get<1>(host_stat);
......@@ -424,7 +471,7 @@ inline unsigned long host_jobs(const std::string &host, bool perhost_limiting) {
}
const char first_msg[] = "\n\nCtrl-C caught; waiting for current jobs to finish (Ctrl-C again to abort running jobs)\n";
const char second_msg[] = "\n\nSecond Ctrl-C caught; sending SIGINT to active ssh sessions\n";
const char second_msg[] = "\n\nSecond Ctrl-C caught; sending SIGTERM to active ssh sessions\n";
void sigint_handler(int) {
if (abort_jobs) {
write(STDERR_FILENO, second_msg, sizeof(second_msg));
......@@ -616,11 +663,18 @@ int main(int argc, char *argv[]) {
// The parent thread gets a special signal handler that lets the children threads to know to
// stop starting any new jobs. If triggered a second time, the parent kills any running ssh
// processes, which should make the children finish up.
struct sigaction act;
act.sa_handler = sigint_handler;
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
sigaction(SIGINT, &act, 0);
{
struct sigaction act;
act.sa_handler = sigint_handler;
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
sigaction(SIGINT, &act, 0);
// Unblock the SIGINT handler we blocked before spawning the threads:
sigset_t set;
sigaddset(&set, SIGINT);
pthread_sigmask(SIG_UNBLOCK, &set, NULL);
}
// Wait for either the thread list to be emptied out, or kill_jobs to be set: in the former, we
// exit nicely; in the latter, we send SIGTERM to all active ssh jobs.
......@@ -631,7 +685,6 @@ int main(int argc, char *argv[]) {
std::lock_guard<std::mutex> lock(thr_mutex);
if (not ssh_pids.empty()) {
for (const pid_t &pid : ssh_pids) {
std::cerr << "killing " << pid << " with sigint\n";
kill(pid, SIGTERM);
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment