32 #include <string_view>
35 #include <unordered_map>
40 #include "core/compiler/hints_branch_predictor.h"
41 #include "core/compiler/hints_hot_code.h"
42 #include "core/compiler/unused.h"
43 #include "core/os/vdso.h"
44 #include "core/os/thread_local_storage.h"
46 #include "core/utilities/configuration.h"
47 #include "core/utilities/logger.h"
48 #include "core/utilities/object_cache.h"
49 #include "core/utilities/std_string_utilities.h"
50 #include "core/utilities/userspace_spinlock.h"
52 #include "electronic_trading/common/message_persist_plugin.h"
53 #include "electronic_trading/managed_instance/modifying_admin_command.h"
54 #include "electronic_trading/managed_instance/managed_instance.h"
56 #include "fix_constants.h"
57 #include "fix_string_view.h"
58 #include "fix_string.h"
59 #include "fix_session_settings.h"
60 #include "fix_utilities.h"
61 #include "incoming_fix_message.h"
62 #include "outgoing_fix_message.h"
63 #include "fix_parser_error_codes.h"
64 #include "fix_session.h"
65 #include "fix_server_settings.h"
67 #include "core/utilities/tcp_reactor_options.h"
73 struct UnknownSessionContext
75 IncomingFixMessage m_incoming_fix_message;
76 ObjectCache<FixStringView> m_fix_string_view_cache;
79 class FixServerConnectors
84 m_tables_lock.initialise();
85 m_peer_index_session_table.reserve(1024);
86 m_session_peer_index_table.reserve(1024);
89 bool has_peer(std::size_t peer_index)
const
91 const std::lock_guard<UserspaceSpinlock<>> lock(m_tables_lock);
93 if (m_peer_index_session_table.find(peer_index) == m_peer_index_session_table.end())
101 void update(std::size_t peer_index, FixSession* session)
103 const std::lock_guard<UserspaceSpinlock<>> lock(m_tables_lock);
106 if (m_session_peer_index_table.find(session) != m_session_peer_index_table.end())
108 auto existing_peer_index = m_session_peer_index_table[session];
110 if (existing_peer_index != peer_index)
112 m_peer_index_session_table.erase(existing_peer_index);
116 m_peer_index_session_table[peer_index] = session;
117 m_session_peer_index_table[session] = peer_index;
120 FixSession* get_session(std::size_t peer_index)
122 const std::lock_guard<UserspaceSpinlock<>> lock(m_tables_lock);
124 if (m_peer_index_session_table.find(peer_index) == m_peer_index_session_table.end())
129 return m_peer_index_session_table[peer_index];
132 std::size_t get_peer_index(FixSession* session)
134 const std::lock_guard<UserspaceSpinlock<>> lock(m_tables_lock);
136 if (m_session_peer_index_table.find(session) == m_session_peer_index_table.end())
138 return static_cast<std::size_t
>(-1);
141 return m_session_peer_index_table[session];
144 void remove(std::size_t peer_index, FixSession* session)
146 const std::lock_guard<UserspaceSpinlock<>> lock(m_tables_lock);
148 m_peer_index_session_table.erase(peer_index);
149 m_session_peer_index_table.erase(session);
153 std::unordered_map<std::size_t, FixSession*> m_peer_index_session_table;
154 std::unordered_map<FixSession*, std::size_t> m_session_peer_index_table;
155 mutable UserspaceSpinlock<> m_tables_lock;
167 template<
typename Transport>
168 class FixServer :
public Transport,
public ManagedInstance
175 for (
auto& entry : m_sessions)
189 [[nodiscard]]
bool create(
const std::string& server_name,
const std::string server_config_file_path)
191 m_name = server_name;
193 if (m_settings.load_from_config_file(server_config_file_path, server_name) ==
false)
195 LLFIX_LOG_ERROR(
"Loading settings for server " + server_name +
" failed : " + m_settings.config_load_error);
199 if (m_settings.validate() ==
false)
201 LLFIX_LOG_ERROR(
"FixServerSettings for " + m_name +
" validation failed : " + m_settings.validation_error);
205 m_is_ha_primary = m_settings.starts_as_primary_instance;
206 m_unknown_session_context_lock.initialise();
208 TCPReactorOptions reactor_options;
210 reactor_options.m_accept_timeout_seconds = m_settings.accept_timeout_seconds;
211 reactor_options.m_async_io_timeout_nanoseconds = m_settings.async_io_timeout_nanoseconds;
212 reactor_options.m_busy_poll_microseconds = m_settings.busy_poll_microseconds;
213 reactor_options.m_cpu_core_id = m_settings.cpu_core_id;
214 reactor_options.m_worker_thread_count = m_settings.worker_thread_count;
215 reactor_options.m_send_try_count = m_settings.send_try_count;
216 reactor_options.m_disable_nagle = m_settings.disable_nagle;
217 reactor_options.m_enable_quick_ack = m_settings.quick_ack;
218 reactor_options.m_max_poll_events = m_settings.max_poll_events;
219 reactor_options.m_nic_interface_ip = m_settings.nic_address;
220 reactor_options.m_nic_interface_name = m_settings.nic_name;
221 reactor_options.m_nic_ringbuffer_rx_size = m_settings.nic_ringbuffer_rx_size;
222 reactor_options.m_nic_ringbuffer_tx_size = m_settings.nic_ringbuffer_tx_size;
223 reactor_options.m_pending_connection_queue_size = m_settings.pending_connection_queue_size;
224 reactor_options.m_port = m_settings.accept_port;
225 reactor_options.m_rx_buffer_capacity = m_settings.rx_buffer_capacity;
226 reactor_options.m_receive_size = m_settings.receive_size;
227 reactor_options.m_socket_rx_size = m_settings.socket_rx_size;
228 reactor_options.m_socket_tx_size = m_settings.socket_tx_size;
229 reactor_options.m_spin_count = m_settings.spin_count;
230 #ifdef LLFIX_ENABLE_OPENSSL
231 reactor_options.m_use_ssl = m_settings.use_ssl;
232 reactor_options.m_ssl_verify_peer = m_settings.ssl_verify_peer;
233 reactor_options.m_ssl_ca_pem_file = m_settings.ssl_ca_pem_file;
234 reactor_options.m_ssl_cert_pem_file = m_settings.ssl_certificate_pem_file;
235 reactor_options.m_ssl_private_key_pem_file = m_settings.ssl_private_key_pem_file;
236 reactor_options.m_ssl_private_key_password = m_settings.ssl_private_key_password;
237 reactor_options.m_ssl_version = m_settings.ssl_version;
238 reactor_options.m_ssl_cipher_suite = m_settings.ssl_cipher_suite;
239 reactor_options.m_ssl_crl_path = m_settings.ssl_crl_path;
242 this->set_params(reactor_options);
244 if constexpr (Transport::is_multithreaded())
246 this->register_acceptor_callback(
247 [
this](std::size_t peer_index)
249 LLFIX_UNUSED(peer_index);
250 this->process_admin_commands_of_all_non_live_sessions();
253 this->register_acceptor_termination_callback(
254 [
this](std::size_t peer_index)
256 LLFIX_UNUSED(peer_index);
257 this->destroy_thread_local_unknown_session_context();
260 this->register_worker_callback(
261 [
this](std::size_t peer_index)
263 this->process_session(peer_index);
266 this->register_worker_termination_callback(
267 [
this](std::size_t peer_index)
269 LLFIX_UNUSED(peer_index);
270 this->destroy_thread_local_unknown_session_context();
275 this->register_callback(std::bind(&FixServer::process,
this));
276 this->register_termination_callback(std::bind(&FixServer::destroy_thread_local_unknown_session_context,
this));
279 LLFIX_LOG_INFO(m_name +
" : Loaded server config =>\n" + m_settings.to_string());
280 LLFIX_LOG_INFO(
"FixServer " + m_name +
" creation success");
293 [[nodiscard]]
bool add_session(
const std::string& session_name, FixSessionSettings& session_settings)
295 return internal_add_session(session_name, session_settings);
306 [[nodiscard]]
bool add_session(
const std::string& session_config_file_path,
const std::string& session_name)
308 FixSessionSettings session_settings;
310 if (session_settings.load_from_config_file(session_config_file_path, session_name) ==
false)
312 LLFIX_LOG_ERROR(
"Loading settings for session " + session_name +
" failed : " + session_settings.config_load_error);
316 return internal_add_session(session_name, session_settings);
331 Configuration config_file;
332 std::string config_load_error;
334 if (config_file.load_from_file(session_config_file_path, config_load_error) ==
false)
339 std::vector<std::string> session_names;
340 config_file.get_group_names(session_names);
342 int added_session_count{0};
344 for (
const auto& session_name : session_names)
346 auto lowered_session_name = StringUtilities::to_lower(session_name);
348 if (StringUtilities::contains(lowered_session_name,
"session"))
350 FixSessionSettings session_settings;
352 if (session_settings.load_from_config_file(session_config_file_path, session_name) ==
false)
354 LLFIX_LOG_ERROR(
"Loading settings for session " + session_name +
" failed : " + session_settings.config_load_error);
358 if (internal_add_session(session_name, session_settings) ==
false)
363 added_session_count++;
367 if(added_session_count == 0)
369 LLFIX_LOG_ERROR(m_name +
" : Could not find any session in " + session_config_file_path +
". Make sure they have 'SESSION' in their names.");
383 return m_sessions.size();
386 bool is_instance_ha_primary()
const override
388 return m_is_ha_primary;
400 if (has_session(session_name) ==
false)
405 return m_sessions[session_name];
419 for (
const auto& session_entry : m_sessions)
421 if(session_entry.second == session)
423 return session_entry.first;
437 for (
const auto& iter : m_sessions)
439 target.push_back(iter.first);
443 std::string get_name()
const override
456 template<
typename... Args>
459 FixSession::get_repeating_group_specs().specify_repeating_group(args...);
462 #ifdef LLFIX_ENABLE_BINARY_FIELDS
472 void specify_binary_field(
const std::string& message_type, uint32_t tag_length, uint32_t tag_data)
474 FixSession::get_binary_field_specs().specify_binary_field(message_type, tag_length, tag_data);
487 return session->get_outgoing_fix_message();
500 std::size_t encoded_length = 0;
503 message->encode(session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, int_sequence_no, encoded_length);
504 return send_bytes<true>(session, session->get_tx_encode_buffer(), encoded_length);
507 void push_admin_command(
const std::string& session_name, ModifyingAdminCommandType type, uint32_t arg = 0)
override
509 if(llfix_likely(session_name !=
"*"))
511 push_admin_command_internal(session_name, type, arg);
515 for (
auto& session : m_sessions)
517 push_admin_command_internal(session.first, type, arg);
522 #ifdef LLFIX_AUTOMATION
524 bool send_fake_outgoing_message(FixSession* session, OutgoingFixMessage* message)
526 std::size_t encoded_length = 0;
527 auto int_sequence_no = session->get_sequence_store()->get_outgoing_seq_no() + 1;
529 message->encode(session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, int_sequence_no, encoded_length);
531 auto sequence_store = session->get_sequence_store();
532 sequence_store->increment_outgoing_seq_no();
534 if(session->serialisation_enabled())
535 session->get_outgoing_message_serialiser()->write(
reinterpret_cast<const void*
>(session->get_tx_encode_buffer()), encoded_length,
true, sequence_store->get_outgoing_seq_no());
537 session->set_last_sent_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
542 void set_replay_messages_on_incoming_resend_request_for_all_sessions(
bool b)
544 for (
auto& entry : m_sessions)
546 entry.second->set_replay_messages_on_incoming_resend_request(b);
559 virtual void on_async_io_error(
int error_code,
int event_result)
override
561 LLFIX_UNUSED(error_code);
562 LLFIX_UNUSED(event_result);
565 virtual void on_socket_error(
int error_code,
int event_result)
override
567 LLFIX_UNUSED(error_code);
568 LLFIX_UNUSED(event_result);
663 if(session->settings()->throttle_limit == 0)
668 session->throttler()->update();
670 if(session->throttler()->reached_limit() )
672 session->increment_incoming_throttler_exceed_count();
674 if(session->settings()->throttle_action == IncomingThrottlerAction::WAIT)
676 session->throttler()->wait();
678 else if(session->settings()->throttle_action == IncomingThrottlerAction::DISCONNECT)
680 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" : terminating session " + session->
get_name() +
" as the other end exceeded the throttle rate : " + std::to_string(session->settings()->throttle_limit));
681 this->process_connection_closure(m_fix_connectors.get_peer_index(session));
684 else if(session->settings()->throttle_action == IncomingThrottlerAction::REJECT)
686 send_throttle_reject_message(session, incoming_fix_message);
703 if (session->is_now_valid_session_datetime() ==
false)
705 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" : terminating session " + session->
get_name() +
" due to schedule settings");
706 this->process_connection_closure(m_fix_connectors.get_peer_index(session),
true);
727 if (parser_reject_code !=
static_cast<uint32_t
>(-1))
730 if (parser_reject_code > FixConstants::FIX_MAX_ERROR_CODE)
737 std::size_t length{ 0 };
738 FixUtilities::get_reject_reason_text(temp_buf, length, parser_reject_code);
742 process_invalid_logon(peer_index, message);
748 if (!(incoming_fix_message->
has_tag(FixConstants::TAG_BEGIN_STRING) && incoming_fix_message->
has_tag(FixConstants::TAG_MSG_SEQ_NUM) && incoming_fix_message->
has_tag(FixConstants::TAG_SENDER_COMP_ID) && incoming_fix_message->
has_tag(FixConstants::TAG_TARGET_COMP_ID) && incoming_fix_message->
has_tag(FixConstants::TAG_MSG_TYPE)))
755 auto message_type = incoming_fix_message->
get_tag_value_as<
char>(FixConstants::TAG_MSG_TYPE);
757 if (message_type == FixConstants::MSG_TYPE_LOGON)
759 auto incoming_begin_string = incoming_fix_message->
get_tag_value_as<std::string>(FixConstants::TAG_BEGIN_STRING);
760 auto incoming_comp_id = incoming_fix_message->
get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID);
761 auto incoming_target_comp_id = incoming_fix_message->
get_tag_value_as<std::string>(FixConstants::TAG_TARGET_COMP_ID);
763 session = find_session(incoming_begin_string, incoming_target_comp_id, incoming_comp_id);
765 if (session !=
nullptr)
767 auto session_state = session->get_state();
769 if (is_state_live(session_state))
771 std::string message =
"Incoming logon message for server " + m_name +
" is invalid as the client is already logged on, session : " + session->get_name();
772 process_invalid_logon(peer_index, message);
776 if(session_state == SessionState::DISABLED)
778 std::string message =
"Cannot accept connection attempt as session " + session->get_name() +
" is disabled. Closing connection.";
779 process_invalid_logon(peer_index, message);
783 if (session->is_now_valid_session_datetime() ==
false)
785 std::string message =
"Cannot accept connection attempt for session " + session->get_name() +
" due to schedule settings. Closing connection.";
786 process_invalid_logon(peer_index, message);
791 m_fix_connectors.update(peer_index, session);
793 if constexpr(Transport::is_multithreaded())
795 this->mark_connector_as_ready_for_worker_thread_dispatch(peer_index);
798 session->reset_flags();
799 session->get_incoming_fix_message()->reset();
800 session->get_incoming_fix_message()->copy_non_dirty_tag_values_from(*incoming_fix_message);
802 if(incoming_fix_message->
has_tag(FixConstants::TAG_RESET_SEQ_NUM_FLAG))
804 if(incoming_fix_message->
get_tag_value_as<
char>(FixConstants::TAG_RESET_SEQ_NUM_FLAG) == FixConstants::FIX_BOOLEAN_TRUE)
806 session->get_sequence_store()->reset_numbers();
810 session->set_state(SessionState::PENDING_LOGON);
815 std::string message =
"Could not find a session for incoming message : 8=" + incoming_begin_string +
" 49=" + incoming_comp_id +
" 56=" + incoming_target_comp_id +
" , message : " +
FixUtilities::fix_to_human_readible(buffer, buffer_length);
816 process_invalid_logon(peer_index, message);
822 std::string message = std::string(
"Received a message with a msgtype different than A (") + message_type +
") while expecting a logon message : " +
FixUtilities::fix_to_human_readible(buffer, buffer_length);
823 process_invalid_logon(peer_index, message);
842 if (validate_logon_request(session, peer_index, message) ==
false)
844 process_invalid_logon(peer_index,
"");
850 std::string message =
"Authentication failed for incoming logon message for server " + m_name +
" for session " + session->
get_name();
851 process_invalid_logon(peer_index, message);
855 auto requested_heartbeat_interval = message->
get_tag_value_as<uint32_t>(FixConstants::TAG_HEART_BT_INT);
856 auto requested_heartbeat_interval_nanoseconds =
static_cast<uint64_t
>(requested_heartbeat_interval) *
static_cast<uint64_t
>(1
'000'000
'000);
858 session->set_heartbeart_interval_in_nanoseconds(requested_heartbeat_interval_nanoseconds);
859 session->set_outgoing_test_request_interval_in_nanoseconds(static_cast<uint64_t>(requested_heartbeat_interval_nanoseconds * session->settings()->outgoing_test_request_interval_multiplier));
861 send_logon_response(session, message);
862 session->set_state(SessionState::LOGGED_ON);
864 LLFIX_LOG_DEBUG(m_name + ", session logged on : " + session->get_name() + " , peer index : " + std::to_string(peer_index));
866 do_post_logon_sequence_number_check(session);
868 on_logon_request(session, message);
882 virtual bool authenticate_logon_request(FixSession* session, const IncomingFixMessage* message)
884 LLFIX_UNUSED(session);
885 LLFIX_UNUSED(message);
890 void on_data_ready(std::size_t peer_index) override
892 auto read = this->receive(peer_index);
894 if (read > 0 && read <= static_cast<int>(this->m_options.m_rx_buffer_capacity))
896 auto buffer_size = this->get_rx_buffer_size(peer_index);
897 if(buffer_size >0 && buffer_size != static_cast<std::size_t>(-1))
898 process_rx_buffer(peer_index, this->get_rx_buffer(peer_index), buffer_size);
901 this->receive_done(peer_index);
904 void on_client_connected(std::size_t peer_index) override
906 LLFIX_LOG_DEBUG("FixServer " + m_name + " : new client connected , peer index : " + std::to_string(peer_index));
909 void on_client_disconnected(std::size_t peer_index) override
911 Transport::on_client_disconnected(peer_index);
913 if (m_fix_connectors.has_peer(peer_index) == false)
916 // A logged on client
917 auto session = m_fix_connectors.get_session(peer_index);
919 if(session != nullptr)
921 LLFIX_LOG_DEBUG("FixServer " + m_name + " : session " + session->get_name() + " disconnected , peer index : " + std::to_string(peer_index));
922 on_client_disconnected(session);
926 m_fix_connectors.remove(peer_index, session);
929 void on_client_disconnected(FixSession* session)
931 session->set_state(SessionState::DISCONNECTED);
934 FixServerSettings& get_settings() { return m_settings; }
936 std::string get_settings_as_string(const std::string& delimiter) override
938 return m_settings.to_string(delimiter);
946 void set_message_persist_plugin(MessagePersistPlugin* plugin)
949 m_message_persist_plugin = plugin;
963 virtual bool send_heartbeat(FixSession* session, FixString* test_request_id)
965 session->build_heartbeat_message(outgoing_message_instance(session), test_request_id);
966 return send_outgoing_message(session, outgoing_message_instance(session));
977 virtual bool send_logon_response(FixSession* session, const IncomingFixMessage* message)
979 LLFIX_UNUSED(message);
980 auto response = outgoing_message_instance(session);
981 response->set_msg_type(FixConstants::MSG_TYPE_LOGON);
983 // TAG 98 ENCRYPTION METHOD
984 response->set_tag(FixConstants::TAG_ENCRYPT_METHOD, 0);
986 // TAG 108 HEARTBEAT INTERVAL
987 response->set_tag(FixConstants::TAG_HEART_BT_INT, static_cast<uint32_t>(session->get_heartbeart_interval_in_nanoseconds() / 1'000
'000'000));
990 auto default_app_ver_id = session->get_default_app_ver_id();
992 if (default_app_ver_id.length() > 0)
994 response->set_tag(FixConstants::TAG_DEFAULT_APPL_VER_ID, default_app_ver_id);
1055 session->build_sequence_reset_message(message, desired_sequence_no);
1058 std::size_t encoded_length = 0;
1059 message->encode(session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, desired_sequence_no, encoded_length);
1062 return send_bytes<false>(session, session->get_tx_encode_buffer(), encoded_length);
1079 session->build_gap_fill_message(message);
1082 std::size_t encoded_length = 0;
1083 message->encode(session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, session->get_incoming_resend_request_begin_no(), encoded_length);
1086 return send_bytes<false>(session, session->get_tx_encode_buffer(), encoded_length);
1089 virtual void resend_messages_to_client(
FixSession* session, uint32_t begining_seq_no, uint32_t end_seq_no)
1091 for (uint32_t i = begining_seq_no; i <= end_seq_no; i++)
1093 std::size_t message_length = 0;
1094 session->get_outgoing_message_serialiser()->read_message(i, session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, message_length);
1097 message->load_from_buffer(session->get_tx_encode_buffer(), message_length);
1099 message->template set_tag<FixMessageComponent::HEADER>(FixConstants::TAG_POSS_DUP_FLAG, FixConstants::FIX_BOOLEAN_TRUE);
1101 if (session->include_t97_during_resends())
1103 message->template set_tag<FixMessageComponent::HEADER>(FixConstants::TAG_POSS_RESEND, FixConstants::FIX_BOOLEAN_TRUE);
1106 std::size_t encoded_length = 0;
1107 message->encode(session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, i, encoded_length);
1108 send_bytes<false>(session, session->get_tx_encode_buffer(), encoded_length);
1112 virtual bool send_reject_message(FixSession* session,
const IncomingFixMessage* incoming_message, uint32_t reject_reason_code,
const char* buffer_message, std::size_t buffer_message_length, uint32_t error_tag=0)
1114 LLFIX_UNUSED(incoming_message);
1115 char reject_reason_text[256];
1117 std::size_t text_length{ 0 };
1118 FixUtilities::get_reject_reason_text(reject_reason_text, text_length, reject_reason_code);
1122 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" session " + session->get_name() +
" received an invalid message : " + std::string(reject_reason_text) +
" ,message : " +
FixUtilities::fix_to_human_readible(buffer_message, buffer_message_length));
1126 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" session " + session->get_name() +
" received an invalid message : " + std::string(reject_reason_text) +
" ,tag : " + std::to_string(error_tag) +
" ,message : " +
FixUtilities::fix_to_human_readible(buffer_message, buffer_message_length));
1129 session->build_session_level_reject_message(
outgoing_message_instance(session), reject_reason_code, reject_reason_text, error_tag);
1133 virtual bool send_throttle_reject_message(FixSession* session,
const IncomingFixMessage* incoming_message)
1136 reject_message->set_msg_type(FixConstants::MSG_TYPE_BUSINESS_REJECT);
1138 reject_message->set_tag(FixConstants::TAG_TEXT, session->settings()->throttler_reject_message);
1140 if (incoming_message->has_tag(FixConstants::TAG_MSG_SEQ_NUM))
1142 reject_message->set_tag(FixConstants::TAG_REF_SEQ_NUM, incoming_message->get_tag_value_as<uint32_t>(FixConstants::TAG_MSG_SEQ_NUM));
1145 if (incoming_message->has_tag(FixConstants::TAG_MSG_TYPE))
1147 reject_message->set_tag(FixConstants::TAG_REF_MSG_TYPE, incoming_message->get_tag_value_as<std::string>(FixConstants::TAG_MSG_TYPE));
1153 std::unordered_map<std::string, FixSession*> m_sessions;
1156 static inline constexpr std::size_t MINIMUM_REQUIRED_INITIAL_BUFFER_FOR_PARSING = 4;
1158 bool m_is_ha_primary =
true;
1160 FixServerSettings m_settings;
1162 FixServerConnectors m_fix_connectors;
1164 MessagePersistPlugin* m_message_persist_plugin =
nullptr;
1166 UserspaceSpinlock<> m_unknown_session_context_lock;
1168 bool has_session(
const std::string& session_name)
1170 if (m_sessions.find(session_name) == m_sessions.end())
1178 FixSession* find_session(
const std::string& begin_string,
const std::string& sender_comp_id,
const std::string& target_comp_id)
1180 FixSession* ret =
nullptr;
1182 for (
const auto& session_entry : m_sessions)
1184 if (session_entry.second->settings()->sender_comp_id == sender_comp_id && session_entry.second->settings()->target_comp_id == target_comp_id && session_entry.second->settings()->begin_string == begin_string)
1186 return session_entry.second;
1193 bool does_any_session_use_path(
const std::string& path)
1195 for (
const auto& session_entry : m_sessions)
1197 if (session_entry.second->settings()->sequence_store_file_path == path)
1202 if (session_entry.second->settings()->incoming_message_serialisation_path == path)
1207 if (session_entry.second->settings()->outgoing_message_serialisation_path == path)
1215 bool does_any_session_use_compids(
const std::string& sender_comp_id,
const std::string& target_comp_id)
1217 for (
const auto& session_entry : m_sessions)
1219 if (session_entry.second->settings()->sender_comp_id == sender_comp_id && session_entry.second->settings()->target_comp_id == target_comp_id)
1227 bool internal_add_session(
const std::string& session_name,
const FixSessionSettings& session_settings)
1229 if (has_session(session_name))
1231 LLFIX_LOG_ERROR(m_name +
" : Already loaded a session with name " + session_name +
" . You have to specify unique names for sessions.");
1235 if (does_any_session_use_path(session_settings.sequence_store_file_path))
1237 LLFIX_LOG_ERROR(m_name +
" : session " + session_name +
"'s sequence_store_file_path already being used in another session.");
1241 if (does_any_session_use_path(session_settings.incoming_message_serialisation_path))
1243 LLFIX_LOG_ERROR(m_name +
" : session " + session_name +
"'s incoming_message_serialisation_path already being used in another session.");
1247 if (does_any_session_use_path(session_settings.outgoing_message_serialisation_path))
1249 LLFIX_LOG_ERROR(m_name +
" : session " + session_name +
"'s outgoing_message_serialisation_path already being used in another session.");
1253 if (does_any_session_use_compids(session_settings.sender_comp_id, session_settings.target_comp_id))
1255 std::string warning_message =
"WARNING (" + m_name +
"): session " + session_name +
"'s sender and target comp ids are already being used in another session.";
1256 LLFIX_LOG_WARNING(warning_message);
1257 fprintf(stderr,
"%s", warning_message.c_str());
1260 FixSession* current_session =
new (std::nothrow) FixSession;
1262 if (current_session ==
nullptr)
1264 LLFIX_LOG_ERROR(m_name +
" : Failed to allocate fix session for " + session_name);
1268 session_settings.is_server =
true;
1269 session_settings.tx_encode_buffer_capacity = m_settings.tx_encode_buffer_capacity;
1271 if (current_session->initialise(session_name, session_settings) ==
false)
1273 delete current_session;
1277 m_sessions[session_name] = current_session;
1279 if (m_settings.starts_as_primary_instance ==
false)
1281 disable_session(current_session);
1287 void process_session(std::size_t peer_index)
1289 static_assert(Transport::is_multithreaded());
1291 FixSession* sess = m_fix_connectors.get_session(peer_index);
1296 process_session(sess);
1301 void process_session(FixSession* session)
1303 static_assert(Transport::is_multithreaded());
1305 if (llfix_likely(this->m_is_stopping.load() ==
false))
1307 process_admin_commands(session);
1309 auto current_state = session->get_state();
1311 if (is_state_live(current_state))
1313 process_outgoing_resend_request_if_necessary(session, VDSO::nanoseconds_monotonic());
1314 process_outgoing_test_request_if_necessary(session, VDSO::nanoseconds_monotonic());
1315 respond_to_resend_request_if_necessary(session);
1316 respond_to_test_request_if_necessary(session);
1317 send_heartbeat_if_necessary(session, VDSO::nanoseconds_monotonic());
1325 on_client_disconnected(session);
1329 void process_admin_commands_of_all_non_live_sessions()
1331 static_assert(Transport::is_multithreaded());
1333 for (
auto& iter : m_sessions)
1335 auto state = iter.second->get_state();
1337 if (is_state_live(state) ==
false)
1339 iter.second->lock();
1340 process_admin_commands(iter.second);
1341 iter.second->unlock();
1349 static_assert(!Transport::is_multithreaded());
1351 if (llfix_likely(this->m_is_stopping.load() ==
false))
1353 for (
const auto& session_entry : m_sessions)
1355 FixSession* current_session = session_entry.second;
1357 process_admin_commands(current_session);
1359 auto current_state = current_session->get_state();
1361 if (is_state_live(current_state))
1363 process_outgoing_resend_request_if_necessary(current_session, VDSO::nanoseconds_monotonic());
1364 process_outgoing_test_request_if_necessary(current_session, VDSO::nanoseconds_monotonic());
1365 respond_to_resend_request_if_necessary(current_session);
1366 respond_to_test_request_if_necessary(current_session);
1367 send_heartbeat_if_necessary(current_session, VDSO::nanoseconds_monotonic());
1375 send_logout_messages_to_all_sessions();
1379 void push_admin_command_internal(
const std::string& session_name, ModifyingAdminCommandType type, uint32_t arg = 0)
1381 auto admin_command =
new (std::nothrow) ModifyingAdminCommand;
1383 if (llfix_likely(admin_command))
1385 admin_command->type = type;
1386 admin_command->arg = arg;
1387 m_sessions[session_name]->get_admin_commands()->push(admin_command);
1391 LLFIX_LOG_ERROR(
"Failed to process setter admin command for FixServer " + m_name);
1395 void process_admin_commands(FixSession* session)
1397 ModifyingAdminCommand* admin_command{
nullptr };
1399 if (session->get_admin_commands()->try_pop(&admin_command) ==
true)
1401 switch (admin_command->type)
1403 case ModifyingAdminCommandType::SET_INCOMING_SEQUENCE_NUMBER:
1405 session->get_sequence_store()->set_incoming_seq_no(admin_command->arg);
1406 LLFIX_LOG_DEBUG(m_name +
" : processed set incoming sequence number admin command , session name : " + session->get_name() +
" , new seq no : " + std::to_string(admin_command->arg) );
1410 case ModifyingAdminCommandType::SET_OUTGOING_SEQUENCE_NUMBER:
1412 session->get_sequence_store()->set_outgoing_seq_no(admin_command->arg);
1413 LLFIX_LOG_DEBUG(m_name +
" : processed set outgoing sequence number admin command , session name : " + session->get_name() +
" , new seq no : " + std::to_string(admin_command->arg) );
1417 case ModifyingAdminCommandType::SEND_SEQUENCE_RESET:
1419 if( session->get_state() == SessionState::LOGGED_ON )
1422 LLFIX_LOG_DEBUG(m_name +
" : processed send sequence reset admin command , session name : " + session->get_name() +
" , new seq no : " + std::to_string(admin_command->arg) );
1426 LLFIX_LOG_ERROR(m_name +
" : dropping send sequence reset admin command as not logged on , session name : " + session->get_name());
1432 case ModifyingAdminCommandType::DISABLE_SESSION:
1434 disable_session(session);
1435 LLFIX_LOG_DEBUG(m_name +
" : processed disable session admin command for session " + session->get_name() +
" , new session state : Disabled");
1439 case ModifyingAdminCommandType::ENABLE_SESSION:
1441 if(m_is_ha_primary ==
true)
1443 enable_session(session);
1444 LLFIX_LOG_DEBUG(m_name +
" : processed enable session admin command for " + session->get_name());
1448 LLFIX_LOG_ERROR(m_name +
" ignoring enable session admin command for " + session->get_name() +
" since not a primary instance" );
1453 case ModifyingAdminCommandType::SET_IS_HA_PRIMARY_INSTANCE:
1455 m_is_ha_primary = admin_command->arg == 1 ? true :
false;
1457 if(admin_command->arg == 1)
1459 enable_session(session);
1460 LLFIX_LOG_DEBUG(m_name +
" : processed set is ha primary instance 1 admin command for " + session->get_name());
1464 disable_session(session);
1465 LLFIX_LOG_DEBUG(m_name +
" : processed set is ha primary instance 0 admin command for " + session->get_name());
1475 delete admin_command;
1479 void disable_session(FixSession* session)
1481 if (is_state_live(session->get_state()))
1483 this->process_connection_closure(m_fix_connectors.get_peer_index(session),
true);
1485 session->set_state(SessionState::DISABLED);
1488 void enable_session(FixSession* session)
1490 if(session->get_state() == SessionState::DISABLED)
1492 session->set_state(SessionState::DISCONNECTED);
1493 if(m_settings.refresh_resend_cache_during_promotion)
1494 session->reinitialise_outgoing_serialiser();
1500 void process_outgoing_resend_request_if_necessary(FixSession* session, uint64_t current_timestamp_nanoseconds)
1502 if (session->get_state() == SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF)
1505 uint64_t delta_nanoseconds = current_timestamp_nanoseconds - session->outgoing_resend_request_timestamp_nanoseconds();
1506 uint64_t required_delta_nanoseconds =
static_cast<uint64_t
>(session->outgoing_resend_request_expire_secs()) * 1
'000'000
'000;
1508 if (delta_nanoseconds >= required_delta_nanoseconds)
1510 // We need to terminate the session as the other side didn't respond properly to our outgoing resend request
1511 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" : terminating session " + session->get_name() +
" as the other end didn't respond to 35=2/resend request in pre-configured timeout (outgoing_resend_request_expire_secs) : " + std::to_string(session->outgoing_resend_request_expire_secs()));
1512 this->process_connection_closure(m_fix_connectors.get_peer_index(session));
1518 if (session->needs_to_send_resend_request())
1520 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" session " + session->get_name() +
" peer " + std::to_string(m_fix_connectors.get_peer_index(session)) +
" : sending resend request , begin no : " + std::to_string(session->get_outgoing_resend_request_begin_no()));
1522 session->set_outgoing_resend_request_timestamp_nanoseconds(current_timestamp_nanoseconds);
1523 session->set_state(llfix::SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF);
1524 session->set_needs_to_send_resend_request(
false);
1529 void process_outgoing_test_request_if_necessary(FixSession* session, uint64_t current_timestamp_nanoseconds)
1531 if (session->expecting_response_for_outgoing_test_request() ==
false)
1534 auto delta_nanoseconds = current_timestamp_nanoseconds - session->last_received_message_timestamp_nanoseconds();
1536 if (delta_nanoseconds >= (session->get_outgoing_test_request_interval_in_nanoseconds()))
1538 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" session " + session->get_name() +
" : sending test request");
1539 send_test_request(session);
1540 session->set_outgoing_test_request_timestamp_nanoseconds(current_timestamp_nanoseconds);
1541 session->set_expecting_response_for_outgoing_test_request(
true);
1547 auto delta_nanoseconds = current_timestamp_nanoseconds - session->outgoing_test_request_timestamp_nanoseconds();
1549 if (delta_nanoseconds >= (session->get_heartbeart_interval_in_nanoseconds() *
static_cast<uint64_t
>(2)))
1551 session->set_expecting_response_for_outgoing_test_request(
false);
1552 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" : terminating session " + session->get_name() +
" as the other end didn't respond to 35=1/test request");
1553 this->process_connection_closure(m_fix_connectors.get_peer_index(session));
1558 void respond_to_resend_request_if_necessary(FixSession* session)
1560 if (session->needs_responding_to_incoming_resend_request())
1562 const auto last_outgoing_seq_no = session->get_sequence_store()->get_outgoing_seq_no();
1565 bool will_replay = (session->serialisation_enabled()) && (session->replay_messages_on_incoming_resend_request() && (session->get_incoming_resend_request_end_no() == 0
1566 || session->get_incoming_resend_request_end_no() == last_outgoing_seq_no) && session->get_incoming_resend_request_begin_no()<=last_outgoing_seq_no);
1568 if(last_outgoing_seq_no > session->get_incoming_resend_request_begin_no())
1569 if(last_outgoing_seq_no-session->get_incoming_resend_request_begin_no()+1 > session->max_resend_range())
1570 will_replay =
false;
1574 for (uint32_t i = session->get_incoming_resend_request_begin_no(); i <= last_outgoing_seq_no; i++)
1576 if (session->get_outgoing_message_serialiser()->has_message_in_memory(i) ==
false)
1578 will_replay =
false;
1586 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" session " + session->get_name() +
" : replaying messages , begin seq no : " + std::to_string(session->get_incoming_resend_request_begin_no()) +
" , end seq no : " + std::to_string(last_outgoing_seq_no));
1587 resend_messages_to_client(session, session->get_incoming_resend_request_begin_no(), last_outgoing_seq_no);
1591 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" session " + session->get_name() +
" : sending gap fill message");
1592 send_gap_fill_message(session);
1595 session->set_needs_responding_to_incoming_resend_request(
false);
1596 session->set_state(llfix::SessionState::LOGGED_ON);
1600 void respond_to_test_request_if_necessary(FixSession* session)
1602 if (session->needs_responding_to_incoming_test_request())
1604 if (session->get_incoming_test_request_id()->length() > 0)
1606 send_heartbeat(session , session->get_incoming_test_request_id());
1607 session->get_incoming_test_request_id()->set_length(0);
1612 send_heartbeat(session,
nullptr);
1615 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" session " + session->get_name() +
" : responded to incoming test request");
1616 session->set_needs_responding_to_incoming_test_request(
false);
1620 void send_heartbeat_if_necessary(FixSession* session, uint64_t current_timestamp_nanoseconds)
1622 auto delta_nanoseconds = current_timestamp_nanoseconds - session->last_sent_message_timestamp_nanoseconds();
1624 if (delta_nanoseconds >= (session->get_heartbeart_interval_in_nanoseconds() *
static_cast<uint64_t
>(9) /
static_cast<uint64_t
>(10)))
1626 send_heartbeat(session,
nullptr);
1630 void send_logout_messages_to_all_sessions()
1632 for (
const auto& session_entry : m_sessions)
1634 FixSession* current_session = session_entry.second;
1636 if (current_session->get_state() != SessionState::DISCONNECTED)
1638 send_logout_message(current_session);
1639 on_client_disconnected(current_session);
1645 template <
bool increment_outgoing_seq_no>
1646 LLFIX_FORCE_INLINE
bool send_bytes(FixSession* session,
const char* buffer, std::size_t buffer_size)
1648 std::size_t peer_index = m_fix_connectors.get_peer_index(session);
1650 if (peer_index ==
static_cast<std::size_t
>(-1))
1653 auto sequence_store = session->get_sequence_store();
1655 auto ret = this->send(peer_index, buffer, buffer_size);
1657 if constexpr (increment_outgoing_seq_no ==
true)
1660 sequence_store->increment_outgoing_seq_no();
1663 if(session->serialisation_enabled())
1664 session->get_outgoing_message_serialiser()->write(
reinterpret_cast<const void*
>(buffer), buffer_size, ret, sequence_store->get_outgoing_seq_no());
1666 if (m_message_persist_plugin)
1667 m_message_persist_plugin->persist_outgoing_message(session->get_name(), sequence_store->get_outgoing_seq_no(), buffer, buffer_size, ret);
1669 session->set_last_sent_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
1676 UnknownSessionContext* get_thread_local_unknown_session_context()
1678 const std::lock_guard<UserspaceSpinlock<>> lock(m_unknown_session_context_lock);
1680 UnknownSessionContext* unknown_session_context =
nullptr;
1681 unknown_session_context =
reinterpret_cast<UnknownSessionContext*
>(ThreadLocalStorage::get_instance().get());
1683 if (unknown_session_context ==
nullptr)
1685 unknown_session_context =
new(std::nothrow) UnknownSessionContext;
1686 ThreadLocalStorage::get_instance().set(unknown_session_context);
1689 if (unknown_session_context)
1691 if (unknown_session_context->m_incoming_fix_message.initialise() ==
false)
1693 LLFIX_LOG_ERROR(m_name +
" : Failed to initialise unknown session context's incoming_fix_message");
1697 if (unknown_session_context->m_fix_string_view_cache.create(256) ==
false)
1699 LLFIX_LOG_ERROR(m_name +
" : Failed to create unknown session context's fix string view cache");
1704 return unknown_session_context;
1707 void destroy_thread_local_unknown_session_context()
1709 const std::lock_guard<UserspaceSpinlock<>> lock(m_unknown_session_context_lock);
1711 UnknownSessionContext* unknown_session_context{
nullptr };
1712 unknown_session_context =
reinterpret_cast<UnknownSessionContext*
>(ThreadLocalStorage::get_instance().get());
1714 if (unknown_session_context)
1716 delete unknown_session_context;
1717 ThreadLocalStorage::get_instance().set(
nullptr);
1723 LLFIX_HOT
void process_rx_buffer(std::size_t peer_index,
char* buffer, std::size_t buffer_size)
1725 std::size_t buffer_read_index = 0;
1727 if (llfix_unlikely(buffer_size < MINIMUM_REQUIRED_INITIAL_BUFFER_FOR_PARSING))
1729 this->set_incomplete_buffer(peer_index, buffer, buffer_size);
1733 FixSession* session = m_fix_connectors.get_session(peer_index);
1737 int final_tag10_delimiter_index{ -1 };
1738 int current_index =
static_cast<int>(buffer_size - 1);
1740 if(llfix_unlikely(FixUtilities::find_delimiter_from_end(buffer, buffer_size, current_index) ==
false))
1743 this->set_incomplete_buffer(peer_index, buffer, buffer_size);
1747 FixUtilities::find_tag10_start_from_end(buffer, buffer_size, current_index, final_tag10_delimiter_index);
1749 if (final_tag10_delimiter_index == -1)
1752 this->set_incomplete_buffer(peer_index, buffer, buffer_size);
1757 LLFIX_ALIGN_CODE_32;
1762 int begin_string_offset{-1};
1763 FixUtilities::find_begin_string_position(buffer+buffer_read_index, buffer_size-buffer_read_index, begin_string_offset);
1765 if(llfix_likely(begin_string_offset>=0))
1767 buffer_read_index += begin_string_offset;
1775 IncomingFixMessage* incoming_fix_message{
nullptr };
1776 ObjectCache<FixStringView>* fix_string_view_cache{
nullptr };
1778 if (llfix_likely(session !=
nullptr))
1780 incoming_fix_message = session->get_incoming_fix_message();
1781 fix_string_view_cache = session->get_fix_string_view_cache();
1785 UnknownSessionContext* unknown_session_context = get_thread_local_unknown_session_context();
1787 if (unknown_session_context)
1789 incoming_fix_message = &(unknown_session_context->m_incoming_fix_message);
1790 fix_string_view_cache = &(unknown_session_context->m_fix_string_view_cache);
1794 this->set_incomplete_buffer(peer_index, buffer, buffer_size);
1799 incoming_fix_message->reset();
1800 fix_string_view_cache->reset_pointer();
1802 current_index =
static_cast<int>(buffer_read_index);
1803 bool looking_for_equals =
true;
1805 int current_tag_start =
static_cast<int>(buffer_read_index);
1806 int current_tag_len{ 0 };
1808 int current_value_start =
static_cast<int>(buffer_read_index);
1809 int current_value_len{ 0 };
1811 int current_tag_10_delimiter_index = -1;
1812 int current_tag_35_tag_start_index = -1;
1814 uint32_t parser_reject_code =
static_cast<uint32_t
>(-1);
1815 uint32_t current_tag_index = 0;
1817 uint32_t encoded_current_message_type =
static_cast<uint32_t
>(-1);
1819 bool in_a_repeating_group{
false };
1820 uint32_t current_rg_count_tag{ 0 };
1822 #ifdef LLFIX_ENABLE_BINARY_FIELDS
1823 int binary_field_length = 0;
1824 int binary_field_counter = 0;
1827 LLFIX_ALIGN_CODE_32;
1830 char current_char = buffer[current_index];
1832 if (looking_for_equals)
1834 if (current_char == FixConstants::FIX_EQUALS)
1836 current_tag_len = current_index - current_tag_start;
1837 current_value_start = current_index + 1;
1838 looking_for_equals =
false;
1840 else if (llfix_unlikely(current_char == FixConstants::FIX_DELIMITER))
1842 current_tag_start = current_index + 1;
1843 parser_reject_code = FixParserErrorCodes::NO_EQUALS_SIGN;
1846 #ifdef LLFIX_ENABLE_BINARY_FIELDS
1849 if(llfix_unlikely(binary_field_length>0))
1850 if(binary_field_counter < binary_field_length)
1851 binary_field_counter++;
1854 if (looking_for_equals ==
false && current_char == FixConstants::FIX_DELIMITER)
1856 bool reached_value_end =
true;
1858 if(llfix_unlikely(binary_field_length>0))
1860 if(binary_field_length>binary_field_counter)
1862 reached_value_end =
false;
1864 else if(binary_field_length == binary_field_counter)
1866 binary_field_length=0;
1867 binary_field_counter=0;
1871 if(llfix_likely(reached_value_end))
1875 else if (current_char == FixConstants::FIX_DELIMITER)
1878 current_value_len = current_index - current_value_start;
1880 if (llfix_unlikely(current_value_len == 0))
1882 parser_reject_code = FixConstants::FIX_ERROR_CODE_TAG_WITHOUT_VALUE;
1885 bool is_current_tag_numeric{
true };
1886 FixSession::validate_tag_format(buffer + current_tag_start,
static_cast<std::size_t
>(current_tag_len), is_current_tag_numeric, parser_reject_code);
1888 if (llfix_likely(is_current_tag_numeric))
1892 uint32_t tag = Converters::chars_to_unsigned_int<uint32_t>(buffer + current_tag_start,
static_cast<std::size_t
>(current_tag_len));
1894 if(llfix_unlikely(tag == 0))
1896 parser_reject_code = FixConstants::FIX_ERROR_CODE_INVALID_TAG_NUMBER;
1899 FixStringView* value = fix_string_view_cache->allocate();
1900 value->set_buffer(
const_cast<char*
>(buffer + current_value_start),
static_cast<std::size_t
>(current_value_len));
1903 current_tag_index++;
1904 FixSession::validate_header_tags_order(tag, current_tag_index, parser_reject_code);
1909 if(encoded_current_message_type !=
static_cast<uint32_t
>(-1))
1911 if(llfix_likely(!in_a_repeating_group))
1913 if (llfix_unlikely(FixSession::get_repeating_group_specs().is_a_repeating_group_count_tag(encoded_current_message_type, tag)))
1915 current_rg_count_tag = tag;
1916 in_a_repeating_group =
true;
1921 if (llfix_likely(FixSession::get_repeating_group_specs().is_a_repeating_group_tag(encoded_current_message_type, current_rg_count_tag, tag) ==
false))
1923 in_a_repeating_group =
false;
1927 #ifdef LLFIX_ENABLE_BINARY_FIELDS
1928 if (llfix_unlikely(FixSession::get_binary_field_specs().is_binary_data_length_tag(encoded_current_message_type, tag)))
1930 binary_field_length = Converters::chars_to_int<int>(value->data(), value->length());
1935 if (llfix_likely(in_a_repeating_group ==
false))
1937 if (llfix_likely(incoming_fix_message->has_tag(tag) ==
false))
1939 incoming_fix_message->set_tag(tag, value);
1943 parser_reject_code = FixConstants::FIX_ERROR_CODE_TAG_APPEARS_MORE_THAN_ONCE;
1948 incoming_fix_message->set_repeating_group_tag(tag, value);
1951 if (tag == FixConstants::TAG_CHECKSUM)
1953 current_tag_10_delimiter_index = current_index;
1955 FixSession::validate_tag9_and_tag35((incoming_fix_message), current_tag_start, current_tag_35_tag_start_index, parser_reject_code);
1959 else if (tag == FixConstants::TAG_MSG_TYPE)
1961 current_tag_35_tag_start_index = current_tag_start;
1962 encoded_current_message_type = FixUtilities::pack_message_type(value->to_string_view());
1966 current_tag_start = current_index + 1;
1967 looking_for_equals =
true;
1968 #ifdef LLFIX_ENABLE_BINARY_FIELDS
1975 if (current_index >=
static_cast<int>(buffer_size) - 1)
1977 this->set_incomplete_buffer(peer_index, buffer + buffer_read_index, buffer_size - buffer_read_index);
1984 auto current_message_length = current_tag_10_delimiter_index + 1 - buffer_read_index;
1986 if (llfix_unlikely(session ==
nullptr))
1988 session = accept_session(peer_index, incoming_fix_message, buffer + buffer_read_index, current_message_length, parser_reject_code);
1990 if (session ==
nullptr)
1997 process_incoming_fix_message(session, peer_index, buffer + buffer_read_index, current_message_length, parser_reject_code);
2000 #ifndef LLFIX_UNIT_TEST
2001 if(llfix_unlikely(session->get_state() == SessionState::DISCONNECTED))
2007 buffer_read_index = current_tag_10_delimiter_index + 1;
2009 if (current_tag_10_delimiter_index ==
static_cast<int>(buffer_size) - 1)
2011 this->reset_incomplete_buffer(peer_index);
2014 else if (
static_cast<int>(buffer_read_index) > final_tag10_delimiter_index)
2016 this->set_incomplete_buffer(peer_index, buffer + buffer_read_index, buffer_size - buffer_read_index);
2022 void process_invalid_logon(std::size_t peer_index,
const std::string& log_message,
bool send_logout =
false,
const std::string& logout_reason_text =
"")
2024 if(!log_message.empty())
2027 std::string peer_ip;
2028 this->get_peer_details(peer_index, peer_ip, peer_port);
2030 LLFIX_LOG_ERROR(
"Invalid logon attempt from " + peer_ip +
" : " + log_message);
2033 process_connection_closure(peer_index, send_logout, logout_reason_text);
2036 void process_connection_closure(std::size_t peer_index,
bool send_logout =
false,
const std::string& logout_reason_text =
"")
2040 auto session = m_fix_connectors.get_session(peer_index);
2042 if (session !=
nullptr)
2044 send_logout_message(session, logout_reason_text);
2048 this->close_connection(peer_index);
2051 void process_incoming_fix_message(FixSession* session, std::size_t peer_index,
const char* buffer_message, std::size_t buffer_message_length, uint32_t parser_reject_code)
2053 session->set_last_received_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
2055 if(session->serialisation_enabled())
2056 session->get_incoming_message_serialiser()->write(buffer_message, buffer_message_length,
true);
2058 if (llfix_unlikely(session->expecting_response_for_outgoing_test_request()))
2061 session->set_expecting_response_for_outgoing_test_request(
false);
2062 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" session " + session->get_name() +
" : other end satisfied the test request");
2065 if (session->validations_enabled())
2067 uint32_t reject_message_code =
static_cast<uint32_t
>(-1);
2069 if (session->validate_fix_message(*session->get_incoming_fix_message(), buffer_message, buffer_message_length, parser_reject_code, reject_message_code) ==
false)
2071 if (reject_message_code !=
static_cast<uint32_t
>(-1))
2073 send_reject_message(session, session->get_incoming_fix_message(), reject_message_code, buffer_message, buffer_message_length, session->get_last_error_tag());
2081 auto sequence_store = session->get_sequence_store();
2082 sequence_store->increment_incoming_seq_no();
2083 auto sequence_store_incoming_seq_no = sequence_store->get_incoming_seq_no();
2085 if (m_message_persist_plugin)
2086 m_message_persist_plugin->persist_incoming_message(session->get_name(), sequence_store_incoming_seq_no, buffer_message, buffer_message_length);
2090 auto incoming_seq_no = session->get_incoming_fix_message()->get_tag_value_as<uint32_t>(FixConstants::TAG_MSG_SEQ_NUM);
2092 if (llfix_unlikely(incoming_seq_no > sequence_store_incoming_seq_no))
2094 if(FixSession::is_a_hard_sequence_reset_message(*session->get_incoming_fix_message()) ==
false)
2096 if (session->get_state() != SessionState::PENDING_LOGON)
2098 session->queue_outgoing_resend_request(sequence_store_incoming_seq_no, incoming_seq_no);
2103 else if (llfix_unlikely(incoming_seq_no < sequence_store_incoming_seq_no))
2105 if(FixSession::is_a_hard_sequence_reset_message(*session->get_incoming_fix_message()) ==
false)
2107 std::string logout_reason_text;
2108 bool send_logout =
false;
2110 if (session->get_state() == SessionState::PENDING_LOGON)
2113 logout_reason_text =
"MsgSeqNum too low, expecting " + std::to_string(sequence_store_incoming_seq_no) +
" but received " + std::to_string(incoming_seq_no);
2116 sequence_store->set_incoming_seq_no(sequence_store_incoming_seq_no - 1);
2117 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" : terminating session " + session->get_name() +
" as the incoming sequence no (" + std::to_string(incoming_seq_no) +
") is lower than expected (" + std::to_string(sequence_store_incoming_seq_no) +
")");
2118 this->process_connection_closure(peer_index, send_logout, logout_reason_text);
2123 if (llfix_unlikely(session->get_state() == SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF))
2125 if (incoming_seq_no == session->get_outgoing_resend_request_end_no())
2127 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" : other end satisfied the resend request");
2128 session->set_state(llfix::SessionState::LOGGED_ON);
2132 if(process_incoming_throttling(session, session->get_incoming_fix_message()) ==
false)
2137 auto message_type = session->get_incoming_fix_message()->get_tag_value(FixConstants::TAG_MSG_TYPE);
2139 if (llfix_likely(message_type->length() == 1))
2141 switch (message_type->data()[0])
2143 case FixConstants::MSG_TYPE_NEW_ORDER: on_new_order(session, session->get_incoming_fix_message());
break;
2144 case FixConstants::MSG_TYPE_ORDER_CANCEL: on_cancel_order(session, session->get_incoming_fix_message());
break;
2145 case FixConstants::MSG_TYPE_ORDER_CANCEL_REPLACE: on_replace_order(session, session->get_incoming_fix_message());
break;
2146 case FixConstants::MSG_TYPE_HEARTBEAT: on_client_heartbeat(session);
break;
2147 case FixConstants::MSG_TYPE_TEST_REQUEST: session->process_test_request_message(*session->get_incoming_fix_message()); on_client_test_request(session, session->get_incoming_fix_message());
break;
2148 case FixConstants::MSG_TYPE_RESEND_REQUEST: session->process_resend_request(*session->get_incoming_fix_message()); on_client_resend_request(session, session->get_incoming_fix_message());
break;
2149 case FixConstants::MSG_TYPE_REJECT: on_session_level_reject(session, session->get_incoming_fix_message());
break;
2150 case FixConstants::MSG_TYPE_BUSINESS_REJECT: on_application_level_reject(session, session->get_incoming_fix_message());
break;
2151 case FixConstants::MSG_TYPE_LOGON: process_logon_request(session, peer_index, session->get_incoming_fix_message());
break;
2152 case FixConstants::MSG_TYPE_LOGOUT: process_logout_request(session, peer_index, session->get_incoming_fix_message());
break;
2153 case FixConstants::MSG_TYPE_SEQUENCE_RESET:
if (session->process_incoming_sequence_reset_message(*session->get_incoming_fix_message()) ==
false) { send_reject_message(session, session->get_incoming_fix_message(), FixConstants::FIX_ERROR_CODE_VALUE_INCORRECT_FOR_TAG, buffer_message, buffer_message_length, FixConstants::TAG_NEW_SEQ_NO); };
break;
2155 default: on_custom_message(session, session->get_incoming_fix_message());
break;
2160 on_custom_message(session, session->get_incoming_fix_message());
2164 bool validate_logon_request(FixSession* session, std::size_t peer_index,
const IncomingFixMessage* message)
2166 LLFIX_UNUSED(session);
2168 std::string peer_ip;
2169 this->get_peer_details(peer_index, peer_ip, peer_port);
2171 if (message->has_tag(FixConstants::TAG_ENCRYPT_METHOD) ==
false)
2173 LLFIX_LOG_ERROR(
"Incoming logon message for server " + m_name +
" from " + peer_ip +
" does not have required t98(encryption method), compid : " + message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID));
2177 if (message->has_tag(FixConstants::TAG_HEART_BT_INT) ==
false)
2179 LLFIX_LOG_ERROR(
"Incoming logon message for server " + m_name +
" from " + peer_ip +
" does not have required t108(heartbeat interval), compid : " + message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID));
2183 auto t108_string = message->get_tag_value_as<std::string>(FixConstants::TAG_HEART_BT_INT);
2185 if (t108_string.find(
'-') != std::string::npos)
2187 LLFIX_LOG_ERROR(
"Incoming logon message for server " + m_name +
" from " + peer_ip +
" has invalid(negative) t108(heartbeat interval) value, compid : " + message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID));
2191 if (t108_string.find(
'.') != std::string::npos)
2193 LLFIX_LOG_ERROR(
"Incoming logon message for server " + m_name +
" from " + peer_ip +
" has invalid(decimal points) t108(heartbeat interval) value, compid : " + message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID));
2197 if (message->get_tag_value_as<uint32_t>(FixConstants::TAG_HEART_BT_INT) == 0)
2199 LLFIX_LOG_ERROR(
"Incoming logon message for server " + m_name +
" from " + peer_ip +
" has invalid(zero) t108(heartbeat interval) value, compid : " + message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID));
2206 void do_post_logon_sequence_number_check(FixSession* session)
2208 if(session->get_state() == SessionState::LOGGED_ON)
2211 auto sequence_store_incoming_seq_no = session->get_sequence_store()->get_incoming_seq_no();
2212 auto incoming_seq_no = session->get_incoming_fix_message()->get_tag_value_as<uint32_t>(FixConstants::TAG_MSG_SEQ_NUM);
2214 if (llfix_unlikely(incoming_seq_no > sequence_store_incoming_seq_no))
2216 if(FixSession::is_a_hard_sequence_reset_message(*session->get_incoming_fix_message()) ==
false)
2218 session->queue_outgoing_resend_request(sequence_store_incoming_seq_no, incoming_seq_no);
2226 void process_logout_request(FixSession* session, std::size_t peer_index,
const IncomingFixMessage* message)
2228 send_logout_message(session);
2229 session->set_state(SessionState::LOGGED_OUT);
2230 LLFIX_LOG_DEBUG(m_name +
", session logged out : " + session->get_name() +
" , peer index : " + std::to_string(peer_index));
2231 on_logout_request(session, message);
2234 FixServer(
const FixServer& other) =
delete;
2235 FixServer& operator= (
const FixServer& other) =
delete;
2236 FixServer(FixServer&& other) =
delete;
2237 FixServer& operator=(FixServer&& other) =
delete;