Commit 8ee7e194 authored by Jason Rhinelander's avatar Jason Rhinelander

creativity-data: fix prctl status

prctl needs to run in the main thread to show up in top (since top
shows the main thread's name when not showing individual threads),
this commit adds machinery to handle that via a wait loop in the main
thread.
parent f1331fc4
......@@ -57,7 +57,7 @@ std::string double_str(double d, unsigned precision) {
}
unsigned int processing_counter = 0, output_count = 0, input_count = 0;
std::mutex output_mutex; // Guards std::cout, std::cerr, output_count
std::mutex output_mutex; // Guards std::cout, std::cerr, and the above counters
decltype(cmdargs::Data::input.cbegin()) input_it, input_it_end;
std::queue<std::pair<std::string, std::unique_ptr<std::stringstream>>> preload_queue;
bool preload_done = false;
......@@ -65,6 +65,19 @@ std::mutex input_mutex; // Guards both the above iterator and preload_queue
std::condition_variable preload_cv; // Used to wait for preload data to appear
std::condition_variable preload_next_cv; // Used to tell the preload thread that something has been removed from the preload queue
#ifdef __linux__
// Machinery for helping the main process managing the threads, so that we can let the parent update
// its status via prctl.
std::mutex thread_mutex;
unsigned threads_running = 0;
std::condition_variable thread_done_cv; // Signals the parent that a thread is done
#define THREAD_START { std::lock_guard<std::mutex> g(thread_mutex); threads_running++; }
#define THREAD_DONE { std::lock_guard<std::mutex> g(thread_mutex); threads_running--; thread_done_cv.notify_all(); }
#else
#define THREAD_START
#define THREAD_DONE
#endif
void thr_preload(const cmdargs::Data &args) {
std::unique_lock<std::mutex> input_lock(input_mutex);
const unsigned int max_queue = args.preload;
......@@ -106,6 +119,8 @@ void thr_preload(const cmdargs::Data &args) {
preload_done = true;
preload_cv.notify_all();
input_lock.unlock();
THREAD_DONE
}
void thr_parse_file(
......@@ -149,18 +164,6 @@ void thr_parse_file(
processing_counter++;
std::cerr << "Processing [" << processing_counter << "/" << input_count << "]: " << source << "\n";
#ifdef __linux__
// Update the process name to something like "crdata [43/123]" (the space and "a" before the [ get
// eliminated if required--we aren't allowed to set a name longer than 15 characters).
{
std::string progress = "[" + std::to_string(processing_counter) + "/" + std::to_string(input_count) + "]";
std::string name = "crdat";
if (progress.size() <= 9) { name += 'a'; if (progress.size() <= 8) name += ' '; }
name += progress;
prctl(PR_SET_NAME, name.c_str());
}
#endif
output_mutex.unlock();
std::ostringstream output;
......@@ -341,6 +344,8 @@ void thr_parse_file(
input_lock.lock();
}
input_lock.unlock();
THREAD_DONE
}
int main(int argc, char *argv[]) {
......@@ -381,20 +386,40 @@ int main(int argc, char *argv[]) {
std::vector<std::thread> threads;
if (args.preload > 0) {
// Start the preload thread, which loads files into memory:
THREAD_START
threads.emplace_back(thr_preload, args);
}
if (args.threads == 0) {
// Not doing threaded loading (aside from, possibly, preloading while parsing):
THREAD_START // Do this anyway (so the "threads_running" value stays correct)
thr_parse_file(args, initial_data, data, longest_name);
}
else {
Eigen::initParallel();
for (unsigned t = 0; t < args.threads; t++) {
THREAD_START
threads.emplace_back(thr_parse_file, args, initial_data, data, longest_name);
}
}
#ifdef __linux__
std::unique_lock<std::mutex> thr_lock(thread_mutex);
while (threads_running > 0) {
thread_done_cv.wait_for(thr_lock, std::chrono::milliseconds(250), [] { return threads_running == 0; });
std::lock_guard<std::mutex> lg(output_mutex);
// Update the process name to something like "crdata [43/123]" (the space and "a" before the [ get
// eliminated if required--we aren't allowed to set a name longer than 15 characters).
std::string progress = "[" + std::to_string(processing_counter) + "/" + std::to_string(input_count) + "]";
std::string name = "crdat";
if (progress.size() <= 9) { name += 'a'; if (progress.size() <= 8) name += ' '; }
name += progress;
prctl(PR_SET_NAME, name.c_str());
}
#endif
// Wait for preload and/or parse threads to finish:
for (auto &th : threads) {
if (th.joinable()) th.join();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment