Commit 27d2640b authored by Jason Rhinelander's avatar Jason Rhinelander

Copied progress counter/prctl from data to series

parent 1746ccf0
......@@ -412,9 +412,9 @@ int main(int argc, char *argv[]) {
// Update the process name to something like "crdata [43/123]" (the space and "a" before the [ get
// eliminated if required--we aren't allowed to set a name longer than 15 characters).
std::string progress = "[" + std::to_string(processing_counter) + "/" + std::to_string(input_count) + "]";
std::string name = "crdat";
if (progress.size() <= 9) { name += 'a'; if (progress.size() <= 8) name += ' '; }
name += progress;
std::string name =
std::string("crdata ").substr(0, std::max<int>(5, 15 - (int) progress.size()))
+ progress;
prctl(PR_SET_NAME, name.c_str());
}
......
......@@ -20,6 +20,12 @@
#include <set>
#include <fstream>
#include <sstream>
#include <mutex>
#ifdef __linux__
extern "C" {
#include <sys/prctl.h>
}
#endif
namespace creativity { namespace state { class State; } }
......@@ -36,8 +42,8 @@ std::unordered_set<std::string> series_wanted;
std::unordered_map<std::string, std::vector<std::vector<double>>> values;
// File list, in same order as [fileindex] above:
std::vector<std::string> files;
size_t error_count = 0;
std::mutex values_mutex; // Guards values, files, error_count
size_t error_count = 0, input_count = 0, processing_counter = 0;
std::mutex values_mutex; // Guards the above plus std::cout/cerr
std::mutex input_mutex; // Guards all of the below:
size_t input_index;
......@@ -50,13 +56,28 @@ eris_time_t periods = 0, piracy_begins = 0, policy_begins = 0;
bool allow_unused_periods = false; // Will only be true if --periods is explicitly given
bool need_parameters = true;
#ifdef __linux__
// Machinery for helping the main process managing the threads, so that we can let the parent update
// its status via prctl.
std::mutex thread_mutex;
unsigned threads_running = 0;
std::condition_variable thread_done_cv; // Signals the parent that a thread is done
#define THREAD_START { std::lock_guard<std::mutex> g(thread_mutex); threads_running++; }
#define THREAD_DONE { std::lock_guard<std::mutex> g(thread_mutex); threads_running--; thread_done_cv.notify_all(); }
#else
#define THREAD_START
#define THREAD_DONE
#endif
void thr_preload(const cmdargs::Series &args) {
std::unique_lock<std::mutex> input_lock(input_mutex);
const unsigned int max_queue = args.preload;
if (max_queue < 1) throw std::logic_error("Internal error: thr_preload called in non-preload mode");
while ((error_count == 0 or args.ignore_errors) and input_index < args.input.size()) {
while (input_index < args.input.size()) {
preload_next_cv.wait(input_lock, [&max_queue]{ return preload_queue.size() < max_queue; });
while (preload_queue.size() < max_queue && input_index < args.input.size()) {
while ((error_count == 0 or args.ignore_errors) &&
preload_queue.size() < max_queue && input_index < args.input.size()) {
std::string source(args.input[input_index++]);
input_lock.unlock();
std::unique_ptr<std::stringstream> s(new std::stringstream(
......@@ -89,12 +110,12 @@ void thr_preload(const cmdargs::Series &args) {
}
}
std::cout << "preload done\n";
// Input file preloading is done
preload_done = true;
preload_cv.notify_all();
input_lock.unlock();
THREAD_DONE
}
// Parses a file; takes the program arguments and a vector of datum objects used to calculate the
......@@ -140,7 +161,7 @@ void thr_parse_file(
// one thread runs until the first thread has determined that initial info.
if (not need_parameters) input_lock.unlock();
{ Locker l(values_mutex); std::cout << "Processing " << source << std::endl; }
{ Locker l(values_mutex); std::cout << "Processing [" << ++processing_counter << "/" << input_count << "]: " << source << std::endl; }
#define FAIL(WHY) { Locker l(values_mutex); std::cerr << "Error reading `" << source << "': " << WHY << std::endl; error_count++; continue; }
......@@ -226,6 +247,8 @@ void thr_parse_file(
input_lock.lock();
}
input_lock.unlock();
THREAD_DONE
}
int main(int argc, char *argv[]) {
......@@ -326,23 +349,44 @@ int main(int argc, char *argv[]) {
if (args.policy_begins != 0) policy_begins = args.policy_begins;
else need_parameters = true;
input_count = args.input.size();
std::vector<std::thread> threads;
if (args.preload > 0) {
// Start the preload thread, which loads files into memory:
THREAD_START
threads.emplace_back(thr_preload, args);
}
if (args.threads == 0) {
THREAD_START // Not really, but keeps the counter value correct
thr_parse_file(args, data);
}
else {
Eigen::initParallel();
// Start up the requested number of threads:
for (unsigned t = 0; t < args.threads; t++) {
THREAD_START
threads.emplace_back(thr_parse_file, args, data);
}
}
#ifdef __linux__
std::unique_lock<std::mutex> thr_lock(thread_mutex);
while (threads_running > 0) {
thread_done_cv.wait_for(thr_lock, std::chrono::milliseconds(250), [] { return threads_running == 0; });
std::lock_guard<std::mutex> lg(values_mutex);
// Update the process name to something like "crseries [1/5]" (the "ies " gets chopped away
// as needed--we aren't allowed to set a name longer than 15 characters).
std::string progress = "[" + std::to_string(processing_counter) + "/" + std::to_string(input_count) + "]";
std::string name =
std::string("crseries ").substr(0, std::max<int>(5, 15 - (int) progress.size()))
+ progress;
prctl(PR_SET_NAME, name.c_str());
}
#endif
// Wait for all threads to finish:
for (auto &th : threads) {
if (th.joinable()) th.join();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment