Samchon Framework for CPP  1.0.0
ParallelSystemArray.cpp
1 #include <samchon/protocol/master/ParallelSystemArray.hpp>
2 
3 #include <samchon/protocol/master/ParallelSystem.hpp>
4 #include <samchon/protocol/master/PRMasterHistoryArray.hpp>
5 #include <samchon/protocol/master/PRMasterHistory.hpp>
6 
7 #include <thread>
8 #include <samchon/protocol/Invoke.hpp>
9 
10 using namespace std;
11 using namespace samchon::library;
12 using namespace samchon::protocol;
13 using namespace samchon::protocol::master;
14 
15 /* ------------------------------------------------------------------
16  CONSTRUCTORS
17 ------------------------------------------------------------------ */
18 ParallelSystemArray::ParallelSystemArray()
19 {
20  historyArray = new PRMasterHistoryArray(this);
21  progressArray = new PRMasterHistoryArray(this);
22 }
23 ParallelSystemArray::~ParallelSystemArray()
24 {
25  delete historyArray;
26  delete progressArray;
27 }
28 
29 /* ------------------------------------------------------------------
30  GETTERS
31 ------------------------------------------------------------------ */
32 SHARED_ENTITY_ARRAY_ELEMENT_ACCESSOR_BODY(ParallelSystemArray, ParallelSystem)
33 
34 /* ------------------------------------------------------------------
35  CHAIN OF INVOKE MESSAGE
36 ------------------------------------------------------------------ */
37 void ParallelSystemArray::sendSegmentData(shared_ptr<Invoke> invoke, size_t totalSize)
38 {
39  sendPieceData(invoke, 0, totalSize);
40 }
41 
42 void ParallelSystemArray::sendPieceData(shared_ptr<Invoke> invoke, size_t index, size_t totalSize)
43 {
44  if (invoke->has("invoke_history_uid") == false)
45  invoke->emplace_back(new InvokeParameter("invoke_history_uid", ++uid));
46 
47  vector<thread> threadArray(size());
48 
49  PRMasterHistory *history = new PRMasterHistory(historyArray, invoke, index, totalSize);
50  historyArray->emplace_back(history);
51 
52  size_t pieceIndex = index;
53 
54  for (size_t i = 0; i < size(); i++)
55  {
56  //DETERMINE PIECE SIZE
57  size_t pieceSize = (size_t)(totalSize / (double)size() * at(i)->performance);
58 
59  if (i == size() - 1 && pieceIndex + pieceSize < totalSize)
60  pieceSize = totalSize - pieceIndex;
61 
62  //LINKAGE
63  if (at(i)->systemArray == nullptr)
64  at(i)->systemArray = this;
65 
66  //CALL ASYNCHRONOUSLY
67  threadArray[i] =
68  thread
69  (
70  &ParallelSystem::sendPieceData, at(i).get(),
71  history, invoke, pieceIndex, pieceSize
72  );
73 
74  pieceIndex += pieceSize;
75  }
76  for (size_t i = 0; i < threadArray.size(); i++)
77  threadArray[i].join();
78 }
79 
80 void ParallelSystemArray::notifyEnd(PRMasterHistory *masterHistory)
81 {
82  // ESTIMATE LOCAL PERFORMANCE INDEX
83  auto &historyArray = masterHistory->historyArray;
84  double avgElapsedTime = 0.0;
85 
86  for (size_t i = 0; i < historyArray.size(); i++)
87  avgElapsedTime += historyArray[i]->calcAverageElapsedTime();
88 
89  for (size_t i = 0; i < historyArray.size(); i++)
90  {
91  PRInvokeHistory *history = historyArray[i];
92  ParallelSystem *system = history->system;
93 
94  double historyPerformance = avgElapsedTime / history->calcAverageElapsedTime();
95  system->performance = system->performance * .7 + historyPerformance * .3;
96  }
97 
98  //NORMALIZE ALL PERFORMANCE INDEX
99  double avgPerformance = 0.0;
100 
101  for (size_t i = 0; i < size(); i++)
102  avgPerformance += at(i)->performance / (double)size();
103 
104  for (size_t i = 0; i < size(); i++)
105  at(i)->performance /= avgPerformance;
106 }
107 
108 /*void ParallelSystemArray::notifyEnd(size_t uid)
109 {
110  //DE-COUNT
111  if (--progressMap[uid] == 0)
112  progressMap.erase(uid);
113 
114  //ESTIMATE PERFORMANCE
115  vector<pair<ParallelSystem*, PRInvokeHistory*>> systemPairArray;
116  size_t i;
117 
118  //FIND SYSTEMS PARTICIPATED IN PARALLEL PROCESS
119  for (i = 0; i < size(); i++)
120  {
121  if (at(i)->historyArray->has(to_string(uid)) == false)
122  continue;
123 
124  systemPairArray.push_back({at(i).get(), at(i)->historyArray->get(to_string(uid)).get()});
125  }
126 
127  //CALCULATE PERFORMANCE INDEX
128  double avgElapsedTime = 0.0; //AVREAGE ELAPSED TIME
129 
130  for (i = 0; i < systemPairArray.size(); i++)
131  {
132  PRInvokeHistory *history = systemPairArray[i].second;
133 
134  avgElapsedTime += history->calcAverageElapsedTime() / (double)systemPairArray.size();
135  }
136  for (i = 0; i < systemPairArray.size(); i++)
137  {
138  ParallelSystem *system = systemPairArray[i].first;
139  PRInvokeHistory *history = systemPairArray[i].second;
140 
141  double historyPerformance = avgElapsedTime / history->calcAverageElapsedTime();
142  system->performance = system->performance * .7 + historyPerformance * .3;
143  }
144 
145  //NORMALIZE ALL PERFORMANCE INDEX
146  double avgPerformance = 0.0;
147 
148  for (i = 0; i < size(); i++)
149  avgPerformance += at(i)->performance / (double)size();
150 
151  for (i = 0; i < size(); i++)
152  at(i)->performance /= avgPerformance;
153 }*/
A history log of an Invoke message on a master.
Package for external system, within the framework of master.
Definition: RWMutex.hpp:4
Package of libraries.
Definition: library.hpp:84
An array of parallel system drivers.
A network driver for a parallel system.
auto calcAverageElapsedTime() const -> double
Calculate average of elapsed time for each segmentation.
An array of invoke histories of master.
ParallelSystem * system
A system the history is belonged to.
Package of network protocol and libraries.
Definition: protocol.hpp:185
A reported history of an Invoke message.
Standard message of network I/O.
Definition: Invoke.hpp:47
library::CriticalVector< PRInvokeHistory * > historyArray
An array of histories which are generated in each system.
A parameter of an Invoke.
double performance
A performance index.