Samchon Framework for CPP  1.0.0
MediatorSystem.hpp
1 #pragma once
2 #include <samchon/API.hpp>
3 
4 #include <samchon/templates/slave/SlaveSystem.hpp>
5 #include <samchon/protocol/IListener.hpp>
6 
7 #include <samchon/templates/parallel/base/ParallelSystemArrayBase.hpp>
8 #include <samchon/templates/distributed/base/DistributedSystemArrayBase.hpp>
9 #include <samchon/templates/distributed/base/DistributedProcessBase.hpp>
10 
11 #include <mutex>
12 #include <samchon/HashMap.hpp>
13 
14 namespace samchon
15 {
16 namespace templates
17 {
18 namespace parallel
19 {
50  : public virtual slave::SlaveSystem,
51  public virtual protocol::IListener
52  {
53  private:
54  typedef slave::SlaveSystem super;
55 
56  external::base::ExternalSystemArrayBase *system_array_;
58 
59  std::mutex mtx_;
60 
61  public:
62  /* ---------------------------------------------------------
63  CONSTRUCTORS
64  --------------------------------------------------------- */
70  MediatorSystem(external::base::ExternalSystemArrayBase* systemArray)
71  : super()
72  {
73  this->system_array_ = systemArray;
74  };
75 
79  virtual ~MediatorSystem() = default;
80 
88  virtual void start() = 0;
89 
90  /* ---------------------------------------------------------
91  ACCESSORS
92  --------------------------------------------------------- */
96  template <class SystemArray>
97  auto getSystemArray() const -> SystemArray*
98  {
99  return (SystemArray*)system_array_;
100  };
101 
102  /* ---------------------------------------------------------
103  INVOKE MESSAGE CHAIN
104  --------------------------------------------------------- */
105  void _Complete_history(size_t uid)
106  {
107  std::unique_lock<std::mutex> uk(mtx_);
108 
109  //--------
110  // NEED TO REDEFINE START AND END TIME
111  //--------
112  // NO SUCH HISTORY; THE PROCESS HAD DONE ONLY IN THIS MEDIATOR LEVEL.
113  if (progress_list_.has(uid) == false)
114  return;
115 
116  // COMPLETE THE HISTORY
117  std::shared_ptr<slave::InvokeHistory> history = progress_list_.get(uid);
118  history->complete();
119 
120  // ERASE THE HISTORY ON PROGRESS LIST
121  progress_list_.erase(uid);
122 
123  // REPORT THE HISTORY TO MASTER
124  std::thread(&MediatorSystem::sendData, this, history->toInvoke()).detach();
125  };
126 
127  private:
128  virtual void _Reply_data(std::shared_ptr<protocol::Invoke> invoke) override final
129  {
130  if (invoke->has("_History_uid") == true)
131  {
132  // REGISTER THIS PROCESS ON HISTORY LIST
133  std::shared_ptr<slave::InvokeHistory> history(new slave::InvokeHistory(invoke));
134  progress_list_.insert({ history->getUID(), history });
135 
136  if (invoke->has("_Piece_first") == true)
137  {
138  // PARALLEL PROCESS
139  size_t first = invoke->get("_Piece_first")->getValue<size_t>();
140  size_t last = invoke->get("_Piece_last")->getValue<size_t>();
141 
142  invoke->erase(invoke->end() - 2, invoke->end());
143  ((base::ParallelSystemArrayBase*)system_array_)->sendPieceData(invoke, first, last);
144  }
145  else if (invoke->has("_Process_name") == true)
146  {
147  // DISTRIBUTED PROCESS
148  auto ds_system_array = (distributed::base::DistributedSystemArrayBase*)system_array_;
149 
150  // FIND THE MATCHED ROLE
151  const std::string &process_name = invoke->get("_Process_name")->getValue<std::string>();
152  if (ds_system_array->hasProcess(process_name) == false)
153  return;
154 
155  // SEND DATA VIA THE ROLE
156  auto process = ds_system_array->getProcess(process_name);
157  ((distributed::base::DistributedProcessBase*)(process.get()))->sendData(invoke, 1.0);
158  }
159  }
160  else
161  replyData(invoke);
162  };
163 
164  public:
165  virtual void replyData(std::shared_ptr<protocol::Invoke> invoke) override
166  {
167  ((protocol::IProtocol*)system_array_)->sendData(invoke);
168  };
169  };
170 };
171 };
172 };
173 
174 #include <samchon/templates/parallel/MediatorClient.hpp>
175 #include <samchon/templates/parallel/MediatorWebClient.hpp>
176 #include <samchon/templates/parallel/MediatorServer.hpp>
177 #include <samchon/templates/parallel/MediatorWebServer.hpp>
auto has(const Key &key) const -> bool
Whether have the item or not.
Definition: HashMap.hpp:125
MediatorSystem(external::base::ExternalSystemArrayBase *systemArray)
auto getSystemArray() const -> SystemArray *
auto get(const Key &key) -> T &
Get element.
Definition: HashMap.hpp:144
Customized std::unordered_map.
Definition: HashMap.hpp:103