2 #include <samchon/API.hpp> 4 #include <samchon/templates/external/ExternalSystemArray.hpp> 5 # include <samchon/templates/parallel/ParallelSystem.hpp> 6 #include <samchon/templates/parallel/base/ParallelSystemArrayBase.hpp> 76 template <
class System = ParallelSystem>
79 public base::ParallelSystemArrayBase
93 base::ParallelSystemArrayBase()
166 virtual auto sendPieceData(std::shared_ptr<protocol::Invoke> invoke,
size_t first,
size_t last) ->
size_t 170 if (invoke->has(
"_History_uid") ==
false)
177 size_t uid = invoke->get(
"_History_uid")->getValue<
size_t>();
180 if (uid > _Get_history_sequence())
181 _Set_history_sequence(uid);
185 size_t segment_size = last - first;
188 std::vector<std::shared_ptr<ParallelSystem>> system_array;
189 std::vector<std::thread> threads;
191 system_array.reserve(size());
192 threads.reserve(size());
195 for (
size_t i = 0; i < size(); i++)
197 std::shared_ptr<ParallelSystem> system = at(i);
199 if (system->_Is_excluded() ==
false)
200 system_array.push_back(system);
204 for (
size_t i = 0; i < system_array.size(); i++)
206 std::shared_ptr<ParallelSystem> system = system_array[i];
209 size_t piece_size = (i == system_array.size() - 1)
210 ? segment_size - first
211 : (
size_t)(segment_size / system_array.size() * system->getPerformance());
215 std::shared_ptr<protocol::Invoke> my_invoke(
new protocol::Invoke(invoke->getListener()));
218 my_invoke->assign(invoke->begin(), invoke->end());
224 std::shared_ptr<slave::InvokeHistory> history(
new PRInvokeHistory(my_invoke));
225 system->_Get_progress_list().emplace(history->getUID(), std::make_pair(invoke, history));
234 for (
auto it = threads.begin(); it != threads.end(); it++)
237 return threads.size();
243 virtual auto _Complete_history(std::shared_ptr<slave::InvokeHistory> history) ->
bool 246 if (std::dynamic_pointer_cast<PRInvokeHistory>(history) ==
nullptr)
253 size_t uid = history->getUID();
256 for (
size_t i = 0; i < size(); i++)
257 if (at(i)->_Get_progress_list().has(uid) ==
true)
264 std::vector<std::pair<std::shared_ptr<ParallelSystem>,
double>> system_pairs;
265 double performance_index_average = 0.0;
267 system_pairs.reserve(size());
269 for (
size_t i = 0; i < size(); i++)
271 std::shared_ptr<ParallelSystem> system = at(i);
272 if (system->_Get_history_list().has(uid) ==
false)
276 std::shared_ptr<PRInvokeHistory> my_history = std::dynamic_pointer_cast<
PRInvokeHistory>(system->_Get_history_list().get(uid));
277 double performance_index = my_history->
computeSize() / (double)my_history->computeElapsedTime();
280 system_pairs.emplace_back(system, performance_index);
281 performance_index_average += performance_index;
283 performance_index_average /= system_pairs.size();
286 for (
size_t i = 0; i < system_pairs.size(); i++)
289 auto system = system_pairs[i].first;
290 if (system->_Is_enforced() ==
true)
293 double new_performance = system_pairs[i].second / performance_index_average;
296 double ordinary_ratio;
297 if (system->_Get_history_list().size() < 2)
300 ordinary_ratio = std::min(0.7, 1.0 / (system->_Get_history_list().size() - 1.0));
302 system->setPerformance((system->getPerformance() * ordinary_ratio) + (new_performance * (1 - ordinary_ratio)));
306 _Normalize_performance();
311 virtual void _Normalize_performance()
314 double average = 0.0;
315 size_t denominator = 0;
317 for (
size_t i = 0; i < size(); i++)
319 std::shared_ptr<ParallelSystem> system = at(i);
320 if (system->_Is_enforced() ==
true)
323 average += system->getPerformance();
326 average /= (double)denominator;
329 for (
size_t i = 0; i < size(); i++)
331 std::shared_ptr<ParallelSystem> system = at(i);
332 if (system->_Is_enforced() ==
true)
335 system->setPerformance(system->getPerformance() / average);
auto computeSize() const -> size_t
An Entity and a container of children Entity objects.
auto sendSegmentData(std::shared_ptr< protocol::Invoke > invoke, size_t size) -> size_t
Standard message of network I/O.
void unlock()
Unlock on writing.
A parameter of an Invoke.
virtual auto sendPieceData(std::shared_ptr< protocol::Invoke > invoke, size_t first, size_t last) -> size_t
virtual void sendData(std::shared_ptr< protocol::Invoke > invoke) override
ParallelSystemArray()
Default Constructor.