11 #include <string_view>
18 #include "core/compiler/hints_hot_code.h"
19 #include "core/compiler/hints_branch_predictor.h"
20 #include "core/compiler/unused.h"
22 #include "core/cpu/alignment_constants.h"
24 #include "core/os/vdso.h"
25 #include "core/os/thread_utilities.h"
26 #include "core/os/process_utilities.h"
28 #include "core/utilities/converters.h"
29 #include "core/utilities/logger.h"
30 #include "core/utilities/std_string_utilities.h"
32 #include "electronic_trading/common/message_persist_plugin.h"
34 #include "electronic_trading/managed_instance/modifying_admin_command.h"
35 #include "electronic_trading/managed_instance/managed_instance.h"
37 #include "fix_constants.h"
38 #include "incoming_fix_message.h"
39 #include "outgoing_fix_message.h"
40 #include "fix_session.h"
41 #include "fix_session_settings.h"
42 #include "fix_client_settings.h"
43 #include "fix_string.h"
44 #include "fix_string_view.h"
45 #include "fix_utilities.h"
46 #include "fix_parser_error_codes.h"
48 #include "core/utilities/tcp_connector_options.h"
50 #ifdef LLFIX_UNIT_TEST // VOLTRON_EXCLUDE
52 #endif // VOLTRON_EXCLUDE
72 template<
typename Transport>
73 class FixClient :
public Transport,
public ManagedInstance
83 m_is_exiting.store(
true);
98 [[nodiscard]]
bool create(
const std::string& client_name,
const FixClientSettings& settings,
const std::string& session_name,
const FixSessionSettings& session_settings)
100 m_settings = settings;
101 return create(client_name, session_name, session_settings);
114 [[nodiscard]]
bool create(
const std::string& client_config_file_path,
const std::string& client_name,
const std::string& session_config_file_path,
const std::string& session_name)
116 if (m_settings.load_from_config_file(client_config_file_path, client_name) ==
false)
118 LLFIX_LOG_ERROR(
"Loading settings for client " + client_name +
" failed : " + m_settings.config_load_error);
122 FixSessionSettings session_settings;
123 if (session_settings.load_from_config_file(session_config_file_path, session_name) ==
false)
125 LLFIX_LOG_ERROR(
"Loading settings for session " + session_name +
" failed : " + session_settings.config_load_error);
129 return create(client_name, session_name, session_settings);
132 std::string get_name()
const override
137 bool is_instance_ha_primary()
const override
139 return m_is_ha_primary;
149 LLFIX_UNUSED(session_name);
153 void get_session_names(std::vector<std::string>& target)
override
155 target.push_back(m_session.get_name());
158 void initialise_thread()
160 if (m_settings.cpu_core_id >= 0)
162 if (ThreadUtilities::pin_calling_thread_to_cpu_core(m_settings.cpu_core_id) == 0)
164 LLFIX_LOG_INFO(m_name +
" thread pinned to CPU core " + std::to_string(m_settings.cpu_core_id));
168 LLFIX_LOG_ERROR(m_name +
" thread pinning to CPU core " + std::to_string(m_settings.cpu_core_id) +
" failed");
181 template<
typename... Args>
184 FixSession::get_repeating_group_specs().specify_repeating_group(args...);
187 #ifdef LLFIX_ENABLE_BINARY_FIELDS
197 void specify_binary_field(
const std::string& message_type, uint32_t tag_length, uint32_t tag_data)
199 FixSession::get_binary_field_specs().specify_binary_field(message_type, tag_length, tag_data);
215 m_thread.reset(
new std::thread(&FixClient::thread_function,
this));
221 LLFIX_LOG_ERROR(m_name +
" creation failed during thread creation");
235 if(m_session.get_state() == SessionState::DISABLED)
237 LLFIX_LOG_DEBUG(
"Cannot allow connection attempt as session " + m_session.get_name() +
" is disabled.");
241 if (m_session.is_now_valid_session_datetime() ==
false)
243 LLFIX_LOG_DEBUG(
"Cannot allow connection attempt for session " + m_session.get_name() +
" due to schedule settings.");
247 m_session.set_state(SessionState::PENDING_CONNECTION);
249 m_session.reset_flags();
251 TCPConnectorOptions options;
253 options.m_rx_buffer_capacity = m_settings.rx_buffer_capacity;
254 options.m_receive_size = m_settings.receive_size;
255 options.m_send_try_count = m_settings.send_try_count;
257 options.m_async_io_timeout_nanoseconds = m_settings.async_io_timeout_nanoseconds;
258 options.m_busy_poll_microseconds = m_settings.busy_poll_microseconds;
259 options.m_spin_count = m_settings.spin_count;
261 options.m_nic_interface_name = m_settings.nic_name.c_str();
262 options.m_nic_interface_ip = m_settings.nic_address.c_str();
264 options.m_socket_tx_size = m_settings.socket_tx_size;
265 options.m_socket_rx_size = m_settings.socket_rx_size;
266 options.m_nic_ringbuffer_rx_size = m_settings.nic_ringbuffer_rx_size;
267 options.m_nic_ringbuffer_tx_size = m_settings.nic_ringbuffer_tx_size;
269 options.m_disable_nagle = m_settings.disable_nagle;
270 options.m_enable_quick_ack = m_settings.quick_ack;
272 options.m_stack = m_settings.stack;
273 options.m_connect_timeout_seconds = m_settings.connect_timeout_seconds;
275 #ifdef LLFIX_ENABLE_OPENSSL
276 options.m_use_ssl = m_settings.use_ssl;
277 options.m_ssl_verify_peer = m_settings.ssl_verify_peer;
278 options.m_ssl_ca_pem_file = m_settings.ssl_ca_pem_file;
279 options.m_ssl_cert_pem_file = m_settings.ssl_certificate_pem_file;
280 options.m_ssl_private_key_pem_file = m_settings.ssl_private_key_pem_file;
281 options.m_ssl_private_key_password = m_settings.ssl_private_key_password;
282 options.m_ssl_version = m_settings.ssl_version;
283 options.m_ssl_cipher_suite = m_settings.ssl_cipher_suite;
286 Transport::set_params(options);
288 bool success = Transport::connect(m_settings.primary_address.c_str(), m_settings.primary_port);
289 m_connected_to_primary = success;
291 if (success ==
false)
294 if (m_settings.secondary_address.length() > 0 && m_settings.secondary_port > 0)
296 LLFIX_LOG_DEBUG(m_name +
" : Primary connection failed. Trying secondary connection...");
297 success = Transport::connect(m_settings.secondary_address.c_str(), m_settings.secondary_port);
298 m_connected_to_secondary = success;
304 m_session.set_last_received_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
306 LLFIX_LOG_INFO(m_name +
" : TCP connection established on " + (m_connected_to_primary?
"primary":
"secondary") );
307 m_session.set_state(SessionState::PENDING_LOGON);
308 m_last_logon_attempt_timestamp = VDSO::nanoseconds_monotonic();
319 m_session.set_state(SessionState::DISCONNECTED);
338 auto session_state = m_session.get_state();
340 if(session_state > SessionState::DISCONNECTED)
341 this->process_incoming_messages();
343 process_admin_commands();
345 if (is_state_live(session_state))
347 process_outgoing_resend_request_if_necessary(VDSO::nanoseconds_monotonic());
348 process_outgoing_test_request_if_necessary(VDSO::nanoseconds_monotonic());
350 respond_to_resend_request_if_necessary();
351 respond_to_test_request_if_necessary();
353 send_client_heartbeat_if_necessary(VDSO::nanoseconds_monotonic());
355 process_schedule_validator();
357 else if(session_state == SessionState::PENDING_LOGON)
359 process_outgoing_logon_request(VDSO::nanoseconds_monotonic());
363 void push_admin_command(
const std::string& session_name, ModifyingAdminCommandType type, uint32_t arg = 0)
override
365 LLFIX_UNUSED(session_name);
366 auto admin_command =
new (std::nothrow) ModifyingAdminCommand;
368 if (llfix_likely(admin_command))
370 admin_command->type = type;
371 admin_command->arg = arg;
373 m_session.get_admin_commands()->push(admin_command);
377 LLFIX_LOG_ERROR(
"Failed to process setter admin command for FixClient " + m_name);
393 m_thread_grace_full_exit.store(graceful_shutdown);
394 m_is_exiting.store(
true);
398 do_shutdown(graceful_shutdown);
410 return m_session.get_outgoing_fix_message();
422 std::size_t encoded_length = 0;
423 auto int_sequence_no = m_session.get_sequence_store()->get_outgoing_seq_no() + 1;
425 message->encode(m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, int_sequence_no, encoded_length);
427 return send_bytes<true>(m_session.get_tx_encode_buffer(), encoded_length);
430 std::string get_settings_as_string(
const std::string& delimiter)
override
432 return m_settings.to_string(delimiter);
435 FixClientSettings& get_settings() {
return m_settings; }
453 virtual void run() {};
466 virtual void on_async_io_error(
int error_code,
int event_result)
override
468 LLFIX_UNUSED(error_code);
469 LLFIX_UNUSED(event_result);
472 virtual void on_socket_error(
int error_code)
override
474 LLFIX_UNUSED(error_code);
547 void on_connection_lost()
override
549 m_session.set_state(SessionState::DISCONNECTED);
550 m_connected_to_primary =
false;
551 m_connected_to_secondary =
false;
555 void on_data_ready()
override
557 std::size_t read = this->receive();
559 if (read > 0 && read <= m_settings.rx_buffer_capacity)
561 process_rx_buffer(this->get_rx_buffer(), this->get_rx_buffer_size());
564 this->receive_done();
575 m_message_persist_plugin = plugin;
578 #ifdef LLFIX_AUTOMATION
582 auto int_sequence_no = m_session.get_sequence_store()->get_outgoing_seq_no() + 1;
583 std::size_t encoded_length = 0;
585 message->encode(m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, int_sequence_no, encoded_length);
587 auto sequence_store = m_session.get_sequence_store();
588 sequence_store->increment_outgoing_seq_no();
590 if(m_session.serialisation_enabled())
591 m_session.get_outgoing_message_serialiser()->write(
reinterpret_cast<const void*
>(m_session.get_tx_encode_buffer()), encoded_length,
true, sequence_store->get_outgoing_seq_no());
593 m_session.set_last_sent_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
610 logon_request->set_msg_type(FixConstants::MSG_TYPE_LOGON);
613 logon_request->set_tag(FixConstants::TAG_ENCRYPT_METHOD, 0);
616 logon_request->set_tag(FixConstants::TAG_HEART_BT_INT,
static_cast<uint32_t
>(m_session.get_heartbeart_interval_in_nanoseconds() / 1
'000'000
'000));
618 // TAG 1137 DEFAULT APP VER ID
619 auto default_app_ver_id = m_session.get_default_app_ver_id();
621 if (default_app_ver_id.length() > 0)
623 logon_request->set_tag(FixConstants::TAG_DEFAULT_APPL_VER_ID, default_app_ver_id);
626 // TAG 141 RESET SEQ NOS
627 if(m_session.logon_reset_sequence_numbers_flag())
629 m_session.get_sequence_store()->reset_numbers();
630 logon_request->set_tag(FixConstants::TAG_RESET_SEQ_NUM_FLAG, FixConstants::FIX_BOOLEAN_TRUE);
633 // TAG 789 NEXT EXPECTED SEQ NO
634 if(m_session.logon_include_next_expected_seq_no())
636 const auto next_expected_seq_no = m_session.get_sequence_store()->get_incoming_seq_no()+1;
637 logon_request->set_tag(FixConstants::TAG_NEXT_EXPECTED_SEQ_NUM, next_expected_seq_no);
640 // TAG 553 SESSION USER NAME
641 auto session_username = m_session.get_username();
643 if (session_username.length() > 0)
645 logon_request->set_tag(FixConstants::TAG_USERNAME, session_username);
648 // TAG 554 SESSION PASSWORD
649 auto session_password = m_session.get_password();
651 if (session_password.length() > 0)
653 logon_request->set_tag(FixConstants::TAG_PASSWORD, session_password);
656 // TAG 925 NEW PASSWORD
657 if (m_session.logon_message_new_password().length() > 0)
659 logon_request->set_tag(FixConstants::TAG_NEW_PASSWORD, m_session.logon_message_new_password());
662 return send_outgoing_message(logon_request);
670 virtual bool send_logout_request()
672 m_session.build_logout_message(outgoing_message_instance());
673 return send_outgoing_message(outgoing_message_instance());
682 virtual bool send_client_heartbeat(FixString* test_request_id)
684 m_session.build_heartbeat_message(outgoing_message_instance(), test_request_id);
685 return send_outgoing_message(outgoing_message_instance());
693 virtual bool send_test_request()
695 m_session.build_test_request_message(outgoing_message_instance());
696 return send_outgoing_message(outgoing_message_instance());
704 virtual bool send_resend_request()
706 m_session.build_resend_request_message(outgoing_message_instance(), "0");
707 return send_outgoing_message(outgoing_message_instance());
716 virtual bool send_sequence_reset_message(uint32_t desired_sequence_no)
719 This message is a "hard" gap fill that doesn't respond to an incoming 35=2/resend request but to an
internal admin request
720 Therefore we can
't set 123=Y
722 auto message = outgoing_message_instance();
723 m_session.build_sequence_reset_message(message, desired_sequence_no);
725 // ENCODE , NO NEED TO INCREMENT OUTGOING SEQ NO AS WE ARE RESETTING WITH THIS MESSAGE
726 std::size_t encoded_length = 0;
727 message->encode(m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, desired_sequence_no, encoded_length);
730 return send_bytes<false>(m_session.get_tx_encode_buffer(), encoded_length); // false is for not incrementing seq no for this message
738 virtual bool send_gap_fill_message()
741 This message is a gap fill that responds to an incoming 35=2/resend request.
745 m_session.build_gap_fill_message(message);
748 std::size_t encoded_length = 0;
749 message->encode(m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, m_session.get_incoming_resend_request_begin_no(), encoded_length);
752 return send_bytes<false>(m_session.get_tx_encode_buffer(), encoded_length);
755 virtual void resend_messages_to_server(uint32_t begin_seq_no, uint32_t end_seq_no)
757 for (uint32_t i = begin_seq_no; i <= end_seq_no; i++)
759 std::size_t message_length = 0;
760 m_session.get_outgoing_message_serialiser()->read_message(i, m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, message_length);
763 message->load_from_buffer(m_session.get_tx_encode_buffer(), message_length);
765 message->template set_tag<FixMessageComponent::HEADER, bool>(FixConstants::TAG_POSS_DUP_FLAG, FixConstants::FIX_BOOLEAN_TRUE);
767 if (m_session.include_t97_during_resends())
769 message->template set_tag<FixMessageComponent::HEADER, bool>(FixConstants::TAG_POSS_RESEND, FixConstants::FIX_BOOLEAN_TRUE);
772 std::size_t encoded_length = 0;
773 message->encode(m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, i, encoded_length);
774 send_bytes<false>(m_session.get_tx_encode_buffer(), encoded_length);
778 virtual bool send_reject_message(
const IncomingFixMessage& incoming_message, uint32_t reject_reason_code,
const char* buffer_message, std::size_t buffer_message_length, uint32_t error_tag=0)
780 LLFIX_UNUSED(incoming_message);
781 char reject_reason_text[256];
783 std::size_t text_length{ 0 };
784 FixUtilities::get_reject_reason_text(reject_reason_text, text_length, reject_reason_code);
788 LLFIX_LOG_DEBUG(
"FixClient " + m_name +
" received an invalid message : " + std::string(reject_reason_text) +
" ,message : " +
FixUtilities::fix_to_human_readible(buffer_message, buffer_message_length));
792 LLFIX_LOG_DEBUG(
"FixClient " + m_name +
" received an invalid message : " + std::string(reject_reason_text) +
" ,tag : " + std::to_string(error_tag) +
" ,message : " +
FixUtilities::fix_to_human_readible(buffer_message, buffer_message_length));
795 m_session.build_session_level_reject_message(
outgoing_message_instance(), reject_reason_code, reject_reason_text, error_tag);
799 std::atomic<bool> m_is_exiting =
false;
802 LLFIX_ALIGN_DATA(AlignmentConstants::CPU_CACHE_LINE_SIZE) FixSession m_session;
805 bool m_is_ha_primary =
true;
807 static inline constexpr std::size_t MINIMUM_REQUIRED_INITIAL_BUFFER_FOR_PARSING = 4;
809 uint64_t m_last_logon_attempt_timestamp = 0;
811 bool m_connected_to_primary =
false;
812 bool m_connected_to_secondary =
false;
814 FixClientSettings m_settings;
816 MessagePersistPlugin* m_message_persist_plugin =
nullptr;
817 std::atomic<bool> m_thread_grace_full_exit =
true;
818 std::unique_ptr<std::thread> m_thread;
820 bool is_threaded()
const {
return m_thread.get() !=
nullptr;}
822 void thread_function()
828 if (m_is_exiting.load() ==
true)
830 do_shutdown(m_thread_grace_full_exit.load());
838 bool create(
const std::string& name,
const std::string& session_name,
const FixSessionSettings& session_settings)
841 m_is_ha_primary = m_settings.starts_as_primary_instance;
843 if (m_settings.validate() ==
false)
845 LLFIX_LOG_ERROR(
"FixClientSettings for " + m_name +
" validation failed : " + m_settings.validation_error);
849 session_settings.is_server =
false;
850 session_settings.tx_encode_buffer_capacity = m_settings.tx_encode_buffer_capacity;
852 if (m_session.initialise(session_name, session_settings) ==
false)
854 LLFIX_LOG_ERROR(m_name +
" creation failed during session initialisation");
858 if (m_settings.starts_as_primary_instance ==
false)
863 LLFIX_LOG_INFO(m_name +
" : Loaded client config =>\n" + m_settings.to_string());
865 LLFIX_LOG_INFO(
"FixClient " + m_name +
" creation success");
870 void process_admin_commands()
872 ModifyingAdminCommand* admin_command{
nullptr };
874 if (m_session.get_admin_commands()->try_pop(&admin_command) ==
true)
876 switch (admin_command->type)
878 case ModifyingAdminCommandType::SET_INCOMING_SEQUENCE_NUMBER:
880 m_session.get_sequence_store()->set_incoming_seq_no(admin_command->arg);
881 LLFIX_LOG_DEBUG(m_name +
" : processed set incoming sequence number admin command , new seq no : " + std::to_string(admin_command->arg));
885 case ModifyingAdminCommandType::SET_OUTGOING_SEQUENCE_NUMBER:
887 m_session.get_sequence_store()->set_outgoing_seq_no(admin_command->arg);
888 LLFIX_LOG_DEBUG(m_name +
" : processed set outgoing sequence number admin command , new seq no : " + std::to_string(admin_command->arg));
892 case ModifyingAdminCommandType::SEND_SEQUENCE_RESET:
894 if( m_session.get_state() == SessionState::LOGGED_ON )
897 LLFIX_LOG_DEBUG(m_name +
" : processed send sequence reset admin command , new seq no : " + std::to_string(admin_command->arg));
901 LLFIX_LOG_ERROR(m_name +
" : dropping send sequence reset admin command as not logged on");
907 case ModifyingAdminCommandType::DISABLE_SESSION:
910 LLFIX_LOG_DEBUG(m_name +
" : processed disable session admin command , new session state : Disabled");
914 case ModifyingAdminCommandType::ENABLE_SESSION:
919 LLFIX_LOG_DEBUG(m_name +
" : processed enable session admin command");
923 LLFIX_LOG_ERROR(m_name +
" ignoring enable session admin command since not a primary instance");
928 case ModifyingAdminCommandType::SET_IS_HA_PRIMARY_INSTANCE:
930 m_is_ha_primary = admin_command->arg == 1 ? true :
false;
932 if(admin_command->arg == 1)
935 LLFIX_LOG_DEBUG(m_name +
" : processed set is ha primary instance 1 admin command");
940 LLFIX_LOG_DEBUG(m_name +
" : processed set is ha primary instance 0 admin command");
949 delete admin_command;
953 void disable_session()
955 if(is_state_live(m_session.get_state()))
959 m_session.set_state(SessionState::DISABLED);
962 void enable_session()
964 if(m_session.get_state() == SessionState::DISABLED)
966 m_session.set_state(SessionState::DISCONNECTED);
967 if(m_settings.refresh_resend_cache_during_promotion)
968 m_session.reinitialise_outgoing_serialiser();
972 void process_schedule_validator()
974 if (m_session.is_now_valid_session_datetime() ==
false)
976 LLFIX_LOG_DEBUG(
"FixClient " + m_name +
" : terminating session due to schedule settings");
981 void do_shutdown(
bool graceful_shutdown =
true)
983 auto call_time_state = m_session.get_state();
984 LLFIX_LOG_DEBUG(m_name +
" : shutdown called. Graceful shutdown param : " + (graceful_shutdown?
"true":
"false") );
986 on_connection_lost();
988 if (graceful_shutdown)
990 if (call_time_state == SessionState::LOGGED_ON)
992 m_session.set_received_logout_response(
false);
995 m_session.set_state(SessionState::PENDING_LOGOUT);
997 auto start = std::chrono::steady_clock::now();
998 const auto timeout = std::chrono::milliseconds(m_session.settings()->logout_timeout_seconds*1000);
1002 this->process_incoming_messages();
1004 if(m_session.received_logout_response() ==
true)
1009 if (m_session.get_state() == SessionState::DISCONNECTED)
1014 auto now = std::chrono::steady_clock::now();
1015 if (now -
start >= timeout)
1017 LLFIX_LOG_DEBUG(
"FixClient " + m_name +
" shutdown timed out before receiving logout response");
1025 m_session.set_state(SessionState::DISCONNECTED);
1030 template <
bool increment_outgoing_seq_no>
1031 LLFIX_FORCE_INLINE
bool send_bytes(
const char* buffer, std::size_t buffer_size)
1033 auto sequence_store = m_session.get_sequence_store();
1035 if(llfix_likely(m_session.settings()->throttle_limit != 0))
1037 m_session.throttler()->update();
1038 m_session.throttler()->wait();
1041 auto ret = this->send(buffer, buffer_size);
1043 if constexpr (increment_outgoing_seq_no ==
true)
1046 sequence_store->increment_outgoing_seq_no();
1049 if(m_session.serialisation_enabled())
1050 m_session.get_outgoing_message_serialiser()->write(
reinterpret_cast<const void*
>(buffer), buffer_size, ret, sequence_store->get_outgoing_seq_no());
1052 if (m_message_persist_plugin)
1053 m_message_persist_plugin->persist_outgoing_message(m_session.get_name(), sequence_store->get_outgoing_seq_no(), buffer, buffer_size, ret);
1055 m_session.set_last_sent_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
1062 void process_outgoing_logon_request(uint64_t current_timestamp_nanoseconds)
1064 auto delta_nanoseconds = current_timestamp_nanoseconds - m_last_logon_attempt_timestamp;
1066 if (delta_nanoseconds >= (
static_cast<uint64_t
>(m_session.settings()->logon_timeout_seconds) * 1
'000'000
'000))
1068 m_session.set_state(SessionState::DISCONNECTED);
1072 void send_client_heartbeat_if_necessary(uint64_t current_timestamp_nanoseconds)
1074 auto delta_nanoseconds = current_timestamp_nanoseconds - m_session.last_sent_message_timestamp_nanoseconds();
1076 if (delta_nanoseconds >= (m_session.get_heartbeart_interval_in_nanoseconds() * static_cast<uint64_t>(9) / static_cast<uint64_t>(10))) // 0.9 for safety
1078 send_client_heartbeat(nullptr);
1082 void process_outgoing_test_request_if_necessary(uint64_t current_timestamp_nanoseconds)
1084 if (m_session.expecting_response_for_outgoing_test_request() == false)
1086 // SEND IF NECESSARY
1087 auto delta_nanoseconds = current_timestamp_nanoseconds - m_session.last_received_message_timestamp_nanoseconds();
1089 if (delta_nanoseconds >= (m_session.get_outgoing_test_request_interval_in_nanoseconds()))
1091 LLFIX_LOG_DEBUG("FixClient " + m_name + " : sending test request");
1092 send_test_request();
1093 m_session.set_outgoing_test_request_timestamp_nanoseconds(current_timestamp_nanoseconds);
1094 m_session.set_expecting_response_for_outgoing_test_request(true);
1099 // TERMINATE SESSION IF NECESSARY
1100 auto delta_nanoseconds = current_timestamp_nanoseconds - m_session.outgoing_test_request_timestamp_nanoseconds();
1102 if (delta_nanoseconds >= (m_session.get_heartbeart_interval_in_nanoseconds() * static_cast<uint64_t>(2)))
1104 m_session.set_expecting_response_for_outgoing_test_request(false);
1105 LLFIX_LOG_DEBUG("FixClient " + m_name + " : terminating session as the other end didn't respond to 35=1/test request
");
1111 void process_outgoing_resend_request_if_necessary(uint64_t current_timestamp_nanoseconds)
1113 if (m_session.get_state() == SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF)
1115 // TERMINATE SESSION IF NECESSARY
1116 uint64_t delta_nanoseconds = current_timestamp_nanoseconds - m_session.outgoing_resend_request_timestamp_nanoseconds();
1117 uint64_t required_delta_nanoseconds = static_cast<uint64_t>(m_session.outgoing_resend_request_expire_secs()) * 1'000'000'000;
1119 if (delta_nanoseconds >= required_delta_nanoseconds)
1121 // We need to terminate the session as the other side didn't respond properly to our outgoing resend request
1122 LLFIX_LOG_DEBUG("FixClient
" + m_name + " : terminating session as the other end didn
't respond to 35=2/resend request in pre-configured timeout (outgoing_resend_request_expire_secs) : " + std::to_string(m_session.outgoing_resend_request_expire_secs()));
1128 // SEND IF NECESSARY
1129 if (m_session.needs_to_send_resend_request())
1131 LLFIX_LOG_DEBUG("FixClient " + m_name + " : sending resend request , begin no : " + std::to_string(m_session.get_outgoing_resend_request_begin_no()));
1132 send_resend_request();
1133 m_session.set_outgoing_resend_request_timestamp_nanoseconds(current_timestamp_nanoseconds);
1134 m_session.set_state(llfix::SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF);
1135 m_session.set_needs_to_send_resend_request(false);
1140 void respond_to_test_request_if_necessary()
1142 if (m_session.needs_responding_to_incoming_test_request())
1144 if (m_session.get_incoming_test_request_id()->length() > 0)
1146 send_client_heartbeat(m_session.get_incoming_test_request_id());
1147 m_session.get_incoming_test_request_id()->set_length(0);
1151 // We should not hit here , but better than risking disconnection
1152 send_client_heartbeat(nullptr);
1155 LLFIX_LOG_DEBUG("FixClient " + m_name + " : responded to incoming test request");
1156 m_session.set_needs_responding_to_incoming_test_request(false);
1160 void respond_to_resend_request_if_necessary()
1162 if (m_session.needs_responding_to_incoming_resend_request())
1164 const auto last_outgoing_seq_no = m_session.get_sequence_store()->get_outgoing_seq_no();
1166 // Partial replays not supported therefore incoming end no should be either 0 or same as last outgoing seq no
1167 bool will_replay = (m_session.serialisation_enabled()) && (m_session.replay_messages_on_incoming_resend_request() && (m_session.get_incoming_resend_request_end_no() == 0
1168 || m_session.get_incoming_resend_request_end_no() == last_outgoing_seq_no) && m_session.get_incoming_resend_request_begin_no()<=last_outgoing_seq_no);
1170 if(last_outgoing_seq_no > m_session.get_incoming_resend_request_begin_no())
1171 if(last_outgoing_seq_no-m_session.get_incoming_resend_request_begin_no()+1 > m_session.max_resend_range())
1172 will_replay = false;
1176 for (uint32_t i = m_session.get_incoming_resend_request_begin_no(); i <= last_outgoing_seq_no; i++)
1178 if (m_session.get_outgoing_message_serialiser()->has_message_in_memory(i) == false)
1180 will_replay = false; // We don't have all the messages
1188 LLFIX_LOG_DEBUG(
"FixClient " + m_name +
" : replaying messages , begin seq no : " + std::to_string(m_session.get_incoming_resend_request_begin_no()) +
" , end seq no : " + std::to_string(last_outgoing_seq_no));
1189 resend_messages_to_server(m_session.get_incoming_resend_request_begin_no(), last_outgoing_seq_no);
1193 LLFIX_LOG_DEBUG(
"FixClient " + m_name +
" : sending gap fill message");
1194 send_gap_fill_message();
1197 m_session.set_needs_responding_to_incoming_resend_request(
false);
1198 m_session.set_state(llfix::SessionState::LOGGED_ON);
1204 #if defined(LLFIX_BENCHMARK) || defined(LLFIX_UNIT_TEST)
1209 LLFIX_HOT
void process_rx_buffer(
char* buffer, std::size_t buffer_size)
1211 std::size_t buffer_read_index = 0;
1213 if (llfix_unlikely(buffer_size < MINIMUM_REQUIRED_INITIAL_BUFFER_FOR_PARSING))
1215 this->set_incomplete_buffer(buffer, buffer_size);
1220 int final_tag10_delimiter_index{ -1 };
1221 int current_index =
static_cast<int>(buffer_size - 1);
1223 if(llfix_unlikely(FixUtilities::find_delimiter_from_end(buffer, buffer_size, current_index) ==
false))
1226 this->set_incomplete_buffer(buffer, buffer_size);
1230 FixUtilities::find_tag10_start_from_end(buffer, buffer_size, current_index, final_tag10_delimiter_index);
1232 if (final_tag10_delimiter_index == -1)
1235 this->set_incomplete_buffer(buffer, buffer_size);
1241 LLFIX_ALIGN_CODE_32;
1246 int begin_string_offset{-1};
1247 FixUtilities::find_begin_string_position(buffer+buffer_read_index, buffer_size-buffer_read_index, begin_string_offset);
1249 if(llfix_likely(begin_string_offset>=0))
1251 buffer_read_index += begin_string_offset;
1255 LLFIX_LOG_ERROR(m_name +
" received a message with no begin string : " + FixUtilities::fix_to_human_readible(buffer+buffer_read_index, buffer_size-buffer_read_index));
1259 m_session.get_incoming_fix_message()->reset();
1260 m_session.get_fix_string_view_cache()->reset_pointer();
1262 current_index =
static_cast<int>(buffer_read_index);
1263 bool looking_for_equals =
true;
1265 int current_tag_start =
static_cast<int>(buffer_read_index);
1266 int current_tag_len{ 0 };
1268 int current_value_start =
static_cast<int>(buffer_read_index);
1269 int current_value_len{ 0 };
1271 int current_tag_10_delimiter_index = -1;
1272 int current_tag_35_tag_start_index = -1;
1274 uint32_t parser_reject_code =
static_cast<uint32_t
>(-1);
1275 uint32_t current_tag_index = 0;
1277 uint32_t encoded_current_message_type =
static_cast<uint32_t
>(-1);
1279 bool in_a_repeating_group{
false };
1280 uint32_t current_rg_count_tag{ 0 };
1282 #ifdef LLFIX_ENABLE_BINARY_FIELDS
1283 int binary_field_length = 0;
1284 int binary_field_counter = 0;
1287 LLFIX_ALIGN_CODE_32;
1290 char current_char = buffer[current_index];
1292 if (looking_for_equals)
1294 if (current_char == FixConstants::FIX_EQUALS)
1296 current_tag_len = current_index - current_tag_start;
1297 current_value_start = current_index + 1;
1298 looking_for_equals =
false;
1300 else if (llfix_unlikely(current_char == FixConstants::FIX_DELIMITER))
1302 current_tag_start = current_index + 1;
1303 parser_reject_code = FixParserErrorCodes::NO_EQUALS_SIGN;
1306 #ifdef LLFIX_ENABLE_BINARY_FIELDS
1309 if(llfix_unlikely(binary_field_length>0))
1310 if(binary_field_counter < binary_field_length)
1311 binary_field_counter++;
1314 if (looking_for_equals ==
false && current_char == FixConstants::FIX_DELIMITER)
1316 bool reached_value_end =
true;
1318 if(llfix_unlikely(binary_field_length>0))
1320 if(binary_field_length>binary_field_counter)
1322 reached_value_end =
false;
1324 else if(binary_field_length == binary_field_counter)
1326 binary_field_length=0;
1327 binary_field_counter=0;
1331 if(llfix_likely(reached_value_end))
1335 else if (current_char == FixConstants::FIX_DELIMITER)
1338 current_value_len = current_index - current_value_start;
1340 if(llfix_unlikely(current_value_len==0))
1342 parser_reject_code = FixConstants::FIX_ERROR_CODE_TAG_WITHOUT_VALUE;
1345 bool is_current_tag_numeric{
true};
1346 FixSession::validate_tag_format(buffer+current_tag_start,
static_cast<std::size_t
>(current_tag_len), is_current_tag_numeric, parser_reject_code);
1348 if(llfix_likely(is_current_tag_numeric))
1352 uint32_t tag = Converters::chars_to_unsigned_int<uint32_t>(buffer+ current_tag_start,
static_cast<std::size_t
>(current_tag_len));
1354 if(llfix_unlikely(tag == 0))
1356 parser_reject_code = FixConstants::FIX_ERROR_CODE_INVALID_TAG_NUMBER;
1359 FixStringView* value = m_session.get_fix_string_view_cache()->allocate();
1360 value->set_buffer(
const_cast<char*
>(buffer + current_value_start),
static_cast<std::size_t
>(current_value_len));
1363 current_tag_index++;
1364 FixSession::validate_header_tags_order(tag, current_tag_index, parser_reject_code);
1369 if (encoded_current_message_type !=
static_cast<uint32_t
>(-1))
1371 if (llfix_likely(!in_a_repeating_group))
1373 if (llfix_unlikely(FixSession::get_repeating_group_specs().is_a_repeating_group_count_tag(encoded_current_message_type, tag)))
1375 current_rg_count_tag = tag;
1376 in_a_repeating_group =
true;
1381 if (llfix_likely(FixSession::get_repeating_group_specs().is_a_repeating_group_tag(encoded_current_message_type, current_rg_count_tag, tag) ==
false))
1383 in_a_repeating_group =
false;
1387 #ifdef LLFIX_ENABLE_BINARY_FIELDS
1388 if (llfix_unlikely(FixSession::get_binary_field_specs().is_binary_data_length_tag(encoded_current_message_type, tag)))
1390 binary_field_length = Converters::chars_to_int<int>(value->data(), value->length());
1395 if (llfix_likely(in_a_repeating_group ==
false))
1397 if(llfix_likely(m_session.get_incoming_fix_message()->has_tag(tag) ==
false))
1399 m_session.get_incoming_fix_message()->set_tag(tag, value);
1403 parser_reject_code = FixConstants::FIX_ERROR_CODE_TAG_APPEARS_MORE_THAN_ONCE;
1408 m_session.get_incoming_fix_message()->set_repeating_group_tag(tag, value);
1411 if (tag == FixConstants::TAG_CHECKSUM)
1413 current_tag_10_delimiter_index = current_index;
1415 FixSession::validate_tag9_and_tag35(m_session.get_incoming_fix_message(), current_tag_start, current_tag_35_tag_start_index, parser_reject_code);
1419 else if (tag == FixConstants::TAG_MSG_TYPE)
1421 current_tag_35_tag_start_index = current_tag_start;
1422 encoded_current_message_type = FixUtilities::pack_message_type(value->to_string_view());
1426 current_tag_start = current_index + 1;
1427 looking_for_equals =
true;
1429 #ifdef LLFIX_ENABLE_BINARY_FIELDS
1436 if (current_index >=
static_cast<int>(buffer_size) - 1)
1438 this->set_incomplete_buffer(buffer + buffer_read_index, buffer_size - buffer_read_index);
1445 auto current_message_length = current_tag_10_delimiter_index+1-buffer_read_index;
1446 process_incoming_fix_message(m_session.get_incoming_fix_message(), buffer+ buffer_read_index, current_message_length, parser_reject_code);
1448 #ifndef LLFIX_UNIT_TEST
1449 if(llfix_unlikely(m_session.get_state() == SessionState::DISCONNECTED))
1455 buffer_read_index = current_tag_10_delimiter_index + 1;
1457 if (current_tag_10_delimiter_index ==
static_cast<int>(buffer_size) - 1)
1459 this->reset_incomplete_buffer();
1462 else if (
static_cast<int>(buffer_read_index)> final_tag10_delimiter_index)
1464 #ifdef LLFIX_UNIT_TEST
1465 this->m_incomplete_buffer =
static_cast<char*
>(malloc(65536));
1467 this->set_incomplete_buffer(buffer + buffer_read_index, buffer_size - buffer_read_index);
1473 void process_incoming_fix_message(IncomingFixMessage* incoming_message,
const char* buffer_message, std::size_t buffer_message_length, uint32_t parser_reject_code)
1475 m_session.set_last_received_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
1477 if(m_session.serialisation_enabled())
1478 m_session.get_incoming_message_serialiser()->write(buffer_message, buffer_message_length,
true);
1480 if(llfix_unlikely(m_session.expecting_response_for_outgoing_test_request()))
1483 m_session.set_expecting_response_for_outgoing_test_request(
false);
1484 LLFIX_LOG_DEBUG(m_name +
" : other end satisfied the test request");
1487 if(m_session.validations_enabled())
1489 uint32_t reject_message_code =
static_cast<uint32_t
>(-1);
1491 if (m_session.validate_fix_message(*m_session.get_incoming_fix_message(), buffer_message, buffer_message_length, parser_reject_code, reject_message_code) ==
false)
1493 if (reject_message_code !=
static_cast<uint32_t
>(-1))
1495 send_reject_message(*m_session.get_incoming_fix_message(), reject_message_code, buffer_message, buffer_message_length, m_session.get_last_error_tag());
1503 auto sequence_store = m_session.get_sequence_store();
1504 sequence_store->increment_incoming_seq_no();
1505 auto sequence_store_incoming_seq_no = sequence_store->get_incoming_seq_no();
1507 if (m_message_persist_plugin)
1508 m_message_persist_plugin->persist_incoming_message(m_session.get_name(), sequence_store_incoming_seq_no, buffer_message, buffer_message_length);
1512 auto incoming_seq_no = incoming_message->get_tag_value_as<uint32_t>(FixConstants::TAG_MSG_SEQ_NUM);
1514 if (llfix_unlikely(incoming_seq_no > sequence_store_incoming_seq_no))
1516 if(FixSession::is_a_hard_sequence_reset_message(*incoming_message) ==
false)
1518 m_session.queue_outgoing_resend_request(sequence_store_incoming_seq_no, incoming_seq_no);
1522 else if (llfix_unlikely(incoming_seq_no < sequence_store_incoming_seq_no))
1524 if(FixSession::is_a_hard_sequence_reset_message(*incoming_message) ==
false)
1526 sequence_store->set_incoming_seq_no(sequence_store_incoming_seq_no-1);
1527 LLFIX_LOG_DEBUG(
"FixClient " + m_name +
" : terminating session as the incoming sequence no (" + std::to_string(incoming_seq_no) +
") is lower than expected ("+ std::to_string(sequence_store_incoming_seq_no) +
")");
1533 if(llfix_unlikely(m_session.get_state() == SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF))
1535 if(incoming_seq_no == m_session.get_outgoing_resend_request_end_no())
1537 m_session.set_state(llfix::SessionState::LOGGED_ON);
1538 LLFIX_LOG_DEBUG(m_name +
" : other end satisfied the resend request");
1542 auto message_type = incoming_message->get_tag_value(FixConstants::TAG_MSG_TYPE);
1544 if (llfix_likely(message_type->length() == 1))
1546 switch (message_type->data()[0])
1548 case FixConstants::MSG_TYPE_EXECUTION_REPORT: on_execution_report(incoming_message);
break;
1549 case FixConstants::MSG_TYPE_HEARTBEAT: on_server_heartbeat();
break;
1550 case FixConstants::MSG_TYPE_TEST_REQUEST: m_session.process_test_request_message(*incoming_message); on_server_test_request(incoming_message);
break;
1551 case FixConstants::MSG_TYPE_RESEND_REQUEST: m_session.process_resend_request(*incoming_message); on_server_resend_request(incoming_message);
break;
1552 case FixConstants::MSG_TYPE_ORDER_CANCEL_REJECT: on_order_cancel_replace_reject(incoming_message);
break;
1553 case FixConstants::MSG_TYPE_REJECT: process_session_level_reject(incoming_message);
break;
1554 case FixConstants::MSG_TYPE_BUSINESS_REJECT: process_application_level_reject(incoming_message);
break;
1555 case FixConstants::MSG_TYPE_LOGON: process_logon_response(incoming_message);
break;
1556 case FixConstants::MSG_TYPE_LOGOUT: process_logout_response(incoming_message);
break;
1557 case FixConstants::MSG_TYPE_SEQUENCE_RESET:
if (m_session.process_incoming_sequence_reset_message(*incoming_message) ==
false) { send_reject_message(*incoming_message, FixConstants::FIX_ERROR_CODE_VALUE_INCORRECT_FOR_TAG, buffer_message, buffer_message_length, FixConstants::TAG_NEW_SEQ_NO); };
break;
1559 default: on_custom_message_type(incoming_message);
break;
1564 on_custom_message_type(incoming_message);
1568 void process_session_level_reject(
const IncomingFixMessage* message)
1570 if (m_session.get_state() == SessionState::PENDING_LOGON)
1572 process_logon_reject(message);
1576 on_session_level_reject(message);
1580 void process_application_level_reject(
const IncomingFixMessage* message)
1582 if (m_session.get_state() == SessionState::PENDING_LOGON)
1584 process_logon_reject(message);
1588 on_application_level_reject(message);
1592 void process_logon_response(
const IncomingFixMessage* logon_response)
1594 m_session.set_state(SessionState::LOGGED_ON);
1595 this->on_logon_response(logon_response);
1598 void process_logout_response(
const IncomingFixMessage* logout_response)
1600 auto state = m_session.get_state();
1602 if (state == SessionState::PENDING_LOGON)
1604 process_logon_reject(logout_response);
1608 m_session.set_received_logout_response(
true);
1609 m_session.set_state(SessionState::LOGGED_OUT);
1610 this->on_logout_response(logout_response);
1614 void process_logon_reject(
const IncomingFixMessage* logon_reject)
1616 m_session.set_state(SessionState::LOGON_REJECTED);
1618 if (logon_reject->has_tag(FixConstants::TAG_TEXT))
1620 [[maybe_unused]]
auto reject_message = logon_reject->get_tag_value(FixConstants::TAG_TEXT);
1621 LLFIX_LOG_DEBUG(
"FixClient " + m_name +
" logon message rejected : " + reject_message->to_string());
1625 LLFIX_LOG_DEBUG(
"FixClient " + m_name +
" logon message rejected with no reason text/tag58.");
1628 on_logon_reject(logon_reject);
1631 FixClient(
const FixClient& other) =
delete;
1632 FixClient& operator= (
const FixClient& other) =
delete;
1633 FixClient(FixClient&& other) =
delete;
1634 FixClient& operator=(FixClient&& other) =
delete;