31 #include "../core/compiler/unused.h"
32 #include "../core/compiler/builtin_functions.h"
34 #include "../core/os/epoll.h"
36 #include "../electronic_trading/managed_instance/managed_instance.h"
38 #include "../core/utilities/tcp_reactor.h"
39 #include "../core/utilities/logger.h"
41 #include "management_context.h"
42 #include "management_server_settings.h"
44 #include "command_factory.h"
78 bool create(
const ManagementServerSettings& settings, uint64_t application_start_time_timestamp,
const std::string& engine_version,
const std::string& log_file_path)
80 if(settings.validate() ==
false)
85 m_options.m_nic_interface_ip = settings.management_server_nic_ip;
86 m_options.m_port = settings.management_server_port;
87 m_options.m_cpu_core_id = settings.management_server_cpu_core_id;
88 set_params(m_options);
92 LLFIX_LOG_ERROR(
"Management server failed to start");
96 m_management_context.application_start_timestamp = application_start_time_timestamp;
97 m_management_context.engine_version = engine_version;
98 m_management_context.log_file_path = log_file_path;
100 LLFIX_LOG_INFO(std::string(
"Management server settings => ") + settings.to_string());
102 m_successfully_created =
true;
127 if (!m_successfully_created)
129 LLFIX_LOG_ERROR(
"register_client called however management server did not initialise correctly. Please check the configs and logs");
133 if(m_management_context.has_instance(instance->get_name()))
135 LLFIX_LOG_ERROR(
"You need to use unique instance names");
139 m_management_context.client_instances.push_back(instance);
164 if (!m_successfully_created)
166 LLFIX_LOG_ERROR(
"register_server called however management server did not initialise correctly. Please check the configs and logs");
170 if(m_management_context.has_instance(instance->get_name()))
172 LLFIX_LOG_ERROR(
"You need to use unique instance names");
176 m_management_context.server_instances.push_back(instance);
180 void on_data_ready(std::size_t peer_index)
override
182 auto read = receive(peer_index);
184 if (read > 0 && read <=
static_cast<int>(m_options.m_rx_buffer_capacity))
186 process_rx_buffer(peer_index, get_rx_buffer(peer_index), get_rx_buffer_size(peer_index));
189 receive_done(peer_index);
192 void on_client_connected(std::size_t peer_index)
override
194 LLFIX_UNUSED(peer_index);
195 m_no_connected_clients++;
198 void on_client_disconnected(std::size_t peer_index)
override
200 if(m_no_connected_clients>0) m_no_connected_clients--;
201 TcpReactor<Epoll>::on_client_disconnected(peer_index);
204 void on_async_io_error(
int error_code,
int event_result)
override
206 LLFIX_UNUSED(error_code);
207 LLFIX_UNUSED(event_result);
210 void on_socket_error(
int error_code,
int event_result)
override
212 LLFIX_UNUSED(error_code);
213 LLFIX_UNUSED(event_result);
216 int get_no_connected_clients()
const {
return m_no_connected_clients; }
219 int m_no_connected_clients = 0;
220 ManagementContext m_management_context;
221 bool m_successfully_created =
false;
223 void process_rx_buffer(std::size_t peer_index,
char* buffer, std::size_t buffer_size)
225 std::size_t offset = 0;
226 std::size_t current_command_start = 0;
227 char* current_command_buffer =
nullptr;
228 std::size_t buffer_read_index = 0;
232 if (buffer[offset] == Commands::COMMAND_DELIMITER)
234 current_command_buffer = buffer + current_command_start;
235 on_command(current_command_buffer, offset - current_command_start, peer_index);
236 current_command_start = offset + 1;
238 buffer_read_index = offset + 1;
243 if (offset > buffer_size - 1)
249 if (buffer_size - buffer_read_index > 0)
251 set_incomplete_buffer(peer_index, buffer+buffer_read_index, buffer_size - buffer_read_index);
255 reset_incomplete_buffer(peer_index);
259 void on_command(
const char* buffer, std::size_t buffer_len, std::size_t peer_index)
261 auto command = CommandFactory::create_command(buffer, buffer_len);
263 if (command ==
nullptr)
265 on_invalid_command(peer_index, buffer, buffer_len);
269 auto response = command->process(m_management_context);
271 if(response.length()>0)
273 response += Commands::COMMAND_DELIMITER;
274 send(peer_index, response.c_str(), response.length());
280 void on_invalid_command(std::size_t peer_index,
const char* buffer, std::size_t buffer_len)
282 std::string response =
"Invalid command";
283 response += Commands::COMMAND_DELIMITER;
285 send(peer_index, response.c_str(), response.length());
287 static constexpr std::size_t TEMP_BUFFER_SIZE = 1024;
289 char temp[TEMP_BUFFER_SIZE];
291 if (TEMP_BUFFER_SIZE >= buffer_len + 1)
293 llfix_builtin_memcpy(temp, buffer, buffer_len);
294 temp[buffer_len] =
'\0';
298 llfix_builtin_memcpy(temp, buffer, TEMP_BUFFER_SIZE - 1);
299 temp[TEMP_BUFFER_SIZE - 1] =
'\0';
302 std::string error_message =
"Management server invalid command : ";
303 error_message += temp;
304 LLFIX_LOG_ERROR(error_message);