Samchon Framework for CPP  1.0.0
ParallelSystem.hpp
1 #pragma once
2 #include <samchon/API.hpp>
3 
4 #include <samchon/templates/external/ExternalSystem.hpp>
5 #include <samchon/protocol/IListener.hpp>
6 
7 #include <samchon/templates/parallel/base/ParallelSystemArrayBase.hpp>
8 
9 #include <thread>
10 #include <samchon/HashMap.hpp>
11 #include <samchon/templates/parallel/PRInvokeHistory.hpp>
12 
13 namespace samchon
14 {
15 namespace templates
16 {
17 namespace parallel
18 {
59  : public virtual external::ExternalSystem,
60  public virtual protocol::IListener
61  {
62  private:
64 
65  HashMap<size_t, std::pair<std::shared_ptr<protocol::Invoke>, std::shared_ptr<slave::InvokeHistory>>> progress_list_;
67 
68  double performance_{ 1.0 };
69  bool enforced_{ false };
70  bool excluded_{ false };
71 
72  public:
73  /* ---------------------------------------------------------
74  CONSTRUCTORS
75  --------------------------------------------------------- */
76  // TAKES SUPER'S CONSTRUCTORS
77  using super::super;
78 
91  virtual ~ParallelSystem()
92  {
93  excluded_ = true;
94 
95  // SHIFT PARALLEL INVOKE MESSAGES HAD PROGRESSED TO OTHER SLAVES
96  for (auto it = progress_list_.begin(); it != progress_list_.end(); it++)
97  {
98  // INVOKE MESSAGE AND ITS HISTORY ON PROGRESS
99  std::shared_ptr<protocol::Invoke> invoke = it->second.first;
100  std::shared_ptr<slave::InvokeHistory> history = it->second.second;
101 
102  // SEND THEM BACK
103  _Send_back_history(invoke, history);
104  }
105  };
106 
107  virtual void construct(std::shared_ptr<library::XML> xml) override
108  {
109  super::construct(xml);
110 
111  performance_ = xml->getProperty<double>("performance");
112  enforced_ = xml->getProperty<bool>("enforced");
113  };
114 
115  /* ---------------------------------------------------------
116  ACCESSORS
117  --------------------------------------------------------- */
144  auto getPerformance() const -> double
145  {
146  return performance_;
147  };
148 
173  void setPerformance(double val)
174  {
175  performance_ = val;
176  enforced_ = false;
177  };
178 
203  void enforcePerformance(double val)
204  {
205  performance_ = val;
206  enforced_ = true;
207  };
208 
209  /* ---------------------------------------------------------
210  INVOKE MESSAGE CHAIN - PERFORMANCE ESTIMATION
211  --------------------------------------------------------- */
212  protected:
213  virtual void _Reply_data(std::shared_ptr<protocol::Invoke> invoke) override
214  {
215  if (invoke->getListener() == "_Report_history")
216  _Report_history(invoke->front()->getValueAsXML());
217  else if (invoke->getListener() == "_Send_back_history")
218  {
219  size_t uid = invoke->front()->getValue<size_t>();
220  auto it = progress_list_.find(uid);
221 
222  if (it != progress_list_.end())
223  _Send_back_history(it->second.first, it->second.second);
224  }
225  else
226  replyData(invoke);
227  };
228 
229  virtual void _Report_history(std::shared_ptr<library::XML> xml)
230  {
231  library::UniqueWriteLock uk(system_array_->getMutex());
232 
233  //--------
234  // CONSTRUCT HISTORY
235  //--------
236  std::shared_ptr<PRInvokeHistory> history(new PRInvokeHistory());
237  history->construct(xml);
238 
239  // IF THE HISTORY IS NOT EXIST IN PROGRESS, THEN TERMINATE REPORTING
240  auto progress_it = progress_list_.find(history->getUID());
241  if (progress_it == progress_list_.end())
242  return;
243 
244  // ARCHIVE FIRST AND LAST INDEX
245  history->first_ = std::dynamic_pointer_cast<PRInvokeHistory>(progress_it->second.second)->getFirst();
246  history->last_ = std::dynamic_pointer_cast<PRInvokeHistory>(progress_it->second.second)->getLast();
247 
248  // ERASE FROM ORDINARY PROGRESS AND MIGRATE TO THE HISTORY
249  progress_list_.erase(progress_it);
250  history_list_.insert({ history->getUID(), history });
251 
252  // NOTIFY TO THE MANAGER, SYSTEM_ARRAY
253  ((base::ParallelSystemArrayBase*)system_array_)->_Complete_history(history);
254  };
255 
256  virtual void _Send_back_history(std::shared_ptr<protocol::Invoke> invoke, std::shared_ptr<slave::InvokeHistory> $history)
257  {
258  std::shared_ptr<PRInvokeHistory> history = std::dynamic_pointer_cast<PRInvokeHistory>($history);
259  if (history == nullptr)
260  return;
261 
262  // REMOVE UID AND FIRST, LAST INDEXES
263  for (size_t i = invoke->size(); i < invoke->size(); i--)
264  {
265  const std::string &name = invoke->at(i)->getName();
266 
267  if (name == "_History_uid" || name == "_Piece_first" || name == "_Piece_last")
268  invoke->erase(invoke->begin() + i);
269  }
270 
271  // RE-SEND (DISTRIBUTE) THE PIECE TO OTHER SLAVES
272  std::thread
273  (
274  &base::ParallelSystemArrayBase::sendPieceData, (base::ParallelSystemArrayBase*)system_array_,
275  invoke, history->getFirst(), history->getLast()
276  ).detach();
277 
278  // ERASE FROM THE PROGRESS LIST
279  progress_list_.erase(history->getUID());
280  };
281 
282  public:
283  /* ---------------------------------------------------------
284  EXPORTERS
285  --------------------------------------------------------- */
286  virtual auto toXML() const -> std::shared_ptr<library::XML> override
287  {
288  std::shared_ptr<library::XML> xml = super::toXML();
289  xml->setProperty("performance", performance_);
290  xml->setProperty("enforced", enforced_);
291 
292  return xml;
293  };
294 
295  /* ---------------------------------------------------------
296  INTERNAL ACCESSORS
297  --------------------------------------------------------- */
298  auto _Get_progress_list() -> HashMap<size_t, std::pair<std::shared_ptr<protocol::Invoke>, std::shared_ptr<slave::InvokeHistory>>>& { return progress_list_; };
299  auto _Get_progress_list() const -> const HashMap<size_t, std::pair<std::shared_ptr<protocol::Invoke>, std::shared_ptr<slave::InvokeHistory>>>& { return progress_list_; };
300 
301  auto _Get_history_list() -> HashMap<size_t, std::shared_ptr<slave::InvokeHistory>>& { return history_list_; };
302  auto _Get_history_list() const -> const HashMap<size_t, std::shared_ptr<slave::InvokeHistory>>& { return history_list_; };
303 
304  auto _Is_enforced() const -> bool
305  {
306  return enforced_;
307  };
308  auto _Is_excluded() const -> bool
309  {
310  return excluded_;
311  };
312  void _Set_excluded()
313  {
314  excluded_ = true;
315  };
316  };
317 };
318 };
319 };
virtual void construct(std::shared_ptr< library::XML > xml) override
Construct data of the Entity from an XML object.
virtual void construct(std::shared_ptr< library::XML > xml) override
Construct data of the Entity from an XML object.
virtual auto toXML() const -> std::shared_ptr< library::XML > override
Get an XML object represents the EntityGroup.
An Entity and a container of children Entity objects.
Definition: EntityGroup.hpp:40
Unique lock for writing.
virtual void replyData(std::shared_ptr< protocol::Invoke > invoke) override
virtual auto toXML() const -> std::shared_ptr< library::XML > override
Get an XML object represents the EntityGroup.
Customized std::unordered_map.
Definition: HashMap.hpp:103