series.cpp 16.7 KB
Newer Older
1 2 3 4
#include "creativity/Creativity.hpp"
#include "creativity/data/simdata.hpp"
#include "creativity/data/util.hpp"
#include "creativity/state/Storage.hpp"
5
#include "creativity/state/FileStorage.hpp"
6 7 8 9 10 11 12 13 14 15 16 17 18
#include "creativity/cmdargs/Series.hpp"
#include <eris/types.hpp>
#include <boost/filesystem/operations.hpp>
#include <cerrno>
#include <exception>
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include <cstddef>
#include <cstring>
#include <iostream>
#include <iomanip>
19 20
#include <vector>
#include <set>
21
#include <queue>
22 23
#include <fstream>
#include <sstream>
24 25 26 27 28 29
#include <mutex>
#ifdef __linux__
extern "C" {
#include <sys/prctl.h>
}
#endif
30 31 32 33 34 35 36

namespace creativity { namespace state { class State; } }

using namespace creativity;
using namespace creativity::state;
using namespace eris;

37 38
using Locker = std::lock_guard<std::mutex>;

39 40 41
// The series we want to calculate, as given to --series
std::unordered_set<std::string> series_wanted;

42 43 44 45
// Values container.  values["field_name"][t][fileindex] = value
std::unordered_map<std::string, std::vector<std::vector<double>>> values;
// File list, in same order as [fileindex] above:
std::vector<std::string> files;
46 47
size_t error_count = 0, input_count = 0, processing_counter = 0;
std::mutex values_mutex; // Guards the above plus std::cout/cerr
48 49 50

std::mutex input_mutex; // Guards all of the below:
size_t input_index;
51 52 53 54
std::queue<std::pair<std::string, std::unique_ptr<std::stringstream>>> preload_queue;
bool preload_done = false;
std::condition_variable preload_cv; // Used to wait for preload data to appear
std::condition_variable preload_next_cv; // Used to tell the preload thread that something has been removed from the preload queue
55
// These have to agree across files:
56
eris_time_t periods = 0, piracy_begins = 0, policy_begins = 0;
57 58 59
bool allow_unused_periods = false; // Will only be true if --periods is explicitly given
bool need_parameters = true;

60 61 62 63 64 65 66 67 68 69 70 71 72 73
#ifdef __linux__
// Machinery for helping the main process managing the threads, so that we can let the parent update
// its status via prctl.
std::mutex thread_mutex;
unsigned threads_running = 0;
std::condition_variable thread_done_cv; // Signals the parent that a thread is done
#define THREAD_START { std::lock_guard<std::mutex> g(thread_mutex); threads_running++; }
#define THREAD_DONE { std::lock_guard<std::mutex> g(thread_mutex); threads_running--; thread_done_cv.notify_all(); }
#else
#define THREAD_START
#define THREAD_DONE
#endif


74 75 76 77
void thr_preload(const cmdargs::Series &args) {
    std::unique_lock<std::mutex> input_lock(input_mutex);
    const unsigned int max_queue = args.preload;
    if (max_queue < 1) throw std::logic_error("Internal error: thr_preload called in non-preload mode");
78
    while (input_index < args.input.size()) {
79
        preload_next_cv.wait(input_lock, [&max_queue]{ return preload_queue.size() < max_queue; });
80 81
        while ((error_count == 0 or args.ignore_errors) &&
                preload_queue.size() < max_queue && input_index < args.input.size()) {
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
            std::string source(args.input[input_index++]);
            input_lock.unlock();
            std::unique_ptr<std::stringstream> s(new std::stringstream(
                        std::ios_base::in | std::ios_base::out | std::ios_base::binary));
            s->exceptions(s->failbit | s->badbit);
            bool success = false;
            std::ifstream f;
            try {
                f.exceptions(f.badbit | f.failbit);
                f.open(source, std::ios_base::in | std::ios_base::binary);
                *s << f.rdbuf();
                success = true;
            }
            catch (std::ios_base::failure &e) {
                std::lock_guard<std::mutex> g(values_mutex);
                std::cerr << "Unable to preload `" << source << "': " << std::strerror(errno) << "\n";
                error_count++;
            }
            catch (std::exception &e) {
                std::lock_guard<std::mutex> g(values_mutex);
                std::cerr << "Unable to preload `" << source << "': " << e.what() << "\n";
                error_count++;
            }

            input_lock.lock();
            if (success) {
                preload_queue.emplace(std::move(source), std::move(s));
                preload_cv.notify_all();
            }
        }
    }

    // Input file preloading is done
    preload_done = true;
    preload_cv.notify_all();
    input_lock.unlock();
118 119

    THREAD_DONE
120
}
121 122 123 124 125 126 127

// Parses a file; takes the program arguments and a vector of datum objects used to calculate the
// values of interest, which are stored in `values`.
void thr_parse_file(
        const cmdargs::Series &args,
        const std::vector<data::datum> &data) {

128
    std::unique_lock<std::mutex> input_lock(input_mutex);
129
    const bool preload_mode = args.preload > 0;
130
    size_t input_i;
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
    while ((error_count == 0 or args.ignore_errors) and (preload_mode || (input_i = input_index++) < args.input.size())) {
        std::string source;
        std::unique_ptr<std::stringstream> ss;
        if (preload_mode) {
            preload_cv.wait(input_lock, []{ return preload_done || !preload_queue.empty(); });
            if (!preload_queue.empty()) {
                {
                    auto &next = preload_queue.front();
                    source = std::move(next.first);
                    ss = std::move(next.second);
                }
                preload_queue.pop();
                preload_next_cv.notify_all();
                if (!ss) {
                    std::lock_guard<std::mutex> g(values_mutex);
                    std::cerr << "Unable to read preloaded file `" << source << "': stringstream pointer is null\n";
                    continue;
                }
            }
            else {
                // Preloading is finished *and* the queue is empty, so we're done.
                break;
            }
        }
        else {
            source = args.input[input_i];
        }

159
        // Hold the lock if this is the first file and we weren't given all of the needed simulation
160
        // period values: we have to set periods, piracy_begins, policy_begins (the lock also
161 162 163
        // protects assigning to these variables) before unlocking; this essentially means that only
        // one thread runs until the first thread has determined that initial info.
        if (not need_parameters) input_lock.unlock();
164

165
        { Locker l(values_mutex); std::cout << "Processing [" << ++processing_counter << "/" << input_count << "]: " << source << std::endl; }
166

167
#define FAIL(WHY) { Locker l(values_mutex); std::cerr << "Error reading `" << source << "': " << WHY << std::endl; error_count++; continue; }
168

169
        Creativity creativity;
170
        // Filename input
171 172 173 174 175 176
        try {
            if (preload_mode) // Preloaded input:
                creativity.read<FileStorage>(std::move(ss), FileStorage::Mode::READONLY);
            else
                creativity.read<FileStorage>(source, FileStorage::Mode::READONLY, args.memory_xz, args.tmpdir);
        }
177
        catch (std::ios_base::failure&) FAIL("I/O error: " << std::strerror(errno))
178
        catch (std::exception &e) FAIL("An error occured: " << e.what())
179

180
        auto locked_storage = creativity.storage();
181 182 183 184
        auto &storage = *locked_storage.first;

        // If parameters were given, use them; otherwise infer them from the first file read
        if (need_parameters) {
185
            if (periods == 0) {
186
                periods = storage.size()-1;
187
                std::cout << "Inferred --periods " << periods << std::endl;
188
            }
189
            if (piracy_begins == 0) {
190
                piracy_begins = creativity.parameters.piracy_begins;
191
                std::cout << "Inferred --piracy-begins " << piracy_begins << std::endl;
192
            }
193
            if (policy_begins == 0) {
194 195
                policy_begins = creativity.parameters.policy_begins;
                std::cout << "Inferred --policy-begins " << policy_begins << std::endl;
196 197
            }
            need_parameters = false;
198
            input_lock.unlock();
199 200 201 202 203 204 205 206
        }

        // If we need more than is available, we can't use this file.
        if (allow_unused_periods ? periods+1 > storage.size() : periods+1 != storage.size()) {
            FAIL(periods << " periods required, but file has " << storage.size()-1);
        }
        // Piracy begins doesn't have to match if the requested number of periods is less than the
        // piracy begins value, but otherwise does:
207 208
        if (piracy_begins <= periods and piracy_begins != creativity.parameters.piracy_begins) {
            FAIL("simulation piracy begins at t=" << creativity.parameters.piracy_begins << " but t=" << piracy_begins << " is required");
209
        }
210 211 212
        // Likewise for policy
        if (policy_begins <= periods and policy_begins != creativity.parameters.policy_begins) {
            FAIL("simulation policy begins at t=" << creativity.parameters.policy_begins << " but t=" << policy_begins << " is required");
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
        }

        // Calculate all the local values, then copy them into the results variable in one shot
        std::unordered_map<std::string, std::vector<double>> local_values;
        for (eris_time_t t = 0; t <= periods; t++) {
            // Cache the period values so it stays in memory and doesn't have to be re-read each
            // time
            std::shared_ptr<const State> cached = storage[t];

            for (const auto &datum : data) {
                if (series_wanted.count(datum.name) > 0) {
                    if (t == 0) local_values[datum.name].resize(periods+1, std::numeric_limits<double>::quiet_NaN());
                    local_values[datum.name][t] = datum.calculate(storage, t, t);
                }
            }
        }

        // Copy values we read into the overall values container
231 232 233 234 235 236 237 238 239 240 241 242 243 244
        {
            Locker locker(values_mutex);
            auto fileindex = files.size();
            files.push_back(source);

            // values["field_name"][t][fileindex] = value
            for (const auto &v : local_values) {
                auto &vstore = values[v.first];
                if (vstore.size() < v.second.size()) vstore.resize(v.second.size());
                for (unsigned t = 0; t < v.second.size(); t++) {
                    vstore[t].reserve(args.input.size());
                    if (vstore[t].size() <= fileindex) vstore[t].resize(fileindex+1, std::numeric_limits<double>::quiet_NaN());
                    vstore[t][fileindex] = v.second[t];
                }
245 246 247
            }
        }

248
        input_lock.lock();
249
    }
250
    input_lock.unlock();
251 252

    THREAD_DONE
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
}

int main(int argc, char *argv[]) {
    cmdargs::Series args;
    try {
        args.parse(argc, argv);
    }
    catch (const std::exception &e) {
        std::cerr << "\n" << e.what() << "\n\n";
        exit(5);
    }

    // Simulation fields:
    auto data = data::data_fields();

    if (args.help_series) {
        std::cout << args.version() << "\n\n";
        std::cout << "Supported series parameters:\n\n";

        for (auto &d : data) {
            std::cout << "        " << d.name << "\n";
        }

        std::cout << "\n\nNote that some values (e.g. book_quality) can be NaN in certain circumstances\n"
            "(for example, no books written during a period): any such NaN values are excluded\n"
            "from the generated data.  In effect this means that the length of the list of\n"
            "values for each period can change.\n";

        exit(0);
    }

    // Verify the series requested
    {
        std::unordered_set<std::string> series_available;
        for (auto &d : data) {
            series_available.emplace(d.name);
        }

        std::istringstream iss(args.series);
        std::string series;
        bool invalid = false;
        while (std::getline(iss, series, ',')) {
            if (series_available.count(series)) {
                series_wanted.insert(series);
            }
            else {
                std::cerr << "Error: requested variable series `" << series << "' is not a valid simulation variable\n";
                invalid = true;
            }
        }

        if (not invalid and series_wanted.empty()) {
            std::cerr << "Error: no variable series specified\n";
            invalid = true;
        }

        if (invalid) exit(7);
    }


    try {
        boost::filesystem::create_directories(args.output_dir);
    }
    catch (const std::exception &e) {
        std::cerr << "\nUnable to create output directory: " << e.what() << "\n\n";
        exit(1);
    }

321
    if (args.input.empty()) {
322
        std::cerr << "\nError: no simulation input files specified\n\n";
323
        exit(50);
324 325
    }

326 327 328 329 330 331 332 333 334 335 336 337 338 339
    // Check for files specified more than once, and abort if found
    {
        std::unordered_set<std::string> seen;
        for (const auto &source : args.input) {
            auto ins = seen.insert(source);
            if (not ins.second) {
                std::cerr << "\nError: simulation input files contains duplicate entry `" << source << "'; aborting\n\n";
                exit(51);
            }
        }
    }

    files.reserve(args.input.size());

340 341 342 343 344 345 346 347 348 349 350 351 352
    need_parameters = false;
    if (args.periods != 0) {
        periods = args.periods;
        allow_unused_periods = args.allow_unused_periods;
    }
    else need_parameters = true;

    if (args.piracy_begins != 0) piracy_begins = args.piracy_begins;
    else need_parameters = true;

    if (args.policy_begins != 0) policy_begins = args.policy_begins;
    else need_parameters = true;

353
    input_count = args.input.size();
354 355 356
    std::vector<std::thread> threads;
    if (args.preload > 0) {
        // Start the preload thread, which loads files into memory:
357
        THREAD_START
358 359 360
        threads.emplace_back(thr_preload, args);
    }

361
    if (args.threads == 0) {
362
        THREAD_START // Not really, but keeps the counter value correct
363 364 365 366 367 368
        thr_parse_file(args, data);
    }
    else {
        Eigen::initParallel();
        // Start up the requested number of threads:
        for (unsigned t = 0; t < args.threads; t++) {
369
            THREAD_START
370 371
            threads.emplace_back(thr_parse_file, args, data);
        }
372 373
    }

374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
#ifdef __linux__
    std::unique_lock<std::mutex> thr_lock(thread_mutex);
    while (threads_running > 0) {
        thread_done_cv.wait_for(thr_lock, std::chrono::milliseconds(250), [] { return threads_running == 0; });

        std::lock_guard<std::mutex> lg(values_mutex);
        // Update the process name to something like "crseries [1/5]" (the "ies " gets chopped away
        // as needed--we aren't allowed to set a name longer than 15 characters).
        std::string progress = "[" + std::to_string(processing_counter) + "/" + std::to_string(input_count) + "]";
        std::string name =
            std::string("crseries ").substr(0, std::max<int>(5, 15 - (int) progress.size()))
            + progress;

        prctl(PR_SET_NAME, name.c_str());
    }
#endif

391 392 393
    // Wait for all threads to finish:
    for (auto &th : threads) {
        if (th.joinable()) th.join();
394 395 396 397 398 399 400 401 402 403 404 405
    }

    if (error_count > 0) {
        if (args.ignore_errors) {
            std::cerr << "Warning: some files failed to be read or were unsuitable.\n";
        }
        else {
            std::cerr << "Error: encountered unusable file; aborting.\n";
            exit(2);
        }
    }

406
    if (files.empty()) {
407 408 409 410
        std::cerr << "Error: no usable data sources!\n";
        exit(1);
    }

411
    std::cout << "Successfully processed " << files.size() << "/" << (files.size()+error_count) << " simulation files.\n" << std::endl;
412

413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
    std::string header;
    {
        // Maps actual source names into CSV-compatible source names by removing potentially problematic
        // characters, and appending a unique number to resolve any duplicates.
        std::unordered_set<std::string> source_used;
        std::ostringstream headeross;
        headeross << "t";
        for (const auto &source : files) {
            std::string fixed = data::csv_fix(source);
            std::string try_s = fixed;
            int append = 2;
            while (not source_used.insert(try_s).second) {
                try_s = fixed + "-" + std::to_string(append++);
            }
            headeross << "," << try_s;
428
        }
429 430 431
        headeross << "\n";
        header = headeross.str();
    }
432

433
    for (auto &v : values) {
434 435 436
        // Write an output file
        std::string output_file = args.output_dir + "/series-" + v.first + ".csv";
        std::ofstream out(output_file, std::ios::out | std::ios::trunc);
437 438 439
        out.exceptions(out.failbit | out.badbit);
        out << header;

440 441 442 443 444 445 446 447 448
        for (eris_time_t t = 0; t < v.second.size(); t++) {
            out << t;
            for (const auto &val : v.second[t]) {
                out << "," << data::double_str(val, args.double_precision);
            }
            out << "\n";
        }

        out.close();
449
        std::cout << "Wrote '" << v.first << "' series data to " << output_file << std::endl;
450 451 452
    }
}