llfix
Low-latency FIX engine
management_server.h
1 // DISCLAIMER_PLACEHOLDER
2 #pragma once
3 
4 #include <cassert>
5 #include <cstdint>
6 #include <cstddef>
7 #include <string>
8 
9 #include "../core/compiler/unused.h"
10 #include "../core/compiler/builtin_functions.h"
11 
12 #include "../core/os/epoll.h"
13 
14 #include "../electronic_trading/managed_instance/managed_instance.h"
15 
16 #include "../core/utilities/tcp_reactor.h"
17 #include "../core/utilities/logger.h"
18 
19 #include "management_context.h"
20 #include "management_server_settings.h"
21 #include "commands.h"
22 #include "command_factory.h"
23 
24 namespace llfix
25 {
26 
49 class ManagementServer : public TcpReactor<Epoll>
50 {
51  public:
52 
53  ManagementServer() = default;
54  virtual ~ManagementServer() = default;
55 
56  bool create(const ManagementServerSettings& settings, uint64_t application_start_time_timestamp, const std::string& engine_version, const std::string& log_file_path)
57  {
58  if(settings.validate() == false)
59  {
60  return false;
61  }
62 
63  m_options.m_nic_interface_ip = settings.management_server_nic_ip;
64  m_options.m_port = settings.management_server_port;
65  m_options.m_cpu_core_id = settings.management_server_cpu_core_id;
66  set_params(m_options);
67 
68  if (start() == false)
69  {
70  LLFIX_LOG_ERROR("Management server failed to start");
71  return false;
72  }
73 
74  m_management_context.application_start_timestamp = application_start_time_timestamp;
75  m_management_context.engine_version = engine_version;
76  m_management_context.log_file_path = log_file_path;
77 
78  LLFIX_LOG_INFO(std::string("Management server settings => ") + settings.to_string());
79 
80  m_successfully_created = true;
81  return true;
82  }
83 
101  bool register_client(ManagedInstance* instance)
102  {
103  assert(instance);
104 
105  if (!m_successfully_created)
106  {
107  LLFIX_LOG_ERROR("register_client called however management server did not initialise correctly. Please check the configs and logs");
108  return false;
109  }
110 
111  if(m_management_context.has_instance(instance->get_name()))
112  {
113  LLFIX_LOG_ERROR("You need to use unique instance names");
114  return false;
115  }
116 
117  m_management_context.client_instances.push_back(instance);
118  return true;
119  }
120 
138  bool register_server(ManagedInstance* instance)
139  {
140  assert(instance);
141 
142  if (!m_successfully_created)
143  {
144  LLFIX_LOG_ERROR("register_server called however management server did not initialise correctly. Please check the configs and logs");
145  return false;
146  }
147 
148  if(m_management_context.has_instance(instance->get_name()))
149  {
150  LLFIX_LOG_ERROR("You need to use unique instance names");
151  return false;
152  }
153 
154  m_management_context.server_instances.push_back(instance);
155  return true;
156  }
157 
158  void on_data_ready(std::size_t peer_index) override
159  {
160  auto read = receive(peer_index);
161 
162  if (read > 0 && read <= static_cast<int>(m_options.m_rx_buffer_capacity))
163  {
164  process_rx_buffer(peer_index, get_rx_buffer(peer_index), get_rx_buffer_size(peer_index));
165  }
166 
167  receive_done(peer_index);
168  }
169 
170  void on_client_connected(std::size_t peer_index) override
171  {
172  LLFIX_UNUSED(peer_index);
173  m_no_connected_clients++;
174  }
175 
176  void on_client_disconnected(std::size_t peer_index) override
177  {
178  if(m_no_connected_clients>0) m_no_connected_clients--;
179  TcpReactor<Epoll>::on_client_disconnected(peer_index);
180  }
181 
182  void on_async_io_error(int error_code, int event_result) override
183  {
184  LLFIX_UNUSED(error_code);
185  LLFIX_UNUSED(event_result);
186  }
187 
188  void on_socket_error(int error_code, int event_result) override
189  {
190  LLFIX_UNUSED(error_code);
191  LLFIX_UNUSED(event_result);
192  }
193 
194  int get_no_connected_clients() const { return m_no_connected_clients; }
195 
196  private:
197  int m_no_connected_clients = 0;
198  ManagementContext m_management_context;
199  bool m_successfully_created = false;
200 
201  void process_rx_buffer(std::size_t peer_index, char* buffer, std::size_t buffer_size)
202  {
203  std::size_t offset = 0;
204  std::size_t current_command_start = 0;
205  char* current_command_buffer = nullptr;
206  std::size_t buffer_read_index = 0;
207 
208  while (true)
209  {
210  if (buffer[offset] == Commands::COMMAND_DELIMITER)
211  {
212  current_command_buffer = buffer + current_command_start;
213  on_command(current_command_buffer, offset - current_command_start, peer_index);
214  current_command_start = offset + 1;
215 
216  buffer_read_index = offset + 1;
217  }
218 
219  offset++;
220 
221  if (offset > buffer_size - 1)
222  {
223  break;
224  }
225  }
226 
227  if (buffer_size - buffer_read_index > 0)
228  {
229  set_incomplete_buffer(peer_index, buffer+buffer_read_index, buffer_size - buffer_read_index);
230  }
231  else
232  {
233  reset_incomplete_buffer(peer_index);
234  }
235  }
236 
237  void on_command(const char* buffer, std::size_t buffer_len, std::size_t peer_index)
238  {
239  auto command = CommandFactory::create_command(buffer, buffer_len);
240 
241  if (command == nullptr)
242  {
243  on_invalid_command(peer_index, buffer, buffer_len);
244  return;
245  }
246 
247  auto response = command->process(m_management_context);
248 
249  if(response.length()>0)
250  {
251  response += Commands::COMMAND_DELIMITER;
252  send(peer_index, response.c_str(), response.length());
253  }
254 
255  delete command;
256  }
257 
258  void on_invalid_command(std::size_t peer_index, const char* buffer, std::size_t buffer_len)
259  {
260  std::string response = "Invalid command";
261  response += Commands::COMMAND_DELIMITER;
262 
263  send(peer_index, response.c_str(), response.length());
264 
265  static constexpr std::size_t TEMP_BUFFER_SIZE = 1024;
266 
267  char temp[TEMP_BUFFER_SIZE];
268 
269  if (TEMP_BUFFER_SIZE >= buffer_len + 1)
270  {
271  llfix_builtin_memcpy(temp, buffer, buffer_len);
272  temp[buffer_len] = '\0';
273  }
274  else
275  {
276  llfix_builtin_memcpy(temp, buffer, TEMP_BUFFER_SIZE - 1);
277  temp[TEMP_BUFFER_SIZE - 1] = '\0';
278  }
279 
280  std::string error_message = "Management server invalid command : ";
281  error_message += temp;
282  LLFIX_LOG_ERROR(error_message);
283  }
284 };
285 
286 } // namespace
llfix::ManagementServer::register_client
bool register_client(ManagedInstance *instance)
Registers a managed FIX client instance with the management server.
Definition: management_server.h:101
llfix::ManagementServer::register_server
bool register_server(ManagedInstance *instance)
Registers a managed FIX server instance with the management server.
Definition: management_server.h:138
llfix::ManagementServer
Management TCP server for FIX engine runtime control and inspection.
Definition: management_server.h:49