Ignore:
Timestamp:
May 4, 2012, 2:19:07 PM (14 years ago)
Author:
Frederik Heber <heber@…>
Branches:
Action_Thermostats, Add_AtomRandomPerturbation, Add_FitFragmentPartialChargesAction, Add_RotateAroundBondAction, Add_SelectAtomByNameAction, Added_ParseSaveFragmentResults, AddingActions_SaveParseParticleParameters, Adding_Graph_to_ChangeBondActions, Adding_MD_integration_tests, Adding_ParticleName_to_Atom, Adding_StructOpt_integration_tests, AtomFragments, Automaking_mpqc_open, AutomationFragmentation_failures, Candidate_v1.5.4, Candidate_v1.6.0, Candidate_v1.6.1, Candidate_v1.7.0, Candidate_v1.7.1, ChangeBugEmailaddress, ChangingTestPorts, ChemicalSpaceEvaluator, CombiningParticlePotentialParsing, Combining_Subpackages, Debian_Package_split, Debian_package_split_molecuildergui_only, Disabling_MemDebug, Docu_Python_wait, EmpiricalPotential_contain_HomologyGraph, EmpiricalPotential_contain_HomologyGraph_documentation, Enable_parallel_make_install, Enhance_userguide, Enhanced_StructuralOptimization, Enhanced_StructuralOptimization_continued, Example_ManyWaysToTranslateAtom, Exclude_Hydrogens_annealWithBondGraph, FitPartialCharges_GlobalError, Fix_BoundInBox_CenterInBox_MoleculeActions, Fix_ChargeSampling_PBC, Fix_ChronosMutex, Fix_FitPartialCharges, Fix_FitPotential_needs_atomicnumbers, Fix_ForceAnnealing, Fix_IndependentFragmentGrids, Fix_ParseParticles, Fix_ParseParticles_split_forward_backward_Actions, Fix_PopActions, Fix_QtFragmentList_sorted_selection, Fix_Restrictedkeyset_FragmentMolecule, Fix_StatusMsg, Fix_StepWorldTime_single_argument, Fix_Verbose_Codepatterns, Fix_fitting_potentials, Fixes, ForceAnnealing_goodresults, ForceAnnealing_oldresults, ForceAnnealing_tocheck, ForceAnnealing_with_BondGraph, ForceAnnealing_with_BondGraph_continued, ForceAnnealing_with_BondGraph_continued_betteresults, ForceAnnealing_with_BondGraph_contraction-expansion, FragmentAction_writes_AtomFragments, FragmentMolecule_checks_bonddegrees, GeometryObjects, Gui_Fixes, Gui_displays_atomic_force_velocity, ImplicitCharges, IndependentFragmentGrids, IndependentFragmentGrids_IndividualZeroInstances, IndependentFragmentGrids_IntegrationTest, IndependentFragmentGrids_Sole_NN_Calculation, JobMarket_RobustOnKillsSegFaults, JobMarket_StableWorkerPool, JobMarket_unresolvable_hostname_fix, MoreRobust_FragmentAutomation, ODR_violation_mpqc_open, PartialCharges_OrthogonalSummation, PdbParser_setsAtomName, PythonUI_with_named_parameters, QtGui_reactivate_TimeChanged_changes, Recreated_GuiChecks, Rewrite_FitPartialCharges, RotateToPrincipalAxisSystem_UndoRedo, SaturateAtoms_findBestMatching, SaturateAtoms_singleDegree, StoppableMakroAction, Subpackage_CodePatterns, Subpackage_JobMarket, Subpackage_LinearAlgebra, Subpackage_levmar, Subpackage_mpqc_open, Subpackage_vmg, Switchable_LogView, ThirdParty_MPQC_rebuilt_buildsystem, TrajectoryDependenant_MaxOrder, TremoloParser_IncreasedPrecision, TremoloParser_MultipleTimesteps, TremoloParser_setsAtomName, Ubuntu_1604_changes, stable
Children:
e70b9d
Parents:
b9c486
git-author:
Frederik Heber <heber@…> (12/09/11 18:18:14)
git-committer:
Frederik Heber <heber@…> (05/04/12 14:19:07)
Message:

Added ResultGetter and capabilities to receive calculated results to FragmentController.

  • Added enum (and file) ControllerChoices that defines the state of FragmentScheduler.
  • depending on what is desired the Scheduler switches between these states and either receives or sends information. Requires new member variable choice because receival is of course asynchronous (see note in previous commit).
  • FragmentController has additional functions connect_get() and handle_connect_get() to receive results.
  • connect_calc() and connect_check() now just the choice whereas the actual sending and receiving is done in handle_... functions.
  • handle_FinishOperation() is the common final callback function for all three of these functions.
  • FragmentScheduler contains an internal list of delivered results.
  • FragmentScheduler only initiates worker socket when jobs are present.
  • FIX: FragmentScheduler does only send results that are done and only once.
  • TESTFIX: Removed third Worker that receives NoJob as socket is powered down before because queue has run empty and we haven't add new jobs.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • src/Fragmentation/Automation/FragmentController.cpp

    rb9c486 r778abb  
    2424
    2525#include <boost/bind.hpp>
     26#include <boost/foreach.hpp>
    2627#include <iostream>
    2728#include <vector>
     
    3233#include "FragmentJob.hpp"
    3334#include "FragmentResult.hpp"
     35#include "ControllerChoices.hpp"
    3436
    3537#include "FragmentController.hpp"
     
    7072  if (!e)
    7173  {
    72     // Successfully established connection. Start operation to read the vector
    73     // of jobs. The connection::async_write() function will automatically
    74     // encode the data that is written to the underlying socket.
    75     LOG(1, "INFO: Sending "+toString(jobs.size())+" jobs ...");
    76     connection_.async_write(jobs,
     74    // Successfully established connection. Give choice.
     75    enum ControllerChoices choice = ReceiveJobs;
     76    connection_.async_write(choice,
    7777      boost::bind(&FragmentController::handle_SendJobs, this,
    7878      boost::asio::placeholders::error));
     
    104104  if (!e)
    105105  {
    106     // Successfully established connection. Start operation to read the vector
    107     // of jobs. The connection::async_write() function will automatically
    108     // encode the data that is written to the underlying socket.
    109     LOG(1, "INFO: Checking number of done jobs ...");
    110     connection_.async_read(doneJobs,
     106    // Successfully established connection. Give choice.
     107    enum ControllerChoices choice = CheckState;
     108    connection_.async_write(choice,
    111109      boost::bind(&FragmentController::handle_ReceiveDoneJobs, this,
    112110      boost::asio::placeholders::error));
     
    127125}
    128126
    129 /** Callback function when jobs have been sent.
    130  *
    131  * \param e error code if something went wrong
    132  */
    133 void FragmentController::handle_SendJobs(const boost::system::error_code& e)
    134 {
    135   Info info(__FUNCTION__);
    136   if (!e)
    137   {
    138     LOG(1, "INFO: Sent jobs ...");
     127/** Handle completion of a connect operation.
     128 *
     129 * \param e error code if something went wrong
     130 * \param endpoint_iterator endpoint of the connection
     131 */
     132void FragmentController::handle_connect_get(const boost::system::error_code& e,
     133    boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
     134{
     135  Info info(__FUNCTION__);
     136  if (!e)
     137  {
     138    // Successfully established connection. Give choice.
     139    enum ControllerChoices choice = SendResults;
     140    connection_.async_write(choice,
     141      boost::bind(&FragmentController::handle_ReceivingResults, this,
     142      boost::asio::placeholders::error));
     143  } else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator()) {
     144    // Try the next endpoint.
     145    connection_.socket().close();
     146    boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
     147    connection_.socket().async_connect(endpoint,
     148      boost::bind(&FragmentController::handle_connect_check, this,
     149      boost::asio::placeholders::error, ++endpoint_iterator));
     150  } else {
     151    // An error occurred. Log it and return. Since we are not starting a new
     152    // operation the io_service will run out of work to do and the client will
     153    // exit.
     154    Exitflag = ErrorFlag;
     155    ELOG(1, e.message());
     156  }
     157}
     158
     159/** Callback function when operation has been completed.
     160 *
     161 * \param e error code if something went wrong
     162 */
     163void FragmentController::handle_FinishOperation(const boost::system::error_code& e)
     164{
     165  Info info(__FUNCTION__);
     166  if (!e)
     167  {
     168    LOG(1, "INFO: Operation completed.");
    139169  }
    140170  else
     
    149179}
    150180
     181/** Callback function when jobs have been sent.
     182 *
     183 * \param e error code if something went wrong
     184 */
     185void FragmentController::handle_SendJobs(const boost::system::error_code& e)
     186{
     187  Info info(__FUNCTION__);
     188  if (!e)
     189  {
     190    // Successfully established connection. Start operation to read the vector
     191    // of jobs. The connection::async_write() function will automatically
     192    // encode the data that is written to the underlying socket.
     193    LOG(1, "INFO: Sending "+toString(jobs.size())+" jobs ...");
     194    connection_.async_write(jobs,
     195      boost::bind(&FragmentController::handle_FinishOperation, this,
     196      boost::asio::placeholders::error));
     197  }
     198  else
     199  {
     200    // An error occurred.
     201    Exitflag = ErrorFlag;
     202    ELOG(1, e.message());
     203  }
     204
     205  // Since we are not starting a new operation the io_service will run out of
     206  // work to do and the client will exit.
     207}
     208
     209/** Callback function when results have been received.
     210 *
     211 * \param e error code if something went wrong
     212 */
     213void FragmentController::handle_ReceivingResults(const boost::system::error_code& e)
     214{
     215  Info info(__FUNCTION__);
     216  if (!e)
     217  {
     218    // The connection::async_read() function will automatically
     219    // decode the data that is written to the underlying socket.
     220    connection_.async_read(results,
     221      boost::bind(&FragmentController::handle_ReceivedResults, this,
     222      boost::asio::placeholders::error));
     223  }
     224  else
     225  {
     226    // An error occurred.
     227    Exitflag = ErrorFlag;
     228    ELOG(1, e.message());
     229  }
     230
     231  // Since we are not starting a new operation the io_service will run out of
     232  // work to do and the client will exit.
     233}
     234
    151235/** Callback function when doneJobs have been received.
    152236 *
    153237 * \param e error code if something went wrong
    154238 */
     239void FragmentController::handle_ReceivedResults(const boost::system::error_code& e)
     240{
     241  Info info(__FUNCTION__);
     242
     243  LOG(1, "INFO: Received "+toString(results.size())+" results ...");
     244
     245  handle_FinishOperation(e);
     246}
     247
     248/** Callback function when doneJobs have been received.
     249 *
     250 * \param e error code if something went wrong
     251 */
    155252void FragmentController::handle_ReceiveDoneJobs(const boost::system::error_code& e)
    156253{
    157254  Info info(__FUNCTION__);
    158   // Nothing to do.
    159   LOG(1, "INFO: "+toString(doneJobs)+" jobs are currently done.");
     255  if (!e)
     256  {
     257    // The connection::async_read() function will automatically
     258    // decode the data that is written to the underlying socket.
     259    LOG(1, "INFO: Checking number of done jobs ...");
     260    connection_.async_read(doneJobs,
     261      boost::bind(&FragmentController::handle_FinishOperation, this,
     262      boost::asio::placeholders::error));
     263  }
     264  else
     265  {
     266    // An error occurred.
     267    Exitflag = ErrorFlag;
     268    ELOG(1, e.message());
     269  }
    160270}
    161271
     
    211321}
    212322
     323/** Internal function to connect to the endpoint of the server asynchronuously.
     324 *
     325 * We require internal connetion_ and host and service to be set up for this.
     326 */
     327void FragmentController::connect_get()
     328{
     329  Info info(__FUNCTION__);
     330  // Resolve the host name into an IP address.
     331  boost::asio::ip::tcp::resolver::iterator endpoint_iterator = getEndpointIterator();
     332  boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
     333
     334  // Start an asynchronous connect operation.
     335  std::cout << "Connecting to endpoint " << endpoint << " to get results " << std::endl;
     336  connection_.socket().async_connect(endpoint,
     337    boost::bind(&FragmentController::handle_connect_get, this,
     338      boost::asio::placeholders::error, ++endpoint_iterator));
     339}
     340
    213341/** Internal function to disconnect connection_ correctly.
    214342 *
     
    225353void FragmentController::addJobs(const std::vector<FragmentJob> &_jobs)
    226354{
    227   jobs = _jobs;
     355  jobs.reserve(jobs.size()+_jobs.size());
     356  BOOST_FOREACH(FragmentJob job, _jobs) {
     357    jobs.push_back(job);
     358  }
    228359}
    229360
     
    250381}
    251382
     383/** Getter for results.
     384 *
     385 * \sa calculateResults()
     386 * \return vector of results for the added jobs (\sa addJobs()).
     387 */
     388std::vector<FragmentResult> FragmentController::getResults()
     389{
     390  Info info(__FUNCTION__);
     391  return results;
     392}
     393
     394/** Function to initiate receival of results.
     395 *
     396 */
     397void FragmentController::obtainResults()
     398{
     399  // connect
     400  connect_get();
     401  //disconnect
     402  disconnect();
     403}
     404
    252405/** Getter for doneJobs.
    253406 *
Note: See TracChangeset for help on using the changeset viewer.