Samchon Framework for CPP  1.0.0
DistributedProcess.hpp
1 #pragma once
2 #include <samchon/API.hpp>
3 
4 #include <samchon/templates/distributed/base/DistributedProcessBase.hpp>
5 #include <samchon/protocol/Entity.hpp>
6 
7 #include <samchon/protocol/Invoke.hpp>
8 #include <samchon/templates/distributed/DSInvokeHistory.hpp>
9 
10 #include <samchon/templates/distributed/DistributedSystem.hpp>
11 #include <samchon/templates/distributed/base/DistributedSystemArrayBase.hpp>
12 #include <samchon/templates/parallel/base/ParallelSystemArrayBase.hpp>
13 
14 namespace samchon
15 {
16 namespace templates
17 {
18 namespace distributed
19 {
46  : public virtual protocol::Entity<std::string>,
47  public base::DistributedProcessBase
48  {
49  private:
51 
52  base::DistributedSystemArrayBase *system_array_;
53 
54  protected:
62  std::string name;
63 
64  public:
65  /* ---------------------------------------------------------
66  CONSTRUCTORS
67  --------------------------------------------------------- */
73  DistributedProcess(base::DistributedSystemArrayBase *systemArray)
74  : super()
75  {
76  this->system_array_ = systemArray;
77 
78  resource = 1.0;
79  enforced_ = false;
80  };
81  virtual ~DistributedProcess() = default;
82 
83  virtual void construct(std::shared_ptr<library::XML> xml) override
84  {
85  name = xml->getProperty<std::string>("name");
86  resource = xml->getProperty<double>("resource");
87  };
88 
89  /* ---------------------------------------------------------
90  ACCESSORS
91  --------------------------------------------------------- */
95  virtual auto key() const -> std::string override
96  {
97  return name;
98  };
99 
105  template <class SystemArray>
106  auto getSystemArray() const -> SystemArray*
107  {
108  return (SystemArray*)system_array_;
109  };
110 
114  auto getName() const -> std::string
115  {
116  return name;
117  };
118 
141  auto getResource() const -> double
142  {
143  return resource;
144  };
145 
169  void setResource(double val)
170  {
171  resource = val;
172  enforced_ = false;
173  };
174 
200  void enforceResource(double val)
201  {
202  resource = val;
203  enforced_ = true;
204  };
205 
206  public:
207  /* ---------------------------------------------------------
208  INVOKE MESSAGE CHAIN
209  --------------------------------------------------------- */
224  auto sendData(std::shared_ptr<protocol::Invoke> invoke) -> std::shared_ptr<DistributedSystem>
225  {
226  return sendData(invoke, 1.0);
227  };
228 
245  virtual auto sendData(std::shared_ptr<protocol::Invoke> invoke, double weight) -> std::shared_ptr<DistributedSystem> override
246  {
247  library::UniqueReadLock uk(((external::base::ExternalSystemArrayBase*)system_array_)->getMutex());
248  if (((protocol::SharedEntityDeque<external::ExternalSystem>*)system_array_)->empty() == true)
249  return nullptr;
250 
251  // ADD UID FOR ARCHIVING HISTORY
252  size_t uid;
253  if (invoke->has("_History_uid") == false)
254  {
255  // ISSUE UID AND ATTACH IT TO INVOKE'S LAST PARAMETER
256  uid = ((parallel::base::ParallelSystemArrayBase*)system_array_)->_Fetch_history_sequence();
257  invoke->emplace_back(new protocol::InvokeParameter("_History_uid", uid));
258  }
259  else
260  {
261  // INVOKE MESSAGE ALREADY HAS ITS OWN UNIQUE ID
262  // - system_array_ IS A TYPE OF DistributedSystemArrayMediator. THE MESSAGE HAS COME FROM ITS MASTER
263  // - A Distributed HAS DISCONNECTED. THE SYSTEM SHIFTED ITS CHAIN TO ANOTHER SLAVE.
264  uid = invoke->get("_History_uid")->getValue<size_t>();
265 
266  // FOR CASE 1. UPDATE HISTORY_SEQUENCE TO MAXIMUM
267  if (uid > ((parallel::base::ParallelSystemArrayBase*)system_array_)->_Get_history_sequence())
268  ((parallel::base::ParallelSystemArrayBase*)system_array_)->_Set_history_sequence(uid);
269 
270  // FOR CASE 2. ERASE ORDINARY PROGRESSIVE HISTORY FROM THE DISCONNECTED
271  progress_list_.erase(uid);
272  }
273 
274  // ADD ROLE NAME FOR MEDIATOR
275  if (invoke->has("_Process_name") == false)
276  invoke->emplace_back(new protocol::InvokeParameter("_Process_name", name));
277  if (invoke->has("_Process_weight") == false)
278  invoke->emplace_back(new protocol::InvokeParameter("_Process_weight", weight));
279  else
280  weight = invoke->get("_Process_weight")->getValue<double>();
281 
282  // FIND THE MOST IDLE SYSTEM
283  std::vector<std::shared_ptr<external::ExternalSystem>> children;
284  std::shared_ptr<DistributedSystem> idle_system;
285 
286  for (size_t i = 0; i < children.size(); i++)
287  {
288  std::shared_ptr<DistributedSystem> system = std::dynamic_pointer_cast<DistributedSystem>(children.at(i));
289  if (system->_Is_excluded() == true)
290  continue; // BEING REMOVED SYSTEM
291 
292  if (idle_system == nullptr
293  || system->_Get_progress_list().size() < idle_system->_Get_progress_list().size()
294  || system->getPerformance() < idle_system->getPerformance())
295  idle_system = system;
296  }
297 
298  // ARCHIVE HISTORY ON PROGRESS_LIST (IN SYSTEM AND ROLE AT THE SAME TIME)
299  std::shared_ptr<DSInvokeHistory> history(new DSInvokeHistory(idle_system.get(), this, invoke, weight));
300 
301  progress_list_.emplace(uid, history);
302  idle_system->_Get_progress_list().emplace(uid, make_pair(invoke, history));
303 
304  uk.unlock(); // SELECTING IDLE AND ARCHIVING HISTORY ENTITY ARE COMPLETED.
305 
306  // SEND DATA
307  idle_system->sendData(invoke);
308 
309  // RETURNS THE IDLE
310  return idle_system;
311  };
312 
316  virtual void replyData(std::shared_ptr<protocol::Invoke>) = 0;
317 
318  public:
319  /* ---------------------------------------------------------
320  EXPORTERS
321  --------------------------------------------------------- */
322  virtual auto TAG() const -> std::string override
323  {
324  return "process";
325  };
326 
327  virtual auto toXML() const -> std::shared_ptr<library::XML> override
328  {
329  std::shared_ptr<library::XML> &xml = super::toXML();
330  xml->setProperty("name", name);
331  xml->setProperty("resource", resource);
332 
333  return xml;
334  };
335 
336  /* ---------------------------------------------------------
337  INTERNAL METHODS
338  --------------------------------------------------------- */
339  auto _Compute_average_elapsed_time() const -> double
340  {
341  double sum = 0.0;
342 
343  for (auto it = history_list_.begin(); it != history_list_.end(); it++)
344  {
345  std::shared_ptr<DSInvokeHistory> history = it->second;
346  double elapsed_time = history->computeElapsedTime() / history->getWeight();
347 
348  // THE SYSTEM'S PERFORMANCE IS 5. THE SYSTEM CAN HANDLE A PROCESS VERY QUICKLY
349  // AND ELAPSED TIME OF THE PROCESS IS 3 SECONDS
350  // THEN I CONSIDER THE ELAPSED TIME AS 15 SECONDS.
351  sum += elapsed_time * history->getSystem()->getPerformance();
352  }
353  return sum / history_list_.size();
354  };
355 
356  auto _Get_progress_list() -> HashMap<size_t, std::shared_ptr<DSInvokeHistory>>& { return progress_list_; };
357  auto _Get_progress_list() const -> const HashMap<size_t, std::shared_ptr<DSInvokeHistory>>& { return progress_list_; };
358 
359  auto _Get_history_list() -> HashMap<size_t, std::shared_ptr<DSInvokeHistory>>& { return history_list_; };
360  auto _Get_history_list() const -> const HashMap<size_t, std::shared_ptr<DSInvokeHistory>>& { return history_list_; };
361 
362  auto _Is_enforced() const -> bool
363  {
364  return enforced_;
365  };
366  };
367 };
368 };
369 };
An entity, a standard data class.
Definition: Entity.hpp:115
auto sendData(std::shared_ptr< protocol::Invoke > invoke) -> std::shared_ptr< DistributedSystem >
DistributedProcess(base::DistributedSystemArrayBase *systemArray)
virtual auto key() const -> std::string override
void unlock() const
Unlock of read.
An Entity and a container of children Entity objects.
Definition: EntityGroup.hpp:40
Unique lock for reading.
virtual void replyData(std::shared_ptr< protocol::Invoke >)=0
virtual auto sendData(std::shared_ptr< protocol::Invoke > invoke, double weight) -> std::shared_ptr< DistributedSystem > override
A parameter of an Invoke.
Customized std::unordered_map.
Definition: HashMap.hpp:103