llfix
Low-latency FIX engine
management_server.h
1 /*
2 MIT License
3 
4 Copyright (c) 2026 Coreware Limited
5 
6 Permission is hereby granted, free of charge, to any person obtaining a copy
7 of this software and associated documentation files (the "Software"), to deal
8 in the Software without restriction, including without limitation the rights
9 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 copies of the Software, and to permit persons to whom the Software is
11 furnished to do so, subject to the following conditions:
12 
13 The above copyright notice and this permission notice shall be included in all
14 copies or substantial portions of the Software.
15 
16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 SOFTWARE.
23 */
24 #pragma once
25 
26 #include <cassert>
27 #include <cstdint>
28 #include <cstddef>
29 #include <string>
30 
31 #include "../core/compiler/unused.h"
32 #include "../core/compiler/builtin_functions.h"
33 
34 #include "../core/os/epoll.h"
35 
36 #include "../electronic_trading/managed_instance/managed_instance.h"
37 
38 #include "../core/utilities/tcp_reactor.h"
39 #include "../core/utilities/logger.h"
40 
41 #include "management_context.h"
42 #include "management_server_settings.h"
43 #include "commands.h"
44 #include "command_factory.h"
45 
46 namespace llfix
47 {
48 
71 class ManagementServer : public TcpReactor<Epoll>
72 {
73  public:
74 
75  ManagementServer() = default;
76  virtual ~ManagementServer() = default;
77 
78  bool create(const ManagementServerSettings& settings, uint64_t application_start_time_timestamp, const std::string& engine_version, const std::string& log_file_path)
79  {
80  if(settings.validate() == false)
81  {
82  return false;
83  }
84 
85  m_options.m_nic_interface_ip = settings.management_server_nic_ip;
86  m_options.m_port = settings.management_server_port;
87  m_options.m_cpu_core_id = settings.management_server_cpu_core_id;
88  set_params(m_options);
89 
90  if (start() == false)
91  {
92  LLFIX_LOG_ERROR("Management server failed to start");
93  return false;
94  }
95 
96  m_management_context.application_start_timestamp = application_start_time_timestamp;
97  m_management_context.engine_version = engine_version;
98  m_management_context.log_file_path = log_file_path;
99 
100  LLFIX_LOG_INFO(std::string("Management server settings => ") + settings.to_string());
101 
102  m_successfully_created = true;
103  return true;
104  }
105 
123  bool register_client(ManagedInstance* instance)
124  {
125  assert(instance);
126 
127  if (!m_successfully_created)
128  {
129  LLFIX_LOG_ERROR("register_client called however management server did not initialise correctly. Please check the configs and logs");
130  return false;
131  }
132 
133  if(m_management_context.has_instance(instance->get_name()))
134  {
135  LLFIX_LOG_ERROR("You need to use unique instance names");
136  return false;
137  }
138 
139  m_management_context.client_instances.push_back(instance);
140  return true;
141  }
142 
160  bool register_server(ManagedInstance* instance)
161  {
162  assert(instance);
163 
164  if (!m_successfully_created)
165  {
166  LLFIX_LOG_ERROR("register_server called however management server did not initialise correctly. Please check the configs and logs");
167  return false;
168  }
169 
170  if(m_management_context.has_instance(instance->get_name()))
171  {
172  LLFIX_LOG_ERROR("You need to use unique instance names");
173  return false;
174  }
175 
176  m_management_context.server_instances.push_back(instance);
177  return true;
178  }
179 
180  void on_data_ready(std::size_t peer_index) override
181  {
182  auto read = receive(peer_index);
183 
184  if (read > 0 && read <= static_cast<int>(m_options.m_rx_buffer_capacity))
185  {
186  process_rx_buffer(peer_index, get_rx_buffer(peer_index), get_rx_buffer_size(peer_index));
187  }
188 
189  receive_done(peer_index);
190  }
191 
192  void on_client_connected(std::size_t peer_index) override
193  {
194  LLFIX_UNUSED(peer_index);
195  m_no_connected_clients++;
196  }
197 
198  void on_client_disconnected(std::size_t peer_index) override
199  {
200  if(m_no_connected_clients>0) m_no_connected_clients--;
201  TcpReactor<Epoll>::on_client_disconnected(peer_index);
202  }
203 
204  void on_async_io_error(int error_code, int event_result) override
205  {
206  LLFIX_UNUSED(error_code);
207  LLFIX_UNUSED(event_result);
208  }
209 
210  void on_socket_error(int error_code, int event_result) override
211  {
212  LLFIX_UNUSED(error_code);
213  LLFIX_UNUSED(event_result);
214  }
215 
216  int get_no_connected_clients() const { return m_no_connected_clients; }
217 
218  private:
219  int m_no_connected_clients = 0;
220  ManagementContext m_management_context;
221  bool m_successfully_created = false;
222 
223  void process_rx_buffer(std::size_t peer_index, char* buffer, std::size_t buffer_size)
224  {
225  std::size_t offset = 0;
226  std::size_t current_command_start = 0;
227  char* current_command_buffer = nullptr;
228  std::size_t buffer_read_index = 0;
229 
230  while (true)
231  {
232  if (buffer[offset] == Commands::COMMAND_DELIMITER)
233  {
234  current_command_buffer = buffer + current_command_start;
235  on_command(current_command_buffer, offset - current_command_start, peer_index);
236  current_command_start = offset + 1;
237 
238  buffer_read_index = offset + 1;
239  }
240 
241  offset++;
242 
243  if (offset > buffer_size - 1)
244  {
245  break;
246  }
247  }
248 
249  if (buffer_size - buffer_read_index > 0)
250  {
251  set_incomplete_buffer(peer_index, buffer+buffer_read_index, buffer_size - buffer_read_index);
252  }
253  else
254  {
255  reset_incomplete_buffer(peer_index);
256  }
257  }
258 
259  void on_command(const char* buffer, std::size_t buffer_len, std::size_t peer_index)
260  {
261  auto command = CommandFactory::create_command(buffer, buffer_len);
262 
263  if (command == nullptr)
264  {
265  on_invalid_command(peer_index, buffer, buffer_len);
266  return;
267  }
268 
269  auto response = command->process(m_management_context);
270 
271  if(response.length()>0)
272  {
273  response += Commands::COMMAND_DELIMITER;
274  send(peer_index, response.c_str(), response.length());
275  }
276 
277  delete command;
278  }
279 
280  void on_invalid_command(std::size_t peer_index, const char* buffer, std::size_t buffer_len)
281  {
282  std::string response = "Invalid command";
283  response += Commands::COMMAND_DELIMITER;
284 
285  send(peer_index, response.c_str(), response.length());
286 
287  static constexpr std::size_t TEMP_BUFFER_SIZE = 1024;
288 
289  char temp[TEMP_BUFFER_SIZE];
290 
291  if (TEMP_BUFFER_SIZE >= buffer_len + 1)
292  {
293  llfix_builtin_memcpy(temp, buffer, buffer_len);
294  temp[buffer_len] = '\0';
295  }
296  else
297  {
298  llfix_builtin_memcpy(temp, buffer, TEMP_BUFFER_SIZE - 1);
299  temp[TEMP_BUFFER_SIZE - 1] = '\0';
300  }
301 
302  std::string error_message = "Management server invalid command : ";
303  error_message += temp;
304  LLFIX_LOG_ERROR(error_message);
305  }
306 };
307 
308 } // namespace
llfix::ManagementServer::register_client
bool register_client(ManagedInstance *instance)
Registers a managed FIX client instance with the management server.
Definition: management_server.h:123
llfix::ManagementServer::register_server
bool register_server(ManagedInstance *instance)
Registers a managed FIX server instance with the management server.
Definition: management_server.h:160
llfix::ManagementServer
Management TCP server for FIX engine runtime control and inspection.
Definition: management_server.h:71