Samchon Framework for CPP  1.0.0
Communicator.hpp
1 #pragma once
2 #include <samchon/API.hpp>
3 
4 #include <samchon/protocol/IProtocol.hpp>
5 #include <samchon/protocol/IListener.hpp>
6 
7 #include <iostream>
8 #include <array>
9 #include <exception>
10 #include <mutex>
11 #include <boost/asio.hpp>
12 #include <samchon/ByteArray.hpp>
13 
14 namespace samchon
15 {
16 namespace protocol
17 {
40  class Communicator
41  : public virtual IProtocol
42  {
43  protected:
44  std::shared_ptr<boost::asio::ip::tcp::socket> socket;
45  IProtocol *listener;
46 
47  std::mutex send_mtx;
48 
49  public:
50  Communicator()
51  {
52  listener = nullptr;
53  };
54  virtual ~Communicator()
55  {
56  close();
57  };
58 
62  virtual void close()
63  {
64  if (socket != nullptr && socket->is_open())
65  socket->close();
66  };
67 
76  virtual void replyData(std::shared_ptr<Invoke> invoke)
77  {
78  IListener *i_listener = dynamic_cast<IListener*>(listener);
79  if (i_listener != nullptr)
80  i_listener->_Reply_data(invoke);
81  else
82  listener->replyData(invoke);
83  };
84 
92  virtual void sendData(std::shared_ptr<Invoke> invoke)
93  {
94  std::unique_lock<std::mutex> uk(send_mtx);
95 
96  // SEND INVOKE
97  send_data(invoke->toXML()->toString());
98 
99  // SEND BINARY
100  for (size_t i = 0; i < invoke->size(); i++)
101  if (invoke->at(i)->getType() == "ByteArray")
102  send_data(invoke->at(i)->referValue<ByteArray>());
103  };
104 
105  protected:
106  /* =========================================================
107  SOCKET I/O
108  - READ
109  - WRITE
110  ============================================================
111  READ
112  --------------------------------------------------------- */
113  virtual void listen_message()
114  {
115  std::shared_ptr<Invoke> binary_invoke = nullptr;
116  std::queue<std::shared_ptr<InvokeParameter>> binary_parameters;
117 
118  while (true)
119  {
120  try
121  {
122  // READ CONTENT SIZE
123  size_t content_size = listen_size();
124 
125  // READ CONTENT
126  if (binary_invoke == nullptr)
127  {
128  std::shared_ptr<Invoke> invoke = listen_string(content_size);
129 
130  for (size_t i = 0; i < invoke->size(); i++)
131  {
132  std::shared_ptr<InvokeParameter> &parameter = invoke->at(i);
133  if (parameter->getType() != "ByteArray")
134  continue;
135 
136  if (binary_invoke == nullptr)
137  binary_invoke = invoke;
138  binary_parameters.push(parameter);
139  }
140 
141  // NO BINARY, THEN REPLY DIRECTLY
142  if (binary_invoke == nullptr)
143  this->replyData(invoke);
144  }
145  else
146  {
147  std::shared_ptr<InvokeParameter> parameter = binary_parameters.front();
148  listen_binary(content_size, parameter);
149  binary_parameters.pop();
150 
151  if (binary_parameters.empty() == true)
152  {
153  // NO BINARY PARAMETER LEFT,
154  std::shared_ptr<Invoke> invoke = binary_invoke;
155  binary_invoke = nullptr;
156 
157  // THEN REPLY
158  this->replyData(invoke);
159  }
160  }
161  }
162  catch (std::exception &e)
163  {
164  std::cout << "Reason of disconnection: " << e.what() << std::endl;
165  break;
166  }
167  catch (...)
168  {
169  break;
170  }
171  }
172  };
173 
174  private:
175  auto listen_size() -> size_t
176  {
177  std::array<unsigned char, 8> size_header;
178  listen_data(size_header);
179 
180  size_t size = 0;
181  for (size_t c = 0; c < size_header.size(); c++)
182  size += size_header[c] << (8 * (size_header.size() - 1 - c));
183 
184  return size;
185  };
186 
187  auto listen_string(size_t size) -> std::shared_ptr<Invoke>
188  {
189  // READ CONTENT
190  std::string data(size, (char)NULL);
191  listen_data(data);
192 
193  // CONSTRUCT INVOKE OBJECT
194  std::shared_ptr<Invoke> invoke(new Invoke());
195  invoke->construct(std::make_shared<library::XML>(data));
196 
197  return invoke;
198  };
199 
200  void listen_binary(size_t size, std::shared_ptr<InvokeParameter> parameter)
201  {
202  // FETCH BYTE_ARRAY
203  ByteArray &data = (ByteArray&)parameter->referValue<ByteArray>();
204  data.assign(size, NULL);
205 
206  // READ CONTENT
207  listen_data(data);
208  };
209 
210  template <class Container>
211  void listen_data(Container &data)
212  {
213  size_t completed = 0;
214 
215  while (completed < data.size())
216  {
217  size_t piece_size = socket->read_some(boost::asio::buffer((unsigned char*)data.data() + completed, data.size() - completed));
218  completed += piece_size;
219  }
220  };
221 
222  /* ---------------------------------------------------------
223  SEND
224  --------------------------------------------------------- */
225  template <class Container>
226  void send_data(const Container &data)
227  {
228  ByteArray header;
229  header.writeReversely((unsigned long long)data.size());
230 
231  socket->write_some(boost::asio::buffer(header));
232  socket->write_some(boost::asio::buffer(data));
233  };
234  };
235 };
236 };
virtual void replyData(std::shared_ptr< Invoke > invoke)
virtual void replyData(std::shared_ptr< Invoke >)=0
void writeReversely(const T &val)
Write a data.
Definition: ByteArray.hpp:222
Standard message of network I/O.
Definition: Invoke.hpp:35
Binary data class.
Definition: ByteArray.hpp:29
virtual void sendData(std::shared_ptr< Invoke > invoke)