Samchon Framework for CPP  1.0.0
DistributedSystem.hpp
1 #pragma once
2 #include <samchon/API.hpp>
3 
4 #include <samchon/templates/parallel/ParallelSystem.hpp>
5 
6 #include <samchon/templates/distributed/DSInvokeHistory.hpp>
7 #include <samchon/templates/distributed/base/DistributedSystemArrayBase.hpp>
8 #include <samchon/templates/distributed/base/DistributedProcessBase.hpp>
9 
10 namespace samchon
11 {
12 namespace templates
13 {
14 namespace distributed
15 {
52  : public virtual parallel::ParallelSystem
53  {
54  private:
56 
57  public:
58  /* ---------------------------------------------------------
59  CONSTRUCTORS
60  --------------------------------------------------------- */
61  using super::super;
62 
63  virtual ~DistributedSystem()
64  {
65  _Set_excluded();
66 
67  // SHIFT PARALLEL INVOKE MESSAGES HAD PROGRESSED TO OTHER SLAVES
68  for (auto it = _Get_progress_list().begin(); it != _Get_progress_list().end(); it++)
69  {
70  // INVOKE MESSAGE AND ITS HISTORY ON PROGRESS
71  std::shared_ptr<protocol::Invoke> invoke = it->second.first;
72  std::shared_ptr<slave::InvokeHistory> history = it->second.second;
73 
74  // SEND THEM BACK
75  _Send_back_history(invoke, history);
76  }
77  };
78 
79  protected:
97  virtual auto createChild(std::shared_ptr<library::XML>) -> external::ExternalSystemRole* override
98  {
99  return nullptr;
100  };
101 
102  public:
103  auto _Compute_average_elapsed_time() const -> double
104  {
105  double sum = 0.0;
106  size_t denominator = 0;
107 
108  for (auto it = _Get_history_list().begin(); it != _Get_history_list().end(); it++)
109  {
110  std::shared_ptr<DSInvokeHistory> history = std::dynamic_pointer_cast<DSInvokeHistory>(it->second);
111  if (history == nullptr)
112  continue;
113 
114  double elapsed_time = history->computeElapsedTime() / history->getWeight();
115 
116  sum += elapsed_time / ((base::DistributedProcessBase*)(history->getProcess()))->getResource();
117  denominator++;
118  }
119 
120  if (denominator == 0)
121  return -1;
122  else
123  return sum / denominator;
124  };
125 
126  public:
127  /* ---------------------------------------------------------
128  INVOKE MESSAGE CHAIN
129  --------------------------------------------------------- */
130  virtual void replyData(std::shared_ptr<protocol::Invoke> invoke) override
131  {
132  // SHIFT TO PROCESSES
133  auto process_map = ((base::DistributedSystemArrayBase*)system_array_)->getProcessMap();
134  for (auto it = process_map.begin(); it != process_map.end(); it++)
135  ((base::DistributedProcessBase*)(it->second.get()))->replyData(invoke);
136 
137  // SHIFT TO MASTER AND SLAVES
138  super::replyData(invoke);
139  };
140 
141  virtual void _Send_back_history(std::shared_ptr<protocol::Invoke> invoke, std::shared_ptr<slave::InvokeHistory> $history)
142  {
143  std::shared_ptr<DSInvokeHistory> history = std::dynamic_pointer_cast<DSInvokeHistory>($history);
144  if (history != nullptr)
145  {
146  // RE-SEND INVOKE MESSAGE TO ANOTHER SLAVE VIA ROLE
147  ((base::DistributedProcessBase*)(history->getProcess()))->sendData(invoke, history->getWeight());
148  }
149 
150  // ERASE THE HISTORY
151  super::_Send_back_history(invoke, history);
152  };
153 
154  protected:
155  virtual void _Report_history(std::shared_ptr<library::XML> xml) override
156  {
157  if (xml->hasProperty("_Piece_first") == true)
158  {
159  //--------
160  // ParallelSystem's history -> PRInvokeHistory
161  //--------
162  super::_Report_history(xml);
163  }
164  else
165  {
166  library::UniqueWriteLock uk(system_array_->getMutex());
167 
168  //--------
169  // DistributedProcess's history -> DSInvokeHistory
170  //--------
171  // CONSTRUCT HISTORY
172  std::shared_ptr<DSInvokeHistory> history(new DSInvokeHistory(this));
173  history->construct(xml);
174 
175  // IF THE HISTORY IS NOT EXIST IN PROGRESS, THEN TERMINATE REPORTING
176  auto progress_it = _Get_progress_list().find(history->getUID());
177  if (progress_it == _Get_progress_list().end())
178  return;
179 
180  history->weight_ = std::dynamic_pointer_cast<DSInvokeHistory>(progress_it->second.second)->getWeight();
181 
182  // ERASE FROM ORDINARY PROGRESS AND MIGRATE TO THE HISTORY
183  _Get_progress_list().erase(progress_it);
184  _Get_history_list().emplace(history->getUID(), history);
185 
186  // ALSO NOTIFY TO THE ROLE
187  if (history->getProcess() != nullptr)
188  ((base::DistributedProcessBase*)(history->getProcess()))->_Report_history(history);
189 
190  // COMPLETE THE HISTORY IN THE BELONGED SYSTEM_ARRAY
191  ((parallel::base::ParallelSystemArrayBase*)system_array_)->_Complete_history(history);
192  }
193  };
194  };
195 };
196 };
197 };
auto computeElapsedTime() const -> long long
Unique lock for writing.
virtual void replyData(std::shared_ptr< protocol::Invoke > invoke) override
virtual auto createChild(std::shared_ptr< library::XML >) -> external::ExternalSystemRole *override
virtual void replyData(std::shared_ptr< protocol::Invoke > invoke) override