Samchon Framework for CPP  1.0.0
DistributedSystemArray.hpp
1 #pragma once
2 #include <samchon/API.hpp>
3 
4 #include <samchon/templates/parallel/ParallelSystemArray.hpp>
5 # include <samchon/templates/distributed/DistributedSystem.hpp>
6 # include <samchon/templates/distributed/DistributedProcess.hpp>
7 #include <samchon/templates/distributed/base/DistributedSystemArrayBase.hpp>
8 
9 namespace samchon
10 {
11 namespace templates
12 {
13 
20 namespace distributed
21 {
118  template <class System = DistributedSystem>
120  : public virtual parallel::ParallelSystemArray<System>,
121  public base::DistributedSystemArrayBase
122  {
123  private:
125 
126  public:
127  /* ---------------------------------------------------------
128  CONSTRUCTORS
129  --------------------------------------------------------- */
134  : super()
135  {
136  };
137  virtual ~DistributedSystemArray() = default;
138 
139  virtual void construct(std::shared_ptr<library::XML> xml) override
140  {
141  //--------
142  // CONSTRUCT ROLES
143  //--------
144  // CLEAR ORDINARY ROLES
145  process_map_.clear();
146 
147  // CREATE ROLES
148  if (xml->has("processes") == true && xml->get("processes")->front()->has("process") == true)
149  {
150  std::shared_ptr<library::XMLList> &role_xml_list = xml->get("processes")->front()->get("process");
151  for (size_t i = 0; i < role_xml_list->size(); i++)
152  {
153  std::shared_ptr<library::XML> &role_xml = role_xml_list->at(i);
154 
155  // CONSTRUCT ROLE FROM XML
156  std::shared_ptr<DistributedProcess> role(createProcess(role_xml));
157  role->construct(role_xml);
158 
159  // AND INSERT TO ROLE_MAP
160  insertProcess(role);
161  }
162  }
163 
164  //--------
165  // CONSTRUCT SYSTEMS
166  //--------
167  super::construct(xml);
168  };
169 
170  protected:
177  virtual auto createProcess(std::shared_ptr<library::XML>) -> DistributedProcess* = 0;
178 
179  /* ---------------------------------------------------------
180  HISTORY HANDLER - PERFORMANCE ESTIMATION, INTERNAL
181  --------------------------------------------------------- */
182  virtual auto _Complete_history(std::shared_ptr<slave::InvokeHistory> $history) -> bool override
183  {
184  std::shared_ptr<DSInvokeHistory> history = std::dynamic_pointer_cast<DSInvokeHistory>($history);
185 
186  // ParallelSystem's history -> PRInvokeHistory
187  if (history == nullptr)
188  return super::_Complete_history($history);
189 
190  library::UniqueWriteLock uk(getMutex());
191 
192  //--------
193  // DistributedProcess's history -> DSInvokeHistory
194  //--------
195  // NO ROLE, THEN FAILED TO COMPLETE
196  if (history->getProcess() == nullptr)
197  return false;
198 
199  // ESTIMATE PERFORMANCE INDEXES
200  estimate_system_performance(history); // ESTIMATE SYSTEMS' INDEX
201  estimate_process_resource(history); // ESTIMATE PROCESS' PERFORMANCE
202 
203  // AT LAST, NORMALIZE PERFORMANCE INDEXES OF ALL SYSTEMS AND ROLES
204  _Normalize_performance();
205  return true;
206  };
207 
208  virtual void _Normalize_performance() override
209  {
210  // NORMALIZE SYSTEMS' PERFORMANCE INDEXES
211  super::_Normalize_performance();
212 
213  // NORMALIZE ROLES' PERFORMANCE INDEXES
214  double average = 0;
215  size_t denominator = 0;
216 
217  for (auto it = process_map_.begin(); it != process_map_.end(); it++)
218  {
219  auto process = it->second;
220  if (process->_Is_enforced() == true)
221  continue; // THE RESOURCE INDEX IS ENFORCED. DO NOT PERMIT REVALUATION
222 
223  average += process->getResource();
224  denominator++;
225  }
226  average /= (double)denominator;
227 
228  // DIVIDE FROM THE AVERAGE
229  for (auto it = process_map_.begin(); it != process_map_.end(); it++)
230  {
231  auto process = it->second;
232  if (process->_Is_enforced() == true)
233  continue; // THE RESOURCE INDEX IS ENFORCED. DO NOT PERMIT REVALUATION
234 
235  process->setResource(process->getResource() / average);
236  }
237  };
238 
239  private:
240  void estimate_process_resource(std::shared_ptr<DSInvokeHistory> history)
241  {
242  DistributedProcess *process = history->getProcess();
243  if (process->_Is_enforced() == true)
244  return; // THE RESOURCE INDEX IS ENFORCED. DO NOT PERMIT REVALUATION
245 
246  double average_elapsed_time_of_others = 0;
247  size_t denominator = 0;
248 
249  // COMPUTE AVERAGE ELAPSED TIME
250  for (auto it = process_map_.begin(); it != process_map_.end(); it++)
251  {
252  DistributedProcess *my_process = it->second.get();
253  if (my_process == process || my_process->_Get_history_list().empty() == true)
254  continue;
255 
256  average_elapsed_time_of_others += my_process->_Compute_average_elapsed_time() * my_process->getResource();
257  denominator++;
258  }
259 
260  // COMPARE WITH THIS HISTORY'S ELAPSED TIME
261  if (denominator != 0)
262  {
263  // DIVE WITH DENOMINATOR
264  average_elapsed_time_of_others /= (double)denominator;
265 
266  // DEDUCT NEW PERFORMANCE INDEX BASED ON THE EXECUTION TIME
267  // - ROLE'S PERFORMANCE MEANS; HOW MUCH TIME THE ROLE NEEDS
268  // - ELAPSED TIME IS LONGER, THEN PERFORMANCE IS HIGHER
269  double elapsed_time = history->computeElapsedTime() / history->getWeight(); // CONSIDER WEIGHT
270  double new_performance = elapsed_time / average_elapsed_time_of_others; // THE NEW RESOURCE
271 
272  // DEDUCT RATIO TO REFLECT THE NEW PERFORMANCE INDEX -> MAXIMUM: 15%
273  double ordinary_ratio;
274  if (process->_Get_history_list().size() < 2)
275  ordinary_ratio = .15;
276  else
277  ordinary_ratio = min(.85, 1.0 / (process->_Get_history_list().size() - 1.0));
278 
279  // DEFINE NEW PERFORMANCE
280  process->setResource
281  (
282  (process->getResource() * ordinary_ratio)
283  + (new_performance * (1 - ordinary_ratio))
284  );
285  }
286  };
287 
288  void estimate_system_performance(std::shared_ptr<DSInvokeHistory> history)
289  {
290  DistributedSystem *system = history->getSystem();
291  if (system->_Is_enforced() == true)
292  return; // THE PERFORMANCE INDEX IS ENFORCED. IT DOESN'T PERMIT REVALUATION
293 
294  double average_elapsed_time_of_others = 0;
295  size_t denominator = 0;
296 
297  // COMPUTE AVERAGE ELAPSED TIME
298  for (size_t i = 0; i < this->size(); i++)
299  {
300  shared_ptr<DistributedSystem> system = this->at(i);
301 
302  double avg = system->_Compute_average_elapsed_time();
303  if (avg == -1)
304  continue;
305 
306  average_elapsed_time_of_others += avg;
307  denominator++;
308  }
309 
310  // COMPARE WITH THIS HISTORY'S ELAPSED TIME
311  if (denominator != 0)
312  {
313  // DIVE WITH DENOMINATOR
314  average_elapsed_time_of_others /= denominator;
315 
316  // DEDUCT NEW PERFORMANCE INDEX BASED ON THE EXECUTION TIME
317  // - SYSTEM'S PERFORMANCE MEANS; HOW FAST THE SYSTEM IS
318  // - ELAPSED TIME IS LOWER, THEN PERFORMANCE IS HIGHER
319  double elapsed_time = history->computeElapsedTime() / history->getWeight();
320  double new_performance = average_elapsed_time_of_others / elapsed_time;
321 
322  // DEDUCT RATIO TO REFLECT THE NEW PERFORMANCE INDEX -> MAXIMUM: 30%
323  double ordinary_ratio;
324  if (system->_Get_history_list().size() < 2)
325  ordinary_ratio = .3;
326  else
327  ordinary_ratio = min(0.7, 1.0 / (system->_Get_history_list().size() - 1.0));
328 
329  // DEFINE NEW PERFORMANCE
330  system->setPerformance
331  (
332  (system->getPerformance() * ordinary_ratio)
333  + (new_performance * (1 - ordinary_ratio))
334  );
335  }
336  };
337 
338  public:
339  /* ---------------------------------------------------------
340  EXPORTERS
341  --------------------------------------------------------- */
342  virtual auto toXML() const -> std::shared_ptr<library::XML> override
343  {
344  std::shared_ptr<library::XML> &xml = super::toXML();
345  if (process_map_.empty() == true)
346  return xml;
347 
348  std::shared_ptr<library::XML> processes_xml(new library::XML());
349  {
350  processes_xml->setTag("processes");
351  for (auto it = process_map_.begin(); it != process_map_.end(); it++)
352  processes_xml->push_back(it->second->toXML());
353  }
354  xml->push_back(processes_xml);
355  return xml;
356  };
357  };
358 };
359 };
360 };
virtual auto createProcess(std::shared_ptr< library::XML >) -> DistributedProcess *=0
virtual void construct(std::shared_ptr< library::XML > xml)
Construct data of the Entity from an XML object.
Definition: EntityGroup.hpp:71
virtual void construct(std::shared_ptr< library::XML > xml) override
Construct data of the Entity from an XML object.
Unique lock for writing.
virtual auto toXML() const -> std::shared_ptr< library::XML > override
Get an XML object represents the EntityGroup.
virtual auto toXML() const -> std::shared_ptr< library::XML >
Get an XML object represents the EntityGroup.