Ignore:
Timestamp:
May 4, 2012, 2:19:06 PM (14 years ago)
Author:
Frederik Heber <heber@…>
Branches:
Action_Thermostats, Add_AtomRandomPerturbation, Add_FitFragmentPartialChargesAction, Add_RotateAroundBondAction, Add_SelectAtomByNameAction, Added_ParseSaveFragmentResults, AddingActions_SaveParseParticleParameters, Adding_Graph_to_ChangeBondActions, Adding_MD_integration_tests, Adding_ParticleName_to_Atom, Adding_StructOpt_integration_tests, AtomFragments, Automaking_mpqc_open, AutomationFragmentation_failures, Candidate_v1.5.4, Candidate_v1.6.0, Candidate_v1.6.1, Candidate_v1.7.0, Candidate_v1.7.1, ChangeBugEmailaddress, ChangingTestPorts, ChemicalSpaceEvaluator, CombiningParticlePotentialParsing, Combining_Subpackages, Debian_Package_split, Debian_package_split_molecuildergui_only, Disabling_MemDebug, Docu_Python_wait, EmpiricalPotential_contain_HomologyGraph, EmpiricalPotential_contain_HomologyGraph_documentation, Enable_parallel_make_install, Enhance_userguide, Enhanced_StructuralOptimization, Enhanced_StructuralOptimization_continued, Example_ManyWaysToTranslateAtom, Exclude_Hydrogens_annealWithBondGraph, FitPartialCharges_GlobalError, Fix_BoundInBox_CenterInBox_MoleculeActions, Fix_ChargeSampling_PBC, Fix_ChronosMutex, Fix_FitPartialCharges, Fix_FitPotential_needs_atomicnumbers, Fix_ForceAnnealing, Fix_IndependentFragmentGrids, Fix_ParseParticles, Fix_ParseParticles_split_forward_backward_Actions, Fix_PopActions, Fix_QtFragmentList_sorted_selection, Fix_Restrictedkeyset_FragmentMolecule, Fix_StatusMsg, Fix_StepWorldTime_single_argument, Fix_Verbose_Codepatterns, Fix_fitting_potentials, Fixes, ForceAnnealing_goodresults, ForceAnnealing_oldresults, ForceAnnealing_tocheck, ForceAnnealing_with_BondGraph, ForceAnnealing_with_BondGraph_continued, ForceAnnealing_with_BondGraph_continued_betteresults, ForceAnnealing_with_BondGraph_contraction-expansion, FragmentAction_writes_AtomFragments, FragmentMolecule_checks_bonddegrees, GeometryObjects, Gui_Fixes, Gui_displays_atomic_force_velocity, ImplicitCharges, IndependentFragmentGrids, IndependentFragmentGrids_IndividualZeroInstances, IndependentFragmentGrids_IntegrationTest, IndependentFragmentGrids_Sole_NN_Calculation, JobMarket_RobustOnKillsSegFaults, JobMarket_StableWorkerPool, JobMarket_unresolvable_hostname_fix, MoreRobust_FragmentAutomation, ODR_violation_mpqc_open, PartialCharges_OrthogonalSummation, PdbParser_setsAtomName, PythonUI_with_named_parameters, QtGui_reactivate_TimeChanged_changes, Recreated_GuiChecks, Rewrite_FitPartialCharges, RotateToPrincipalAxisSystem_UndoRedo, SaturateAtoms_findBestMatching, SaturateAtoms_singleDegree, StoppableMakroAction, Subpackage_CodePatterns, Subpackage_JobMarket, Subpackage_LinearAlgebra, Subpackage_levmar, Subpackage_mpqc_open, Subpackage_vmg, Switchable_LogView, ThirdParty_MPQC_rebuilt_buildsystem, TrajectoryDependenant_MaxOrder, TremoloParser_IncreasedPrecision, TremoloParser_MultipleTimesteps, TremoloParser_setsAtomName, Ubuntu_1604_changes, stable
Children:
083490
Parents:
630a6e
git-author:
Frederik Heber <heber@…> (11/27/11 18:38:26)
git-committer:
Frederik Heber <heber@…> (05/04/12 14:19:06)
Message:

Server and Worker now also correctly exchange result.

  • Server uses FragmentQueue which also stores the obtained result.
  • Worker calls received FragmentJob::Work() function and returns the thereby calculated FragmentResult.
  • Regression test for Server/Worker now checks whether job #1 is listed twice, this basically checks whether the result has been exchanged.

There have been some concepts to understand to get this working and these

shall be briefly recorded here:

  • asynchronous communication can only work on objects that live beyond the scope of where they have been called, e.g. therefore FragmentScheduler contains a FragmentResult and FragmentWorker a FragmentJob instance. They receive this object and need the write access in the scope of the aync. comm. and not of the caller's scope. This is probably because the initial argument is only used to set up a buffer of correct length. However, the received instance is created/deserialized first when the communication is completed. And this may well be after the caller's scope has been left.
  • This is different to read operations as probably there the object to send is immediately serialized and placed into an internal buffer such that later access is only to this buffer and not to the original instance which therefore does not need to exist anymore. That's why the above Schedulder and Worker do not have the "other" instance as class members as well.
  • chaining asynchronous communications, e.g. a write after a read has been performed, can only be done by using the callback functions that the async_write/read gets as parameters. They are called when the one operation has finished and therein the next operation can then be launched. This way a successful chain is executed.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • src/Fragmentation/Automation/FragmentScheduler.cpp

    r630a6e ref2767  
    3535#include "CodePatterns/Log.hpp"
    3636#include "FragmentJob.hpp"
     37#include "JobId.hpp"
    3738
    3839#include "FragmentScheduler.hpp"
     
    4344  acceptor_(io_service,
    4445      boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)
    45   )
     46  ),
     47  result(JobId::NoJob)
    4648{
    4749  Info info(__FUNCTION__);
     
    6062{
    6163  Info info(__FUNCTION__);
    62   std::cout << "handle_accept called." << std::endl;
    6364  if (!e)
    6465  {
     
    6768    if (JobsQueue.isJobPresent()) {
    6869      // pop a job and send it to the client.
    69       FragmentJob s(JobsQueue.popJob());
     70      FragmentJob job(JobsQueue.popJob());
    7071      // The connection::async_write() function will automatically
    7172      // serialize the data structure for us.
    72       conn->async_write(s,
     73      LOG(1, "INFO: Sending job #" << job.getId() << ".");
     74      conn->async_write(job,
    7375        boost::bind(&FragmentScheduler::handle_write, this,
    7476        boost::asio::placeholders::error, conn));
     
    8587        boost::bind(&FragmentScheduler::handle_write, this,
    8688        boost::asio::placeholders::error, conn));
     89
     90      // then there must be no read necesary
    8791
    8892      ELOG(2, "There is currently no job present in the queue.");
     
    102106{
    103107    Info info(__FUNCTION__);
    104   // Nothing to do. The socket will be closed automatically when the last
    105   // reference to the connection object goes away.
     108    LOG(1, "INFO: Job sent.");
     109    // obtain result
     110    LOG(1, "INFO: Receiving result for a job ...");
     111    conn->async_read(result,
     112      boost::bind(&FragmentScheduler::handle_read, this,
     113      boost::asio::placeholders::error, conn));
    106114}
    107115
     116/// Handle completion of a read operation.
     117void FragmentScheduler::handle_read(const boost::system::error_code& e, connection_ptr conn)
     118{
     119    Info info(__FUNCTION__);
     120    // nothing to do
     121    LOG(1, "INFO: Received result for job #" << result.getId() << " ...");
     122    // and push into queue
     123    ASSERT(result.getId() != JobId::NoJob,
     124        "FragmentScheduler::handle_write() - result received has NoJob id.");
     125    ASSERT(result.getId() != JobId::IllegalJob,
     126        "FragmentScheduler::handle_write() - result received has IllegalJob id.");
     127    if ((result.getId() != JobId::NoJob) && (result.getId() != JobId::IllegalJob))
     128      JobsQueue.pushResult(result);
     129    // erase result
     130    result = FragmentResult(JobId::NoJob);
     131}
     132
Note: See TracChangeset for help on using the changeset viewer.