10 #include <string_view>
13 #include <unordered_map>
18 #include "core/compiler/hints_branch_predictor.h"
19 #include "core/compiler/hints_hot_code.h"
20 #include "core/compiler/unused.h"
21 #include "core/os/vdso.h"
22 #include "core/os/thread_local_storage.h"
24 #include "core/utilities/configuration.h"
25 #include "core/utilities/logger.h"
26 #include "core/utilities/object_cache.h"
27 #include "core/utilities/std_string_utilities.h"
28 #include "core/utilities/userspace_spinlock.h"
30 #include "electronic_trading/common/message_persist_plugin.h"
31 #include "electronic_trading/managed_instance/modifying_admin_command.h"
32 #include "electronic_trading/managed_instance/managed_instance.h"
34 #include "fix_constants.h"
35 #include "fix_string_view.h"
36 #include "fix_string.h"
37 #include "fix_session_settings.h"
38 #include "fix_utilities.h"
39 #include "incoming_fix_message.h"
40 #include "outgoing_fix_message.h"
41 #include "fix_parser_error_codes.h"
42 #include "fix_session.h"
43 #include "fix_server_settings.h"
45 #include "core/utilities/tcp_reactor_options.h"
51 struct UnknownSessionContext
53 IncomingFixMessage m_incoming_fix_message;
54 ObjectCache<FixStringView> m_fix_string_view_cache;
57 class FixServerConnectors
62 m_tables_lock.initialise();
63 m_peer_index_session_table.reserve(1024);
64 m_session_peer_index_table.reserve(1024);
67 bool has_peer(std::size_t peer_index)
const
69 const std::lock_guard<UserspaceSpinlock<>> lock(m_tables_lock);
71 if (m_peer_index_session_table.find(peer_index) == m_peer_index_session_table.end())
79 void update(std::size_t peer_index, FixSession* session)
81 const std::lock_guard<UserspaceSpinlock<>> lock(m_tables_lock);
84 if (m_session_peer_index_table.find(session) != m_session_peer_index_table.end())
86 auto existing_peer_index = m_session_peer_index_table[session];
88 if (existing_peer_index != peer_index)
90 m_peer_index_session_table.erase(existing_peer_index);
94 m_peer_index_session_table[peer_index] = session;
95 m_session_peer_index_table[session] = peer_index;
98 FixSession* get_session(std::size_t peer_index)
100 const std::lock_guard<UserspaceSpinlock<>> lock(m_tables_lock);
102 if (m_peer_index_session_table.find(peer_index) == m_peer_index_session_table.end())
107 return m_peer_index_session_table[peer_index];
110 std::size_t get_peer_index(FixSession* session)
112 const std::lock_guard<UserspaceSpinlock<>> lock(m_tables_lock);
114 if (m_session_peer_index_table.find(session) == m_session_peer_index_table.end())
116 return static_cast<std::size_t
>(-1);
119 return m_session_peer_index_table[session];
122 void remove(std::size_t peer_index, FixSession* session)
124 const std::lock_guard<UserspaceSpinlock<>> lock(m_tables_lock);
126 m_peer_index_session_table.erase(peer_index);
127 m_session_peer_index_table.erase(session);
131 std::unordered_map<std::size_t, FixSession*> m_peer_index_session_table;
132 std::unordered_map<FixSession*, std::size_t> m_session_peer_index_table;
133 mutable UserspaceSpinlock<> m_tables_lock;
145 template<
typename Transport>
146 class FixServer :
public Transport,
public ManagedInstance
153 for (
auto& entry : m_sessions)
167 [[nodiscard]]
bool create(
const std::string& server_name,
const std::string server_config_file_path)
169 m_name = server_name;
171 if (m_settings.load_from_config_file(server_config_file_path, server_name) ==
false)
173 LLFIX_LOG_ERROR(
"Loading settings for server " + server_name +
" failed : " + m_settings.config_load_error);
177 if (m_settings.validate() ==
false)
179 LLFIX_LOG_ERROR(
"FixServerSettings for " + m_name +
" validation failed : " + m_settings.validation_error);
183 m_is_ha_primary = m_settings.starts_as_primary_instance;
184 m_unknown_session_context_lock.initialise();
186 TCPReactorOptions reactor_options;
188 reactor_options.m_accept_timeout_seconds = m_settings.accept_timeout_seconds;
189 reactor_options.m_async_io_timeout_nanoseconds = m_settings.async_io_timeout_nanoseconds;
190 reactor_options.m_busy_poll_microseconds = m_settings.busy_poll_microseconds;
191 reactor_options.m_cpu_core_id = m_settings.cpu_core_id;
192 reactor_options.m_worker_thread_count = m_settings.worker_thread_count;
193 reactor_options.m_send_try_count = m_settings.send_try_count;
194 reactor_options.m_disable_nagle = m_settings.disable_nagle;
195 reactor_options.m_enable_quick_ack = m_settings.quick_ack;
196 reactor_options.m_max_poll_events = m_settings.max_poll_events;
197 reactor_options.m_nic_interface_ip = m_settings.nic_address;
198 reactor_options.m_nic_interface_name = m_settings.nic_name;
199 reactor_options.m_nic_ringbuffer_rx_size = m_settings.nic_ringbuffer_rx_size;
200 reactor_options.m_nic_ringbuffer_tx_size = m_settings.nic_ringbuffer_tx_size;
201 reactor_options.m_pending_connection_queue_size = m_settings.pending_connection_queue_size;
202 reactor_options.m_port = m_settings.accept_port;
203 reactor_options.m_rx_buffer_capacity = m_settings.rx_buffer_capacity;
204 reactor_options.m_receive_size = m_settings.receive_size;
205 reactor_options.m_socket_rx_size = m_settings.socket_rx_size;
206 reactor_options.m_socket_tx_size = m_settings.socket_tx_size;
207 reactor_options.m_spin_count = m_settings.spin_count;
208 #ifdef LLFIX_ENABLE_OPENSSL
209 reactor_options.m_use_ssl = m_settings.use_ssl;
210 reactor_options.m_ssl_verify_peer = m_settings.ssl_verify_peer;
211 reactor_options.m_ssl_ca_pem_file = m_settings.ssl_ca_pem_file;
212 reactor_options.m_ssl_cert_pem_file = m_settings.ssl_certificate_pem_file;
213 reactor_options.m_ssl_private_key_pem_file = m_settings.ssl_private_key_pem_file;
214 reactor_options.m_ssl_private_key_password = m_settings.ssl_private_key_password;
215 reactor_options.m_ssl_version = m_settings.ssl_version;
216 reactor_options.m_ssl_cipher_suite = m_settings.ssl_cipher_suite;
217 reactor_options.m_ssl_crl_path = m_settings.ssl_crl_path;
220 this->set_params(reactor_options);
222 if constexpr (Transport::is_multithreaded())
224 this->register_acceptor_callback(
225 [
this](std::size_t peer_index)
227 LLFIX_UNUSED(peer_index);
228 this->process_admin_commands_of_all_non_live_sessions();
231 this->register_acceptor_termination_callback(
232 [
this](std::size_t peer_index)
234 LLFIX_UNUSED(peer_index);
235 this->destroy_thread_local_unknown_session_context();
238 this->register_worker_callback(
239 [
this](std::size_t peer_index)
241 this->process_session(peer_index);
244 this->register_worker_termination_callback(
245 [
this](std::size_t peer_index)
247 LLFIX_UNUSED(peer_index);
248 this->destroy_thread_local_unknown_session_context();
253 this->register_callback(std::bind(&FixServer::process,
this));
254 this->register_termination_callback(std::bind(&FixServer::destroy_thread_local_unknown_session_context,
this));
257 LLFIX_LOG_INFO(m_name +
" : Loaded server config =>\n" + m_settings.to_string());
258 LLFIX_LOG_INFO(
"FixServer " + m_name +
" creation success");
271 [[nodiscard]]
bool add_session(
const std::string& session_name, FixSessionSettings& session_settings)
273 return internal_add_session(session_name, session_settings);
284 [[nodiscard]]
bool add_session(
const std::string& session_config_file_path,
const std::string& session_name)
286 FixSessionSettings session_settings;
288 if (session_settings.load_from_config_file(session_config_file_path, session_name) ==
false)
290 LLFIX_LOG_ERROR(
"Loading settings for session " + session_name +
" failed : " + session_settings.config_load_error);
294 return internal_add_session(session_name, session_settings);
309 Configuration config_file;
310 std::string config_load_error;
312 if (config_file.load_from_file(session_config_file_path, config_load_error) ==
false)
317 std::vector<std::string> session_names;
318 config_file.get_group_names(session_names);
320 int added_session_count{0};
322 for (
const auto& session_name : session_names)
324 auto lowered_session_name = StringUtilities::to_lower(session_name);
326 if (StringUtilities::contains(lowered_session_name,
"session"))
328 FixSessionSettings session_settings;
330 if (session_settings.load_from_config_file(session_config_file_path, session_name) ==
false)
332 LLFIX_LOG_ERROR(
"Loading settings for session " + session_name +
" failed : " + session_settings.config_load_error);
336 if (internal_add_session(session_name, session_settings) ==
false)
341 added_session_count++;
345 if(added_session_count == 0)
347 LLFIX_LOG_ERROR(m_name +
" : Could not find any session in " + session_config_file_path +
". Make sure they have 'SESSION' in their names.");
361 return m_sessions.size();
364 bool is_instance_ha_primary()
const override
366 return m_is_ha_primary;
378 if (has_session(session_name) ==
false)
383 return m_sessions[session_name];
397 for (
const auto& session_entry : m_sessions)
399 if(session_entry.second == session)
401 return session_entry.first;
415 for (
const auto& iter : m_sessions)
417 target.push_back(iter.first);
421 std::string get_name()
const override
434 template<
typename... Args>
437 FixSession::get_repeating_group_specs().specify_repeating_group(args...);
440 #ifdef LLFIX_ENABLE_BINARY_FIELDS
450 void specify_binary_field(
const std::string& message_type, uint32_t tag_length, uint32_t tag_data)
452 FixSession::get_binary_field_specs().specify_binary_field(message_type, tag_length, tag_data);
465 return session->get_outgoing_fix_message();
478 std::size_t encoded_length = 0;
481 message->encode(session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, int_sequence_no, encoded_length);
482 return send_bytes<true>(session, session->get_tx_encode_buffer(), encoded_length);
485 void push_admin_command(
const std::string& session_name, ModifyingAdminCommandType type, uint32_t arg = 0)
override
487 if(llfix_likely(session_name !=
"*"))
489 push_admin_command_internal(session_name, type, arg);
493 for (
auto& session : m_sessions)
495 push_admin_command_internal(session.first, type, arg);
500 #ifdef LLFIX_AUTOMATION
502 bool send_fake_outgoing_message(FixSession* session, OutgoingFixMessage* message)
504 std::size_t encoded_length = 0;
505 auto int_sequence_no = session->get_sequence_store()->get_outgoing_seq_no() + 1;
507 message->encode(session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, int_sequence_no, encoded_length);
509 auto sequence_store = session->get_sequence_store();
510 sequence_store->increment_outgoing_seq_no();
512 if(session->serialisation_enabled())
513 session->get_outgoing_message_serialiser()->write(
reinterpret_cast<const void*
>(session->get_tx_encode_buffer()), encoded_length,
true, sequence_store->get_outgoing_seq_no());
515 session->set_last_sent_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
520 void set_replay_messages_on_incoming_resend_request_for_all_sessions(
bool b)
522 for (
auto& entry : m_sessions)
524 entry.second->set_replay_messages_on_incoming_resend_request(b);
537 virtual void on_async_io_error(
int error_code,
int event_result)
override
539 LLFIX_UNUSED(error_code);
540 LLFIX_UNUSED(event_result);
543 virtual void on_socket_error(
int error_code,
int event_result)
override
545 LLFIX_UNUSED(error_code);
546 LLFIX_UNUSED(event_result);
641 if(session->settings()->throttle_limit == 0)
646 session->throttler()->update();
648 if(session->throttler()->reached_limit() )
650 session->increment_incoming_throttler_exceed_count();
652 if(session->settings()->throttle_action == IncomingThrottlerAction::WAIT)
654 session->throttler()->wait();
656 else if(session->settings()->throttle_action == IncomingThrottlerAction::DISCONNECT)
658 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" : terminating session " + session->
get_name() +
" as the other end exceeded the throttle rate : " + std::to_string(session->settings()->throttle_limit));
659 this->process_connection_closure(m_fix_connectors.get_peer_index(session));
662 else if(session->settings()->throttle_action == IncomingThrottlerAction::REJECT)
664 send_throttle_reject_message(session, incoming_fix_message);
681 if (session->is_now_valid_session_datetime() ==
false)
683 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" : terminating session " + session->
get_name() +
" due to schedule settings");
684 this->process_connection_closure(m_fix_connectors.get_peer_index(session),
true);
705 if (parser_reject_code !=
static_cast<uint32_t
>(-1))
708 if (parser_reject_code > FixConstants::FIX_MAX_ERROR_CODE)
715 std::size_t length{ 0 };
716 FixUtilities::get_reject_reason_text(temp_buf, length, parser_reject_code);
720 process_invalid_logon(peer_index, message);
726 if (!(incoming_fix_message->
has_tag(FixConstants::TAG_BEGIN_STRING) && incoming_fix_message->
has_tag(FixConstants::TAG_MSG_SEQ_NUM) && incoming_fix_message->
has_tag(FixConstants::TAG_SENDER_COMP_ID) && incoming_fix_message->
has_tag(FixConstants::TAG_TARGET_COMP_ID) && incoming_fix_message->
has_tag(FixConstants::TAG_MSG_TYPE)))
733 auto message_type = incoming_fix_message->
get_tag_value_as<
char>(FixConstants::TAG_MSG_TYPE);
735 if (message_type == FixConstants::MSG_TYPE_LOGON)
737 auto incoming_begin_string = incoming_fix_message->
get_tag_value_as<std::string>(FixConstants::TAG_BEGIN_STRING);
738 auto incoming_comp_id = incoming_fix_message->
get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID);
739 auto incoming_target_comp_id = incoming_fix_message->
get_tag_value_as<std::string>(FixConstants::TAG_TARGET_COMP_ID);
741 session = find_session(incoming_begin_string, incoming_target_comp_id, incoming_comp_id);
743 if (session !=
nullptr)
745 auto session_state = session->get_state();
747 if (is_state_live(session_state))
749 std::string message =
"Incoming logon message for server " + m_name +
" is invalid as the client is already logged on, session : " + session->get_name();
750 process_invalid_logon(peer_index, message);
754 if(session_state == SessionState::DISABLED)
756 std::string message =
"Cannot accept connection attempt as session " + session->get_name() +
" is disabled. Closing connection.";
757 process_invalid_logon(peer_index, message);
761 if (session->is_now_valid_session_datetime() ==
false)
763 std::string message =
"Cannot accept connection attempt for session " + session->get_name() +
" due to schedule settings. Closing connection.";
764 process_invalid_logon(peer_index, message);
769 m_fix_connectors.update(peer_index, session);
771 if constexpr(Transport::is_multithreaded())
773 this->mark_connector_as_ready_for_worker_thread_dispatch(peer_index);
776 session->reset_flags();
777 session->get_incoming_fix_message()->reset();
778 session->get_incoming_fix_message()->copy_non_dirty_tag_values_from(*incoming_fix_message);
780 if(incoming_fix_message->
has_tag(FixConstants::TAG_RESET_SEQ_NUM_FLAG))
782 if(incoming_fix_message->
get_tag_value_as<
char>(FixConstants::TAG_RESET_SEQ_NUM_FLAG) == FixConstants::FIX_BOOLEAN_TRUE)
784 session->get_sequence_store()->reset_numbers();
788 session->set_state(SessionState::PENDING_LOGON);
793 std::string message =
"Could not find a session for incoming message : 8=" + incoming_begin_string +
" 49=" + incoming_comp_id +
" 56=" + incoming_target_comp_id +
" , message : " +
FixUtilities::fix_to_human_readible(buffer, buffer_length);
794 process_invalid_logon(peer_index, message);
800 std::string message = std::string(
"Received a message with a msgtype different than A (") + message_type +
") while expecting a logon message : " +
FixUtilities::fix_to_human_readible(buffer, buffer_length);
801 process_invalid_logon(peer_index, message);
820 if (validate_logon_request(session, peer_index, message) ==
false)
822 process_invalid_logon(peer_index,
"");
828 std::string message =
"Authentication failed for incoming logon message for server " + m_name +
" for session " + session->
get_name();
829 process_invalid_logon(peer_index, message);
833 auto requested_heartbeat_interval = message->
get_tag_value_as<uint32_t>(FixConstants::TAG_HEART_BT_INT);
834 auto requested_heartbeat_interval_nanoseconds =
static_cast<uint64_t
>(requested_heartbeat_interval) *
static_cast<uint64_t
>(1
'000'000
'000);
836 session->set_heartbeart_interval_in_nanoseconds(requested_heartbeat_interval_nanoseconds);
837 session->set_outgoing_test_request_interval_in_nanoseconds(static_cast<uint64_t>(requested_heartbeat_interval_nanoseconds * session->settings()->outgoing_test_request_interval_multiplier));
839 send_logon_response(session, message);
840 session->set_state(SessionState::LOGGED_ON);
842 LLFIX_LOG_DEBUG(m_name + ", session logged on : " + session->get_name() + " , peer index : " + std::to_string(peer_index));
844 do_post_logon_sequence_number_check(session);
846 on_logon_request(session, message);
860 virtual bool authenticate_logon_request(FixSession* session, const IncomingFixMessage* message)
862 LLFIX_UNUSED(session);
863 LLFIX_UNUSED(message);
868 void on_data_ready(std::size_t peer_index) override
870 auto read = this->receive(peer_index);
872 if (read > 0 && read <= static_cast<int>(this->m_options.m_rx_buffer_capacity))
874 auto buffer_size = this->get_rx_buffer_size(peer_index);
875 if(buffer_size >0 && buffer_size != static_cast<std::size_t>(-1))
876 process_rx_buffer(peer_index, this->get_rx_buffer(peer_index), buffer_size);
879 this->receive_done(peer_index);
882 void on_client_connected(std::size_t peer_index) override
884 LLFIX_LOG_DEBUG("FixServer " + m_name + " : new client connected , peer index : " + std::to_string(peer_index));
887 void on_client_disconnected(std::size_t peer_index) override
889 Transport::on_client_disconnected(peer_index);
891 if (m_fix_connectors.has_peer(peer_index) == false)
894 // A logged on client
895 auto session = m_fix_connectors.get_session(peer_index);
897 if(session != nullptr)
899 LLFIX_LOG_DEBUG("FixServer " + m_name + " : session " + session->get_name() + " disconnected , peer index : " + std::to_string(peer_index));
900 on_client_disconnected(session);
904 m_fix_connectors.remove(peer_index, session);
907 void on_client_disconnected(FixSession* session)
909 session->set_state(SessionState::DISCONNECTED);
912 FixServerSettings& get_settings() { return m_settings; }
914 std::string get_settings_as_string(const std::string& delimiter) override
916 return m_settings.to_string(delimiter);
924 void set_message_persist_plugin(MessagePersistPlugin* plugin)
927 m_message_persist_plugin = plugin;
941 virtual bool send_heartbeat(FixSession* session, FixString* test_request_id)
943 session->build_heartbeat_message(outgoing_message_instance(session), test_request_id);
944 return send_outgoing_message(session, outgoing_message_instance(session));
955 virtual bool send_logon_response(FixSession* session, const IncomingFixMessage* message)
957 LLFIX_UNUSED(message);
958 auto response = outgoing_message_instance(session);
959 response->set_msg_type(FixConstants::MSG_TYPE_LOGON);
961 // TAG 98 ENCRYPTION METHOD
962 response->set_tag(FixConstants::TAG_ENCRYPT_METHOD, 0);
964 // TAG 108 HEARTBEAT INTERVAL
965 response->set_tag(FixConstants::TAG_HEART_BT_INT, static_cast<uint32_t>(session->get_heartbeart_interval_in_nanoseconds() / 1'000
'000'000));
968 auto default_app_ver_id = session->get_default_app_ver_id();
970 if (default_app_ver_id.length() > 0)
972 response->set_tag(FixConstants::TAG_DEFAULT_APPL_VER_ID, default_app_ver_id);
1033 session->build_sequence_reset_message(message, desired_sequence_no);
1036 std::size_t encoded_length = 0;
1037 message->encode(session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, desired_sequence_no, encoded_length);
1040 return send_bytes<false>(session, session->get_tx_encode_buffer(), encoded_length);
1057 session->build_gap_fill_message(message);
1060 std::size_t encoded_length = 0;
1061 message->encode(session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, session->get_incoming_resend_request_begin_no(), encoded_length);
1064 return send_bytes<false>(session, session->get_tx_encode_buffer(), encoded_length);
1067 virtual void resend_messages_to_client(
FixSession* session, uint32_t begining_seq_no, uint32_t end_seq_no)
1069 for (uint32_t i = begining_seq_no; i <= end_seq_no; i++)
1071 std::size_t message_length = 0;
1072 session->get_outgoing_message_serialiser()->read_message(i, session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, message_length);
1075 message->load_from_buffer(session->get_tx_encode_buffer(), message_length);
1077 message->template set_tag<FixMessageComponent::HEADER>(FixConstants::TAG_POSS_DUP_FLAG, FixConstants::FIX_BOOLEAN_TRUE);
1079 if (session->include_t97_during_resends())
1081 message->template set_tag<FixMessageComponent::HEADER>(FixConstants::TAG_POSS_RESEND, FixConstants::FIX_BOOLEAN_TRUE);
1084 std::size_t encoded_length = 0;
1085 message->encode(session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, i, encoded_length);
1086 send_bytes<false>(session, session->get_tx_encode_buffer(), encoded_length);
1090 virtual bool send_reject_message(FixSession* session,
const IncomingFixMessage* incoming_message, uint32_t reject_reason_code,
const char* buffer_message, std::size_t buffer_message_length, uint32_t error_tag=0)
1092 LLFIX_UNUSED(incoming_message);
1093 char reject_reason_text[256];
1095 std::size_t text_length{ 0 };
1096 FixUtilities::get_reject_reason_text(reject_reason_text, text_length, reject_reason_code);
1100 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" session " + session->get_name() +
" received an invalid message : " + std::string(reject_reason_text) +
" ,message : " +
FixUtilities::fix_to_human_readible(buffer_message, buffer_message_length));
1104 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" session " + session->get_name() +
" received an invalid message : " + std::string(reject_reason_text) +
" ,tag : " + std::to_string(error_tag) +
" ,message : " +
FixUtilities::fix_to_human_readible(buffer_message, buffer_message_length));
1107 session->build_session_level_reject_message(
outgoing_message_instance(session), reject_reason_code, reject_reason_text, error_tag);
1111 virtual bool send_throttle_reject_message(FixSession* session,
const IncomingFixMessage* incoming_message)
1114 reject_message->set_msg_type(FixConstants::MSG_TYPE_BUSINESS_REJECT);
1116 reject_message->set_tag(FixConstants::TAG_TEXT, session->settings()->throttler_reject_message);
1118 if (incoming_message->has_tag(FixConstants::TAG_MSG_SEQ_NUM))
1120 reject_message->set_tag(FixConstants::TAG_REF_SEQ_NUM, incoming_message->get_tag_value_as<uint32_t>(FixConstants::TAG_MSG_SEQ_NUM));
1123 if (incoming_message->has_tag(FixConstants::TAG_MSG_TYPE))
1125 reject_message->set_tag(FixConstants::TAG_REF_MSG_TYPE, incoming_message->get_tag_value_as<std::string>(FixConstants::TAG_MSG_TYPE));
1131 std::unordered_map<std::string, FixSession*> m_sessions;
1134 static inline constexpr std::size_t MINIMUM_REQUIRED_INITIAL_BUFFER_FOR_PARSING = 4;
1136 bool m_is_ha_primary =
true;
1138 FixServerSettings m_settings;
1140 FixServerConnectors m_fix_connectors;
1142 MessagePersistPlugin* m_message_persist_plugin =
nullptr;
1144 UserspaceSpinlock<> m_unknown_session_context_lock;
1146 bool has_session(
const std::string& session_name)
1148 if (m_sessions.find(session_name) == m_sessions.end())
1156 FixSession* find_session(
const std::string& begin_string,
const std::string& sender_comp_id,
const std::string& target_comp_id)
1158 FixSession* ret =
nullptr;
1160 for (
const auto& session_entry : m_sessions)
1162 if (session_entry.second->settings()->sender_comp_id == sender_comp_id && session_entry.second->settings()->target_comp_id == target_comp_id && session_entry.second->settings()->begin_string == begin_string)
1164 return session_entry.second;
1171 bool does_any_session_use_path(
const std::string& path)
1173 for (
const auto& session_entry : m_sessions)
1175 if (session_entry.second->settings()->sequence_store_file_path == path)
1180 if (session_entry.second->settings()->incoming_message_serialisation_path == path)
1185 if (session_entry.second->settings()->outgoing_message_serialisation_path == path)
1193 bool does_any_session_use_compids(
const std::string& sender_comp_id,
const std::string& target_comp_id)
1195 for (
const auto& session_entry : m_sessions)
1197 if (session_entry.second->settings()->sender_comp_id == sender_comp_id && session_entry.second->settings()->target_comp_id == target_comp_id)
1205 bool internal_add_session(
const std::string& session_name,
const FixSessionSettings& session_settings)
1207 if (has_session(session_name))
1209 LLFIX_LOG_ERROR(m_name +
" : Already loaded a session with name " + session_name +
" . You have to specify unique names for sessions.");
1213 if (does_any_session_use_path(session_settings.sequence_store_file_path))
1215 LLFIX_LOG_ERROR(m_name +
" : session " + session_name +
"'s sequence_store_file_path already being used in another session.");
1219 if (does_any_session_use_path(session_settings.incoming_message_serialisation_path))
1221 LLFIX_LOG_ERROR(m_name +
" : session " + session_name +
"'s incoming_message_serialisation_path already being used in another session.");
1225 if (does_any_session_use_path(session_settings.outgoing_message_serialisation_path))
1227 LLFIX_LOG_ERROR(m_name +
" : session " + session_name +
"'s outgoing_message_serialisation_path already being used in another session.");
1231 if (does_any_session_use_compids(session_settings.sender_comp_id, session_settings.target_comp_id))
1233 std::string warning_message =
"WARNING (" + m_name +
"): session " + session_name +
"'s sender and target comp ids are already being used in another session.";
1234 LLFIX_LOG_WARNING(warning_message);
1235 fprintf(stderr,
"%s", warning_message.c_str());
1238 FixSession* current_session =
new (std::nothrow) FixSession;
1240 if (current_session ==
nullptr)
1242 LLFIX_LOG_ERROR(m_name +
" : Failed to allocate fix session for " + session_name);
1246 session_settings.is_server =
true;
1247 session_settings.tx_encode_buffer_capacity = m_settings.tx_encode_buffer_capacity;
1249 if (current_session->initialise(session_name, session_settings) ==
false)
1251 delete current_session;
1255 m_sessions[session_name] = current_session;
1257 if (m_settings.starts_as_primary_instance ==
false)
1259 disable_session(current_session);
1265 void process_session(std::size_t peer_index)
1267 static_assert(Transport::is_multithreaded());
1269 FixSession* sess = m_fix_connectors.get_session(peer_index);
1274 process_session(sess);
1279 void process_session(FixSession* session)
1281 static_assert(Transport::is_multithreaded());
1283 if (llfix_likely(this->m_is_stopping.load() ==
false))
1285 process_admin_commands(session);
1287 auto current_state = session->get_state();
1289 if (is_state_live(current_state))
1291 process_outgoing_resend_request_if_necessary(session, VDSO::nanoseconds_monotonic());
1292 process_outgoing_test_request_if_necessary(session, VDSO::nanoseconds_monotonic());
1293 respond_to_resend_request_if_necessary(session);
1294 respond_to_test_request_if_necessary(session);
1295 send_heartbeat_if_necessary(session, VDSO::nanoseconds_monotonic());
1303 on_client_disconnected(session);
1307 void process_admin_commands_of_all_non_live_sessions()
1309 static_assert(Transport::is_multithreaded());
1311 for (
auto& iter : m_sessions)
1313 auto state = iter.second->get_state();
1315 if (is_state_live(state) ==
false)
1317 iter.second->lock();
1318 process_admin_commands(iter.second);
1319 iter.second->unlock();
1327 static_assert(!Transport::is_multithreaded());
1329 if (llfix_likely(this->m_is_stopping.load() ==
false))
1331 for (
const auto& session_entry : m_sessions)
1333 FixSession* current_session = session_entry.second;
1335 process_admin_commands(current_session);
1337 auto current_state = current_session->get_state();
1339 if (is_state_live(current_state))
1341 process_outgoing_resend_request_if_necessary(current_session, VDSO::nanoseconds_monotonic());
1342 process_outgoing_test_request_if_necessary(current_session, VDSO::nanoseconds_monotonic());
1343 respond_to_resend_request_if_necessary(current_session);
1344 respond_to_test_request_if_necessary(current_session);
1345 send_heartbeat_if_necessary(current_session, VDSO::nanoseconds_monotonic());
1353 send_logout_messages_to_all_sessions();
1357 void push_admin_command_internal(
const std::string& session_name, ModifyingAdminCommandType type, uint32_t arg = 0)
1359 auto admin_command =
new (std::nothrow) ModifyingAdminCommand;
1361 if (llfix_likely(admin_command))
1363 admin_command->type = type;
1364 admin_command->arg = arg;
1365 m_sessions[session_name]->get_admin_commands()->push(admin_command);
1369 LLFIX_LOG_ERROR(
"Failed to process setter admin command for FixServer " + m_name);
1373 void process_admin_commands(FixSession* session)
1375 ModifyingAdminCommand* admin_command{
nullptr };
1377 if (session->get_admin_commands()->try_pop(&admin_command) ==
true)
1379 switch (admin_command->type)
1381 case ModifyingAdminCommandType::SET_INCOMING_SEQUENCE_NUMBER:
1383 session->get_sequence_store()->set_incoming_seq_no(admin_command->arg);
1384 LLFIX_LOG_DEBUG(m_name +
" : processed set incoming sequence number admin command , session name : " + session->get_name() +
" , new seq no : " + std::to_string(admin_command->arg) );
1388 case ModifyingAdminCommandType::SET_OUTGOING_SEQUENCE_NUMBER:
1390 session->get_sequence_store()->set_outgoing_seq_no(admin_command->arg);
1391 LLFIX_LOG_DEBUG(m_name +
" : processed set outgoing sequence number admin command , session name : " + session->get_name() +
" , new seq no : " + std::to_string(admin_command->arg) );
1395 case ModifyingAdminCommandType::SEND_SEQUENCE_RESET:
1397 if( session->get_state() == SessionState::LOGGED_ON )
1400 LLFIX_LOG_DEBUG(m_name +
" : processed send sequence reset admin command , session name : " + session->get_name() +
" , new seq no : " + std::to_string(admin_command->arg) );
1404 LLFIX_LOG_ERROR(m_name +
" : dropping send sequence reset admin command as not logged on , session name : " + session->get_name());
1410 case ModifyingAdminCommandType::DISABLE_SESSION:
1412 disable_session(session);
1413 LLFIX_LOG_DEBUG(m_name +
" : processed disable session admin command for session " + session->get_name() +
" , new session state : Disabled");
1417 case ModifyingAdminCommandType::ENABLE_SESSION:
1419 if(m_is_ha_primary ==
true)
1421 enable_session(session);
1422 LLFIX_LOG_DEBUG(m_name +
" : processed enable session admin command for " + session->get_name());
1426 LLFIX_LOG_ERROR(m_name +
" ignoring enable session admin command for " + session->get_name() +
" since not a primary instance" );
1431 case ModifyingAdminCommandType::SET_IS_HA_PRIMARY_INSTANCE:
1433 m_is_ha_primary = admin_command->arg == 1 ? true :
false;
1435 if(admin_command->arg == 1)
1437 enable_session(session);
1438 LLFIX_LOG_DEBUG(m_name +
" : processed set is ha primary instance 1 admin command for " + session->get_name());
1442 disable_session(session);
1443 LLFIX_LOG_DEBUG(m_name +
" : processed set is ha primary instance 0 admin command for " + session->get_name());
1453 delete admin_command;
1457 void disable_session(FixSession* session)
1459 if (is_state_live(session->get_state()))
1461 this->process_connection_closure(m_fix_connectors.get_peer_index(session),
true);
1463 session->set_state(SessionState::DISABLED);
1466 void enable_session(FixSession* session)
1468 if(session->get_state() == SessionState::DISABLED)
1470 session->set_state(SessionState::DISCONNECTED);
1471 if(m_settings.refresh_resend_cache_during_promotion)
1472 session->reinitialise_outgoing_serialiser();
1478 void process_outgoing_resend_request_if_necessary(FixSession* session, uint64_t current_timestamp_nanoseconds)
1480 if (session->get_state() == SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF)
1483 uint64_t delta_nanoseconds = current_timestamp_nanoseconds - session->outgoing_resend_request_timestamp_nanoseconds();
1484 uint64_t required_delta_nanoseconds =
static_cast<uint64_t
>(session->outgoing_resend_request_expire_secs()) * 1
'000'000
'000;
1486 if (delta_nanoseconds >= required_delta_nanoseconds)
1488 // We need to terminate the session as the other side didn't respond properly to our outgoing resend request
1489 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" : terminating session " + session->get_name() +
" as the other end didn't respond to 35=2/resend request in pre-configured timeout (outgoing_resend_request_expire_secs) : " + std::to_string(session->outgoing_resend_request_expire_secs()));
1490 this->process_connection_closure(m_fix_connectors.get_peer_index(session));
1496 if (session->needs_to_send_resend_request())
1498 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" session " + session->get_name() +
" peer " + std::to_string(m_fix_connectors.get_peer_index(session)) +
" : sending resend request , begin no : " + std::to_string(session->get_outgoing_resend_request_begin_no()));
1500 session->set_outgoing_resend_request_timestamp_nanoseconds(current_timestamp_nanoseconds);
1501 session->set_state(llfix::SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF);
1502 session->set_needs_to_send_resend_request(
false);
1507 void process_outgoing_test_request_if_necessary(FixSession* session, uint64_t current_timestamp_nanoseconds)
1509 if (session->expecting_response_for_outgoing_test_request() ==
false)
1512 auto delta_nanoseconds = current_timestamp_nanoseconds - session->last_received_message_timestamp_nanoseconds();
1514 if (delta_nanoseconds >= (session->get_outgoing_test_request_interval_in_nanoseconds()))
1516 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" session " + session->get_name() +
" : sending test request");
1517 send_test_request(session);
1518 session->set_outgoing_test_request_timestamp_nanoseconds(current_timestamp_nanoseconds);
1519 session->set_expecting_response_for_outgoing_test_request(
true);
1525 auto delta_nanoseconds = current_timestamp_nanoseconds - session->outgoing_test_request_timestamp_nanoseconds();
1527 if (delta_nanoseconds >= (session->get_heartbeart_interval_in_nanoseconds() *
static_cast<uint64_t
>(2)))
1529 session->set_expecting_response_for_outgoing_test_request(
false);
1530 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" : terminating session " + session->get_name() +
" as the other end didn't respond to 35=1/test request");
1531 this->process_connection_closure(m_fix_connectors.get_peer_index(session));
1536 void respond_to_resend_request_if_necessary(FixSession* session)
1538 if (session->needs_responding_to_incoming_resend_request())
1540 const auto last_outgoing_seq_no = session->get_sequence_store()->get_outgoing_seq_no();
1543 bool will_replay = (session->serialisation_enabled()) && (session->replay_messages_on_incoming_resend_request() && (session->get_incoming_resend_request_end_no() == 0
1544 || session->get_incoming_resend_request_end_no() == last_outgoing_seq_no) && session->get_incoming_resend_request_begin_no()<=last_outgoing_seq_no);
1546 if(last_outgoing_seq_no > session->get_incoming_resend_request_begin_no())
1547 if(last_outgoing_seq_no-session->get_incoming_resend_request_begin_no()+1 > session->max_resend_range())
1548 will_replay =
false;
1552 for (uint32_t i = session->get_incoming_resend_request_begin_no(); i <= last_outgoing_seq_no; i++)
1554 if (session->get_outgoing_message_serialiser()->has_message_in_memory(i) ==
false)
1556 will_replay =
false;
1564 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" session " + session->get_name() +
" : replaying messages , begin seq no : " + std::to_string(session->get_incoming_resend_request_begin_no()) +
" , end seq no : " + std::to_string(last_outgoing_seq_no));
1565 resend_messages_to_client(session, session->get_incoming_resend_request_begin_no(), last_outgoing_seq_no);
1569 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" session " + session->get_name() +
" : sending gap fill message");
1570 send_gap_fill_message(session);
1573 session->set_needs_responding_to_incoming_resend_request(
false);
1574 session->set_state(llfix::SessionState::LOGGED_ON);
1578 void respond_to_test_request_if_necessary(FixSession* session)
1580 if (session->needs_responding_to_incoming_test_request())
1582 if (session->get_incoming_test_request_id()->length() > 0)
1584 send_heartbeat(session , session->get_incoming_test_request_id());
1585 session->get_incoming_test_request_id()->set_length(0);
1590 send_heartbeat(session,
nullptr);
1593 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" session " + session->get_name() +
" : responded to incoming test request");
1594 session->set_needs_responding_to_incoming_test_request(
false);
1598 void send_heartbeat_if_necessary(FixSession* session, uint64_t current_timestamp_nanoseconds)
1600 auto delta_nanoseconds = current_timestamp_nanoseconds - session->last_sent_message_timestamp_nanoseconds();
1602 if (delta_nanoseconds >= (session->get_heartbeart_interval_in_nanoseconds() *
static_cast<uint64_t
>(9) /
static_cast<uint64_t
>(10)))
1604 send_heartbeat(session,
nullptr);
1608 void send_logout_messages_to_all_sessions()
1610 for (
const auto& session_entry : m_sessions)
1612 FixSession* current_session = session_entry.second;
1614 if (current_session->get_state() != SessionState::DISCONNECTED)
1616 send_logout_message(current_session);
1617 on_client_disconnected(current_session);
1623 template <
bool increment_outgoing_seq_no>
1624 LLFIX_FORCE_INLINE
bool send_bytes(FixSession* session,
const char* buffer, std::size_t buffer_size)
1626 std::size_t peer_index = m_fix_connectors.get_peer_index(session);
1628 if (peer_index ==
static_cast<std::size_t
>(-1))
1631 auto sequence_store = session->get_sequence_store();
1633 auto ret = this->send(peer_index, buffer, buffer_size);
1635 if constexpr (increment_outgoing_seq_no ==
true)
1638 sequence_store->increment_outgoing_seq_no();
1641 if(session->serialisation_enabled())
1642 session->get_outgoing_message_serialiser()->write(
reinterpret_cast<const void*
>(buffer), buffer_size, ret, sequence_store->get_outgoing_seq_no());
1644 if (m_message_persist_plugin)
1645 m_message_persist_plugin->persist_outgoing_message(session->get_name(), sequence_store->get_outgoing_seq_no(), buffer, buffer_size, ret);
1647 session->set_last_sent_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
1654 UnknownSessionContext* get_thread_local_unknown_session_context()
1656 const std::lock_guard<UserspaceSpinlock<>> lock(m_unknown_session_context_lock);
1658 UnknownSessionContext* unknown_session_context =
nullptr;
1659 unknown_session_context =
reinterpret_cast<UnknownSessionContext*
>(ThreadLocalStorage::get_instance().get());
1661 if (unknown_session_context ==
nullptr)
1663 unknown_session_context =
new(std::nothrow) UnknownSessionContext;
1664 ThreadLocalStorage::get_instance().set(unknown_session_context);
1667 if (unknown_session_context)
1669 if (unknown_session_context->m_incoming_fix_message.initialise() ==
false)
1671 LLFIX_LOG_ERROR(m_name +
" : Failed to initialise unknown session context's incoming_fix_message");
1675 if (unknown_session_context->m_fix_string_view_cache.create(256) ==
false)
1677 LLFIX_LOG_ERROR(m_name +
" : Failed to create unknown session context's fix string view cache");
1682 return unknown_session_context;
1685 void destroy_thread_local_unknown_session_context()
1687 const std::lock_guard<UserspaceSpinlock<>> lock(m_unknown_session_context_lock);
1689 UnknownSessionContext* unknown_session_context{
nullptr };
1690 unknown_session_context =
reinterpret_cast<UnknownSessionContext*
>(ThreadLocalStorage::get_instance().get());
1692 if (unknown_session_context)
1694 delete unknown_session_context;
1695 ThreadLocalStorage::get_instance().set(
nullptr);
1701 LLFIX_HOT
void process_rx_buffer(std::size_t peer_index,
char* buffer, std::size_t buffer_size)
1703 std::size_t buffer_read_index = 0;
1705 if (llfix_unlikely(buffer_size < MINIMUM_REQUIRED_INITIAL_BUFFER_FOR_PARSING))
1707 this->set_incomplete_buffer(peer_index, buffer, buffer_size);
1711 FixSession* session = m_fix_connectors.get_session(peer_index);
1715 int final_tag10_delimiter_index{ -1 };
1716 int current_index =
static_cast<int>(buffer_size - 1);
1718 if(llfix_unlikely(FixUtilities::find_delimiter_from_end(buffer, buffer_size, current_index) ==
false))
1721 this->set_incomplete_buffer(peer_index, buffer, buffer_size);
1725 FixUtilities::find_tag10_start_from_end(buffer, buffer_size, current_index, final_tag10_delimiter_index);
1727 if (final_tag10_delimiter_index == -1)
1730 this->set_incomplete_buffer(peer_index, buffer, buffer_size);
1735 LLFIX_ALIGN_CODE_32;
1740 int begin_string_offset{-1};
1741 FixUtilities::find_begin_string_position(buffer+buffer_read_index, buffer_size-buffer_read_index, begin_string_offset);
1743 if(llfix_likely(begin_string_offset>=0))
1745 buffer_read_index += begin_string_offset;
1753 IncomingFixMessage* incoming_fix_message{
nullptr };
1754 ObjectCache<FixStringView>* fix_string_view_cache{
nullptr };
1756 if (llfix_likely(session !=
nullptr))
1758 incoming_fix_message = session->get_incoming_fix_message();
1759 fix_string_view_cache = session->get_fix_string_view_cache();
1763 UnknownSessionContext* unknown_session_context = get_thread_local_unknown_session_context();
1765 if (unknown_session_context)
1767 incoming_fix_message = &(unknown_session_context->m_incoming_fix_message);
1768 fix_string_view_cache = &(unknown_session_context->m_fix_string_view_cache);
1772 this->set_incomplete_buffer(peer_index, buffer, buffer_size);
1777 incoming_fix_message->reset();
1778 fix_string_view_cache->reset_pointer();
1780 current_index =
static_cast<int>(buffer_read_index);
1781 bool looking_for_equals =
true;
1783 int current_tag_start =
static_cast<int>(buffer_read_index);
1784 int current_tag_len{ 0 };
1786 int current_value_start =
static_cast<int>(buffer_read_index);
1787 int current_value_len{ 0 };
1789 int current_tag_10_delimiter_index = -1;
1790 int current_tag_35_tag_start_index = -1;
1792 uint32_t parser_reject_code =
static_cast<uint32_t
>(-1);
1793 uint32_t current_tag_index = 0;
1795 uint32_t encoded_current_message_type =
static_cast<uint32_t
>(-1);
1797 bool in_a_repeating_group{
false };
1798 uint32_t current_rg_count_tag{ 0 };
1800 #ifdef LLFIX_ENABLE_BINARY_FIELDS
1801 int binary_field_length = 0;
1802 int binary_field_counter = 0;
1805 LLFIX_ALIGN_CODE_32;
1808 char current_char = buffer[current_index];
1810 if (looking_for_equals)
1812 if (current_char == FixConstants::FIX_EQUALS)
1814 current_tag_len = current_index - current_tag_start;
1815 current_value_start = current_index + 1;
1816 looking_for_equals =
false;
1818 else if (llfix_unlikely(current_char == FixConstants::FIX_DELIMITER))
1820 current_tag_start = current_index + 1;
1821 parser_reject_code = FixParserErrorCodes::NO_EQUALS_SIGN;
1824 #ifdef LLFIX_ENABLE_BINARY_FIELDS
1827 if(llfix_unlikely(binary_field_length>0))
1828 if(binary_field_counter < binary_field_length)
1829 binary_field_counter++;
1832 if (looking_for_equals ==
false && current_char == FixConstants::FIX_DELIMITER)
1834 bool reached_value_end =
true;
1836 if(llfix_unlikely(binary_field_length>0))
1838 if(binary_field_length>binary_field_counter)
1840 reached_value_end =
false;
1842 else if(binary_field_length == binary_field_counter)
1844 binary_field_length=0;
1845 binary_field_counter=0;
1849 if(llfix_likely(reached_value_end))
1853 else if (current_char == FixConstants::FIX_DELIMITER)
1856 current_value_len = current_index - current_value_start;
1858 if (llfix_unlikely(current_value_len == 0))
1860 parser_reject_code = FixConstants::FIX_ERROR_CODE_TAG_WITHOUT_VALUE;
1863 bool is_current_tag_numeric{
true };
1864 FixSession::validate_tag_format(buffer + current_tag_start,
static_cast<std::size_t
>(current_tag_len), is_current_tag_numeric, parser_reject_code);
1866 if (llfix_likely(is_current_tag_numeric))
1870 uint32_t tag = Converters::chars_to_unsigned_int<uint32_t>(buffer + current_tag_start,
static_cast<std::size_t
>(current_tag_len));
1872 if(llfix_unlikely(tag == 0))
1874 parser_reject_code = FixConstants::FIX_ERROR_CODE_INVALID_TAG_NUMBER;
1877 FixStringView* value = fix_string_view_cache->allocate();
1878 value->set_buffer(
const_cast<char*
>(buffer + current_value_start),
static_cast<std::size_t
>(current_value_len));
1881 current_tag_index++;
1882 FixSession::validate_header_tags_order(tag, current_tag_index, parser_reject_code);
1887 if(encoded_current_message_type !=
static_cast<uint32_t
>(-1))
1889 if(llfix_likely(!in_a_repeating_group))
1891 if (llfix_unlikely(FixSession::get_repeating_group_specs().is_a_repeating_group_count_tag(encoded_current_message_type, tag)))
1893 current_rg_count_tag = tag;
1894 in_a_repeating_group =
true;
1899 if (llfix_likely(FixSession::get_repeating_group_specs().is_a_repeating_group_tag(encoded_current_message_type, current_rg_count_tag, tag) ==
false))
1901 in_a_repeating_group =
false;
1905 #ifdef LLFIX_ENABLE_BINARY_FIELDS
1906 if (llfix_unlikely(FixSession::get_binary_field_specs().is_binary_data_length_tag(encoded_current_message_type, tag)))
1908 binary_field_length = Converters::chars_to_int<int>(value->data(), value->length());
1913 if (llfix_likely(in_a_repeating_group ==
false))
1915 if (llfix_likely(incoming_fix_message->has_tag(tag) ==
false))
1917 incoming_fix_message->set_tag(tag, value);
1921 parser_reject_code = FixConstants::FIX_ERROR_CODE_TAG_APPEARS_MORE_THAN_ONCE;
1926 incoming_fix_message->set_repeating_group_tag(tag, value);
1929 if (tag == FixConstants::TAG_CHECKSUM)
1931 current_tag_10_delimiter_index = current_index;
1933 FixSession::validate_tag9_and_tag35((incoming_fix_message), current_tag_start, current_tag_35_tag_start_index, parser_reject_code);
1937 else if (tag == FixConstants::TAG_MSG_TYPE)
1939 current_tag_35_tag_start_index = current_tag_start;
1940 encoded_current_message_type = FixUtilities::pack_message_type(value->to_string_view());
1944 current_tag_start = current_index + 1;
1945 looking_for_equals =
true;
1946 #ifdef LLFIX_ENABLE_BINARY_FIELDS
1953 if (current_index >=
static_cast<int>(buffer_size) - 1)
1955 this->set_incomplete_buffer(peer_index, buffer + buffer_read_index, buffer_size - buffer_read_index);
1962 auto current_message_length = current_tag_10_delimiter_index + 1 - buffer_read_index;
1964 if (llfix_unlikely(session ==
nullptr))
1966 session = accept_session(peer_index, incoming_fix_message, buffer + buffer_read_index, current_message_length, parser_reject_code);
1968 if (session ==
nullptr)
1975 process_incoming_fix_message(session, peer_index, buffer + buffer_read_index, current_message_length, parser_reject_code);
1978 #ifndef LLFIX_UNIT_TEST
1979 if(llfix_unlikely(session->get_state() == SessionState::DISCONNECTED))
1985 buffer_read_index = current_tag_10_delimiter_index + 1;
1987 if (current_tag_10_delimiter_index ==
static_cast<int>(buffer_size) - 1)
1989 this->reset_incomplete_buffer(peer_index);
1992 else if (
static_cast<int>(buffer_read_index) > final_tag10_delimiter_index)
1994 this->set_incomplete_buffer(peer_index, buffer + buffer_read_index, buffer_size - buffer_read_index);
2000 void process_invalid_logon(std::size_t peer_index,
const std::string& log_message,
bool send_logout =
false,
const std::string& logout_reason_text =
"")
2002 if(!log_message.empty())
2005 std::string peer_ip;
2006 this->get_peer_details(peer_index, peer_ip, peer_port);
2008 LLFIX_LOG_ERROR(
"Invalid logon attempt from " + peer_ip +
" : " + log_message);
2011 process_connection_closure(peer_index, send_logout, logout_reason_text);
2014 void process_connection_closure(std::size_t peer_index,
bool send_logout =
false,
const std::string& logout_reason_text =
"")
2018 auto session = m_fix_connectors.get_session(peer_index);
2020 if (session !=
nullptr)
2022 send_logout_message(session, logout_reason_text);
2026 this->close_connection(peer_index);
2029 void process_incoming_fix_message(FixSession* session, std::size_t peer_index,
const char* buffer_message, std::size_t buffer_message_length, uint32_t parser_reject_code)
2031 session->set_last_received_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
2033 if(session->serialisation_enabled())
2034 session->get_incoming_message_serialiser()->write(buffer_message, buffer_message_length,
true);
2036 if (llfix_unlikely(session->expecting_response_for_outgoing_test_request()))
2039 session->set_expecting_response_for_outgoing_test_request(
false);
2040 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" session " + session->get_name() +
" : other end satisfied the test request");
2043 if (session->validations_enabled())
2045 uint32_t reject_message_code =
static_cast<uint32_t
>(-1);
2047 if (session->validate_fix_message(*session->get_incoming_fix_message(), buffer_message, buffer_message_length, parser_reject_code, reject_message_code) ==
false)
2049 if (reject_message_code !=
static_cast<uint32_t
>(-1))
2051 send_reject_message(session, session->get_incoming_fix_message(), reject_message_code, buffer_message, buffer_message_length, session->get_last_error_tag());
2059 auto sequence_store = session->get_sequence_store();
2060 sequence_store->increment_incoming_seq_no();
2061 auto sequence_store_incoming_seq_no = sequence_store->get_incoming_seq_no();
2063 if (m_message_persist_plugin)
2064 m_message_persist_plugin->persist_incoming_message(session->get_name(), sequence_store_incoming_seq_no, buffer_message, buffer_message_length);
2068 auto incoming_seq_no = session->get_incoming_fix_message()->get_tag_value_as<uint32_t>(FixConstants::TAG_MSG_SEQ_NUM);
2070 if (llfix_unlikely(incoming_seq_no > sequence_store_incoming_seq_no))
2072 if(FixSession::is_a_hard_sequence_reset_message(*session->get_incoming_fix_message()) ==
false)
2074 if (session->get_state() != SessionState::PENDING_LOGON)
2076 session->queue_outgoing_resend_request(sequence_store_incoming_seq_no, incoming_seq_no);
2081 else if (llfix_unlikely(incoming_seq_no < sequence_store_incoming_seq_no))
2083 if(FixSession::is_a_hard_sequence_reset_message(*session->get_incoming_fix_message()) ==
false)
2085 std::string logout_reason_text;
2086 bool send_logout =
false;
2088 if (session->get_state() == SessionState::PENDING_LOGON)
2091 logout_reason_text =
"MsgSeqNum too low, expecting " + std::to_string(sequence_store_incoming_seq_no) +
" but received " + std::to_string(incoming_seq_no);
2094 sequence_store->set_incoming_seq_no(sequence_store_incoming_seq_no - 1);
2095 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" : terminating session " + session->get_name() +
" as the incoming sequence no (" + std::to_string(incoming_seq_no) +
") is lower than expected (" + std::to_string(sequence_store_incoming_seq_no) +
")");
2096 this->process_connection_closure(peer_index, send_logout, logout_reason_text);
2101 if (llfix_unlikely(session->get_state() == SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF))
2103 if (incoming_seq_no == session->get_outgoing_resend_request_end_no())
2105 LLFIX_LOG_DEBUG(
"FixServer " + m_name +
" : other end satisfied the resend request");
2106 session->set_state(llfix::SessionState::LOGGED_ON);
2110 if(process_incoming_throttling(session, session->get_incoming_fix_message()) ==
false)
2115 auto message_type = session->get_incoming_fix_message()->get_tag_value(FixConstants::TAG_MSG_TYPE);
2117 if (llfix_likely(message_type->length() == 1))
2119 switch (message_type->data()[0])
2121 case FixConstants::MSG_TYPE_NEW_ORDER: on_new_order(session, session->get_incoming_fix_message());
break;
2122 case FixConstants::MSG_TYPE_ORDER_CANCEL: on_cancel_order(session, session->get_incoming_fix_message());
break;
2123 case FixConstants::MSG_TYPE_ORDER_CANCEL_REPLACE: on_replace_order(session, session->get_incoming_fix_message());
break;
2124 case FixConstants::MSG_TYPE_HEARTBEAT: on_client_heartbeat(session);
break;
2125 case FixConstants::MSG_TYPE_TEST_REQUEST: session->process_test_request_message(*session->get_incoming_fix_message()); on_client_test_request(session, session->get_incoming_fix_message());
break;
2126 case FixConstants::MSG_TYPE_RESEND_REQUEST: session->process_resend_request(*session->get_incoming_fix_message()); on_client_resend_request(session, session->get_incoming_fix_message());
break;
2127 case FixConstants::MSG_TYPE_REJECT: on_session_level_reject(session, session->get_incoming_fix_message());
break;
2128 case FixConstants::MSG_TYPE_BUSINESS_REJECT: on_application_level_reject(session, session->get_incoming_fix_message());
break;
2129 case FixConstants::MSG_TYPE_LOGON: process_logon_request(session, peer_index, session->get_incoming_fix_message());
break;
2130 case FixConstants::MSG_TYPE_LOGOUT: process_logout_request(session, peer_index, session->get_incoming_fix_message());
break;
2131 case FixConstants::MSG_TYPE_SEQUENCE_RESET:
if (session->process_incoming_sequence_reset_message(*session->get_incoming_fix_message()) ==
false) { send_reject_message(session, session->get_incoming_fix_message(), FixConstants::FIX_ERROR_CODE_VALUE_INCORRECT_FOR_TAG, buffer_message, buffer_message_length, FixConstants::TAG_NEW_SEQ_NO); };
break;
2133 default: on_custom_message(session, session->get_incoming_fix_message());
break;
2138 on_custom_message(session, session->get_incoming_fix_message());
2142 bool validate_logon_request(FixSession* session, std::size_t peer_index,
const IncomingFixMessage* message)
2144 LLFIX_UNUSED(session);
2146 std::string peer_ip;
2147 this->get_peer_details(peer_index, peer_ip, peer_port);
2149 if (message->has_tag(FixConstants::TAG_ENCRYPT_METHOD) ==
false)
2151 LLFIX_LOG_ERROR(
"Incoming logon message for server " + m_name +
" from " + peer_ip +
" does not have required t98(encryption method), compid : " + message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID));
2155 if (message->has_tag(FixConstants::TAG_HEART_BT_INT) ==
false)
2157 LLFIX_LOG_ERROR(
"Incoming logon message for server " + m_name +
" from " + peer_ip +
" does not have required t108(heartbeat interval), compid : " + message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID));
2161 auto t108_string = message->get_tag_value_as<std::string>(FixConstants::TAG_HEART_BT_INT);
2163 if (t108_string.find(
'-') != std::string::npos)
2165 LLFIX_LOG_ERROR(
"Incoming logon message for server " + m_name +
" from " + peer_ip +
" has invalid(negative) t108(heartbeat interval) value, compid : " + message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID));
2169 if (t108_string.find(
'.') != std::string::npos)
2171 LLFIX_LOG_ERROR(
"Incoming logon message for server " + m_name +
" from " + peer_ip +
" has invalid(decimal points) t108(heartbeat interval) value, compid : " + message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID));
2175 if (message->get_tag_value_as<uint32_t>(FixConstants::TAG_HEART_BT_INT) == 0)
2177 LLFIX_LOG_ERROR(
"Incoming logon message for server " + m_name +
" from " + peer_ip +
" has invalid(zero) t108(heartbeat interval) value, compid : " + message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID));
2184 void do_post_logon_sequence_number_check(FixSession* session)
2186 if(session->get_state() == SessionState::LOGGED_ON)
2189 auto sequence_store_incoming_seq_no = session->get_sequence_store()->get_incoming_seq_no();
2190 auto incoming_seq_no = session->get_incoming_fix_message()->get_tag_value_as<uint32_t>(FixConstants::TAG_MSG_SEQ_NUM);
2192 if (llfix_unlikely(incoming_seq_no > sequence_store_incoming_seq_no))
2194 if(FixSession::is_a_hard_sequence_reset_message(*session->get_incoming_fix_message()) ==
false)
2196 session->queue_outgoing_resend_request(sequence_store_incoming_seq_no, incoming_seq_no);
2204 void process_logout_request(FixSession* session, std::size_t peer_index,
const IncomingFixMessage* message)
2206 send_logout_message(session);
2207 session->set_state(SessionState::LOGGED_OUT);
2208 LLFIX_LOG_DEBUG(m_name +
", session logged out : " + session->get_name() +
" , peer index : " + std::to_string(peer_index));
2209 on_logout_request(session, message);
2212 FixServer(
const FixServer& other) =
delete;
2213 FixServer& operator= (
const FixServer& other) =
delete;
2214 FixServer(FixServer&& other) =
delete;
2215 FixServer& operator=(FixServer&& other) =
delete;