9 #include "../core/compiler/unused.h"
10 #include "../core/compiler/builtin_functions.h"
12 #include "../core/os/epoll.h"
14 #include "../electronic_trading/managed_instance/managed_instance.h"
16 #include "../core/utilities/tcp_reactor.h"
17 #include "../core/utilities/logger.h"
19 #include "management_context.h"
20 #include "management_server_settings.h"
22 #include "command_factory.h"
56 bool create(
const ManagementServerSettings& settings, uint64_t application_start_time_timestamp,
const std::string& engine_version,
const std::string& log_file_path)
58 if(settings.validate() ==
false)
63 m_options.m_nic_interface_ip = settings.management_server_nic_ip;
64 m_options.m_port = settings.management_server_port;
65 m_options.m_cpu_core_id = settings.management_server_cpu_core_id;
66 set_params(m_options);
70 LLFIX_LOG_ERROR(
"Management server failed to start");
74 m_management_context.application_start_timestamp = application_start_time_timestamp;
75 m_management_context.engine_version = engine_version;
76 m_management_context.log_file_path = log_file_path;
78 LLFIX_LOG_INFO(std::string(
"Management server settings => ") + settings.to_string());
80 m_successfully_created =
true;
105 if (!m_successfully_created)
107 LLFIX_LOG_ERROR(
"register_client called however management server did not initialise correctly. Please check the configs and logs");
111 if(m_management_context.has_instance(instance->get_name()))
113 LLFIX_LOG_ERROR(
"You need to use unique instance names");
117 m_management_context.client_instances.push_back(instance);
142 if (!m_successfully_created)
144 LLFIX_LOG_ERROR(
"register_server called however management server did not initialise correctly. Please check the configs and logs");
148 if(m_management_context.has_instance(instance->get_name()))
150 LLFIX_LOG_ERROR(
"You need to use unique instance names");
154 m_management_context.server_instances.push_back(instance);
158 void on_data_ready(std::size_t peer_index)
override
160 auto read = receive(peer_index);
162 if (read > 0 && read <=
static_cast<int>(m_options.m_rx_buffer_capacity))
164 process_rx_buffer(peer_index, get_rx_buffer(peer_index), get_rx_buffer_size(peer_index));
167 receive_done(peer_index);
170 void on_client_connected(std::size_t peer_index)
override
172 LLFIX_UNUSED(peer_index);
173 m_no_connected_clients++;
176 void on_client_disconnected(std::size_t peer_index)
override
178 if(m_no_connected_clients>0) m_no_connected_clients--;
179 TcpReactor<Epoll>::on_client_disconnected(peer_index);
182 void on_async_io_error(
int error_code,
int event_result)
override
184 LLFIX_UNUSED(error_code);
185 LLFIX_UNUSED(event_result);
188 void on_socket_error(
int error_code,
int event_result)
override
190 LLFIX_UNUSED(error_code);
191 LLFIX_UNUSED(event_result);
194 int get_no_connected_clients()
const {
return m_no_connected_clients; }
197 int m_no_connected_clients = 0;
198 ManagementContext m_management_context;
199 bool m_successfully_created =
false;
201 void process_rx_buffer(std::size_t peer_index,
char* buffer, std::size_t buffer_size)
203 std::size_t offset = 0;
204 std::size_t current_command_start = 0;
205 char* current_command_buffer =
nullptr;
206 std::size_t buffer_read_index = 0;
210 if (buffer[offset] == Commands::COMMAND_DELIMITER)
212 current_command_buffer = buffer + current_command_start;
213 on_command(current_command_buffer, offset - current_command_start, peer_index);
214 current_command_start = offset + 1;
216 buffer_read_index = offset + 1;
221 if (offset > buffer_size - 1)
227 if (buffer_size - buffer_read_index > 0)
229 set_incomplete_buffer(peer_index, buffer+buffer_read_index, buffer_size - buffer_read_index);
233 reset_incomplete_buffer(peer_index);
237 void on_command(
const char* buffer, std::size_t buffer_len, std::size_t peer_index)
239 auto command = CommandFactory::create_command(buffer, buffer_len);
241 if (command ==
nullptr)
243 on_invalid_command(peer_index, buffer, buffer_len);
247 auto response = command->process(m_management_context);
249 if(response.length()>0)
251 response += Commands::COMMAND_DELIMITER;
252 send(peer_index, response.c_str(), response.length());
258 void on_invalid_command(std::size_t peer_index,
const char* buffer, std::size_t buffer_len)
260 std::string response =
"Invalid command";
261 response += Commands::COMMAND_DELIMITER;
263 send(peer_index, response.c_str(), response.length());
265 static constexpr std::size_t TEMP_BUFFER_SIZE = 1024;
267 char temp[TEMP_BUFFER_SIZE];
269 if (TEMP_BUFFER_SIZE >= buffer_len + 1)
271 llfix_builtin_memcpy(temp, buffer, buffer_len);
272 temp[buffer_len] =
'\0';
276 llfix_builtin_memcpy(temp, buffer, TEMP_BUFFER_SIZE - 1);
277 temp[TEMP_BUFFER_SIZE - 1] =
'\0';
280 std::string error_message =
"Management server invalid command : ";
281 error_message += temp;
282 LLFIX_LOG_ERROR(error_message);