llfix
Low-latency FIX engine
fix_client.h
1 // DISCLAIMER_PLACEHOLDER
2 #pragma once
3 
4 #include "common.h"
5 
6 #include <atomic>
7 #include <cassert>
8 #include <cstdint>
9 #include <cstddef>
10 #include <string>
11 #include <string_view>
12 #include <memory>
13 #include <vector>
14 #include <thread>
15 #include <chrono>
16 #include <new>
17 
18 #include "core/compiler/hints_hot_code.h"
19 #include "core/compiler/hints_branch_predictor.h"
20 #include "core/compiler/unused.h"
21 
22 #include "core/cpu/alignment_constants.h"
23 
24 #include "core/os/vdso.h"
25 #include "core/os/thread_utilities.h"
26 #include "core/os/process_utilities.h"
27 
28 #include "core/utilities/converters.h"
29 #include "core/utilities/logger.h"
30 #include "core/utilities/std_string_utilities.h"
31 
32 #include "electronic_trading/common/message_persist_plugin.h"
33 
34 #include "electronic_trading/managed_instance/modifying_admin_command.h"
35 #include "electronic_trading/managed_instance/managed_instance.h"
36 
37 #include "fix_constants.h"
38 #include "incoming_fix_message.h"
39 #include "outgoing_fix_message.h"
40 #include "fix_session.h"
41 #include "fix_session_settings.h"
42 #include "fix_client_settings.h"
43 #include "fix_string.h"
44 #include "fix_string_view.h"
45 #include "fix_utilities.h"
46 #include "fix_parser_error_codes.h"
47 
48 #include "core/utilities/tcp_connector_options.h"
49 
50 #ifdef LLFIX_UNIT_TEST // VOLTRON_EXCLUDE
51 #include <cstdlib>
52 #endif // VOLTRON_EXCLUDE
53 
54 namespace llfix
55 {
56 
72 template<typename Transport>
73 class FixClient : public Transport, public ManagedInstance
74 {
75  public:
76 
77  FixClient() = default;
78 
79  virtual ~FixClient()
80  {
81  if (is_threaded())
82  {
83  m_is_exiting.store(true);
84  m_thread->join();
85  }
86  }
87 
98  [[nodiscard]] bool create(const std::string& client_name, const FixClientSettings& settings, const std::string& session_name, const FixSessionSettings& session_settings)
99  {
100  m_settings = settings;
101  return create(client_name, session_name, session_settings);
102  }
103 
114  [[nodiscard]] bool create(const std::string& client_config_file_path, const std::string& client_name, const std::string& session_config_file_path, const std::string& session_name)
115  {
116  if (m_settings.load_from_config_file(client_config_file_path, client_name) == false)
117  {
118  LLFIX_LOG_ERROR("Loading settings for client " + client_name + " failed : " + m_settings.config_load_error);
119  return false;
120  }
121 
122  FixSessionSettings session_settings;
123  if (session_settings.load_from_config_file(session_config_file_path, session_name) == false)
124  {
125  LLFIX_LOG_ERROR("Loading settings for session " + session_name + " failed : " + session_settings.config_load_error);
126  return false;
127  }
128 
129  return create(client_name, session_name, session_settings);
130  }
131 
132  std::string get_name() const override
133  {
134  return m_name;
135  }
136 
137  bool is_instance_ha_primary() const override
138  {
139  return m_is_ha_primary;
140  }
141 
147  FixSession* get_session(const std::string& session_name="") override
148  {
149  LLFIX_UNUSED(session_name);
150  return &m_session;
151  }
152 
153  void get_session_names(std::vector<std::string>& target) override
154  {
155  target.push_back(m_session.get_name());
156  }
157 
158  void initialise_thread()
159  {
160  if (m_settings.cpu_core_id >= 0)
161  {
162  if (ThreadUtilities::pin_calling_thread_to_cpu_core(m_settings.cpu_core_id) == 0)
163  {
164  LLFIX_LOG_INFO(m_name + " thread pinned to CPU core " + std::to_string(m_settings.cpu_core_id));
165  }
166  else
167  {
168  LLFIX_LOG_ERROR(m_name + " thread pinning to CPU core " + std::to_string(m_settings.cpu_core_id) + " failed");
169  }
170  }
171  }
172 
181  template<typename... Args>
182  void specify_repeating_group(Args... args)
183  {
184  FixSession::get_repeating_group_specs().specify_repeating_group(args...);
185  }
186 
187  #ifdef LLFIX_ENABLE_BINARY_FIELDS
188 
197  void specify_binary_field(const std::string& message_type, uint32_t tag_length, uint32_t tag_data)
198  {
199  FixSession::get_binary_field_specs().specify_binary_field(message_type, tag_length, tag_data);
200  }
201  #endif
202 
211  [[nodiscard]] bool start()
212  {
213  try
214  {
215  m_thread.reset(new std::thread(&FixClient::thread_function, this));
216  return true;
217  }
218  catch(...)
219  {}
220 
221  LLFIX_LOG_ERROR(m_name + " creation failed during thread creation");
222  return false;
223  }
224 
233  [[nodiscard]] bool connect()
234  {
235  if(m_session.get_state() == SessionState::DISABLED)
236  {
237  LLFIX_LOG_DEBUG("Cannot allow connection attempt as session " + m_session.get_name() + " is disabled.");
238  return false;
239  }
240 
241  if (m_session.is_now_valid_session_datetime() == false)
242  {
243  LLFIX_LOG_DEBUG("Cannot allow connection attempt for session " + m_session.get_name() + " due to schedule settings.");
244  return false;
245  }
246 
247  m_session.set_state(SessionState::PENDING_CONNECTION);
248 
249  m_session.reset_flags();
250 
251  TCPConnectorOptions options;
252 
253  options.m_rx_buffer_capacity = m_settings.rx_buffer_capacity;
254  options.m_receive_size = m_settings.receive_size;
255  options.m_send_try_count = m_settings.send_try_count;
256 
257  options.m_async_io_timeout_nanoseconds = m_settings.async_io_timeout_nanoseconds;
258  options.m_busy_poll_microseconds = m_settings.busy_poll_microseconds;
259  options.m_spin_count = m_settings.spin_count;
260 
261  options.m_nic_interface_name = m_settings.nic_name.c_str();
262  options.m_nic_interface_ip = m_settings.nic_address.c_str();
263 
264  options.m_socket_tx_size = m_settings.socket_tx_size;
265  options.m_socket_rx_size = m_settings.socket_rx_size;
266  options.m_nic_ringbuffer_rx_size = m_settings.nic_ringbuffer_rx_size;
267  options.m_nic_ringbuffer_tx_size = m_settings.nic_ringbuffer_tx_size;
268 
269  options.m_disable_nagle = m_settings.disable_nagle;
270  options.m_enable_quick_ack = m_settings.quick_ack;
271 
272  options.m_stack = m_settings.stack;
273  options.m_connect_timeout_seconds = m_settings.connect_timeout_seconds;
274 
275  #ifdef LLFIX_ENABLE_OPENSSL
276  options.m_use_ssl = m_settings.use_ssl;
277  options.m_ssl_verify_peer = m_settings.ssl_verify_peer;
278  options.m_ssl_ca_pem_file = m_settings.ssl_ca_pem_file;
279  options.m_ssl_cert_pem_file = m_settings.ssl_certificate_pem_file;
280  options.m_ssl_private_key_pem_file = m_settings.ssl_private_key_pem_file;
281  options.m_ssl_private_key_password = m_settings.ssl_private_key_password;
282  options.m_ssl_version = m_settings.ssl_version;
283  options.m_ssl_cipher_suite = m_settings.ssl_cipher_suite;
284  #endif
285 
286  Transport::set_params(options);
287 
288  bool success = Transport::connect(m_settings.primary_address.c_str(), m_settings.primary_port);
289  m_connected_to_primary = success;
290 
291  if (success == false)
292  {
293  // TRY SECONDARY CONNNECTION IF SET
294  if (m_settings.secondary_address.length() > 0 && m_settings.secondary_port > 0)
295  {
296  LLFIX_LOG_DEBUG(m_name + " : Primary connection failed. Trying secondary connection...");
297  success = Transport::connect(m_settings.secondary_address.c_str(), m_settings.secondary_port);
298  m_connected_to_secondary = success;
299  }
300  }
301 
302  if (success == true)
303  {
304  m_session.set_last_received_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic()); // This is to avoid sending unnecessary test request
305 
306  LLFIX_LOG_INFO(m_name + " : TCP connection established on " + (m_connected_to_primary?"primary":"secondary") );
307  m_session.set_state(SessionState::PENDING_LOGON);
308  m_last_logon_attempt_timestamp = VDSO::nanoseconds_monotonic();
309 
310  on_connection();
311 
312  if (send_logon_request() == false)
313  {
314  success = false;
315  }
316  }
317  else
318  {
319  m_session.set_state(SessionState::DISCONNECTED);
320  }
321 
322  return success;
323  }
324 
336  void process()
337  {
338  auto session_state = m_session.get_state();
339 
340  if(session_state > SessionState::DISCONNECTED) // In case of BSD sockets, calls on select may hang on some Linux distros if not connected
341  this->process_incoming_messages();
342 
343  process_admin_commands();
344 
345  if (is_state_live(session_state))
346  {
347  process_outgoing_resend_request_if_necessary(VDSO::nanoseconds_monotonic());
348  process_outgoing_test_request_if_necessary(VDSO::nanoseconds_monotonic());
349 
350  respond_to_resend_request_if_necessary();
351  respond_to_test_request_if_necessary();
352 
353  send_client_heartbeat_if_necessary(VDSO::nanoseconds_monotonic());
354 
355  process_schedule_validator();
356  }
357  else if(session_state == SessionState::PENDING_LOGON)
358  {
359  process_outgoing_logon_request(VDSO::nanoseconds_monotonic());
360  }
361  }
362 
363  void push_admin_command(const std::string& session_name, ModifyingAdminCommandType type, uint32_t arg = 0) override
364  {
365  LLFIX_UNUSED(session_name);
366  auto admin_command = new (std::nothrow) ModifyingAdminCommand;
367 
368  if (llfix_likely(admin_command))
369  {
370  admin_command->type = type;
371  admin_command->arg = arg;
372 
373  m_session.get_admin_commands()->push(admin_command);
374  }
375  else
376  {
377  LLFIX_LOG_ERROR("Failed to process setter admin command for FixClient " + m_name);
378  }
379  }
380 
389  void shutdown(bool graceful_shutdown = true)
390  {
391  if(is_threaded())
392  {
393  m_thread_grace_full_exit.store(graceful_shutdown);
394  m_is_exiting.store(true);
395  return;
396  }
397 
398  do_shutdown(graceful_shutdown);
399  }
400 
409  {
410  return m_session.get_outgoing_fix_message();
411  }
412 
421  {
422  std::size_t encoded_length = 0;
423  auto int_sequence_no = m_session.get_sequence_store()->get_outgoing_seq_no() + 1;
424 
425  message->encode(m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, int_sequence_no, encoded_length);
426 
427  return send_bytes<true>(m_session.get_tx_encode_buffer(), encoded_length);
428  }
429 
430  std::string get_settings_as_string(const std::string& delimiter) override
431  {
432  return m_settings.to_string(delimiter);
433  }
434 
435  FixClientSettings& get_settings() { return m_settings; }
436 
442  bool connected_to_primary() const { return m_connected_to_primary; }
443 
449  bool connected_to_secondary() const { return m_connected_to_secondary; }
451 
452  // Run method in case this class is maintaining the client thread
453  virtual void run() {};
454 
455  // TCP connection
459  virtual void on_connection() {};
460 
464  virtual void on_disconnection() {};
465 
466  virtual void on_async_io_error(int error_code, int event_result) override
467  {
468  LLFIX_UNUSED(error_code);
469  LLFIX_UNUSED(event_result);
470  }
471 
472  virtual void on_socket_error(int error_code) override
473  {
474  LLFIX_UNUSED(error_code);
475  }
476  // Logon messages
482  virtual void on_logon_response(const IncomingFixMessage* message) {LLFIX_UNUSED(message);};
488  virtual void on_logout_response(const IncomingFixMessage* message) {LLFIX_UNUSED(message);};
494  virtual void on_logon_reject(const IncomingFixMessage* message) {LLFIX_UNUSED(message);};
495  // Session/admin messages
499  virtual void on_server_heartbeat() {};
505  virtual void on_server_test_request(const IncomingFixMessage* message) {LLFIX_UNUSED(message);};
511  virtual void on_server_resend_request(const IncomingFixMessage* message) {LLFIX_UNUSED(message);};
512  // Application messages
518  virtual void on_execution_report(const IncomingFixMessage* message) {LLFIX_UNUSED(message);};
524  virtual void on_order_cancel_replace_reject(const IncomingFixMessage* message) {LLFIX_UNUSED(message);};
530  virtual void on_session_level_reject(const IncomingFixMessage* message) {LLFIX_UNUSED(message);};
536  virtual void on_application_level_reject(const IncomingFixMessage* message) {LLFIX_UNUSED(message);};
542  virtual void on_custom_message_type(const IncomingFixMessage* message) {LLFIX_UNUSED(message);};
544 
545  public:
546 
547  void on_connection_lost() override
548  {
549  m_session.set_state(SessionState::DISCONNECTED);
550  m_connected_to_primary = false;
551  m_connected_to_secondary = false;
553  }
554 
555  void on_data_ready() override
556  {
557  std::size_t read = this->receive();
558 
559  if (read > 0 && read <= m_settings.rx_buffer_capacity)
560  {
561  process_rx_buffer(this->get_rx_buffer(), this->get_rx_buffer_size());
562  }
563 
564  this->receive_done();
565  }
566 
573  {
574  assert(plugin);
575  m_message_persist_plugin = plugin;
576  }
577 
578  #ifdef LLFIX_AUTOMATION
579  // Used for testing outgoing msg resends and gap fills
580  bool send_fake_outgoing_message(OutgoingFixMessage* message)
581  {
582  auto int_sequence_no = m_session.get_sequence_store()->get_outgoing_seq_no() + 1;
583  std::size_t encoded_length = 0;
584 
585  message->encode(m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, int_sequence_no, encoded_length);
586 
587  auto sequence_store = m_session.get_sequence_store();
588  sequence_store->increment_outgoing_seq_no();
589 
590  if(m_session.serialisation_enabled())
591  m_session.get_outgoing_message_serialiser()->write(reinterpret_cast<const void*>(m_session.get_tx_encode_buffer()), encoded_length, true, sequence_store->get_outgoing_seq_no());
592 
593  m_session.set_last_sent_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
594 
595  return true;
596  }
597  #endif
598 
599  protected:
601  // ADMIN MESSAGES TO SERVER
607  virtual bool send_logon_request()
608  {
609  auto logon_request = outgoing_message_instance();
610  logon_request->set_msg_type(FixConstants::MSG_TYPE_LOGON);
611 
612  // TAG 98 ENCRYPTION METHOD
613  logon_request->set_tag(FixConstants::TAG_ENCRYPT_METHOD, 0);
614 
615  // TAG 108 HEARTBEAT INTERVAL
616  logon_request->set_tag(FixConstants::TAG_HEART_BT_INT, static_cast<uint32_t>(m_session.get_heartbeart_interval_in_nanoseconds() / 1'000'000'000));
617 
618  // TAG 1137 DEFAULT APP VER ID
619  auto default_app_ver_id = m_session.get_default_app_ver_id();
620 
621  if (default_app_ver_id.length() > 0)
622  {
623  logon_request->set_tag(FixConstants::TAG_DEFAULT_APPL_VER_ID, default_app_ver_id);
624  }
625 
626  // TAG 141 RESET SEQ NOS
627  if(m_session.logon_reset_sequence_numbers_flag())
628  {
629  m_session.get_sequence_store()->reset_numbers();
630  logon_request->set_tag(FixConstants::TAG_RESET_SEQ_NUM_FLAG, FixConstants::FIX_BOOLEAN_TRUE);
631  }
632 
633  // TAG 789 NEXT EXPECTED SEQ NO
634  if(m_session.logon_include_next_expected_seq_no())
635  {
636  const auto next_expected_seq_no = m_session.get_sequence_store()->get_incoming_seq_no()+1;
637  logon_request->set_tag(FixConstants::TAG_NEXT_EXPECTED_SEQ_NUM, next_expected_seq_no);
638  }
639 
640  // TAG 553 SESSION USER NAME
641  auto session_username = m_session.get_username();
642 
643  if (session_username.length() > 0)
644  {
645  logon_request->set_tag(FixConstants::TAG_USERNAME, session_username);
646  }
647 
648  // TAG 554 SESSION PASSWORD
649  auto session_password = m_session.get_password();
650 
651  if (session_password.length() > 0)
652  {
653  logon_request->set_tag(FixConstants::TAG_PASSWORD, session_password);
654  }
655 
656  // TAG 925 NEW PASSWORD
657  if (m_session.logon_message_new_password().length() > 0)
658  {
659  logon_request->set_tag(FixConstants::TAG_NEW_PASSWORD, m_session.logon_message_new_password());
660  }
661 
662  return send_outgoing_message(logon_request);
663  }
664 
670  virtual bool send_logout_request()
671  {
672  m_session.build_logout_message(outgoing_message_instance());
673  return send_outgoing_message(outgoing_message_instance());
674  }
675 
682  virtual bool send_client_heartbeat(FixString* test_request_id)
683  {
684  m_session.build_heartbeat_message(outgoing_message_instance(), test_request_id);
685  return send_outgoing_message(outgoing_message_instance());
686  }
687 
693  virtual bool send_test_request()
694  {
695  m_session.build_test_request_message(outgoing_message_instance());
696  return send_outgoing_message(outgoing_message_instance());
697  }
698 
704  virtual bool send_resend_request()
705  {
706  m_session.build_resend_request_message(outgoing_message_instance(), "0");
707  return send_outgoing_message(outgoing_message_instance());
708  }
709 
716  virtual bool send_sequence_reset_message(uint32_t desired_sequence_no)
717  {
718  /*
719  This message is a "hard" gap fill that doesn't respond to an incoming 35=2/resend request but to an internal admin request
720  Therefore we can't set 123=Y
721  */
722  auto message = outgoing_message_instance();
723  m_session.build_sequence_reset_message(message, desired_sequence_no);
724 
725  // ENCODE , NO NEED TO INCREMENT OUTGOING SEQ NO AS WE ARE RESETTING WITH THIS MESSAGE
726  std::size_t encoded_length = 0;
727  message->encode(m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, desired_sequence_no, encoded_length);
728 
729  // SEND
730  return send_bytes<false>(m_session.get_tx_encode_buffer(), encoded_length); // false is for not incrementing seq no for this message
731  }
732 
738  virtual bool send_gap_fill_message()
739  {
740  /*
741  This message is a gap fill that responds to an incoming 35=2/resend request.
742  For "hard" gap fills that don't set 123=Y, see send_sequence_reset_message
743  */
744  auto message = outgoing_message_instance();
745  m_session.build_gap_fill_message(message);
746 
747  // ENCODE , NO NEED TO INCREMENT OUTGOING SEQ NO AS WE ARE RESETTING WITH THIS MESSAGE
748  std::size_t encoded_length = 0;
749  message->encode(m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, m_session.get_incoming_resend_request_begin_no(), encoded_length); // We need to encode with seq no that the peer expects hence using m_incoming_resend_request_begin_no
750 
751  // SEND
752  return send_bytes<false>(m_session.get_tx_encode_buffer(), encoded_length); // false is for not incrementing seq no for this message
753  }
754 
755  virtual void resend_messages_to_server(uint32_t begin_seq_no, uint32_t end_seq_no)
756  {
757  for (uint32_t i = begin_seq_no; i <= end_seq_no; i++)
758  {
759  std::size_t message_length = 0;
760  m_session.get_outgoing_message_serialiser()->read_message(i, m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, message_length);
761 
762  auto message = outgoing_message_instance();
763  message->load_from_buffer(m_session.get_tx_encode_buffer(), message_length); // t122(orig sending time) will be handled by load_from_buffer
764 
765  message->template set_tag<FixMessageComponent::HEADER, bool>(FixConstants::TAG_POSS_DUP_FLAG, FixConstants::FIX_BOOLEAN_TRUE);
766 
767  if (m_session.include_t97_during_resends())
768  {
769  message->template set_tag<FixMessageComponent::HEADER, bool>(FixConstants::TAG_POSS_RESEND, FixConstants::FIX_BOOLEAN_TRUE);
770  }
771 
772  std::size_t encoded_length = 0;
773  message->encode(m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, i, encoded_length);
774  send_bytes<false>(m_session.get_tx_encode_buffer(), encoded_length); // false is for not incrementing seq no for this message
775  }
776  }
777 
778  virtual bool send_reject_message(const IncomingFixMessage& incoming_message, uint32_t reject_reason_code, const char* buffer_message, std::size_t buffer_message_length, uint32_t error_tag=0)
779  {
780  LLFIX_UNUSED(incoming_message);
781  char reject_reason_text[256];
782 
783  std::size_t text_length{ 0 };
784  FixUtilities::get_reject_reason_text(reject_reason_text, text_length, reject_reason_code);
785 
786  if (error_tag == 0)
787  {
788  LLFIX_LOG_DEBUG("FixClient " + m_name + " received an invalid message : " + std::string(reject_reason_text) + " ,message : " + FixUtilities::fix_to_human_readible(buffer_message, buffer_message_length));
789  }
790  else
791  {
792  LLFIX_LOG_DEBUG("FixClient " + m_name + " received an invalid message : " + std::string(reject_reason_text) + " ,tag : " + std::to_string(error_tag) + " ,message : " + FixUtilities::fix_to_human_readible(buffer_message, buffer_message_length));
793  }
794 
795  m_session.build_session_level_reject_message(outgoing_message_instance(), reject_reason_code, reject_reason_text, error_tag);
797  }
798 
799  std::atomic<bool> m_is_exiting = false;
800 
801  private:
802  LLFIX_ALIGN_DATA(AlignmentConstants::CPU_CACHE_LINE_SIZE) FixSession m_session;
803 
804  std::string m_name;
805  bool m_is_ha_primary = true;
806 
807  static inline constexpr std::size_t MINIMUM_REQUIRED_INITIAL_BUFFER_FOR_PARSING = 4; // 10=<DELIMITER>
808 
809  uint64_t m_last_logon_attempt_timestamp = 0;
810 
811  bool m_connected_to_primary = false;
812  bool m_connected_to_secondary = false;
813 
814  FixClientSettings m_settings;
815 
816  MessagePersistPlugin* m_message_persist_plugin = nullptr;
817  std::atomic<bool> m_thread_grace_full_exit = true;
818  std::unique_ptr<std::thread> m_thread;
819 
820  bool is_threaded() const { return m_thread.get() != nullptr;}
821 
822  void thread_function()
823  {
824  initialise_thread();
825 
826  while (true)
827  {
828  if (m_is_exiting.load() == true)
829  {
830  do_shutdown(m_thread_grace_full_exit.load());
831  break;
832  }
833 
834  run();
835  }
836  }
837 
838  bool create(const std::string& name, const std::string& session_name, const FixSessionSettings& session_settings)
839  {
840  m_name = name;
841  m_is_ha_primary = m_settings.starts_as_primary_instance;
842 
843  if (m_settings.validate() == false)
844  {
845  LLFIX_LOG_ERROR("FixClientSettings for " + m_name + " validation failed : " + m_settings.validation_error);
846  return false;
847  }
848 
849  session_settings.is_server = false;
850  session_settings.tx_encode_buffer_capacity = m_settings.tx_encode_buffer_capacity; // Propagate
851 
852  if (m_session.initialise(session_name, session_settings) == false)
853  {
854  LLFIX_LOG_ERROR(m_name + " creation failed during session initialisation");
855  return false;
856  }
857 
858  if (m_settings.starts_as_primary_instance == false)
859  {
860  disable_session();
861  }
862 
863  LLFIX_LOG_INFO(m_name + " : Loaded client config =>\n" + m_settings.to_string());
864 
865  LLFIX_LOG_INFO("FixClient " + m_name + " creation success");
866 
867  return true;
868  }
869 
870  void process_admin_commands()
871  {
872  ModifyingAdminCommand* admin_command{ nullptr };
873 
874  if (m_session.get_admin_commands()->try_pop(&admin_command) == true)
875  {
876  switch (admin_command->type)
877  {
878  case ModifyingAdminCommandType::SET_INCOMING_SEQUENCE_NUMBER:
879  {
880  m_session.get_sequence_store()->set_incoming_seq_no(admin_command->arg);
881  LLFIX_LOG_DEBUG(m_name + " : processed set incoming sequence number admin command , new seq no : " + std::to_string(admin_command->arg));
882  break;
883  }
884 
885  case ModifyingAdminCommandType::SET_OUTGOING_SEQUENCE_NUMBER:
886  {
887  m_session.get_sequence_store()->set_outgoing_seq_no(admin_command->arg);
888  LLFIX_LOG_DEBUG(m_name + " : processed set outgoing sequence number admin command , new seq no : " + std::to_string(admin_command->arg));
889  break;
890  }
891 
892  case ModifyingAdminCommandType::SEND_SEQUENCE_RESET:
893  {
894  if( m_session.get_state() == SessionState::LOGGED_ON )
895  {
896  send_sequence_reset_message(admin_command->arg);
897  LLFIX_LOG_DEBUG(m_name + " : processed send sequence reset admin command , new seq no : " + std::to_string(admin_command->arg));
898  }
899  else
900  {
901  LLFIX_LOG_ERROR(m_name + " : dropping send sequence reset admin command as not logged on");
902  }
903 
904  break;
905  }
906 
907  case ModifyingAdminCommandType::DISABLE_SESSION:
908  {
909  disable_session();
910  LLFIX_LOG_DEBUG(m_name + " : processed disable session admin command , new session state : Disabled");
911  break;
912  }
913 
914  case ModifyingAdminCommandType::ENABLE_SESSION:
915  {
916  if(m_is_ha_primary)
917  {
918  enable_session();
919  LLFIX_LOG_DEBUG(m_name + " : processed enable session admin command");
920  }
921  else
922  {
923  LLFIX_LOG_ERROR(m_name + " ignoring enable session admin command since not a primary instance");
924  }
925  break;
926  }
927 
928  case ModifyingAdminCommandType::SET_IS_HA_PRIMARY_INSTANCE:
929  {
930  m_is_ha_primary = admin_command->arg == 1 ? true : false;
931 
932  if(admin_command->arg == 1)
933  {
934  enable_session();
935  LLFIX_LOG_DEBUG(m_name + " : processed set is ha primary instance 1 admin command");
936  }
937  else
938  {
939  disable_session();
940  LLFIX_LOG_DEBUG(m_name + " : processed set is ha primary instance 0 admin command");
941  }
942 
943  break;
944  }
945 
946  default: break;
947  }
948 
949  delete admin_command;
950  }
951  }
952 
953  void disable_session()
954  {
955  if(is_state_live(m_session.get_state()))
956  {
957  do_shutdown(true);
958  }
959  m_session.set_state(SessionState::DISABLED);
960  }
961 
962  void enable_session()
963  {
964  if(m_session.get_state() == SessionState::DISABLED)
965  {
966  m_session.set_state(SessionState::DISCONNECTED);
967  if(m_settings.refresh_resend_cache_during_promotion)
968  m_session.reinitialise_outgoing_serialiser();
969  }
970  }
971 
972  void process_schedule_validator()
973  {
974  if (m_session.is_now_valid_session_datetime() == false)
975  {
976  LLFIX_LOG_DEBUG("FixClient " + m_name + " : terminating session due to schedule settings");
977  do_shutdown(true);
978  }
979  }
980 
981  void do_shutdown(bool graceful_shutdown = true)
982  {
983  auto call_time_state = m_session.get_state();
984  LLFIX_LOG_DEBUG(m_name + " : shutdown called. Graceful shutdown param : " + (graceful_shutdown?"true":"false") );
985 
986  on_connection_lost();
987 
988  if (graceful_shutdown)
989  {
990  if (call_time_state == SessionState::LOGGED_ON)
991  {
992  m_session.set_received_logout_response(false);
993 
995  m_session.set_state(SessionState::PENDING_LOGOUT);
996 
997  auto start = std::chrono::steady_clock::now();
998  const auto timeout = std::chrono::milliseconds(m_session.settings()->logout_timeout_seconds*1000);
999 
1000  while(true)
1001  {
1002  this->process_incoming_messages();
1003 
1004  if(m_session.received_logout_response() == true)
1005  {
1006  break;
1007  }
1008 
1009  if (m_session.get_state() == SessionState::DISCONNECTED)
1010  {
1011  break;
1012  }
1013 
1014  auto now = std::chrono::steady_clock::now();
1015  if (now - start >= timeout)
1016  {
1017  LLFIX_LOG_DEBUG("FixClient " + m_name + " shutdown timed out before receiving logout response");
1018  break;
1019  }
1020  }
1021  }
1022  }
1023 
1024  this->close();
1025  m_session.set_state(SessionState::DISCONNECTED);
1026  }
1027 
1029  // TO SERVER
1030  template <bool increment_outgoing_seq_no>
1031  LLFIX_FORCE_INLINE bool send_bytes(const char* buffer, std::size_t buffer_size)
1032  {
1033  auto sequence_store = m_session.get_sequence_store();
1034 
1035  if(llfix_likely(m_session.settings()->throttle_limit != 0))
1036  {
1037  m_session.throttler()->update();
1038  m_session.throttler()->wait();
1039  }
1040 
1041  auto ret = this->send(buffer, buffer_size);
1042 
1043  if constexpr (increment_outgoing_seq_no == true)
1044  {
1045  if(ret == true)
1046  sequence_store->increment_outgoing_seq_no();
1047  }
1048 
1049  if(m_session.serialisation_enabled())
1050  m_session.get_outgoing_message_serialiser()->write(reinterpret_cast<const void*>(buffer), buffer_size, ret, sequence_store->get_outgoing_seq_no());
1051 
1052  if (m_message_persist_plugin)
1053  m_message_persist_plugin->persist_outgoing_message(m_session.get_name(), sequence_store->get_outgoing_seq_no(), buffer, buffer_size, ret);
1054 
1055  m_session.set_last_sent_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
1056 
1057  return ret;
1058  }
1059 
1061  // ADMIN UTILITY TO SERVER
1062  void process_outgoing_logon_request(uint64_t current_timestamp_nanoseconds)
1063  {
1064  auto delta_nanoseconds = current_timestamp_nanoseconds - m_last_logon_attempt_timestamp;
1065 
1066  if (delta_nanoseconds >= (static_cast<uint64_t>(m_session.settings()->logon_timeout_seconds) * 1'000'000'000))
1067  {
1068  m_session.set_state(SessionState::DISCONNECTED);
1069  }
1070  }
1071 
1072  void send_client_heartbeat_if_necessary(uint64_t current_timestamp_nanoseconds)
1073  {
1074  auto delta_nanoseconds = current_timestamp_nanoseconds - m_session.last_sent_message_timestamp_nanoseconds();
1075 
1076  if (delta_nanoseconds >= (m_session.get_heartbeart_interval_in_nanoseconds() * static_cast<uint64_t>(9) / static_cast<uint64_t>(10))) // 0.9 for safety
1077  {
1078  send_client_heartbeat(nullptr);
1079  }
1080  }
1081 
1082  void process_outgoing_test_request_if_necessary(uint64_t current_timestamp_nanoseconds)
1083  {
1084  if (m_session.expecting_response_for_outgoing_test_request() == false)
1085  {
1086  // SEND IF NECESSARY
1087  auto delta_nanoseconds = current_timestamp_nanoseconds - m_session.last_received_message_timestamp_nanoseconds();
1088 
1089  if (delta_nanoseconds >= (m_session.get_outgoing_test_request_interval_in_nanoseconds()))
1090  {
1091  LLFIX_LOG_DEBUG("FixClient " + m_name + " : sending test request");
1092  send_test_request();
1093  m_session.set_outgoing_test_request_timestamp_nanoseconds(current_timestamp_nanoseconds);
1094  m_session.set_expecting_response_for_outgoing_test_request(true);
1095  }
1096  }
1097  else
1098  {
1099  // TERMINATE SESSION IF NECESSARY
1100  auto delta_nanoseconds = current_timestamp_nanoseconds - m_session.outgoing_test_request_timestamp_nanoseconds();
1101 
1102  if (delta_nanoseconds >= (m_session.get_heartbeart_interval_in_nanoseconds() * static_cast<uint64_t>(2)))
1103  {
1104  m_session.set_expecting_response_for_outgoing_test_request(false);
1105  LLFIX_LOG_DEBUG("FixClient " + m_name + " : terminating session as the other end didn't respond to 35=1/test request");
1106  do_shutdown(false);
1107  }
1108  }
1109  }
1110 
1111  void process_outgoing_resend_request_if_necessary(uint64_t current_timestamp_nanoseconds)
1112  {
1113  if (m_session.get_state() == SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF)
1114  {
1115  // TERMINATE SESSION IF NECESSARY
1116  uint64_t delta_nanoseconds = current_timestamp_nanoseconds - m_session.outgoing_resend_request_timestamp_nanoseconds();
1117  uint64_t required_delta_nanoseconds = static_cast<uint64_t>(m_session.outgoing_resend_request_expire_secs()) * 1'000'000'000;
1118 
1119  if (delta_nanoseconds >= required_delta_nanoseconds)
1120  {
1121  // We need to terminate the session as the other side didn't respond properly to our outgoing resend request
1122  LLFIX_LOG_DEBUG("FixClient " + m_name + " : terminating session as the other end didn't respond to 35=2/resend request in pre-configured timeout (outgoing_resend_request_expire_secs) : " + std::to_string(m_session.outgoing_resend_request_expire_secs()));
1123  do_shutdown(false);
1124  }
1125  }
1126  else
1127  {
1128  // SEND IF NECESSARY
1129  if (m_session.needs_to_send_resend_request())
1130  {
1131  LLFIX_LOG_DEBUG("FixClient " + m_name + " : sending resend request , begin no : " + std::to_string(m_session.get_outgoing_resend_request_begin_no()));
1132  send_resend_request();
1133  m_session.set_outgoing_resend_request_timestamp_nanoseconds(current_timestamp_nanoseconds);
1134  m_session.set_state(llfix::SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF);
1135  m_session.set_needs_to_send_resend_request(false);
1136  }
1137  }
1138  }
1139 
1140  void respond_to_test_request_if_necessary()
1141  {
1142  if (m_session.needs_responding_to_incoming_test_request())
1143  {
1144  if (m_session.get_incoming_test_request_id()->length() > 0)
1145  {
1146  send_client_heartbeat(m_session.get_incoming_test_request_id());
1147  m_session.get_incoming_test_request_id()->set_length(0);
1148  }
1149  else
1150  {
1151  // We should not hit here , but better than risking disconnection
1152  send_client_heartbeat(nullptr);
1153  }
1154 
1155  LLFIX_LOG_DEBUG("FixClient " + m_name + " : responded to incoming test request");
1156  m_session.set_needs_responding_to_incoming_test_request(false);
1157  }
1158  }
1159 
1160  void respond_to_resend_request_if_necessary()
1161  {
1162  if (m_session.needs_responding_to_incoming_resend_request())
1163  {
1164  const auto last_outgoing_seq_no = m_session.get_sequence_store()->get_outgoing_seq_no();
1165 
1166  // Partial replays not supported therefore incoming end no should be either 0 or same as last outgoing seq no
1167  bool will_replay = (m_session.serialisation_enabled()) && (m_session.replay_messages_on_incoming_resend_request() && (m_session.get_incoming_resend_request_end_no() == 0
1168  || m_session.get_incoming_resend_request_end_no() == last_outgoing_seq_no) && m_session.get_incoming_resend_request_begin_no()<=last_outgoing_seq_no);
1169 
1170  if(last_outgoing_seq_no > m_session.get_incoming_resend_request_begin_no())
1171  if(last_outgoing_seq_no-m_session.get_incoming_resend_request_begin_no()+1 > m_session.max_resend_range())
1172  will_replay = false;
1173 
1174  if(will_replay)
1175  {
1176  for (uint32_t i = m_session.get_incoming_resend_request_begin_no(); i <= last_outgoing_seq_no; i++)
1177  {
1178  if (m_session.get_outgoing_message_serialiser()->has_message_in_memory(i) == false)
1179  {
1180  will_replay = false; // We don't have all the messages
1181  break;
1182  }
1183  }
1184  }
1185 
1186  if(will_replay)
1187  {
1188  LLFIX_LOG_DEBUG("FixClient " + m_name + " : replaying messages , begin seq no : " + std::to_string(m_session.get_incoming_resend_request_begin_no()) + " , end seq no : " + std::to_string(last_outgoing_seq_no));
1189  resend_messages_to_server(m_session.get_incoming_resend_request_begin_no(), last_outgoing_seq_no);
1190  }
1191  else
1192  {
1193  LLFIX_LOG_DEBUG("FixClient " + m_name + " : sending gap fill message");
1194  send_gap_fill_message();
1195  }
1196 
1197  m_session.set_needs_responding_to_incoming_resend_request(false);
1198  m_session.set_state(llfix::SessionState::LOGGED_ON);
1199  }
1200  }
1201 
1203  // FROM SERVER
1204  #if defined(LLFIX_BENCHMARK) || defined(LLFIX_UNIT_TEST)
1205  public:
1206  #else
1207  private:
1208  #endif
1209  LLFIX_HOT void process_rx_buffer(char* buffer, std::size_t buffer_size)
1210  {
1211  std::size_t buffer_read_index = 0;
1212 
1213  if (llfix_unlikely(buffer_size < MINIMUM_REQUIRED_INITIAL_BUFFER_FOR_PARSING))
1214  {
1215  this->set_incomplete_buffer(buffer, buffer_size);
1216  return;
1217  }
1219  // FIND OUT THE END
1220  int final_tag10_delimiter_index{ -1 };
1221  int current_index = static_cast<int>(buffer_size - 1);
1222 
1223  if(llfix_unlikely(FixUtilities::find_delimiter_from_end(buffer, buffer_size, current_index) == false))
1224  {
1225  // We don't have the entire message
1226  this->set_incomplete_buffer(buffer, buffer_size);
1227  return;
1228  }
1229 
1230  FixUtilities::find_tag10_start_from_end(buffer, buffer_size, current_index, final_tag10_delimiter_index);
1231 
1232  if (final_tag10_delimiter_index == -1)
1233  {
1234  // We don't have the entire message
1235  this->set_incomplete_buffer(buffer, buffer_size);
1236  return;
1237  }
1238 
1240  // AT THIS POINT WE HAVE AT LEAST ONE COMPLETE MESSAGE
1241  LLFIX_ALIGN_CODE_32;
1242  while (true)
1243  {
1245  // FIND OUT THE BEGIN STRING
1246  int begin_string_offset{-1};
1247  FixUtilities::find_begin_string_position(buffer+buffer_read_index, buffer_size-buffer_read_index, begin_string_offset);
1248 
1249  if(llfix_likely(begin_string_offset>=0))
1250  {
1251  buffer_read_index += begin_string_offset;
1252  }
1253  else
1254  {
1255  LLFIX_LOG_ERROR(m_name + " received a message with no begin string : " + FixUtilities::fix_to_human_readible(buffer+buffer_read_index, buffer_size-buffer_read_index));
1256  return;
1257  }
1259  m_session.get_incoming_fix_message()->reset();
1260  m_session.get_fix_string_view_cache()->reset_pointer();
1261 
1262  current_index = static_cast<int>(buffer_read_index);
1263  bool looking_for_equals = true;
1264 
1265  int current_tag_start = static_cast<int>(buffer_read_index);
1266  int current_tag_len{ 0 };
1267 
1268  int current_value_start = static_cast<int>(buffer_read_index);
1269  int current_value_len{ 0 };
1270 
1271  int current_tag_10_delimiter_index = -1;
1272  int current_tag_35_tag_start_index = -1;
1273 
1274  uint32_t parser_reject_code = static_cast<uint32_t>(-1);
1275  uint32_t current_tag_index = 0;
1276 
1277  uint32_t encoded_current_message_type = static_cast<uint32_t>(-1);
1278 
1279  bool in_a_repeating_group{ false };
1280  uint32_t current_rg_count_tag{ 0 };
1281 
1282  #ifdef LLFIX_ENABLE_BINARY_FIELDS
1283  int binary_field_length = 0;
1284  int binary_field_counter = 0;
1285  #endif
1286 
1287  LLFIX_ALIGN_CODE_32;
1288  while (true)
1289  {
1290  char current_char = buffer[current_index];
1291 
1292  if (looking_for_equals)
1293  {
1294  if (current_char == FixConstants::FIX_EQUALS)
1295  {
1296  current_tag_len = current_index - current_tag_start;
1297  current_value_start = current_index + 1;
1298  looking_for_equals = false;
1299  }
1300  else if (llfix_unlikely(current_char == FixConstants::FIX_DELIMITER))
1301  {
1302  current_tag_start = current_index + 1;
1303  parser_reject_code = FixParserErrorCodes::NO_EQUALS_SIGN;
1304  }
1305  }
1306  #ifdef LLFIX_ENABLE_BINARY_FIELDS
1307  else
1308  {
1309  if(llfix_unlikely(binary_field_length>0))
1310  if(binary_field_counter < binary_field_length)
1311  binary_field_counter++;
1312  }
1313 
1314  if (looking_for_equals == false && current_char == FixConstants::FIX_DELIMITER)
1315  {
1316  bool reached_value_end = true;
1317 
1318  if(llfix_unlikely(binary_field_length>0))
1319  {
1320  if(binary_field_length>binary_field_counter)
1321  {
1322  reached_value_end = false;
1323  }
1324  else if(binary_field_length == binary_field_counter)
1325  {
1326  binary_field_length=0;
1327  binary_field_counter=0;
1328  }
1329  }
1330 
1331  if(llfix_likely(reached_value_end))
1332  {
1333 
1334  #else
1335  else if (current_char == FixConstants::FIX_DELIMITER)
1336  {
1337  #endif
1338  current_value_len = current_index - current_value_start;
1339 
1340  if(llfix_unlikely(current_value_len==0))
1341  {
1342  parser_reject_code = FixConstants::FIX_ERROR_CODE_TAG_WITHOUT_VALUE;
1343  }
1344 
1345  bool is_current_tag_numeric{true};
1346  FixSession::validate_tag_format(buffer+current_tag_start, static_cast<std::size_t>(current_tag_len), is_current_tag_numeric, parser_reject_code);
1347 
1348  if(llfix_likely(is_current_tag_numeric))
1349  {
1351  // RECORD THE CURRENT TAG VALUE PAIR
1352  uint32_t tag = Converters::chars_to_unsigned_int<uint32_t>(buffer+ current_tag_start, static_cast<std::size_t>(current_tag_len));
1353 
1354  if(llfix_unlikely(tag == 0))
1355  {
1356  parser_reject_code = FixConstants::FIX_ERROR_CODE_INVALID_TAG_NUMBER;
1357  }
1358 
1359  FixStringView* value = m_session.get_fix_string_view_cache()->allocate();
1360  value->set_buffer(const_cast<char*>(buffer + current_value_start), static_cast<std::size_t>(current_value_len));
1361 
1363  current_tag_index++;
1364  FixSession::validate_header_tags_order(tag, current_tag_index, parser_reject_code);
1366 
1367  // Some tags may be used as a group member but also as an individual tag so we can't fully rely on is_a_repeating_group_tag
1368  // and need to detect start and end of a group
1369  if (encoded_current_message_type != static_cast<uint32_t>(-1))
1370  {
1371  if (llfix_likely(!in_a_repeating_group))
1372  {
1373  if (llfix_unlikely(FixSession::get_repeating_group_specs().is_a_repeating_group_count_tag(encoded_current_message_type, tag)))
1374  {
1375  current_rg_count_tag = tag;
1376  in_a_repeating_group = true;
1377  }
1378  }
1379  else
1380  {
1381  if (llfix_likely(FixSession::get_repeating_group_specs().is_a_repeating_group_tag(encoded_current_message_type, current_rg_count_tag, tag) == false))
1382  {
1383  in_a_repeating_group = false;
1384  }
1385  }
1386 
1387  #ifdef LLFIX_ENABLE_BINARY_FIELDS
1388  if (llfix_unlikely(FixSession::get_binary_field_specs().is_binary_data_length_tag(encoded_current_message_type, tag)))
1389  {
1390  binary_field_length = Converters::chars_to_int<int>(value->data(), value->length());
1391  }
1392  #endif
1393  }
1394 
1395  if (llfix_likely(in_a_repeating_group == false))
1396  {
1397  if(llfix_likely(m_session.get_incoming_fix_message()->has_tag(tag) == false))
1398  {
1399  m_session.get_incoming_fix_message()->set_tag(tag, value);
1400  }
1401  else
1402  {
1403  parser_reject_code = FixConstants::FIX_ERROR_CODE_TAG_APPEARS_MORE_THAN_ONCE;
1404  }
1405  }
1406  else
1407  {
1408  m_session.get_incoming_fix_message()->set_repeating_group_tag(tag, value);
1409  }
1410 
1411  if (tag == FixConstants::TAG_CHECKSUM)
1412  {
1413  current_tag_10_delimiter_index = current_index;
1415  FixSession::validate_tag9_and_tag35(m_session.get_incoming_fix_message(), current_tag_start, current_tag_35_tag_start_index, parser_reject_code);
1417  break;
1418  }
1419  else if (tag == FixConstants::TAG_MSG_TYPE)
1420  {
1421  current_tag_35_tag_start_index = current_tag_start;
1422  encoded_current_message_type = FixUtilities::pack_message_type(value->to_string_view());
1423  }
1424  } // if(llfix_likely(is_current_tag_numeric))
1425 
1426  current_tag_start = current_index + 1;
1427  looking_for_equals = true;
1428 
1429  #ifdef LLFIX_ENABLE_BINARY_FIELDS
1430  } // if(llfix_likely(reached_value_end))
1431  #endif
1432  } // else if (current_char == FixConstants::FIX_DELIMITER) or if (looking_for_equals == false && current_char == FixConstants::FIX_DELIMITER)
1433 
1434  // Apart from the check below (else if (static_cast<int>(buffer_read_index) > final_tag10_delimiter_index)),
1435  // we also need to check here if found last tag 10 delimiter is before the found tag8
1436  if (current_index >= static_cast<int>(buffer_size) - 1)
1437  {
1438  this->set_incomplete_buffer(buffer + buffer_read_index, buffer_size - buffer_read_index);
1439  return;
1440  }
1441 
1442  current_index++;
1443  }// while true
1445  auto current_message_length = current_tag_10_delimiter_index+1-buffer_read_index;
1446  process_incoming_fix_message(m_session.get_incoming_fix_message(), buffer+ buffer_read_index, current_message_length, parser_reject_code);
1447 
1448  #ifndef LLFIX_UNIT_TEST
1449  if(llfix_unlikely(m_session.get_state() == SessionState::DISCONNECTED)) // process_incoming_fix_message may terminate connection
1450  {
1451  return;
1452  }
1453  #endif
1454 
1455  buffer_read_index = current_tag_10_delimiter_index + 1;
1456 
1457  if (current_tag_10_delimiter_index == static_cast<int>(buffer_size) - 1)
1458  {
1459  this->reset_incomplete_buffer();
1460  return;
1461  }
1462  else if (static_cast<int>(buffer_read_index)> final_tag10_delimiter_index)
1463  {
1464  #ifdef LLFIX_UNIT_TEST
1465  this->m_incomplete_buffer = static_cast<char*>(malloc(65536));
1466  #endif
1467  this->set_incomplete_buffer(buffer + buffer_read_index, buffer_size - buffer_read_index);
1468  return;
1469  }
1470  }
1471  }
1472 
1473  void process_incoming_fix_message(IncomingFixMessage* incoming_message, const char* buffer_message, std::size_t buffer_message_length, uint32_t parser_reject_code)
1474  {
1475  m_session.set_last_received_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
1476 
1477  if(m_session.serialisation_enabled())
1478  m_session.get_incoming_message_serialiser()->write(buffer_message, buffer_message_length, true);
1479 
1480  if(llfix_unlikely(m_session.expecting_response_for_outgoing_test_request()))
1481  {
1482  // We are permissive , any message not just 35=0 with expected t112, satisfies our outgoing test request
1483  m_session.set_expecting_response_for_outgoing_test_request(false);
1484  LLFIX_LOG_DEBUG(m_name + " : other end satisfied the test request");
1485  }
1487  if(m_session.validations_enabled())
1488  {
1489  uint32_t reject_message_code = static_cast<uint32_t>(-1);
1490 
1491  if (m_session.validate_fix_message(*m_session.get_incoming_fix_message(), buffer_message, buffer_message_length, parser_reject_code, reject_message_code) == false)
1492  {
1493  if (reject_message_code != static_cast<uint32_t>(-1))
1494  {
1495  send_reject_message(*m_session.get_incoming_fix_message(), reject_message_code, buffer_message, buffer_message_length, m_session.get_last_error_tag());
1496  }
1497 
1498  return;
1499  }
1500  }
1502  // SEQUENCE NO & MESSAGE PERSIST PLUGIN
1503  auto sequence_store = m_session.get_sequence_store();
1504  sequence_store->increment_incoming_seq_no();
1505  auto sequence_store_incoming_seq_no = sequence_store->get_incoming_seq_no();
1506 
1507  if (m_message_persist_plugin)
1508  m_message_persist_plugin->persist_incoming_message(m_session.get_name(), sequence_store_incoming_seq_no, buffer_message, buffer_message_length);
1509 
1511  // SEQUENCE NO CHECKS
1512  auto incoming_seq_no = incoming_message->get_tag_value_as<uint32_t>(FixConstants::TAG_MSG_SEQ_NUM);
1513 
1514  if (llfix_unlikely(incoming_seq_no > sequence_store_incoming_seq_no))
1515  {
1516  if(FixSession::is_a_hard_sequence_reset_message(*incoming_message) == false)
1517  {
1518  m_session.queue_outgoing_resend_request(sequence_store_incoming_seq_no, incoming_seq_no);
1519  return;
1520  }
1521  }
1522  else if (llfix_unlikely(incoming_seq_no < sequence_store_incoming_seq_no))
1523  {
1524  if(FixSession::is_a_hard_sequence_reset_message(*incoming_message) == false)
1525  {
1526  sequence_store->set_incoming_seq_no(sequence_store_incoming_seq_no-1);
1527  LLFIX_LOG_DEBUG("FixClient " + m_name + " : terminating session as the incoming sequence no (" + std::to_string(incoming_seq_no) + ") is lower than expected ("+ std::to_string(sequence_store_incoming_seq_no) + ")");
1528  do_shutdown(false);
1529  return;
1530  }
1531  }
1532 
1533  if(llfix_unlikely(m_session.get_state() == SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF))
1534  {
1535  if(incoming_seq_no == m_session.get_outgoing_resend_request_end_no())
1536  {
1537  m_session.set_state(llfix::SessionState::LOGGED_ON);
1538  LLFIX_LOG_DEBUG(m_name + " : other end satisfied the resend request");
1539  }
1540  }
1542  auto message_type = incoming_message->get_tag_value(FixConstants::TAG_MSG_TYPE);
1543 
1544  if (llfix_likely(message_type->length() == 1))
1545  {
1546  switch (message_type->data()[0])
1547  {
1548  case FixConstants::MSG_TYPE_EXECUTION_REPORT: on_execution_report(incoming_message); break;
1549  case FixConstants::MSG_TYPE_HEARTBEAT: on_server_heartbeat(); break;
1550  case FixConstants::MSG_TYPE_TEST_REQUEST: m_session.process_test_request_message(*incoming_message); on_server_test_request(incoming_message); break;
1551  case FixConstants::MSG_TYPE_RESEND_REQUEST: m_session.process_resend_request(*incoming_message); on_server_resend_request(incoming_message); break;
1552  case FixConstants::MSG_TYPE_ORDER_CANCEL_REJECT: on_order_cancel_replace_reject(incoming_message); break;
1553  case FixConstants::MSG_TYPE_REJECT: process_session_level_reject(incoming_message); break;
1554  case FixConstants::MSG_TYPE_BUSINESS_REJECT: process_application_level_reject(incoming_message); break;
1555  case FixConstants::MSG_TYPE_LOGON: process_logon_response(incoming_message); break;
1556  case FixConstants::MSG_TYPE_LOGOUT: process_logout_response(incoming_message); break;
1557  case FixConstants::MSG_TYPE_SEQUENCE_RESET: if (m_session.process_incoming_sequence_reset_message(*incoming_message) == false) { send_reject_message(*incoming_message, FixConstants::FIX_ERROR_CODE_VALUE_INCORRECT_FOR_TAG, buffer_message, buffer_message_length, FixConstants::TAG_NEW_SEQ_NO); }; break;
1558  // Anything else
1559  default: on_custom_message_type(incoming_message); break;
1560  }
1561  }
1562  else
1563  {
1564  on_custom_message_type(incoming_message);
1565  }
1566  }
1567 
1568  void process_session_level_reject(const IncomingFixMessage* message)
1569  {
1570  if (m_session.get_state() == SessionState::PENDING_LOGON)
1571  {
1572  process_logon_reject(message);
1573  }
1574  else
1575  {
1576  on_session_level_reject(message);
1577  }
1578  }
1579 
1580  void process_application_level_reject(const IncomingFixMessage* message)
1581  {
1582  if (m_session.get_state() == SessionState::PENDING_LOGON)
1583  {
1584  process_logon_reject(message);
1585  }
1586  else
1587  {
1588  on_application_level_reject(message);
1589  }
1590  }
1591 
1592  void process_logon_response(const IncomingFixMessage* logon_response)
1593  {
1594  m_session.set_state(SessionState::LOGGED_ON);
1595  this->on_logon_response(logon_response);
1596  }
1597 
1598  void process_logout_response(const IncomingFixMessage* logout_response)
1599  {
1600  auto state = m_session.get_state();
1601 
1602  if (state == SessionState::PENDING_LOGON)
1603  {
1604  process_logon_reject(logout_response);
1605  }
1606  else
1607  {
1608  m_session.set_received_logout_response(true);
1609  m_session.set_state(SessionState::LOGGED_OUT);
1610  this->on_logout_response(logout_response);
1611  }
1612  }
1613 
1614  void process_logon_reject(const IncomingFixMessage* logon_reject)
1615  {
1616  m_session.set_state(SessionState::LOGON_REJECTED);
1617 
1618  if (logon_reject->has_tag(FixConstants::TAG_TEXT))
1619  {
1620  [[maybe_unused]] auto reject_message = logon_reject->get_tag_value(FixConstants::TAG_TEXT);
1621  LLFIX_LOG_DEBUG("FixClient " + m_name + " logon message rejected : " + reject_message->to_string());
1622  }
1623  else
1624  {
1625  LLFIX_LOG_DEBUG("FixClient " + m_name + " logon message rejected with no reason text/tag58.");
1626  }
1627 
1628  on_logon_reject(logon_reject);
1629  }
1630 
1631  FixClient(const FixClient& other) = delete;
1632  FixClient& operator= (const FixClient& other) = delete;
1633  FixClient(FixClient&& other) = delete;
1634  FixClient& operator=(FixClient&& other) = delete;
1635 };
1636 
1637 } // namespace
llfix::OutgoingFixMessage
FIX message builder and encoder for outbound messages.
Definition: outgoing_fix_message.h:77
llfix::FixClient::on_server_test_request
virtual void on_server_test_request(const IncomingFixMessage *message)
Called upon receiving a FIX Test Request.
Definition: fix_client.h:505
llfix::FixClient::shutdown
void shutdown(bool graceful_shutdown=true)
Shutdown the FIX client.
Definition: fix_client.h:389
llfix::FixClient::send_logout_request
virtual bool send_logout_request()
Send FIX Logout request.
Definition: fix_client.h:670
llfix::FixClient::connected_to_primary
bool connected_to_primary() const
Check if the client is connected to the primary endpoint.
Definition: fix_client.h:442
llfix::FixClient::send_logon_request
virtual bool send_logon_request()
Send FIX Logon request.
Definition: fix_client.h:607
llfix::FixClient
FIX client implementation.
Definition: fix_client.h:73
llfix::FixClient::on_logon_reject
virtual void on_logon_reject(const IncomingFixMessage *message)
Called when a FIX Logon is rejected.
Definition: fix_client.h:494
llfix::FixClient::set_message_persist_plugin
void set_message_persist_plugin(MessagePersistPlugin *plugin)
Set the message persistence plugin.
Definition: fix_client.h:572
llfix::FixClient::process
void process()
Process incoming and outgoing FIX protocol activity.
Definition: fix_client.h:336
llfix::FixClient::on_logon_response
virtual void on_logon_response(const IncomingFixMessage *message)
Called upon receiving a successful FIX Logon response.
Definition: fix_client.h:482
llfix::FixClient::on_connection
virtual void on_connection()
Called when a TCP connection is successfully established.
Definition: fix_client.h:459
llfix::FixClient::create
bool create(const std::string &client_name, const FixClientSettings &settings, const std::string &session_name, const FixSessionSettings &session_settings)
Create and initialise a FIX client instance.
Definition: fix_client.h:98
llfix::FixSession
Represents a single FIX protocol session.
Definition: fix_session.h:73
llfix::FixClient::on_server_heartbeat
virtual void on_server_heartbeat()
Called upon receiving a FIX Heartbeat from the server.
Definition: fix_client.h:499
llfix::FixClient::connect
bool connect()
Establish a FIX network connection and initiate logon.
Definition: fix_client.h:233
llfix::FixClient::on_custom_message_type
virtual void on_custom_message_type(const IncomingFixMessage *message)
Called for other FIX message types.
Definition: fix_client.h:542
llfix::FixClient::get_session
FixSession * get_session(const std::string &session_name="") override
Retrieve the FIX session managed by this client.
Definition: fix_client.h:147
llfix::FixClient::outgoing_message_instance
OutgoingFixMessage * outgoing_message_instance()
Obtain a reusable outgoing FIX message instance.
Definition: fix_client.h:408
llfix::FixClient::on_logout_response
virtual void on_logout_response(const IncomingFixMessage *message)
Called upon receiving a FIX Logout response.
Definition: fix_client.h:488
llfix::FixClient::on_order_cancel_replace_reject
virtual void on_order_cancel_replace_reject(const IncomingFixMessage *message)
Called when an Order Cancel/Replace Reject (35=9) is received.
Definition: fix_client.h:524
llfix::FixClient::on_session_level_reject
virtual void on_session_level_reject(const IncomingFixMessage *message)
Called on session-level FIX rejects.
Definition: fix_client.h:530
llfix::FixClient::connected_to_secondary
bool connected_to_secondary() const
Check if the client is connected to the secondary endpoint.
Definition: fix_client.h:449
llfix::IncomingFixMessage
Represents a parsed incoming FIX message.
Definition: incoming_fix_message.h:44
llfix::FixClient::create
bool create(const std::string &client_config_file_path, const std::string &client_name, const std::string &session_config_file_path, const std::string &session_name)
Create and initialise a FIX client using configuration files.
Definition: fix_client.h:114
llfix::FixUtilities::fix_to_human_readible
static std::string fix_to_human_readible(const char *buffer, std::size_t buffer_length)
Converts a FIX message buffer to a human-readable string.
Definition: fix_utilities.h:86
llfix::MessagePersistPlugin
Interface for custom FIX message persistence plugins.
Definition: message_persist_plugin.h:27
llfix::FixClient::on_execution_report
virtual void on_execution_report(const IncomingFixMessage *message)
Called when an Execution Report (35=8) is received.
Definition: fix_client.h:518
llfix::FixClient::send_outgoing_message
virtual bool send_outgoing_message(OutgoingFixMessage *message)
Encode and send an outgoing FIX message.
Definition: fix_client.h:420
llfix::FixClient::send_sequence_reset_message
virtual bool send_sequence_reset_message(uint32_t desired_sequence_no)
Send FIX Sequence Reset message.
Definition: fix_client.h:716
llfix::FixClient::on_server_resend_request
virtual void on_server_resend_request(const IncomingFixMessage *message)
Called upon receiving a FIX Resend Request.
Definition: fix_client.h:511
llfix::FixClient::on_application_level_reject
virtual void on_application_level_reject(const IncomingFixMessage *message)
Called on application-level FIX rejects.
Definition: fix_client.h:536
llfix::FixClient::specify_repeating_group
void specify_repeating_group(Args... args)
Specify repeating group definitions for incoming FIX messages.
Definition: fix_client.h:182
llfix::FixClient::on_disconnection
virtual void on_disconnection()
Called when the TCP connection is lost or closed.
Definition: fix_client.h:464
llfix::FixClient::start
bool start()
Start the client execution thread.
Definition: fix_client.h:211