Samchon Framework for CPP  1.0.0
ParallelSystemArray.hpp
1 #pragma once
2 #include <samchon/API.hpp>
3 
4 #include <samchon/templates/external/ExternalSystemArray.hpp>
5 # include <samchon/templates/parallel/ParallelSystem.hpp>
6 #include <samchon/templates/parallel/base/ParallelSystemArrayBase.hpp>
7 
8 namespace samchon
9 {
10 namespace templates
11 {
12 
19 namespace parallel
20 {
76  template <class System = ParallelSystem>
78  : public virtual external::ExternalSystemArray<System>,
79  public base::ParallelSystemArrayBase
80  {
81  private:
83 
84  public:
85  /* ---------------------------------------------------------
86  CONSTRUCTORS
87  --------------------------------------------------------- */
92  : super(),
93  base::ParallelSystemArrayBase()
94  {
95  };
96  virtual ~ParallelSystemArray() = default;
97 
98  /* =========================================================
99  INVOKE MESSAGE CHAIN
100  - SEND DATA
101  - PERFORMANCE ESTIMATION
102  ============================================================
103  SEND & REPLY DATA
104  --------------------------------------------------------- */
131  auto sendSegmentData(std::shared_ptr<protocol::Invoke> invoke, size_t size) -> size_t
132  {
133  return sendPieceData(invoke, 0, size);
134  };
135 
166  virtual auto sendPieceData(std::shared_ptr<protocol::Invoke> invoke, size_t first, size_t last) -> size_t
167  {
168  library::UniqueWriteLock uk(getMutex());
169 
170  if (invoke->has("_History_uid") == false)
171  invoke->emplace_back(new protocol::InvokeParameter("_History_uid", _Fetch_history_sequence()));
172  else
173  {
174  // INVOKE MESSAGE ALREADY HAS ITS OWN UNIQUE ID
175  // - THIS IS A TYPE OF ParallelSystemArrayMediator. THE MESSAGE HAS COME FROM ITS MASTER
176  // - A ParallelSystem HAS DISCONNECTED. THE SYSTEM SHIFTED ITS CHAIN TO OTHER SLAVES.
177  size_t uid = invoke->get("_History_uid")->getValue<size_t>();
178 
179  // FOR CASE 1. UPDATE HISTORY_SEQUENCE TO MAXIMUM
180  if (uid > _Get_history_sequence())
181  _Set_history_sequence(uid);
182  }
183 
184  // TOTAL NUMBER OF PIECES TO DIVIDE
185  size_t segment_size = last - first;
186 
187  // SYSTEMS TO BE GET DIVIDED PROCESSES AND
188  std::vector<std::shared_ptr<ParallelSystem>> system_array;
189  std::vector<std::thread> threads;
190 
191  system_array.reserve(size());
192  threads.reserve(size());
193 
194  // POP EXCLUDEDS
195  for (size_t i = 0; i < size(); i++)
196  {
197  std::shared_ptr<ParallelSystem> system = at(i);
198 
199  if (system->_Is_excluded() == false)
200  system_array.push_back(system);
201  }
202 
203  // ORDERS
204  for (size_t i = 0; i < system_array.size(); i++)
205  {
206  std::shared_ptr<ParallelSystem> system = system_array[i];
207 
208  // COMPUTE FIRST AND LAST INDEX TO ALLOCATE
209  size_t piece_size = (i == system_array.size() - 1)
210  ? segment_size - first
211  : (size_t)(segment_size / system_array.size() * system->getPerformance());
212  if (piece_size == 0)
213  continue;
214 
215  std::shared_ptr<protocol::Invoke> my_invoke(new protocol::Invoke(invoke->getListener()));
216  {
217  // DUPLICATE INVOKE AND ATTACH PIECE INFO
218  my_invoke->assign(invoke->begin(), invoke->end());
219  my_invoke->emplace_back(new protocol::InvokeParameter("_Piece_first", first));
220  my_invoke->emplace_back(new protocol::InvokeParameter("_Piece_last", last));
221  };
222 
223  // ENROLL TO PROGRESS LIST
224  std::shared_ptr<slave::InvokeHistory> history(new PRInvokeHistory(my_invoke));
225  system->_Get_progress_list().emplace(history->getUID(), std::make_pair(invoke, history));
226 
227  // ENROLL THE SEND DATA INTO THREADS
228  threads.emplace_back(&ParallelSystem::sendData, system.get(), my_invoke);
229  first += piece_size; // FOR THE NEXT STEP
230  }
231  uk.unlock();
232 
233  // JOIN THREADS
234  for (auto it = threads.begin(); it != threads.end(); it++)
235  it->join();
236 
237  return threads.size();
238  };
239 
240  /* ---------------------------------------------------------
241  PERFORMANCE ESTIMATION - INTERNAL METHODS
242  --------------------------------------------------------- */
243  virtual auto _Complete_history(std::shared_ptr<slave::InvokeHistory> history) -> bool
244  {
245  // WRONG TYPE
246  if (std::dynamic_pointer_cast<PRInvokeHistory>(history) == nullptr)
247  return false;
248 
249  //========
250  // READ LOCK
251  //========
252  library::UniqueWriteLock uk(getMutex());
253  size_t uid = history->getUID();
254 
255  // ALL THE SUB-TASKS ARE DONE?
256  for (size_t i = 0; i < size(); i++)
257  if (at(i)->_Get_progress_list().has(uid) == true)
258  return false; // IT'S ON A PROCESS IN SOME SYSTEM.
259 
260  //--------
261  // RE-CALCULATE PERFORMANCE INDEX
262  //--------
263  // CONSTRUCT BASIC DATA
264  std::vector<std::pair<std::shared_ptr<ParallelSystem>, double>> system_pairs;
265  double performance_index_average = 0.0;
266 
267  system_pairs.reserve(size());
268 
269  for (size_t i = 0; i < size(); i++)
270  {
271  std::shared_ptr<ParallelSystem> system = at(i);
272  if (system->_Get_history_list().has(uid) == false)
273  continue; // NO HISTORY (HAVE NOT PARTICIPATED IN THE PARALLEL PROCESS)
274 
275  // COMPUTE PERFORMANCE INDEX BASIS ON EXECUTION TIME OF THIS PARALLEL PROCESS
276  std::shared_ptr<PRInvokeHistory> my_history = std::dynamic_pointer_cast<PRInvokeHistory>(system->_Get_history_list().get(uid));
277  double performance_index = my_history->computeSize() / (double)my_history->computeElapsedTime();
278 
279  // PUSH TO SYSTEM PAIRS AND ADD TO AVERAGE
280  system_pairs.emplace_back(system, performance_index);
281  performance_index_average += performance_index;
282  }
283  performance_index_average /= system_pairs.size();
284 
285  // RE-CALCULATE PERFORMANCE INDEX
286  for (size_t i = 0; i < system_pairs.size(); i++)
287  {
288  // SYSTEM AND NEW PERFORMANCE INDEX BASIS ON THE EXECUTION TIME
289  auto system = system_pairs[i].first;
290  if (system->_Is_enforced() == true)
291  continue; // PERFORMANCE INDEX IS ENFORCED. DOES NOT PERMIT REVALUATION
292 
293  double new_performance = system_pairs[i].second / performance_index_average;
294 
295  // DEDUCT RATIO TO REFLECT THE NEW PERFORMANCE INDEX
296  double ordinary_ratio;
297  if (system->_Get_history_list().size() < 2)
298  ordinary_ratio = .3;
299  else
300  ordinary_ratio = std::min(0.7, 1.0 / (system->_Get_history_list().size() - 1.0));
301 
302  system->setPerformance((system->getPerformance() * ordinary_ratio) + (new_performance * (1 - ordinary_ratio)));
303  }
304 
305  // AT LAST, NORMALIZE PERFORMANCE INDEXES OF ALL SLAVE SYSTEMS
306  _Normalize_performance();
307  return true;
308  };
309 
310  protected:
311  virtual void _Normalize_performance()
312  {
313  // COMPUTE AVERAGE
314  double average = 0.0;
315  size_t denominator = 0;
316 
317  for (size_t i = 0; i < size(); i++)
318  {
319  std::shared_ptr<ParallelSystem> system = at(i);
320  if (system->_Is_enforced() == true)
321  continue; // PERFORMANCE INDEX IS ENFORCED. DOES NOT PERMIT REVALUATION
322 
323  average += system->getPerformance();
324  denominator++;
325  }
326  average /= (double)denominator;
327 
328  // DIVIDE FROM THE AVERAGE
329  for (size_t i = 0; i < size(); i++)
330  {
331  std::shared_ptr<ParallelSystem> system = at(i);
332  if (system->_Is_enforced() == true)
333  continue; // PERFORMANCE INDEX IS ENFORCED. DOES NOT PERMIT REVALUATION
334 
335  system->setPerformance(system->getPerformance() / average);
336  }
337  };
338  };
339 };
340 };
341 };
An Entity and a container of children Entity objects.
Definition: EntityGroup.hpp:40
Unique lock for writing.
auto sendSegmentData(std::shared_ptr< protocol::Invoke > invoke, size_t size) -> size_t
Standard message of network I/O.
Definition: Invoke.hpp:35
void unlock()
Unlock on writing.
A parameter of an Invoke.
virtual auto sendPieceData(std::shared_ptr< protocol::Invoke > invoke, size_t first, size_t last) -> size_t
virtual void sendData(std::shared_ptr< protocol::Invoke > invoke) override