Commit fd4a0914 authored by Jason Rhinelander's avatar Jason Rhinelander

creativity-series: add preloading

Mostly copied from creativity-data.
parent 8ee7e194
......@@ -26,6 +26,7 @@ void Series::addOptions() {
("precision", range<3,std::numeric_limits<double>::max_digits10>(double_precision), " Specifies the precision level for floating point values. The default is the minimum required to exactly represent all possible double values without any loss of precision.")
("output-directory,o", value(output_dir), "The directory in which to place series files. If this directory does not exist, it will be created. Each series S is written to a file named `series-S.csv' in this directory. Existing files will be overwritten.")
("threads,j", value(threads), " Maximum number of threads to use for data parsing. 0 (the default) disables data parsing threading entirely.")
("preload,J", value(preload), " Maximum number of simulations to copy from disk into memory before being needed, 0 to disable. If non-zero, preloading happens in a dedicated thread (not included in the -j limit), which preloads the next file from disk whenever fewer than this number are preloaded. Larger values require more memory, but can help reduce starvation among parsing threads. This option also implies --memory-xz.")
("memory-xz,M", value(memory_xz), "If an input file is an xz-compressed file, using this flag causes it to be decompressed into memory instead of writing it to a temporary file.")
("tmpdir", value(tmpdir), "If --memory-xz is not specified, this specifies a temporary directory in which to place temporary decompressed files. If omitted, the file is in the same directory as the input file.")
("help-series,S", value(help_series), "Shows the series that can be generated.")
......
......@@ -51,6 +51,9 @@ class Series : public CmdArgs {
/// Number of threads to use
unsigned int threads = 0;
/// Number of files to preload from disk into memory
unsigned int preload = 0;
/// The input files to load data from
std::vector<std::string> input;
......
......@@ -41,11 +41,63 @@ std::mutex values_mutex; // Guards values, files, error_count
std::mutex input_mutex; // Guards all of the below:
size_t input_index;
std::queue<std::pair<std::string, std::unique_ptr<std::stringstream>>> preload_queue;
bool preload_done = false;
std::condition_variable preload_cv; // Used to wait for preload data to appear
std::condition_variable preload_next_cv; // Used to tell the preload thread that something has been removed from the preload queue
// These have to agree across files:
eris_time_t periods = 0, piracy_begins = 0, policy_begins = 0;
bool allow_unused_periods = false; // Will only be true if --periods is explicitly given
bool need_parameters = true;
void thr_preload(const cmdargs::Series &args) {
std::unique_lock<std::mutex> input_lock(input_mutex);
const unsigned int max_queue = args.preload;
std::cerr << "preload = " << max_queue << "\n";
if (max_queue < 1) throw std::logic_error("Internal error: thr_preload called in non-preload mode");
while ((error_count == 0 or args.ignore_errors) and input_index < args.input.size()) {
preload_next_cv.wait(input_lock, [&max_queue]{ return preload_queue.size() < max_queue; });
while (preload_queue.size() < max_queue && input_index < args.input.size()) {
std::string source(args.input[input_index++]);
input_lock.unlock();
std::unique_ptr<std::stringstream> s(new std::stringstream(
std::ios_base::in | std::ios_base::out | std::ios_base::binary));
s->exceptions(s->failbit | s->badbit);
bool success = false;
std::ifstream f;
try {
f.exceptions(f.badbit | f.failbit);
f.open(source, std::ios_base::in | std::ios_base::binary);
*s << f.rdbuf();
success = true;
}
catch (std::ios_base::failure &e) {
std::lock_guard<std::mutex> g(values_mutex);
std::cerr << "Unable to preload `" << source << "': " << std::strerror(errno) << "\n";
error_count++;
}
catch (std::exception &e) {
std::lock_guard<std::mutex> g(values_mutex);
std::cerr << "Unable to preload `" << source << "': " << e.what() << "\n";
error_count++;
}
input_lock.lock();
if (success) {
std::cout << "preloaded " << source << "\n";
preload_queue.emplace(std::move(source), std::move(s));
preload_cv.notify_all();
}
}
}
std::cout << "preload done\n";
// Input file preloading is done
preload_done = true;
preload_cv.notify_all();
input_lock.unlock();
}
// Parses a file; takes the program arguments and a vector of datum objects used to calculate the
// values of interest, which are stored in `values`.
......@@ -54,25 +106,54 @@ void thr_parse_file(
const std::vector<data::datum> &data) {
std::unique_lock<std::mutex> input_lock(input_mutex);
const bool preload_mode = args.preload > 0;
size_t input_i;
while ((error_count == 0 or args.ignore_errors) and (input_i = input_index++) < args.input.size()) {
while ((error_count == 0 or args.ignore_errors) and (preload_mode || (input_i = input_index++) < args.input.size())) {
std::string source;
std::unique_ptr<std::stringstream> ss;
if (preload_mode) {
preload_cv.wait(input_lock, []{ return preload_done || !preload_queue.empty(); });
if (!preload_queue.empty()) {
{
auto &next = preload_queue.front();
source = std::move(next.first);
ss = std::move(next.second);
}
preload_queue.pop();
preload_next_cv.notify_all();
if (!ss) {
std::lock_guard<std::mutex> g(values_mutex);
std::cerr << "Unable to read preloaded file `" << source << "': stringstream pointer is null\n";
continue;
}
}
else {
// Preloading is finished *and* the queue is empty, so we're done.
break;
}
}
else {
source = args.input[input_i];
}
// Hold the lock if this is the first file and we weren't given all of the needed simulation
// period values: we have to set periods, piracy_begins, policy_begins (the lock also
// protects assigning to these variables) before unlocking; this essentially means that only
// one thread runs until the first thread has determined that initial info.
if (not need_parameters) input_lock.unlock();
const auto &source = args.input[input_i];
{ Locker l(values_mutex); std::cout << "Processing " << source << std::endl; }
#define FAIL(WHY) { Locker l(values_mutex); std::cerr << "Error reading `" << source << "': " << WHY << std::endl; error_count++; continue; }
std::ostringstream output;
output.precision(args.double_precision);
Creativity creativity;
// Filename input
try { creativity.read<FileStorage>(source, FileStorage::Mode::READONLY, args.memory_xz, args.tmpdir); }
try {
if (preload_mode) // Preloaded input:
creativity.read<FileStorage>(std::move(ss), FileStorage::Mode::READONLY);
else
creativity.read<FileStorage>(source, FileStorage::Mode::READONLY, args.memory_xz, args.tmpdir);
}
catch (std::ios_base::failure&) FAIL("I/O error: " << std::strerror(errno))
catch (std::exception &e) FAIL("An error occured: " << e.what())
......@@ -240,20 +321,26 @@ int main(int argc, char *argv[]) {
files.reserve(args.input.size());
std::vector<std::thread> threads;
if (args.preload > 0) {
// Start the preload thread, which loads files into memory:
threads.emplace_back(thr_preload, args);
}
if (args.threads == 0) {
thr_parse_file(args, data);
}
else {
Eigen::initParallel();
std::vector<std::thread> threads;
// Start up the requested number of threads:
for (unsigned t = 0; t < args.threads; t++) {
threads.emplace_back(thr_parse_file, args, data);
}
// Wait for all threads to finish:
for (auto &th : threads) {
if (th.joinable()) th.join();
}
}
// Wait for all threads to finish:
for (auto &th : threads) {
if (th.joinable()) th.join();
}
if (error_count > 0) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment