Codebase list votca-xtp / 1b2bccf1-4511-4a27-bd7b-f49ebfd9899c/main src / libxtp / progressobserver.cc
1b2bccf1-4511-4a27-bd7b-f49ebfd9899c/main

Tree @1b2bccf1-4511-4a27-bd7b-f49ebfd9899c/main (Download .tar.gz)

progressobserver.cc @1b2bccf1-4511-4a27-bd7b-f49ebfd9899c/mainraw · history · blame

/*
 *            Copyright 2009-2020 The VOTCA Development Team
 *                       (http://www.votca.org)
 *
 *      Licensed under the Apache License, Version 2.0 (the "License")
 *
 * You may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *              http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */
/// For an earlier history see ctp repo commit
/// 77795ea591b29e664153f9404c8655ba28dc14e9

// Standard includes
#include <fstream>
#include <unistd.h>

// Third party includes
#include <boost/algorithm/string/replace.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/filesystem.hpp>
#include <boost/format.hpp>
#include <sys/types.h>

// Local VOTCA includes
#include "votca/xtp/job.h"
#include "votca/xtp/progressobserver.h"
#include "votca/xtp/qmthread.h"

using boost::format;

namespace votca {
namespace xtp {

template <typename JobContainer>
typename ProgObserver<JobContainer>::Job *
    ProgObserver<JobContainer>::RequestNextJob(QMThread &thread) {

  _lockThread.Lock();
  Job *jobToProc = nullptr;

  XTP_LOG(Log::error, thread.getLogger())
      << "Requesting next job" << std::flush;

  // NEED NEW CHUNK?
  if (_nextjit == _jobsToProc.end() && _moreJobsAvailable) {
    SyncWithProgFile(thread);
    _nextjit = _jobsToProc.begin();
    if (_nextjit == _jobsToProc.end()) {
      _moreJobsAvailable = false;
      XTP_LOG(Log::error, thread.getLogger())
          << "Sync did not yield any new jobs." << std::flush;
    }
  }

  // JOBS EATEN ALL UP?
  if (_nextjit == _jobsToProc.end()) {
    if (_maxJobs == _startJobsCount) {
      XTP_LOG(Log::error, thread.getLogger())
          << "Next job: ID = - (reached maximum for this process)"
          << std::flush;
    } else {
      XTP_LOG(Log::error, thread.getLogger())
          << "Next job: ID = - (none available)" << std::flush;
    }
  }
  // TAKE A BITE
  else {
    jobToProc = *_nextjit;
    ++_nextjit;
    XTP_LOG(Log::error, thread.getLogger())
        << "Next job: ID = " << jobToProc->getId() << std::flush;
  }

  if (!thread.isMaverick() && jobToProc != nullptr) {
    Index idx = jobToProc->getId();
    Index frac = (_jobs.size() >= 10) ? 10 : _jobs.size();
    Index rounded = Index(double(_jobs.size()) / double(frac)) * frac;
    Index tenth = rounded / frac;
    if (idx % tenth == 0) {
      double percent = double(idx) / double(rounded) * 100 + 0.5;
      std::cout << (format("=> [%1$2.0f%%] ") % percent).str() << std::flush;
    }
  }

  _lockThread.Unlock();
  return jobToProc;
}

template <typename JobContainer>
void ProgObserver<JobContainer>::ReportJobDone(Job &job, Result &res,
                                               QMThread &thread) {
  _lockThread.Lock();
  XTP_LOG(Log::error, thread.getLogger())
      << "Reporting job results" << std::flush;
  // RESULTS, TIME, HOST
  job.UpdateFromResult(res);
  job.setTime(GenerateTime());
  job.setHost(GenerateHost());
  // PRINT PROGRESS BAR
  _jobsReported += 1;
  if (!thread.isMaverick()) {
    std::cout << std::endl << thread.getLogger() << std::flush;
  }
  _lockThread.Unlock();
  return;
}

template <typename JobContainer>
std::string ProgObserver<JobContainer>::GenerateHost() {
  char host[128];
  (void)gethostname(host, sizeof host);
  pid_t pid = getpid();
  return (format("%1$s:%2$d") % host % pid).str();
}

template <typename JobContainer>
std::string ProgObserver<JobContainer>::GenerateTime() {
  boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
  return (format("%1$s") % now.time_of_day()).str();
}

template <typename JobContainer>
void ProgObserver<JobContainer>::SyncWithProgFile(QMThread &thread) {

  // INTERPROCESS FILE LOCKING (THREAD LOCK IN ::RequestNextJob)
  this->LockProgFile(thread);

  std::string progFile = _progFile;
  std::string progBackFile = _progFile + "~";

  // LOAD EXTERNAL JOBS FROM SHARED XML & UPDATE INTERNAL JOBS
  XTP_LOG(Log::info, thread.getLogger())
      << "Update internal structures from job file" << std::flush;
  JobContainer jobs_ext = LOAD_JOBS(progFile);
  UPDATE_JOBS(jobs_ext, _jobs, GenerateHost());

  // GENERATE BACK-UP FOR SHARED XML
  XTP_LOG(Log::info, thread.getLogger())
      << "Create job-file back-up" << std::flush;
  WRITE_JOBS(_jobs, progBackFile);

  // ASSIGN NEW JOBS IF AVAILABLE
  XTP_LOG(Log::error, thread.getLogger())
      << "Assign jobs from stack" << std::flush;
  _jobsToProc.clear();

  Index cacheSize = _cacheSize;
  while (int(_jobsToProc.size()) < cacheSize) {
    if (_metajit == _jobs.end() || _startJobsCount == _maxJobs) {
      break;
    }

    bool startJob = false;

    // Start if job available or restart patterns matched
    if ((_metajit->isAvailable()) ||
        (_restartMode && _restart_stats.count(_metajit->getStatusStr())) ||
        (_restartMode && _restart_hosts.count(_metajit->getHost()))) {
      startJob = true;
    }

    if (startJob) {
      _metajit->Reset();
      _metajit->setStatus("ASSIGNED");
      _metajit->setHost(GenerateHost());
      _metajit->setTime(GenerateTime());
      _jobsToProc.push_back(&*_metajit);
      _startJobsCount += 1;
    }

    ++_metajit;
  }

  // UPDATE PROGRESS STATUS FILE
  WRITE_JOBS(_jobs, progFile);

  // RELEASE PROGRESS STATUS FILE
  this->ReleaseProgFile(thread);
  return;
}

template <typename JobContainer>
void ProgObserver<JobContainer>::LockProgFile(QMThread &thread) {
  _flock = std::unique_ptr<boost::interprocess::file_lock>(
      new boost::interprocess::file_lock(_lockFile.c_str()));
  _flock->lock();
  XTP_LOG(Log::warning, thread.getLogger())
      << "Imposed lock on " << _lockFile << std::flush;
  XTP_LOG(Log::warning, thread.getLogger())
      << "Sleep ... " << _lockFile << std::flush;
  XTP_LOG(Log::warning, thread.getLogger())
      << "Wake up ... " << _lockFile << std::flush;
}

template <typename JobContainer>
void ProgObserver<JobContainer>::ReleaseProgFile(QMThread &thread) {
  _flock->unlock();
  XTP_LOG(Log::warning, thread.getLogger())
      << "Releasing " << _lockFile << ". " << std::flush;
}

template <typename JobContainer>
void ProgObserver<JobContainer>::InitCmdLineOpts(
    const boost::program_options::variables_map &optsMap) {

  _lockFile = optsMap["file"].as<std::string>();
  _cacheSize = optsMap["cache"].as<Index>();
  _maxJobs = optsMap["maxjobs"].as<Index>();
  std::string restartPattern = optsMap["restart"].as<std::string>();

  // restartPattern = e.g. host(pckr124:1234) stat(FAILED)
  boost::algorithm::replace_all(restartPattern, " ", "");
  if (restartPattern == "") {
    _restartMode = false;
  } else {
    _restartMode = true;
  }

  tools::Tokenizer toker(restartPattern, "(,)");
  std::vector<std::string> patterns = toker.ToVector();

  std::string category = "";
  for (const std::string &pattern : patterns) {

    if (pattern == "host" || pattern == "stat") {
      category = pattern;

    } else if (category == "host") {
      _restart_hosts[pattern] = true;
    } else if (category == "stat") {
      if (pattern == "ASSIGNED" || pattern == "COMPLETE") {
        std::cout << "Restart if status == " << pattern
                  << "? Not necessarily a good idea." << std::endl;
      }
      _restart_stats[pattern] = true;
    }

    else {
      throw std::runtime_error(
          "Restart pattern ill-defined, format is"
          "[host([HOSTNAME:PID])] [stat([STATUS])]");
    }
  }
  return;
}

template <typename JobContainer>
void ProgObserver<JobContainer>::InitFromProgFile(std::string progFile,
                                                  QMThread &thread) {

  _progFile = progFile;
  _jobsReported = 0;

  XTP_LOG(Log::error, thread.getLogger())
      << "Job file = '" << _progFile << "', ";
  XTP_LOG(Log::info, thread.getLogger())
      << "lock file = '" << _lockFile << "', ";
  XTP_LOG(Log::error, thread.getLogger())
      << "cache size =  " << _cacheSize << std::flush;

  XTP_LOG(Log::error, thread.getLogger())
      << "Initialize jobs from " << progFile << std::flush;
  XTP_LOG(Log::info, thread.getLogger()) << "Lock & load " << std::flush;

  // LOCK, READ INTO XML
  this->LockProgFile(thread);

  // ... Clear container
  _jobs.clear();

  // ... Load new, set availability bool
  _jobs = LOAD_JOBS(progFile);
  _metajit = _jobs.begin();
  WRITE_JOBS(_jobs, progFile + "~");
  XTP_LOG(Log::error, thread.getLogger())
      << "Registered " << _jobs.size() << " jobs." << std::flush;
  if (_jobs.size() > 0) {
    _moreJobsAvailable = true;
  } else {
    _moreJobsAvailable = false;
  }

  // SUMMARIZE OBSERVER VARIABLES: RESTART PATTERN, CACHE, LOCK FILE
  if (_restartMode && _restart_hosts.size()) {
    std::string infostr = "Restart if host == ";
    for (const std::pair<const std::string, bool> &host : _restart_hosts) {
      infostr += host.first + " ";
    }
    XTP_LOG(Log::error, thread.getLogger()) << infostr << std::flush;
  }
  if (_restartMode && _restart_stats.size()) {
    std::string infostr = "Restart if stat == ";
    for (const std::pair<const std::string, bool> &host : _restart_hosts) {
      infostr += host.first + " ";
    }
    XTP_LOG(Log::error, thread.getLogger()) << infostr << std::flush;
  }

  // RELEASE PROGRESS FILE
  this->ReleaseProgFile(thread);
  return;
}

// REGISTER
template class ProgObserver<std::vector<Job> >;

}  // namespace xtp
}  // namespace votca