2 #include <samchon/API.hpp> 4 #include <samchon/templates/distributed/base/DistributedProcessBase.hpp> 5 #include <samchon/protocol/Entity.hpp> 7 #include <samchon/protocol/Invoke.hpp> 8 #include <samchon/templates/distributed/DSInvokeHistory.hpp> 10 #include <samchon/templates/distributed/DistributedSystem.hpp> 11 #include <samchon/templates/distributed/base/DistributedSystemArrayBase.hpp> 12 #include <samchon/templates/parallel/base/ParallelSystemArrayBase.hpp> 47 public base::DistributedProcessBase
52 base::DistributedSystemArrayBase *system_array_;
76 this->system_array_ = systemArray;
83 virtual void construct(std::shared_ptr<library::XML> xml)
override 85 name = xml->getProperty<std::string>(
"name");
86 resource = xml->getProperty<
double>(
"resource");
95 virtual auto key() const ->
std::
string override 105 template <
class SystemArray>
108 return (SystemArray*)system_array_;
224 auto sendData(std::shared_ptr<protocol::Invoke> invoke) -> std::shared_ptr<DistributedSystem>
245 virtual auto sendData(std::shared_ptr<protocol::Invoke> invoke,
double weight) -> std::shared_ptr<DistributedSystem>
override 253 if (invoke->has(
"_History_uid") ==
false)
256 uid = ((parallel::base::ParallelSystemArrayBase*)system_array_)->_Fetch_history_sequence();
264 uid = invoke->get(
"_History_uid")->getValue<
size_t>();
267 if (uid > ((parallel::base::ParallelSystemArrayBase*)system_array_)->_Get_history_sequence())
268 ((parallel::base::ParallelSystemArrayBase*)system_array_)->_Set_history_sequence(uid);
271 progress_list_.erase(uid);
275 if (invoke->has(
"_Process_name") ==
false)
277 if (invoke->has(
"_Process_weight") ==
false)
280 weight = invoke->get(
"_Process_weight")->getValue<
double>();
283 std::vector<std::shared_ptr<external::ExternalSystem>> children;
284 std::shared_ptr<DistributedSystem> idle_system;
286 for (
size_t i = 0; i < children.size(); i++)
288 std::shared_ptr<DistributedSystem> system = std::dynamic_pointer_cast<
DistributedSystem>(children.at(i));
289 if (system->_Is_excluded() ==
true)
292 if (idle_system ==
nullptr 293 || system->_Get_progress_list().size() < idle_system->_Get_progress_list().size()
294 || system->getPerformance() < idle_system->getPerformance())
295 idle_system = system;
299 std::shared_ptr<DSInvokeHistory> history(
new DSInvokeHistory(idle_system.get(),
this, invoke, weight));
301 progress_list_.emplace(uid, history);
302 idle_system->_Get_progress_list().emplace(uid, make_pair(invoke, history));
307 idle_system->sendData(invoke);
316 virtual void replyData(std::shared_ptr<protocol::Invoke>) = 0;
322 virtual auto TAG()
const -> std::string
override 327 virtual auto toXML()
const -> std::shared_ptr<library::XML>
override 329 std::shared_ptr<library::XML> &xml = super::toXML();
330 xml->setProperty(
"name", name);
331 xml->setProperty(
"resource", resource);
339 auto _Compute_average_elapsed_time()
const ->
double 343 for (
auto it = history_list_.begin(); it != history_list_.end(); it++)
345 std::shared_ptr<DSInvokeHistory> history = it->second;
346 double elapsed_time = history->computeElapsedTime() / history->getWeight();
351 sum += elapsed_time * history->getSystem()->getPerformance();
353 return sum / history_list_.size();
362 auto _Is_enforced()
const ->
bool auto getResource() const -> double
An entity, a standard data class.
void setResource(double val)
void enforceResource(double val)
auto sendData(std::shared_ptr< protocol::Invoke > invoke) -> std::shared_ptr< DistributedSystem >
DistributedProcess(base::DistributedSystemArrayBase *systemArray)
virtual auto key() const -> std::string override
void unlock() const
Unlock of read.
An Entity and a container of children Entity objects.
virtual void replyData(std::shared_ptr< protocol::Invoke >)=0
auto getSystemArray() const -> SystemArray *
auto getName() const -> std::string
virtual auto sendData(std::shared_ptr< protocol::Invoke > invoke, double weight) -> std::shared_ptr< DistributedSystem > override
A parameter of an Invoke.
Customized std::unordered_map.