33 #include <string_view>
40 #include "core/compiler/hints_hot_code.h"
41 #include "core/compiler/hints_branch_predictor.h"
42 #include "core/compiler/unused.h"
44 #include "core/cpu/alignment_constants.h"
46 #include "core/os/vdso.h"
47 #include "core/os/thread_utilities.h"
48 #include "core/os/process_utilities.h"
50 #include "core/utilities/converters.h"
51 #include "core/utilities/logger.h"
52 #include "core/utilities/std_string_utilities.h"
54 #include "electronic_trading/common/message_persist_plugin.h"
56 #include "electronic_trading/managed_instance/modifying_admin_command.h"
57 #include "electronic_trading/managed_instance/managed_instance.h"
59 #include "fix_constants.h"
60 #include "incoming_fix_message.h"
61 #include "outgoing_fix_message.h"
62 #include "fix_session.h"
63 #include "fix_session_settings.h"
64 #include "fix_client_settings.h"
65 #include "fix_string.h"
66 #include "fix_string_view.h"
67 #include "fix_utilities.h"
68 #include "fix_parser_error_codes.h"
70 #include "core/utilities/tcp_connector_options.h"
72 #ifdef LLFIX_UNIT_TEST // VOLTRON_EXCLUDE
74 #endif // VOLTRON_EXCLUDE
94 template<
typename Transport>
95 class FixClient :
public Transport,
public ManagedInstance
105 m_is_exiting.store(
true);
120 [[nodiscard]]
bool create(
const std::string& client_name,
const FixClientSettings& settings,
const std::string& session_name,
const FixSessionSettings& session_settings)
122 m_settings = settings;
123 return create(client_name, session_name, session_settings);
136 [[nodiscard]]
bool create(
const std::string& client_config_file_path,
const std::string& client_name,
const std::string& session_config_file_path,
const std::string& session_name)
138 if (m_settings.load_from_config_file(client_config_file_path, client_name) ==
false)
140 LLFIX_LOG_ERROR(
"Loading settings for client " + client_name +
" failed : " + m_settings.config_load_error);
144 FixSessionSettings session_settings;
145 if (session_settings.load_from_config_file(session_config_file_path, session_name) ==
false)
147 LLFIX_LOG_ERROR(
"Loading settings for session " + session_name +
" failed : " + session_settings.config_load_error);
151 return create(client_name, session_name, session_settings);
154 std::string get_name()
const override
159 bool is_instance_ha_primary()
const override
161 return m_is_ha_primary;
171 LLFIX_UNUSED(session_name);
175 void get_session_names(std::vector<std::string>& target)
override
177 target.push_back(m_session.get_name());
180 void initialise_thread()
182 if (m_settings.cpu_core_id >= 0)
184 if (ThreadUtilities::pin_calling_thread_to_cpu_core(m_settings.cpu_core_id) == 0)
186 LLFIX_LOG_INFO(m_name +
" thread pinned to CPU core " + std::to_string(m_settings.cpu_core_id));
190 LLFIX_LOG_ERROR(m_name +
" thread pinning to CPU core " + std::to_string(m_settings.cpu_core_id) +
" failed");
203 template<
typename... Args>
206 FixSession::get_repeating_group_specs().specify_repeating_group(args...);
209 #ifdef LLFIX_ENABLE_BINARY_FIELDS
219 void specify_binary_field(
const std::string& message_type, uint32_t tag_length, uint32_t tag_data)
221 FixSession::get_binary_field_specs().specify_binary_field(message_type, tag_length, tag_data);
237 m_thread.reset(
new std::thread(&FixClient::thread_function,
this));
243 LLFIX_LOG_ERROR(m_name +
" creation failed during thread creation");
257 if(m_session.get_state() == SessionState::DISABLED)
259 LLFIX_LOG_DEBUG(
"Cannot allow connection attempt as session " + m_session.get_name() +
" is disabled.");
263 if (m_session.is_now_valid_session_datetime() ==
false)
265 LLFIX_LOG_DEBUG(
"Cannot allow connection attempt for session " + m_session.get_name() +
" due to schedule settings.");
269 m_session.set_state(SessionState::PENDING_CONNECTION);
271 m_session.reset_flags();
273 TCPConnectorOptions options;
275 options.m_rx_buffer_capacity = m_settings.rx_buffer_capacity;
276 options.m_receive_size = m_settings.receive_size;
277 options.m_send_try_count = m_settings.send_try_count;
279 options.m_async_io_timeout_nanoseconds = m_settings.async_io_timeout_nanoseconds;
280 options.m_busy_poll_microseconds = m_settings.busy_poll_microseconds;
281 options.m_spin_count = m_settings.spin_count;
283 options.m_nic_interface_name = m_settings.nic_name.c_str();
284 options.m_nic_interface_ip = m_settings.nic_address.c_str();
286 options.m_socket_tx_size = m_settings.socket_tx_size;
287 options.m_socket_rx_size = m_settings.socket_rx_size;
288 options.m_nic_ringbuffer_rx_size = m_settings.nic_ringbuffer_rx_size;
289 options.m_nic_ringbuffer_tx_size = m_settings.nic_ringbuffer_tx_size;
291 options.m_disable_nagle = m_settings.disable_nagle;
292 options.m_enable_quick_ack = m_settings.quick_ack;
294 options.m_stack = m_settings.stack;
295 options.m_connect_timeout_seconds = m_settings.connect_timeout_seconds;
297 #ifdef LLFIX_ENABLE_OPENSSL
298 options.m_use_ssl = m_settings.use_ssl;
299 options.m_ssl_verify_peer = m_settings.ssl_verify_peer;
300 options.m_ssl_ca_pem_file = m_settings.ssl_ca_pem_file;
301 options.m_ssl_cert_pem_file = m_settings.ssl_certificate_pem_file;
302 options.m_ssl_private_key_pem_file = m_settings.ssl_private_key_pem_file;
303 options.m_ssl_private_key_password = m_settings.ssl_private_key_password;
304 options.m_ssl_version = m_settings.ssl_version;
305 options.m_ssl_cipher_suite = m_settings.ssl_cipher_suite;
308 Transport::set_params(options);
310 bool success = Transport::connect(m_settings.primary_address.c_str(), m_settings.primary_port);
311 m_connected_to_primary = success;
313 if (success ==
false)
316 if (m_settings.secondary_address.length() > 0 && m_settings.secondary_port > 0)
318 LLFIX_LOG_DEBUG(m_name +
" : Primary connection failed. Trying secondary connection...");
319 success = Transport::connect(m_settings.secondary_address.c_str(), m_settings.secondary_port);
320 m_connected_to_secondary = success;
326 m_session.set_last_received_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
328 LLFIX_LOG_INFO(m_name +
" : TCP connection established on " + (m_connected_to_primary?
"primary":
"secondary") );
329 m_session.set_state(SessionState::PENDING_LOGON);
330 m_last_logon_attempt_timestamp = VDSO::nanoseconds_monotonic();
341 m_session.set_state(SessionState::DISCONNECTED);
360 auto session_state = m_session.get_state();
362 if(session_state > SessionState::DISCONNECTED)
363 this->process_incoming_messages();
365 process_admin_commands();
367 if (is_state_live(session_state))
369 process_outgoing_resend_request_if_necessary(VDSO::nanoseconds_monotonic());
370 process_outgoing_test_request_if_necessary(VDSO::nanoseconds_monotonic());
372 respond_to_resend_request_if_necessary();
373 respond_to_test_request_if_necessary();
375 send_client_heartbeat_if_necessary(VDSO::nanoseconds_monotonic());
377 process_schedule_validator();
379 else if(session_state == SessionState::PENDING_LOGON)
381 process_outgoing_logon_request(VDSO::nanoseconds_monotonic());
385 void push_admin_command(
const std::string& session_name, ModifyingAdminCommandType type, uint32_t arg = 0)
override
387 LLFIX_UNUSED(session_name);
388 auto admin_command =
new (std::nothrow) ModifyingAdminCommand;
390 if (llfix_likely(admin_command))
392 admin_command->type = type;
393 admin_command->arg = arg;
395 m_session.get_admin_commands()->push(admin_command);
399 LLFIX_LOG_ERROR(
"Failed to process setter admin command for FixClient " + m_name);
415 m_thread_grace_full_exit.store(graceful_shutdown);
416 m_is_exiting.store(
true);
420 do_shutdown(graceful_shutdown);
432 return m_session.get_outgoing_fix_message();
444 std::size_t encoded_length = 0;
445 auto int_sequence_no = m_session.get_sequence_store()->get_outgoing_seq_no() + 1;
447 message->encode(m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, int_sequence_no, encoded_length);
449 return send_bytes<true>(m_session.get_tx_encode_buffer(), encoded_length);
452 std::string get_settings_as_string(
const std::string& delimiter)
override
454 return m_settings.to_string(delimiter);
457 FixClientSettings& get_settings() {
return m_settings; }
475 virtual void run() {};
488 virtual void on_async_io_error(
int error_code,
int event_result)
override
490 LLFIX_UNUSED(error_code);
491 LLFIX_UNUSED(event_result);
494 virtual void on_socket_error(
int error_code)
override
496 LLFIX_UNUSED(error_code);
569 void on_connection_lost()
override
571 m_session.set_state(SessionState::DISCONNECTED);
572 m_connected_to_primary =
false;
573 m_connected_to_secondary =
false;
577 void on_data_ready()
override
579 std::size_t read = this->receive();
581 if (read > 0 && read <= m_settings.rx_buffer_capacity)
583 process_rx_buffer(this->get_rx_buffer(), this->get_rx_buffer_size());
586 this->receive_done();
597 m_message_persist_plugin = plugin;
600 #ifdef LLFIX_AUTOMATION
604 auto int_sequence_no = m_session.get_sequence_store()->get_outgoing_seq_no() + 1;
605 std::size_t encoded_length = 0;
607 message->encode(m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, int_sequence_no, encoded_length);
609 auto sequence_store = m_session.get_sequence_store();
610 sequence_store->increment_outgoing_seq_no();
612 if(m_session.serialisation_enabled())
613 m_session.get_outgoing_message_serialiser()->write(
reinterpret_cast<const void*
>(m_session.get_tx_encode_buffer()), encoded_length,
true, sequence_store->get_outgoing_seq_no());
615 m_session.set_last_sent_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
632 logon_request->set_msg_type(FixConstants::MSG_TYPE_LOGON);
635 logon_request->set_tag(FixConstants::TAG_ENCRYPT_METHOD, 0);
638 logon_request->set_tag(FixConstants::TAG_HEART_BT_INT,
static_cast<uint32_t
>(m_session.get_heartbeart_interval_in_nanoseconds() / 1
'000'000
'000));
640 // TAG 1137 DEFAULT APP VER ID
641 auto default_app_ver_id = m_session.get_default_app_ver_id();
643 if (default_app_ver_id.length() > 0)
645 logon_request->set_tag(FixConstants::TAG_DEFAULT_APPL_VER_ID, default_app_ver_id);
648 // TAG 141 RESET SEQ NOS
649 if(m_session.logon_reset_sequence_numbers_flag())
651 m_session.get_sequence_store()->reset_numbers();
652 logon_request->set_tag(FixConstants::TAG_RESET_SEQ_NUM_FLAG, FixConstants::FIX_BOOLEAN_TRUE);
655 // TAG 789 NEXT EXPECTED SEQ NO
656 if(m_session.logon_include_next_expected_seq_no())
658 const auto next_expected_seq_no = m_session.get_sequence_store()->get_incoming_seq_no()+1;
659 logon_request->set_tag(FixConstants::TAG_NEXT_EXPECTED_SEQ_NUM, next_expected_seq_no);
662 // TAG 553 SESSION USER NAME
663 auto session_username = m_session.get_username();
665 if (session_username.length() > 0)
667 logon_request->set_tag(FixConstants::TAG_USERNAME, session_username);
670 // TAG 554 SESSION PASSWORD
671 auto session_password = m_session.get_password();
673 if (session_password.length() > 0)
675 logon_request->set_tag(FixConstants::TAG_PASSWORD, session_password);
678 // TAG 925 NEW PASSWORD
679 if (m_session.logon_message_new_password().length() > 0)
681 logon_request->set_tag(FixConstants::TAG_NEW_PASSWORD, m_session.logon_message_new_password());
684 return send_outgoing_message(logon_request);
692 virtual bool send_logout_request()
694 m_session.build_logout_message(outgoing_message_instance());
695 return send_outgoing_message(outgoing_message_instance());
704 virtual bool send_client_heartbeat(FixString* test_request_id)
706 m_session.build_heartbeat_message(outgoing_message_instance(), test_request_id);
707 return send_outgoing_message(outgoing_message_instance());
715 virtual bool send_test_request()
717 m_session.build_test_request_message(outgoing_message_instance());
718 return send_outgoing_message(outgoing_message_instance());
726 virtual bool send_resend_request()
728 m_session.build_resend_request_message(outgoing_message_instance(), "0");
729 return send_outgoing_message(outgoing_message_instance());
738 virtual bool send_sequence_reset_message(uint32_t desired_sequence_no)
741 This message is a "hard" gap fill that doesn't respond to an incoming 35=2/resend request but to an
internal admin request
742 Therefore we can
't set 123=Y
744 auto message = outgoing_message_instance();
745 m_session.build_sequence_reset_message(message, desired_sequence_no);
747 // ENCODE , NO NEED TO INCREMENT OUTGOING SEQ NO AS WE ARE RESETTING WITH THIS MESSAGE
748 std::size_t encoded_length = 0;
749 message->encode(m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, desired_sequence_no, encoded_length);
752 return send_bytes<false>(m_session.get_tx_encode_buffer(), encoded_length); // false is for not incrementing seq no for this message
760 virtual bool send_gap_fill_message()
763 This message is a gap fill that responds to an incoming 35=2/resend request.
767 m_session.build_gap_fill_message(message);
770 std::size_t encoded_length = 0;
771 message->encode(m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, m_session.get_incoming_resend_request_begin_no(), encoded_length);
774 return send_bytes<false>(m_session.get_tx_encode_buffer(), encoded_length);
777 virtual void resend_messages_to_server(uint32_t begin_seq_no, uint32_t end_seq_no)
779 for (uint32_t i = begin_seq_no; i <= end_seq_no; i++)
781 std::size_t message_length = 0;
782 m_session.get_outgoing_message_serialiser()->read_message(i, m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, message_length);
785 message->load_from_buffer(m_session.get_tx_encode_buffer(), message_length);
787 message->template set_tag<FixMessageComponent::HEADER, bool>(FixConstants::TAG_POSS_DUP_FLAG, FixConstants::FIX_BOOLEAN_TRUE);
789 if (m_session.include_t97_during_resends())
791 message->template set_tag<FixMessageComponent::HEADER, bool>(FixConstants::TAG_POSS_RESEND, FixConstants::FIX_BOOLEAN_TRUE);
794 std::size_t encoded_length = 0;
795 message->encode(m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, i, encoded_length);
796 send_bytes<false>(m_session.get_tx_encode_buffer(), encoded_length);
800 virtual bool send_reject_message(
const IncomingFixMessage& incoming_message, uint32_t reject_reason_code,
const char* buffer_message, std::size_t buffer_message_length, uint32_t error_tag=0)
802 LLFIX_UNUSED(incoming_message);
803 char reject_reason_text[256];
805 std::size_t text_length{ 0 };
806 FixUtilities::get_reject_reason_text(reject_reason_text, text_length, reject_reason_code);
810 LLFIX_LOG_DEBUG(
"FixClient " + m_name +
" received an invalid message : " + std::string(reject_reason_text) +
" ,message : " +
FixUtilities::fix_to_human_readible(buffer_message, buffer_message_length));
814 LLFIX_LOG_DEBUG(
"FixClient " + m_name +
" received an invalid message : " + std::string(reject_reason_text) +
" ,tag : " + std::to_string(error_tag) +
" ,message : " +
FixUtilities::fix_to_human_readible(buffer_message, buffer_message_length));
817 m_session.build_session_level_reject_message(
outgoing_message_instance(), reject_reason_code, reject_reason_text, error_tag);
821 std::atomic<bool> m_is_exiting =
false;
824 LLFIX_ALIGN_DATA(AlignmentConstants::CPU_CACHE_LINE_SIZE) FixSession m_session;
827 bool m_is_ha_primary =
true;
829 static inline constexpr std::size_t MINIMUM_REQUIRED_INITIAL_BUFFER_FOR_PARSING = 4;
831 uint64_t m_last_logon_attempt_timestamp = 0;
833 bool m_connected_to_primary =
false;
834 bool m_connected_to_secondary =
false;
836 FixClientSettings m_settings;
838 MessagePersistPlugin* m_message_persist_plugin =
nullptr;
839 std::atomic<bool> m_thread_grace_full_exit =
true;
840 std::unique_ptr<std::thread> m_thread;
842 bool is_threaded()
const {
return m_thread.get() !=
nullptr;}
844 void thread_function()
850 if (m_is_exiting.load() ==
true)
852 do_shutdown(m_thread_grace_full_exit.load());
860 bool create(
const std::string& name,
const std::string& session_name,
const FixSessionSettings& session_settings)
863 m_is_ha_primary = m_settings.starts_as_primary_instance;
865 if (m_settings.validate() ==
false)
867 LLFIX_LOG_ERROR(
"FixClientSettings for " + m_name +
" validation failed : " + m_settings.validation_error);
871 session_settings.is_server =
false;
872 session_settings.tx_encode_buffer_capacity = m_settings.tx_encode_buffer_capacity;
874 if (m_session.initialise(session_name, session_settings) ==
false)
876 LLFIX_LOG_ERROR(m_name +
" creation failed during session initialisation");
880 if (m_settings.starts_as_primary_instance ==
false)
885 LLFIX_LOG_INFO(m_name +
" : Loaded client config =>\n" + m_settings.to_string());
887 LLFIX_LOG_INFO(
"FixClient " + m_name +
" creation success");
892 void process_admin_commands()
894 ModifyingAdminCommand* admin_command{
nullptr };
896 if (m_session.get_admin_commands()->try_pop(&admin_command) ==
true)
898 switch (admin_command->type)
900 case ModifyingAdminCommandType::SET_INCOMING_SEQUENCE_NUMBER:
902 m_session.get_sequence_store()->set_incoming_seq_no(admin_command->arg);
903 LLFIX_LOG_DEBUG(m_name +
" : processed set incoming sequence number admin command , new seq no : " + std::to_string(admin_command->arg));
907 case ModifyingAdminCommandType::SET_OUTGOING_SEQUENCE_NUMBER:
909 m_session.get_sequence_store()->set_outgoing_seq_no(admin_command->arg);
910 LLFIX_LOG_DEBUG(m_name +
" : processed set outgoing sequence number admin command , new seq no : " + std::to_string(admin_command->arg));
914 case ModifyingAdminCommandType::SEND_SEQUENCE_RESET:
916 if( m_session.get_state() == SessionState::LOGGED_ON )
919 LLFIX_LOG_DEBUG(m_name +
" : processed send sequence reset admin command , new seq no : " + std::to_string(admin_command->arg));
923 LLFIX_LOG_ERROR(m_name +
" : dropping send sequence reset admin command as not logged on");
929 case ModifyingAdminCommandType::DISABLE_SESSION:
932 LLFIX_LOG_DEBUG(m_name +
" : processed disable session admin command , new session state : Disabled");
936 case ModifyingAdminCommandType::ENABLE_SESSION:
941 LLFIX_LOG_DEBUG(m_name +
" : processed enable session admin command");
945 LLFIX_LOG_ERROR(m_name +
" ignoring enable session admin command since not a primary instance");
950 case ModifyingAdminCommandType::SET_IS_HA_PRIMARY_INSTANCE:
952 m_is_ha_primary = admin_command->arg == 1 ? true :
false;
954 if(admin_command->arg == 1)
957 LLFIX_LOG_DEBUG(m_name +
" : processed set is ha primary instance 1 admin command");
962 LLFIX_LOG_DEBUG(m_name +
" : processed set is ha primary instance 0 admin command");
971 delete admin_command;
975 void disable_session()
977 if(is_state_live(m_session.get_state()))
981 m_session.set_state(SessionState::DISABLED);
984 void enable_session()
986 if(m_session.get_state() == SessionState::DISABLED)
988 m_session.set_state(SessionState::DISCONNECTED);
989 if(m_settings.refresh_resend_cache_during_promotion)
990 m_session.reinitialise_outgoing_serialiser();
994 void process_schedule_validator()
996 if (m_session.is_now_valid_session_datetime() ==
false)
998 LLFIX_LOG_DEBUG(
"FixClient " + m_name +
" : terminating session due to schedule settings");
1003 void do_shutdown(
bool graceful_shutdown =
true)
1005 auto call_time_state = m_session.get_state();
1006 LLFIX_LOG_DEBUG(m_name +
" : shutdown called. Graceful shutdown param : " + (graceful_shutdown?
"true":
"false") );
1008 on_connection_lost();
1010 if (graceful_shutdown)
1012 if (call_time_state == SessionState::LOGGED_ON)
1014 m_session.set_received_logout_response(
false);
1017 m_session.set_state(SessionState::PENDING_LOGOUT);
1019 auto start = std::chrono::steady_clock::now();
1020 const auto timeout = std::chrono::milliseconds(m_session.settings()->logout_timeout_seconds*1000);
1024 this->process_incoming_messages();
1026 if(m_session.received_logout_response() ==
true)
1031 if (m_session.get_state() == SessionState::DISCONNECTED)
1036 auto now = std::chrono::steady_clock::now();
1037 if (now -
start >= timeout)
1039 LLFIX_LOG_DEBUG(
"FixClient " + m_name +
" shutdown timed out before receiving logout response");
1047 m_session.set_state(SessionState::DISCONNECTED);
1052 template <
bool increment_outgoing_seq_no>
1053 LLFIX_FORCE_INLINE
bool send_bytes(
const char* buffer, std::size_t buffer_size)
1055 auto sequence_store = m_session.get_sequence_store();
1057 if(llfix_likely(m_session.settings()->throttle_limit != 0))
1059 m_session.throttler()->update();
1060 m_session.throttler()->wait();
1063 auto ret = this->send(buffer, buffer_size);
1065 if constexpr (increment_outgoing_seq_no ==
true)
1068 sequence_store->increment_outgoing_seq_no();
1071 if(m_session.serialisation_enabled())
1072 m_session.get_outgoing_message_serialiser()->write(
reinterpret_cast<const void*
>(buffer), buffer_size, ret, sequence_store->get_outgoing_seq_no());
1074 if (m_message_persist_plugin)
1075 m_message_persist_plugin->persist_outgoing_message(m_session.get_name(), sequence_store->get_outgoing_seq_no(), buffer, buffer_size, ret);
1077 m_session.set_last_sent_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
1084 void process_outgoing_logon_request(uint64_t current_timestamp_nanoseconds)
1086 auto delta_nanoseconds = current_timestamp_nanoseconds - m_last_logon_attempt_timestamp;
1088 if (delta_nanoseconds >= (
static_cast<uint64_t
>(m_session.settings()->logon_timeout_seconds) * 1
'000'000
'000))
1090 m_session.set_state(SessionState::DISCONNECTED);
1094 void send_client_heartbeat_if_necessary(uint64_t current_timestamp_nanoseconds)
1096 auto delta_nanoseconds = current_timestamp_nanoseconds - m_session.last_sent_message_timestamp_nanoseconds();
1098 if (delta_nanoseconds >= (m_session.get_heartbeart_interval_in_nanoseconds() * static_cast<uint64_t>(9) / static_cast<uint64_t>(10))) // 0.9 for safety
1100 send_client_heartbeat(nullptr);
1104 void process_outgoing_test_request_if_necessary(uint64_t current_timestamp_nanoseconds)
1106 if (m_session.expecting_response_for_outgoing_test_request() == false)
1108 // SEND IF NECESSARY
1109 auto delta_nanoseconds = current_timestamp_nanoseconds - m_session.last_received_message_timestamp_nanoseconds();
1111 if (delta_nanoseconds >= (m_session.get_outgoing_test_request_interval_in_nanoseconds()))
1113 LLFIX_LOG_DEBUG("FixClient " + m_name + " : sending test request");
1114 send_test_request();
1115 m_session.set_outgoing_test_request_timestamp_nanoseconds(current_timestamp_nanoseconds);
1116 m_session.set_expecting_response_for_outgoing_test_request(true);
1121 // TERMINATE SESSION IF NECESSARY
1122 auto delta_nanoseconds = current_timestamp_nanoseconds - m_session.outgoing_test_request_timestamp_nanoseconds();
1124 if (delta_nanoseconds >= (m_session.get_heartbeart_interval_in_nanoseconds() * static_cast<uint64_t>(2)))
1126 m_session.set_expecting_response_for_outgoing_test_request(false);
1127 LLFIX_LOG_DEBUG("FixClient " + m_name + " : terminating session as the other end didn't respond to 35=1/test request
");
1133 void process_outgoing_resend_request_if_necessary(uint64_t current_timestamp_nanoseconds)
1135 if (m_session.get_state() == SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF)
1137 // TERMINATE SESSION IF NECESSARY
1138 uint64_t delta_nanoseconds = current_timestamp_nanoseconds - m_session.outgoing_resend_request_timestamp_nanoseconds();
1139 uint64_t required_delta_nanoseconds = static_cast<uint64_t>(m_session.outgoing_resend_request_expire_secs()) * 1'000'000'000;
1141 if (delta_nanoseconds >= required_delta_nanoseconds)
1143 // We need to terminate the session as the other side didn't respond properly to our outgoing resend request
1144 LLFIX_LOG_DEBUG("FixClient
" + m_name + " : terminating session as the other end didn
't respond to 35=2/resend request in pre-configured timeout (outgoing_resend_request_expire_secs) : " + std::to_string(m_session.outgoing_resend_request_expire_secs()));
1150 // SEND IF NECESSARY
1151 if (m_session.needs_to_send_resend_request())
1153 LLFIX_LOG_DEBUG("FixClient " + m_name + " : sending resend request , begin no : " + std::to_string(m_session.get_outgoing_resend_request_begin_no()));
1154 send_resend_request();
1155 m_session.set_outgoing_resend_request_timestamp_nanoseconds(current_timestamp_nanoseconds);
1156 m_session.set_state(llfix::SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF);
1157 m_session.set_needs_to_send_resend_request(false);
1162 void respond_to_test_request_if_necessary()
1164 if (m_session.needs_responding_to_incoming_test_request())
1166 if (m_session.get_incoming_test_request_id()->length() > 0)
1168 send_client_heartbeat(m_session.get_incoming_test_request_id());
1169 m_session.get_incoming_test_request_id()->set_length(0);
1173 // We should not hit here , but better than risking disconnection
1174 send_client_heartbeat(nullptr);
1177 LLFIX_LOG_DEBUG("FixClient " + m_name + " : responded to incoming test request");
1178 m_session.set_needs_responding_to_incoming_test_request(false);
1182 void respond_to_resend_request_if_necessary()
1184 if (m_session.needs_responding_to_incoming_resend_request())
1186 const auto last_outgoing_seq_no = m_session.get_sequence_store()->get_outgoing_seq_no();
1188 // Partial replays not supported therefore incoming end no should be either 0 or same as last outgoing seq no
1189 bool will_replay = (m_session.serialisation_enabled()) && (m_session.replay_messages_on_incoming_resend_request() && (m_session.get_incoming_resend_request_end_no() == 0
1190 || m_session.get_incoming_resend_request_end_no() == last_outgoing_seq_no) && m_session.get_incoming_resend_request_begin_no()<=last_outgoing_seq_no);
1192 if(last_outgoing_seq_no > m_session.get_incoming_resend_request_begin_no())
1193 if(last_outgoing_seq_no-m_session.get_incoming_resend_request_begin_no()+1 > m_session.max_resend_range())
1194 will_replay = false;
1198 for (uint32_t i = m_session.get_incoming_resend_request_begin_no(); i <= last_outgoing_seq_no; i++)
1200 if (m_session.get_outgoing_message_serialiser()->has_message_in_memory(i) == false)
1202 will_replay = false; // We don't have all the messages
1210 LLFIX_LOG_DEBUG(
"FixClient " + m_name +
" : replaying messages , begin seq no : " + std::to_string(m_session.get_incoming_resend_request_begin_no()) +
" , end seq no : " + std::to_string(last_outgoing_seq_no));
1211 resend_messages_to_server(m_session.get_incoming_resend_request_begin_no(), last_outgoing_seq_no);
1215 LLFIX_LOG_DEBUG(
"FixClient " + m_name +
" : sending gap fill message");
1216 send_gap_fill_message();
1219 m_session.set_needs_responding_to_incoming_resend_request(
false);
1220 m_session.set_state(llfix::SessionState::LOGGED_ON);
1226 #if defined(LLFIX_BENCHMARK) || defined(LLFIX_UNIT_TEST)
1231 LLFIX_HOT
void process_rx_buffer(
char* buffer, std::size_t buffer_size)
1233 std::size_t buffer_read_index = 0;
1235 if (llfix_unlikely(buffer_size < MINIMUM_REQUIRED_INITIAL_BUFFER_FOR_PARSING))
1237 this->set_incomplete_buffer(buffer, buffer_size);
1242 int final_tag10_delimiter_index{ -1 };
1243 int current_index =
static_cast<int>(buffer_size - 1);
1245 if(llfix_unlikely(FixUtilities::find_delimiter_from_end(buffer, buffer_size, current_index) ==
false))
1248 this->set_incomplete_buffer(buffer, buffer_size);
1252 FixUtilities::find_tag10_start_from_end(buffer, buffer_size, current_index, final_tag10_delimiter_index);
1254 if (final_tag10_delimiter_index == -1)
1257 this->set_incomplete_buffer(buffer, buffer_size);
1263 LLFIX_ALIGN_CODE_32;
1268 int begin_string_offset{-1};
1269 FixUtilities::find_begin_string_position(buffer+buffer_read_index, buffer_size-buffer_read_index, begin_string_offset);
1271 if(llfix_likely(begin_string_offset>=0))
1273 buffer_read_index += begin_string_offset;
1277 LLFIX_LOG_ERROR(m_name +
" received a message with no begin string : " + FixUtilities::fix_to_human_readible(buffer+buffer_read_index, buffer_size-buffer_read_index));
1281 m_session.get_incoming_fix_message()->reset();
1282 m_session.get_fix_string_view_cache()->reset_pointer();
1284 current_index =
static_cast<int>(buffer_read_index);
1285 bool looking_for_equals =
true;
1287 int current_tag_start =
static_cast<int>(buffer_read_index);
1288 int current_tag_len{ 0 };
1290 int current_value_start =
static_cast<int>(buffer_read_index);
1291 int current_value_len{ 0 };
1293 int current_tag_10_delimiter_index = -1;
1294 int current_tag_35_tag_start_index = -1;
1296 uint32_t parser_reject_code =
static_cast<uint32_t
>(-1);
1297 uint32_t current_tag_index = 0;
1299 uint32_t encoded_current_message_type =
static_cast<uint32_t
>(-1);
1301 bool in_a_repeating_group{
false };
1302 uint32_t current_rg_count_tag{ 0 };
1304 #ifdef LLFIX_ENABLE_BINARY_FIELDS
1305 int binary_field_length = 0;
1306 int binary_field_counter = 0;
1309 LLFIX_ALIGN_CODE_32;
1312 char current_char = buffer[current_index];
1314 if (looking_for_equals)
1316 if (current_char == FixConstants::FIX_EQUALS)
1318 current_tag_len = current_index - current_tag_start;
1319 current_value_start = current_index + 1;
1320 looking_for_equals =
false;
1322 else if (llfix_unlikely(current_char == FixConstants::FIX_DELIMITER))
1324 current_tag_start = current_index + 1;
1325 parser_reject_code = FixParserErrorCodes::NO_EQUALS_SIGN;
1328 #ifdef LLFIX_ENABLE_BINARY_FIELDS
1331 if(llfix_unlikely(binary_field_length>0))
1332 if(binary_field_counter < binary_field_length)
1333 binary_field_counter++;
1336 if (looking_for_equals ==
false && current_char == FixConstants::FIX_DELIMITER)
1338 bool reached_value_end =
true;
1340 if(llfix_unlikely(binary_field_length>0))
1342 if(binary_field_length>binary_field_counter)
1344 reached_value_end =
false;
1346 else if(binary_field_length == binary_field_counter)
1348 binary_field_length=0;
1349 binary_field_counter=0;
1353 if(llfix_likely(reached_value_end))
1357 else if (current_char == FixConstants::FIX_DELIMITER)
1360 current_value_len = current_index - current_value_start;
1362 if(llfix_unlikely(current_value_len==0))
1364 parser_reject_code = FixConstants::FIX_ERROR_CODE_TAG_WITHOUT_VALUE;
1367 bool is_current_tag_numeric{
true};
1368 FixSession::validate_tag_format(buffer+current_tag_start,
static_cast<std::size_t
>(current_tag_len), is_current_tag_numeric, parser_reject_code);
1370 if(llfix_likely(is_current_tag_numeric))
1374 uint32_t tag = Converters::chars_to_unsigned_int<uint32_t>(buffer+ current_tag_start,
static_cast<std::size_t
>(current_tag_len));
1376 if(llfix_unlikely(tag == 0))
1378 parser_reject_code = FixConstants::FIX_ERROR_CODE_INVALID_TAG_NUMBER;
1381 FixStringView* value = m_session.get_fix_string_view_cache()->allocate();
1382 value->set_buffer(
const_cast<char*
>(buffer + current_value_start),
static_cast<std::size_t
>(current_value_len));
1385 current_tag_index++;
1386 FixSession::validate_header_tags_order(tag, current_tag_index, parser_reject_code);
1391 if (encoded_current_message_type !=
static_cast<uint32_t
>(-1))
1393 if (llfix_likely(!in_a_repeating_group))
1395 if (llfix_unlikely(FixSession::get_repeating_group_specs().is_a_repeating_group_count_tag(encoded_current_message_type, tag)))
1397 current_rg_count_tag = tag;
1398 in_a_repeating_group =
true;
1403 if (llfix_likely(FixSession::get_repeating_group_specs().is_a_repeating_group_tag(encoded_current_message_type, current_rg_count_tag, tag) ==
false))
1405 in_a_repeating_group =
false;
1409 #ifdef LLFIX_ENABLE_BINARY_FIELDS
1410 if (llfix_unlikely(FixSession::get_binary_field_specs().is_binary_data_length_tag(encoded_current_message_type, tag)))
1412 binary_field_length = Converters::chars_to_int<int>(value->data(), value->length());
1417 if (llfix_likely(in_a_repeating_group ==
false))
1419 if(llfix_likely(m_session.get_incoming_fix_message()->has_tag(tag) ==
false))
1421 m_session.get_incoming_fix_message()->set_tag(tag, value);
1425 parser_reject_code = FixConstants::FIX_ERROR_CODE_TAG_APPEARS_MORE_THAN_ONCE;
1430 m_session.get_incoming_fix_message()->set_repeating_group_tag(tag, value);
1433 if (tag == FixConstants::TAG_CHECKSUM)
1435 current_tag_10_delimiter_index = current_index;
1437 FixSession::validate_tag9_and_tag35(m_session.get_incoming_fix_message(), current_tag_start, current_tag_35_tag_start_index, parser_reject_code);
1441 else if (tag == FixConstants::TAG_MSG_TYPE)
1443 current_tag_35_tag_start_index = current_tag_start;
1444 encoded_current_message_type = FixUtilities::pack_message_type(value->to_string_view());
1448 current_tag_start = current_index + 1;
1449 looking_for_equals =
true;
1451 #ifdef LLFIX_ENABLE_BINARY_FIELDS
1458 if (current_index >=
static_cast<int>(buffer_size) - 1)
1460 this->set_incomplete_buffer(buffer + buffer_read_index, buffer_size - buffer_read_index);
1467 auto current_message_length = current_tag_10_delimiter_index+1-buffer_read_index;
1468 process_incoming_fix_message(m_session.get_incoming_fix_message(), buffer+ buffer_read_index, current_message_length, parser_reject_code);
1470 #ifndef LLFIX_UNIT_TEST
1471 if(llfix_unlikely(m_session.get_state() == SessionState::DISCONNECTED))
1477 buffer_read_index = current_tag_10_delimiter_index + 1;
1479 if (current_tag_10_delimiter_index ==
static_cast<int>(buffer_size) - 1)
1481 this->reset_incomplete_buffer();
1484 else if (
static_cast<int>(buffer_read_index)> final_tag10_delimiter_index)
1486 #ifdef LLFIX_UNIT_TEST
1487 this->m_incomplete_buffer =
static_cast<char*
>(malloc(65536));
1489 this->set_incomplete_buffer(buffer + buffer_read_index, buffer_size - buffer_read_index);
1495 void process_incoming_fix_message(IncomingFixMessage* incoming_message,
const char* buffer_message, std::size_t buffer_message_length, uint32_t parser_reject_code)
1497 m_session.set_last_received_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
1499 if(m_session.serialisation_enabled())
1500 m_session.get_incoming_message_serialiser()->write(buffer_message, buffer_message_length,
true);
1502 if(llfix_unlikely(m_session.expecting_response_for_outgoing_test_request()))
1505 m_session.set_expecting_response_for_outgoing_test_request(
false);
1506 LLFIX_LOG_DEBUG(m_name +
" : other end satisfied the test request");
1509 if(m_session.validations_enabled())
1511 uint32_t reject_message_code =
static_cast<uint32_t
>(-1);
1513 if (m_session.validate_fix_message(*m_session.get_incoming_fix_message(), buffer_message, buffer_message_length, parser_reject_code, reject_message_code) ==
false)
1515 if (reject_message_code !=
static_cast<uint32_t
>(-1))
1517 send_reject_message(*m_session.get_incoming_fix_message(), reject_message_code, buffer_message, buffer_message_length, m_session.get_last_error_tag());
1525 auto sequence_store = m_session.get_sequence_store();
1526 sequence_store->increment_incoming_seq_no();
1527 auto sequence_store_incoming_seq_no = sequence_store->get_incoming_seq_no();
1529 if (m_message_persist_plugin)
1530 m_message_persist_plugin->persist_incoming_message(m_session.get_name(), sequence_store_incoming_seq_no, buffer_message, buffer_message_length);
1534 auto incoming_seq_no = incoming_message->get_tag_value_as<uint32_t>(FixConstants::TAG_MSG_SEQ_NUM);
1536 if (llfix_unlikely(incoming_seq_no > sequence_store_incoming_seq_no))
1538 if(FixSession::is_a_hard_sequence_reset_message(*incoming_message) ==
false)
1540 m_session.queue_outgoing_resend_request(sequence_store_incoming_seq_no, incoming_seq_no);
1544 else if (llfix_unlikely(incoming_seq_no < sequence_store_incoming_seq_no))
1546 if(FixSession::is_a_hard_sequence_reset_message(*incoming_message) ==
false)
1548 sequence_store->set_incoming_seq_no(sequence_store_incoming_seq_no-1);
1549 LLFIX_LOG_DEBUG(
"FixClient " + m_name +
" : terminating session as the incoming sequence no (" + std::to_string(incoming_seq_no) +
") is lower than expected ("+ std::to_string(sequence_store_incoming_seq_no) +
")");
1555 if(llfix_unlikely(m_session.get_state() == SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF))
1557 if(incoming_seq_no == m_session.get_outgoing_resend_request_end_no())
1559 m_session.set_state(llfix::SessionState::LOGGED_ON);
1560 LLFIX_LOG_DEBUG(m_name +
" : other end satisfied the resend request");
1564 auto message_type = incoming_message->get_tag_value(FixConstants::TAG_MSG_TYPE);
1566 if (llfix_likely(message_type->length() == 1))
1568 switch (message_type->data()[0])
1570 case FixConstants::MSG_TYPE_EXECUTION_REPORT: on_execution_report(incoming_message);
break;
1571 case FixConstants::MSG_TYPE_HEARTBEAT: on_server_heartbeat();
break;
1572 case FixConstants::MSG_TYPE_TEST_REQUEST: m_session.process_test_request_message(*incoming_message); on_server_test_request(incoming_message);
break;
1573 case FixConstants::MSG_TYPE_RESEND_REQUEST: m_session.process_resend_request(*incoming_message); on_server_resend_request(incoming_message);
break;
1574 case FixConstants::MSG_TYPE_ORDER_CANCEL_REJECT: on_order_cancel_replace_reject(incoming_message);
break;
1575 case FixConstants::MSG_TYPE_REJECT: process_session_level_reject(incoming_message);
break;
1576 case FixConstants::MSG_TYPE_BUSINESS_REJECT: process_application_level_reject(incoming_message);
break;
1577 case FixConstants::MSG_TYPE_LOGON: process_logon_response(incoming_message);
break;
1578 case FixConstants::MSG_TYPE_LOGOUT: process_logout_response(incoming_message);
break;
1579 case FixConstants::MSG_TYPE_SEQUENCE_RESET:
if (m_session.process_incoming_sequence_reset_message(*incoming_message) ==
false) { send_reject_message(*incoming_message, FixConstants::FIX_ERROR_CODE_VALUE_INCORRECT_FOR_TAG, buffer_message, buffer_message_length, FixConstants::TAG_NEW_SEQ_NO); };
break;
1581 default: on_custom_message_type(incoming_message);
break;
1586 on_custom_message_type(incoming_message);
1590 void process_session_level_reject(
const IncomingFixMessage* message)
1592 if (m_session.get_state() == SessionState::PENDING_LOGON)
1594 process_logon_reject(message);
1598 on_session_level_reject(message);
1602 void process_application_level_reject(
const IncomingFixMessage* message)
1604 if (m_session.get_state() == SessionState::PENDING_LOGON)
1606 process_logon_reject(message);
1610 on_application_level_reject(message);
1614 void process_logon_response(
const IncomingFixMessage* logon_response)
1616 m_session.set_state(SessionState::LOGGED_ON);
1617 this->on_logon_response(logon_response);
1620 void process_logout_response(
const IncomingFixMessage* logout_response)
1622 auto state = m_session.get_state();
1624 if (state == SessionState::PENDING_LOGON)
1626 process_logon_reject(logout_response);
1630 m_session.set_received_logout_response(
true);
1631 m_session.set_state(SessionState::LOGGED_OUT);
1632 this->on_logout_response(logout_response);
1636 void process_logon_reject(
const IncomingFixMessage* logon_reject)
1638 m_session.set_state(SessionState::LOGON_REJECTED);
1640 if (logon_reject->has_tag(FixConstants::TAG_TEXT))
1642 [[maybe_unused]]
auto reject_message = logon_reject->get_tag_value(FixConstants::TAG_TEXT);
1643 LLFIX_LOG_DEBUG(
"FixClient " + m_name +
" logon message rejected : " + reject_message->to_string());
1647 LLFIX_LOG_DEBUG(
"FixClient " + m_name +
" logon message rejected with no reason text/tag58.");
1650 on_logon_reject(logon_reject);
1653 FixClient(
const FixClient& other) =
delete;
1654 FixClient& operator= (
const FixClient& other) =
delete;
1655 FixClient(FixClient&& other) =
delete;
1656 FixClient& operator=(FixClient&& other) =
delete;