llfix
Low-latency FIX engine
fix_client.h
1 /*
2 MIT License
3 
4 Copyright (c) 2026 Coreware Limited
5 
6 Permission is hereby granted, free of charge, to any person obtaining a copy
7 of this software and associated documentation files (the "Software"), to deal
8 in the Software without restriction, including without limitation the rights
9 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 copies of the Software, and to permit persons to whom the Software is
11 furnished to do so, subject to the following conditions:
12 
13 The above copyright notice and this permission notice shall be included in all
14 copies or substantial portions of the Software.
15 
16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 SOFTWARE.
23 */
24 #pragma once
25 
26 #include "common.h"
27 
28 #include <atomic>
29 #include <cassert>
30 #include <cstdint>
31 #include <cstddef>
32 #include <string>
33 #include <string_view>
34 #include <memory>
35 #include <vector>
36 #include <thread>
37 #include <chrono>
38 #include <new>
39 
40 #include "core/compiler/hints_hot_code.h"
41 #include "core/compiler/hints_branch_predictor.h"
42 #include "core/compiler/unused.h"
43 
44 #include "core/cpu/alignment_constants.h"
45 
46 #include "core/os/vdso.h"
47 #include "core/os/thread_utilities.h"
48 #include "core/os/process_utilities.h"
49 
50 #include "core/utilities/converters.h"
51 #include "core/utilities/logger.h"
52 #include "core/utilities/std_string_utilities.h"
53 
54 #include "electronic_trading/common/message_persist_plugin.h"
55 
56 #include "electronic_trading/managed_instance/modifying_admin_command.h"
57 #include "electronic_trading/managed_instance/managed_instance.h"
58 
59 #include "fix_constants.h"
60 #include "incoming_fix_message.h"
61 #include "outgoing_fix_message.h"
62 #include "fix_session.h"
63 #include "fix_session_settings.h"
64 #include "fix_client_settings.h"
65 #include "fix_string.h"
66 #include "fix_string_view.h"
67 #include "fix_utilities.h"
68 #include "fix_parser_error_codes.h"
69 
70 #include "core/utilities/tcp_connector_options.h"
71 
72 #ifdef LLFIX_UNIT_TEST // VOLTRON_EXCLUDE
73 #include <cstdlib>
74 #endif // VOLTRON_EXCLUDE
75 
76 namespace llfix
77 {
78 
94 template<typename Transport>
95 class FixClient : public Transport, public ManagedInstance
96 {
97  public:
98 
99  FixClient() = default;
100 
101  virtual ~FixClient()
102  {
103  if (is_threaded())
104  {
105  m_is_exiting.store(true);
106  m_thread->join();
107  }
108  }
109 
120  [[nodiscard]] bool create(const std::string& client_name, const FixClientSettings& settings, const std::string& session_name, const FixSessionSettings& session_settings)
121  {
122  m_settings = settings;
123  return create(client_name, session_name, session_settings);
124  }
125 
136  [[nodiscard]] bool create(const std::string& client_config_file_path, const std::string& client_name, const std::string& session_config_file_path, const std::string& session_name)
137  {
138  if (m_settings.load_from_config_file(client_config_file_path, client_name) == false)
139  {
140  LLFIX_LOG_ERROR("Loading settings for client " + client_name + " failed : " + m_settings.config_load_error);
141  return false;
142  }
143 
144  FixSessionSettings session_settings;
145  if (session_settings.load_from_config_file(session_config_file_path, session_name) == false)
146  {
147  LLFIX_LOG_ERROR("Loading settings for session " + session_name + " failed : " + session_settings.config_load_error);
148  return false;
149  }
150 
151  return create(client_name, session_name, session_settings);
152  }
153 
154  std::string get_name() const override
155  {
156  return m_name;
157  }
158 
159  bool is_instance_ha_primary() const override
160  {
161  return m_is_ha_primary;
162  }
163 
169  FixSession* get_session(const std::string& session_name="") override
170  {
171  LLFIX_UNUSED(session_name);
172  return &m_session;
173  }
174 
175  void get_session_names(std::vector<std::string>& target) override
176  {
177  target.push_back(m_session.get_name());
178  }
179 
180  void initialise_thread()
181  {
182  if (m_settings.cpu_core_id >= 0)
183  {
184  if (ThreadUtilities::pin_calling_thread_to_cpu_core(m_settings.cpu_core_id) == 0)
185  {
186  LLFIX_LOG_INFO(m_name + " thread pinned to CPU core " + std::to_string(m_settings.cpu_core_id));
187  }
188  else
189  {
190  LLFIX_LOG_ERROR(m_name + " thread pinning to CPU core " + std::to_string(m_settings.cpu_core_id) + " failed");
191  }
192  }
193  }
194 
203  template<typename... Args>
204  void specify_repeating_group(Args... args)
205  {
206  FixSession::get_repeating_group_specs().specify_repeating_group(args...);
207  }
208 
209  #ifdef LLFIX_ENABLE_BINARY_FIELDS
210 
219  void specify_binary_field(const std::string& message_type, uint32_t tag_length, uint32_t tag_data)
220  {
221  FixSession::get_binary_field_specs().specify_binary_field(message_type, tag_length, tag_data);
222  }
223  #endif
224 
233  [[nodiscard]] bool start()
234  {
235  try
236  {
237  m_thread.reset(new std::thread(&FixClient::thread_function, this));
238  return true;
239  }
240  catch(...)
241  {}
242 
243  LLFIX_LOG_ERROR(m_name + " creation failed during thread creation");
244  return false;
245  }
246 
255  [[nodiscard]] bool connect()
256  {
257  if(m_session.get_state() == SessionState::DISABLED)
258  {
259  LLFIX_LOG_DEBUG("Cannot allow connection attempt as session " + m_session.get_name() + " is disabled.");
260  return false;
261  }
262 
263  if (m_session.is_now_valid_session_datetime() == false)
264  {
265  LLFIX_LOG_DEBUG("Cannot allow connection attempt for session " + m_session.get_name() + " due to schedule settings.");
266  return false;
267  }
268 
269  m_session.set_state(SessionState::PENDING_CONNECTION);
270 
271  m_session.reset_flags();
272 
273  TCPConnectorOptions options;
274 
275  options.m_rx_buffer_capacity = m_settings.rx_buffer_capacity;
276  options.m_receive_size = m_settings.receive_size;
277  options.m_send_try_count = m_settings.send_try_count;
278 
279  options.m_async_io_timeout_nanoseconds = m_settings.async_io_timeout_nanoseconds;
280  options.m_busy_poll_microseconds = m_settings.busy_poll_microseconds;
281  options.m_spin_count = m_settings.spin_count;
282 
283  options.m_nic_interface_name = m_settings.nic_name.c_str();
284  options.m_nic_interface_ip = m_settings.nic_address.c_str();
285 
286  options.m_socket_tx_size = m_settings.socket_tx_size;
287  options.m_socket_rx_size = m_settings.socket_rx_size;
288  options.m_nic_ringbuffer_rx_size = m_settings.nic_ringbuffer_rx_size;
289  options.m_nic_ringbuffer_tx_size = m_settings.nic_ringbuffer_tx_size;
290 
291  options.m_disable_nagle = m_settings.disable_nagle;
292  options.m_enable_quick_ack = m_settings.quick_ack;
293 
294  options.m_stack = m_settings.stack;
295  options.m_connect_timeout_seconds = m_settings.connect_timeout_seconds;
296 
297  #ifdef LLFIX_ENABLE_OPENSSL
298  options.m_use_ssl = m_settings.use_ssl;
299  options.m_ssl_verify_peer = m_settings.ssl_verify_peer;
300  options.m_ssl_ca_pem_file = m_settings.ssl_ca_pem_file;
301  options.m_ssl_cert_pem_file = m_settings.ssl_certificate_pem_file;
302  options.m_ssl_private_key_pem_file = m_settings.ssl_private_key_pem_file;
303  options.m_ssl_private_key_password = m_settings.ssl_private_key_password;
304  options.m_ssl_version = m_settings.ssl_version;
305  options.m_ssl_cipher_suite = m_settings.ssl_cipher_suite;
306  #endif
307 
308  Transport::set_params(options);
309 
310  bool success = Transport::connect(m_settings.primary_address.c_str(), m_settings.primary_port);
311  m_connected_to_primary = success;
312 
313  if (success == false)
314  {
315  // TRY SECONDARY CONNNECTION IF SET
316  if (m_settings.secondary_address.length() > 0 && m_settings.secondary_port > 0)
317  {
318  LLFIX_LOG_DEBUG(m_name + " : Primary connection failed. Trying secondary connection...");
319  success = Transport::connect(m_settings.secondary_address.c_str(), m_settings.secondary_port);
320  m_connected_to_secondary = success;
321  }
322  }
323 
324  if (success == true)
325  {
326  m_session.set_last_received_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic()); // This is to avoid sending unnecessary test request
327 
328  LLFIX_LOG_INFO(m_name + " : TCP connection established on " + (m_connected_to_primary?"primary":"secondary") );
329  m_session.set_state(SessionState::PENDING_LOGON);
330  m_last_logon_attempt_timestamp = VDSO::nanoseconds_monotonic();
331 
332  on_connection();
333 
334  if (send_logon_request() == false)
335  {
336  success = false;
337  }
338  }
339  else
340  {
341  m_session.set_state(SessionState::DISCONNECTED);
342  }
343 
344  return success;
345  }
346 
358  void process()
359  {
360  auto session_state = m_session.get_state();
361 
362  if(session_state > SessionState::DISCONNECTED) // In case of BSD sockets, calls on select may hang on some Linux distros if not connected
363  this->process_incoming_messages();
364 
365  process_admin_commands();
366 
367  if (is_state_live(session_state))
368  {
369  process_outgoing_resend_request_if_necessary(VDSO::nanoseconds_monotonic());
370  process_outgoing_test_request_if_necessary(VDSO::nanoseconds_monotonic());
371 
372  respond_to_resend_request_if_necessary();
373  respond_to_test_request_if_necessary();
374 
375  send_client_heartbeat_if_necessary(VDSO::nanoseconds_monotonic());
376 
377  process_schedule_validator();
378  }
379  else if(session_state == SessionState::PENDING_LOGON)
380  {
381  process_outgoing_logon_request(VDSO::nanoseconds_monotonic());
382  }
383  }
384 
385  void push_admin_command(const std::string& session_name, ModifyingAdminCommandType type, uint32_t arg = 0) override
386  {
387  LLFIX_UNUSED(session_name);
388  auto admin_command = new (std::nothrow) ModifyingAdminCommand;
389 
390  if (llfix_likely(admin_command))
391  {
392  admin_command->type = type;
393  admin_command->arg = arg;
394 
395  m_session.get_admin_commands()->push(admin_command);
396  }
397  else
398  {
399  LLFIX_LOG_ERROR("Failed to process setter admin command for FixClient " + m_name);
400  }
401  }
402 
411  void shutdown(bool graceful_shutdown = true)
412  {
413  if(is_threaded())
414  {
415  m_thread_grace_full_exit.store(graceful_shutdown);
416  m_is_exiting.store(true);
417  return;
418  }
419 
420  do_shutdown(graceful_shutdown);
421  }
422 
431  {
432  return m_session.get_outgoing_fix_message();
433  }
434 
443  {
444  std::size_t encoded_length = 0;
445  auto int_sequence_no = m_session.get_sequence_store()->get_outgoing_seq_no() + 1;
446 
447  message->encode(m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, int_sequence_no, encoded_length);
448 
449  return send_bytes<true>(m_session.get_tx_encode_buffer(), encoded_length);
450  }
451 
452  std::string get_settings_as_string(const std::string& delimiter) override
453  {
454  return m_settings.to_string(delimiter);
455  }
456 
457  FixClientSettings& get_settings() { return m_settings; }
458 
464  bool connected_to_primary() const { return m_connected_to_primary; }
465 
471  bool connected_to_secondary() const { return m_connected_to_secondary; }
473 
474  // Run method in case this class is maintaining the client thread
475  virtual void run() {};
476 
477  // TCP connection
481  virtual void on_connection() {};
482 
486  virtual void on_disconnection() {};
487 
488  virtual void on_async_io_error(int error_code, int event_result) override
489  {
490  LLFIX_UNUSED(error_code);
491  LLFIX_UNUSED(event_result);
492  }
493 
494  virtual void on_socket_error(int error_code) override
495  {
496  LLFIX_UNUSED(error_code);
497  }
498  // Logon messages
504  virtual void on_logon_response(const IncomingFixMessage* message) {LLFIX_UNUSED(message);};
510  virtual void on_logout_response(const IncomingFixMessage* message) {LLFIX_UNUSED(message);};
516  virtual void on_logon_reject(const IncomingFixMessage* message) {LLFIX_UNUSED(message);};
517  // Session/admin messages
521  virtual void on_server_heartbeat() {};
527  virtual void on_server_test_request(const IncomingFixMessage* message) {LLFIX_UNUSED(message);};
533  virtual void on_server_resend_request(const IncomingFixMessage* message) {LLFIX_UNUSED(message);};
534  // Application messages
540  virtual void on_execution_report(const IncomingFixMessage* message) {LLFIX_UNUSED(message);};
546  virtual void on_order_cancel_replace_reject(const IncomingFixMessage* message) {LLFIX_UNUSED(message);};
552  virtual void on_session_level_reject(const IncomingFixMessage* message) {LLFIX_UNUSED(message);};
558  virtual void on_application_level_reject(const IncomingFixMessage* message) {LLFIX_UNUSED(message);};
564  virtual void on_custom_message_type(const IncomingFixMessage* message) {LLFIX_UNUSED(message);};
566 
567  public:
568 
569  void on_connection_lost() override
570  {
571  m_session.set_state(SessionState::DISCONNECTED);
572  m_connected_to_primary = false;
573  m_connected_to_secondary = false;
575  }
576 
577  void on_data_ready() override
578  {
579  std::size_t read = this->receive();
580 
581  if (read > 0 && read <= m_settings.rx_buffer_capacity)
582  {
583  process_rx_buffer(this->get_rx_buffer(), this->get_rx_buffer_size());
584  }
585 
586  this->receive_done();
587  }
588 
595  {
596  assert(plugin);
597  m_message_persist_plugin = plugin;
598  }
599 
600  #ifdef LLFIX_AUTOMATION
601  // Used for testing outgoing msg resends and gap fills
602  bool send_fake_outgoing_message(OutgoingFixMessage* message)
603  {
604  auto int_sequence_no = m_session.get_sequence_store()->get_outgoing_seq_no() + 1;
605  std::size_t encoded_length = 0;
606 
607  message->encode(m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, int_sequence_no, encoded_length);
608 
609  auto sequence_store = m_session.get_sequence_store();
610  sequence_store->increment_outgoing_seq_no();
611 
612  if(m_session.serialisation_enabled())
613  m_session.get_outgoing_message_serialiser()->write(reinterpret_cast<const void*>(m_session.get_tx_encode_buffer()), encoded_length, true, sequence_store->get_outgoing_seq_no());
614 
615  m_session.set_last_sent_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
616 
617  return true;
618  }
619  #endif
620 
621  protected:
623  // ADMIN MESSAGES TO SERVER
629  virtual bool send_logon_request()
630  {
631  auto logon_request = outgoing_message_instance();
632  logon_request->set_msg_type(FixConstants::MSG_TYPE_LOGON);
633 
634  // TAG 98 ENCRYPTION METHOD
635  logon_request->set_tag(FixConstants::TAG_ENCRYPT_METHOD, 0);
636 
637  // TAG 108 HEARTBEAT INTERVAL
638  logon_request->set_tag(FixConstants::TAG_HEART_BT_INT, static_cast<uint32_t>(m_session.get_heartbeart_interval_in_nanoseconds() / 1'000'000'000));
639 
640  // TAG 1137 DEFAULT APP VER ID
641  auto default_app_ver_id = m_session.get_default_app_ver_id();
642 
643  if (default_app_ver_id.length() > 0)
644  {
645  logon_request->set_tag(FixConstants::TAG_DEFAULT_APPL_VER_ID, default_app_ver_id);
646  }
647 
648  // TAG 141 RESET SEQ NOS
649  if(m_session.logon_reset_sequence_numbers_flag())
650  {
651  m_session.get_sequence_store()->reset_numbers();
652  logon_request->set_tag(FixConstants::TAG_RESET_SEQ_NUM_FLAG, FixConstants::FIX_BOOLEAN_TRUE);
653  }
654 
655  // TAG 789 NEXT EXPECTED SEQ NO
656  if(m_session.logon_include_next_expected_seq_no())
657  {
658  const auto next_expected_seq_no = m_session.get_sequence_store()->get_incoming_seq_no()+1;
659  logon_request->set_tag(FixConstants::TAG_NEXT_EXPECTED_SEQ_NUM, next_expected_seq_no);
660  }
661 
662  // TAG 553 SESSION USER NAME
663  auto session_username = m_session.get_username();
664 
665  if (session_username.length() > 0)
666  {
667  logon_request->set_tag(FixConstants::TAG_USERNAME, session_username);
668  }
669 
670  // TAG 554 SESSION PASSWORD
671  auto session_password = m_session.get_password();
672 
673  if (session_password.length() > 0)
674  {
675  logon_request->set_tag(FixConstants::TAG_PASSWORD, session_password);
676  }
677 
678  // TAG 925 NEW PASSWORD
679  if (m_session.logon_message_new_password().length() > 0)
680  {
681  logon_request->set_tag(FixConstants::TAG_NEW_PASSWORD, m_session.logon_message_new_password());
682  }
683 
684  return send_outgoing_message(logon_request);
685  }
686 
692  virtual bool send_logout_request()
693  {
694  m_session.build_logout_message(outgoing_message_instance());
695  return send_outgoing_message(outgoing_message_instance());
696  }
697 
704  virtual bool send_client_heartbeat(FixString* test_request_id)
705  {
706  m_session.build_heartbeat_message(outgoing_message_instance(), test_request_id);
707  return send_outgoing_message(outgoing_message_instance());
708  }
709 
715  virtual bool send_test_request()
716  {
717  m_session.build_test_request_message(outgoing_message_instance());
718  return send_outgoing_message(outgoing_message_instance());
719  }
720 
726  virtual bool send_resend_request()
727  {
728  m_session.build_resend_request_message(outgoing_message_instance(), "0");
729  return send_outgoing_message(outgoing_message_instance());
730  }
731 
738  virtual bool send_sequence_reset_message(uint32_t desired_sequence_no)
739  {
740  /*
741  This message is a "hard" gap fill that doesn't respond to an incoming 35=2/resend request but to an internal admin request
742  Therefore we can't set 123=Y
743  */
744  auto message = outgoing_message_instance();
745  m_session.build_sequence_reset_message(message, desired_sequence_no);
746 
747  // ENCODE , NO NEED TO INCREMENT OUTGOING SEQ NO AS WE ARE RESETTING WITH THIS MESSAGE
748  std::size_t encoded_length = 0;
749  message->encode(m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, desired_sequence_no, encoded_length);
750 
751  // SEND
752  return send_bytes<false>(m_session.get_tx_encode_buffer(), encoded_length); // false is for not incrementing seq no for this message
753  }
754 
760  virtual bool send_gap_fill_message()
761  {
762  /*
763  This message is a gap fill that responds to an incoming 35=2/resend request.
764  For "hard" gap fills that don't set 123=Y, see send_sequence_reset_message
765  */
766  auto message = outgoing_message_instance();
767  m_session.build_gap_fill_message(message);
768 
769  // ENCODE , NO NEED TO INCREMENT OUTGOING SEQ NO AS WE ARE RESETTING WITH THIS MESSAGE
770  std::size_t encoded_length = 0;
771  message->encode(m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, m_session.get_incoming_resend_request_begin_no(), encoded_length); // We need to encode with seq no that the peer expects hence using m_incoming_resend_request_begin_no
772 
773  // SEND
774  return send_bytes<false>(m_session.get_tx_encode_buffer(), encoded_length); // false is for not incrementing seq no for this message
775  }
776 
777  virtual void resend_messages_to_server(uint32_t begin_seq_no, uint32_t end_seq_no)
778  {
779  for (uint32_t i = begin_seq_no; i <= end_seq_no; i++)
780  {
781  std::size_t message_length = 0;
782  m_session.get_outgoing_message_serialiser()->read_message(i, m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, message_length);
783 
784  auto message = outgoing_message_instance();
785  message->load_from_buffer(m_session.get_tx_encode_buffer(), message_length); // t122(orig sending time) will be handled by load_from_buffer
786 
787  message->template set_tag<FixMessageComponent::HEADER, bool>(FixConstants::TAG_POSS_DUP_FLAG, FixConstants::FIX_BOOLEAN_TRUE);
788 
789  if (m_session.include_t97_during_resends())
790  {
791  message->template set_tag<FixMessageComponent::HEADER, bool>(FixConstants::TAG_POSS_RESEND, FixConstants::FIX_BOOLEAN_TRUE);
792  }
793 
794  std::size_t encoded_length = 0;
795  message->encode(m_session.get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, i, encoded_length);
796  send_bytes<false>(m_session.get_tx_encode_buffer(), encoded_length); // false is for not incrementing seq no for this message
797  }
798  }
799 
800  virtual bool send_reject_message(const IncomingFixMessage& incoming_message, uint32_t reject_reason_code, const char* buffer_message, std::size_t buffer_message_length, uint32_t error_tag=0)
801  {
802  LLFIX_UNUSED(incoming_message);
803  char reject_reason_text[256];
804 
805  std::size_t text_length{ 0 };
806  FixUtilities::get_reject_reason_text(reject_reason_text, text_length, reject_reason_code);
807 
808  if (error_tag == 0)
809  {
810  LLFIX_LOG_DEBUG("FixClient " + m_name + " received an invalid message : " + std::string(reject_reason_text) + " ,message : " + FixUtilities::fix_to_human_readible(buffer_message, buffer_message_length));
811  }
812  else
813  {
814  LLFIX_LOG_DEBUG("FixClient " + m_name + " received an invalid message : " + std::string(reject_reason_text) + " ,tag : " + std::to_string(error_tag) + " ,message : " + FixUtilities::fix_to_human_readible(buffer_message, buffer_message_length));
815  }
816 
817  m_session.build_session_level_reject_message(outgoing_message_instance(), reject_reason_code, reject_reason_text, error_tag);
819  }
820 
821  std::atomic<bool> m_is_exiting = false;
822 
823  private:
824  LLFIX_ALIGN_DATA(AlignmentConstants::CPU_CACHE_LINE_SIZE) FixSession m_session;
825 
826  std::string m_name;
827  bool m_is_ha_primary = true;
828 
829  static inline constexpr std::size_t MINIMUM_REQUIRED_INITIAL_BUFFER_FOR_PARSING = 4; // 10=<DELIMITER>
830 
831  uint64_t m_last_logon_attempt_timestamp = 0;
832 
833  bool m_connected_to_primary = false;
834  bool m_connected_to_secondary = false;
835 
836  FixClientSettings m_settings;
837 
838  MessagePersistPlugin* m_message_persist_plugin = nullptr;
839  std::atomic<bool> m_thread_grace_full_exit = true;
840  std::unique_ptr<std::thread> m_thread;
841 
842  bool is_threaded() const { return m_thread.get() != nullptr;}
843 
844  void thread_function()
845  {
846  initialise_thread();
847 
848  while (true)
849  {
850  if (m_is_exiting.load() == true)
851  {
852  do_shutdown(m_thread_grace_full_exit.load());
853  break;
854  }
855 
856  run();
857  }
858  }
859 
860  bool create(const std::string& name, const std::string& session_name, const FixSessionSettings& session_settings)
861  {
862  m_name = name;
863  m_is_ha_primary = m_settings.starts_as_primary_instance;
864 
865  if (m_settings.validate() == false)
866  {
867  LLFIX_LOG_ERROR("FixClientSettings for " + m_name + " validation failed : " + m_settings.validation_error);
868  return false;
869  }
870 
871  session_settings.is_server = false;
872  session_settings.tx_encode_buffer_capacity = m_settings.tx_encode_buffer_capacity; // Propagate
873 
874  if (m_session.initialise(session_name, session_settings) == false)
875  {
876  LLFIX_LOG_ERROR(m_name + " creation failed during session initialisation");
877  return false;
878  }
879 
880  if (m_settings.starts_as_primary_instance == false)
881  {
882  disable_session();
883  }
884 
885  LLFIX_LOG_INFO(m_name + " : Loaded client config =>\n" + m_settings.to_string());
886 
887  LLFIX_LOG_INFO("FixClient " + m_name + " creation success");
888 
889  return true;
890  }
891 
892  void process_admin_commands()
893  {
894  ModifyingAdminCommand* admin_command{ nullptr };
895 
896  if (m_session.get_admin_commands()->try_pop(&admin_command) == true)
897  {
898  switch (admin_command->type)
899  {
900  case ModifyingAdminCommandType::SET_INCOMING_SEQUENCE_NUMBER:
901  {
902  m_session.get_sequence_store()->set_incoming_seq_no(admin_command->arg);
903  LLFIX_LOG_DEBUG(m_name + " : processed set incoming sequence number admin command , new seq no : " + std::to_string(admin_command->arg));
904  break;
905  }
906 
907  case ModifyingAdminCommandType::SET_OUTGOING_SEQUENCE_NUMBER:
908  {
909  m_session.get_sequence_store()->set_outgoing_seq_no(admin_command->arg);
910  LLFIX_LOG_DEBUG(m_name + " : processed set outgoing sequence number admin command , new seq no : " + std::to_string(admin_command->arg));
911  break;
912  }
913 
914  case ModifyingAdminCommandType::SEND_SEQUENCE_RESET:
915  {
916  if( m_session.get_state() == SessionState::LOGGED_ON )
917  {
918  send_sequence_reset_message(admin_command->arg);
919  LLFIX_LOG_DEBUG(m_name + " : processed send sequence reset admin command , new seq no : " + std::to_string(admin_command->arg));
920  }
921  else
922  {
923  LLFIX_LOG_ERROR(m_name + " : dropping send sequence reset admin command as not logged on");
924  }
925 
926  break;
927  }
928 
929  case ModifyingAdminCommandType::DISABLE_SESSION:
930  {
931  disable_session();
932  LLFIX_LOG_DEBUG(m_name + " : processed disable session admin command , new session state : Disabled");
933  break;
934  }
935 
936  case ModifyingAdminCommandType::ENABLE_SESSION:
937  {
938  if(m_is_ha_primary)
939  {
940  enable_session();
941  LLFIX_LOG_DEBUG(m_name + " : processed enable session admin command");
942  }
943  else
944  {
945  LLFIX_LOG_ERROR(m_name + " ignoring enable session admin command since not a primary instance");
946  }
947  break;
948  }
949 
950  case ModifyingAdminCommandType::SET_IS_HA_PRIMARY_INSTANCE:
951  {
952  m_is_ha_primary = admin_command->arg == 1 ? true : false;
953 
954  if(admin_command->arg == 1)
955  {
956  enable_session();
957  LLFIX_LOG_DEBUG(m_name + " : processed set is ha primary instance 1 admin command");
958  }
959  else
960  {
961  disable_session();
962  LLFIX_LOG_DEBUG(m_name + " : processed set is ha primary instance 0 admin command");
963  }
964 
965  break;
966  }
967 
968  default: break;
969  }
970 
971  delete admin_command;
972  }
973  }
974 
975  void disable_session()
976  {
977  if(is_state_live(m_session.get_state()))
978  {
979  do_shutdown(true);
980  }
981  m_session.set_state(SessionState::DISABLED);
982  }
983 
984  void enable_session()
985  {
986  if(m_session.get_state() == SessionState::DISABLED)
987  {
988  m_session.set_state(SessionState::DISCONNECTED);
989  if(m_settings.refresh_resend_cache_during_promotion)
990  m_session.reinitialise_outgoing_serialiser();
991  }
992  }
993 
994  void process_schedule_validator()
995  {
996  if (m_session.is_now_valid_session_datetime() == false)
997  {
998  LLFIX_LOG_DEBUG("FixClient " + m_name + " : terminating session due to schedule settings");
999  do_shutdown(true);
1000  }
1001  }
1002 
1003  void do_shutdown(bool graceful_shutdown = true)
1004  {
1005  auto call_time_state = m_session.get_state();
1006  LLFIX_LOG_DEBUG(m_name + " : shutdown called. Graceful shutdown param : " + (graceful_shutdown?"true":"false") );
1007 
1008  on_connection_lost();
1009 
1010  if (graceful_shutdown)
1011  {
1012  if (call_time_state == SessionState::LOGGED_ON)
1013  {
1014  m_session.set_received_logout_response(false);
1015 
1017  m_session.set_state(SessionState::PENDING_LOGOUT);
1018 
1019  auto start = std::chrono::steady_clock::now();
1020  const auto timeout = std::chrono::milliseconds(m_session.settings()->logout_timeout_seconds*1000);
1021 
1022  while(true)
1023  {
1024  this->process_incoming_messages();
1025 
1026  if(m_session.received_logout_response() == true)
1027  {
1028  break;
1029  }
1030 
1031  if (m_session.get_state() == SessionState::DISCONNECTED)
1032  {
1033  break;
1034  }
1035 
1036  auto now = std::chrono::steady_clock::now();
1037  if (now - start >= timeout)
1038  {
1039  LLFIX_LOG_DEBUG("FixClient " + m_name + " shutdown timed out before receiving logout response");
1040  break;
1041  }
1042  }
1043  }
1044  }
1045 
1046  this->close();
1047  m_session.set_state(SessionState::DISCONNECTED);
1048  }
1049 
1051  // TO SERVER
1052  template <bool increment_outgoing_seq_no>
1053  LLFIX_FORCE_INLINE bool send_bytes(const char* buffer, std::size_t buffer_size)
1054  {
1055  auto sequence_store = m_session.get_sequence_store();
1056 
1057  if(llfix_likely(m_session.settings()->throttle_limit != 0))
1058  {
1059  m_session.throttler()->update();
1060  m_session.throttler()->wait();
1061  }
1062 
1063  auto ret = this->send(buffer, buffer_size);
1064 
1065  if constexpr (increment_outgoing_seq_no == true)
1066  {
1067  if(ret == true)
1068  sequence_store->increment_outgoing_seq_no();
1069  }
1070 
1071  if(m_session.serialisation_enabled())
1072  m_session.get_outgoing_message_serialiser()->write(reinterpret_cast<const void*>(buffer), buffer_size, ret, sequence_store->get_outgoing_seq_no());
1073 
1074  if (m_message_persist_plugin)
1075  m_message_persist_plugin->persist_outgoing_message(m_session.get_name(), sequence_store->get_outgoing_seq_no(), buffer, buffer_size, ret);
1076 
1077  m_session.set_last_sent_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
1078 
1079  return ret;
1080  }
1081 
1083  // ADMIN UTILITY TO SERVER
1084  void process_outgoing_logon_request(uint64_t current_timestamp_nanoseconds)
1085  {
1086  auto delta_nanoseconds = current_timestamp_nanoseconds - m_last_logon_attempt_timestamp;
1087 
1088  if (delta_nanoseconds >= (static_cast<uint64_t>(m_session.settings()->logon_timeout_seconds) * 1'000'000'000))
1089  {
1090  m_session.set_state(SessionState::DISCONNECTED);
1091  }
1092  }
1093 
1094  void send_client_heartbeat_if_necessary(uint64_t current_timestamp_nanoseconds)
1095  {
1096  auto delta_nanoseconds = current_timestamp_nanoseconds - m_session.last_sent_message_timestamp_nanoseconds();
1097 
1098  if (delta_nanoseconds >= (m_session.get_heartbeart_interval_in_nanoseconds() * static_cast<uint64_t>(9) / static_cast<uint64_t>(10))) // 0.9 for safety
1099  {
1100  send_client_heartbeat(nullptr);
1101  }
1102  }
1103 
1104  void process_outgoing_test_request_if_necessary(uint64_t current_timestamp_nanoseconds)
1105  {
1106  if (m_session.expecting_response_for_outgoing_test_request() == false)
1107  {
1108  // SEND IF NECESSARY
1109  auto delta_nanoseconds = current_timestamp_nanoseconds - m_session.last_received_message_timestamp_nanoseconds();
1110 
1111  if (delta_nanoseconds >= (m_session.get_outgoing_test_request_interval_in_nanoseconds()))
1112  {
1113  LLFIX_LOG_DEBUG("FixClient " + m_name + " : sending test request");
1114  send_test_request();
1115  m_session.set_outgoing_test_request_timestamp_nanoseconds(current_timestamp_nanoseconds);
1116  m_session.set_expecting_response_for_outgoing_test_request(true);
1117  }
1118  }
1119  else
1120  {
1121  // TERMINATE SESSION IF NECESSARY
1122  auto delta_nanoseconds = current_timestamp_nanoseconds - m_session.outgoing_test_request_timestamp_nanoseconds();
1123 
1124  if (delta_nanoseconds >= (m_session.get_heartbeart_interval_in_nanoseconds() * static_cast<uint64_t>(2)))
1125  {
1126  m_session.set_expecting_response_for_outgoing_test_request(false);
1127  LLFIX_LOG_DEBUG("FixClient " + m_name + " : terminating session as the other end didn't respond to 35=1/test request");
1128  do_shutdown(false);
1129  }
1130  }
1131  }
1132 
1133  void process_outgoing_resend_request_if_necessary(uint64_t current_timestamp_nanoseconds)
1134  {
1135  if (m_session.get_state() == SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF)
1136  {
1137  // TERMINATE SESSION IF NECESSARY
1138  uint64_t delta_nanoseconds = current_timestamp_nanoseconds - m_session.outgoing_resend_request_timestamp_nanoseconds();
1139  uint64_t required_delta_nanoseconds = static_cast<uint64_t>(m_session.outgoing_resend_request_expire_secs()) * 1'000'000'000;
1140 
1141  if (delta_nanoseconds >= required_delta_nanoseconds)
1142  {
1143  // We need to terminate the session as the other side didn't respond properly to our outgoing resend request
1144  LLFIX_LOG_DEBUG("FixClient " + m_name + " : terminating session as the other end didn't respond to 35=2/resend request in pre-configured timeout (outgoing_resend_request_expire_secs) : " + std::to_string(m_session.outgoing_resend_request_expire_secs()));
1145  do_shutdown(false);
1146  }
1147  }
1148  else
1149  {
1150  // SEND IF NECESSARY
1151  if (m_session.needs_to_send_resend_request())
1152  {
1153  LLFIX_LOG_DEBUG("FixClient " + m_name + " : sending resend request , begin no : " + std::to_string(m_session.get_outgoing_resend_request_begin_no()));
1154  send_resend_request();
1155  m_session.set_outgoing_resend_request_timestamp_nanoseconds(current_timestamp_nanoseconds);
1156  m_session.set_state(llfix::SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF);
1157  m_session.set_needs_to_send_resend_request(false);
1158  }
1159  }
1160  }
1161 
1162  void respond_to_test_request_if_necessary()
1163  {
1164  if (m_session.needs_responding_to_incoming_test_request())
1165  {
1166  if (m_session.get_incoming_test_request_id()->length() > 0)
1167  {
1168  send_client_heartbeat(m_session.get_incoming_test_request_id());
1169  m_session.get_incoming_test_request_id()->set_length(0);
1170  }
1171  else
1172  {
1173  // We should not hit here , but better than risking disconnection
1174  send_client_heartbeat(nullptr);
1175  }
1176 
1177  LLFIX_LOG_DEBUG("FixClient " + m_name + " : responded to incoming test request");
1178  m_session.set_needs_responding_to_incoming_test_request(false);
1179  }
1180  }
1181 
1182  void respond_to_resend_request_if_necessary()
1183  {
1184  if (m_session.needs_responding_to_incoming_resend_request())
1185  {
1186  const auto last_outgoing_seq_no = m_session.get_sequence_store()->get_outgoing_seq_no();
1187 
1188  // Partial replays not supported therefore incoming end no should be either 0 or same as last outgoing seq no
1189  bool will_replay = (m_session.serialisation_enabled()) && (m_session.replay_messages_on_incoming_resend_request() && (m_session.get_incoming_resend_request_end_no() == 0
1190  || m_session.get_incoming_resend_request_end_no() == last_outgoing_seq_no) && m_session.get_incoming_resend_request_begin_no()<=last_outgoing_seq_no);
1191 
1192  if(last_outgoing_seq_no > m_session.get_incoming_resend_request_begin_no())
1193  if(last_outgoing_seq_no-m_session.get_incoming_resend_request_begin_no()+1 > m_session.max_resend_range())
1194  will_replay = false;
1195 
1196  if(will_replay)
1197  {
1198  for (uint32_t i = m_session.get_incoming_resend_request_begin_no(); i <= last_outgoing_seq_no; i++)
1199  {
1200  if (m_session.get_outgoing_message_serialiser()->has_message_in_memory(i) == false)
1201  {
1202  will_replay = false; // We don't have all the messages
1203  break;
1204  }
1205  }
1206  }
1207 
1208  if(will_replay)
1209  {
1210  LLFIX_LOG_DEBUG("FixClient " + m_name + " : replaying messages , begin seq no : " + std::to_string(m_session.get_incoming_resend_request_begin_no()) + " , end seq no : " + std::to_string(last_outgoing_seq_no));
1211  resend_messages_to_server(m_session.get_incoming_resend_request_begin_no(), last_outgoing_seq_no);
1212  }
1213  else
1214  {
1215  LLFIX_LOG_DEBUG("FixClient " + m_name + " : sending gap fill message");
1216  send_gap_fill_message();
1217  }
1218 
1219  m_session.set_needs_responding_to_incoming_resend_request(false);
1220  m_session.set_state(llfix::SessionState::LOGGED_ON);
1221  }
1222  }
1223 
1225  // FROM SERVER
1226  #if defined(LLFIX_BENCHMARK) || defined(LLFIX_UNIT_TEST)
1227  public:
1228  #else
1229  private:
1230  #endif
1231  LLFIX_HOT void process_rx_buffer(char* buffer, std::size_t buffer_size)
1232  {
1233  std::size_t buffer_read_index = 0;
1234 
1235  if (llfix_unlikely(buffer_size < MINIMUM_REQUIRED_INITIAL_BUFFER_FOR_PARSING))
1236  {
1237  this->set_incomplete_buffer(buffer, buffer_size);
1238  return;
1239  }
1241  // FIND OUT THE END
1242  int final_tag10_delimiter_index{ -1 };
1243  int current_index = static_cast<int>(buffer_size - 1);
1244 
1245  if(llfix_unlikely(FixUtilities::find_delimiter_from_end(buffer, buffer_size, current_index) == false))
1246  {
1247  // We don't have the entire message
1248  this->set_incomplete_buffer(buffer, buffer_size);
1249  return;
1250  }
1251 
1252  FixUtilities::find_tag10_start_from_end(buffer, buffer_size, current_index, final_tag10_delimiter_index);
1253 
1254  if (final_tag10_delimiter_index == -1)
1255  {
1256  // We don't have the entire message
1257  this->set_incomplete_buffer(buffer, buffer_size);
1258  return;
1259  }
1260 
1262  // AT THIS POINT WE HAVE AT LEAST ONE COMPLETE MESSAGE
1263  LLFIX_ALIGN_CODE_32;
1264  while (true)
1265  {
1267  // FIND OUT THE BEGIN STRING
1268  int begin_string_offset{-1};
1269  FixUtilities::find_begin_string_position(buffer+buffer_read_index, buffer_size-buffer_read_index, begin_string_offset);
1270 
1271  if(llfix_likely(begin_string_offset>=0))
1272  {
1273  buffer_read_index += begin_string_offset;
1274  }
1275  else
1276  {
1277  LLFIX_LOG_ERROR(m_name + " received a message with no begin string : " + FixUtilities::fix_to_human_readible(buffer+buffer_read_index, buffer_size-buffer_read_index));
1278  return;
1279  }
1281  m_session.get_incoming_fix_message()->reset();
1282  m_session.get_fix_string_view_cache()->reset_pointer();
1283 
1284  current_index = static_cast<int>(buffer_read_index);
1285  bool looking_for_equals = true;
1286 
1287  int current_tag_start = static_cast<int>(buffer_read_index);
1288  int current_tag_len{ 0 };
1289 
1290  int current_value_start = static_cast<int>(buffer_read_index);
1291  int current_value_len{ 0 };
1292 
1293  int current_tag_10_delimiter_index = -1;
1294  int current_tag_35_tag_start_index = -1;
1295 
1296  uint32_t parser_reject_code = static_cast<uint32_t>(-1);
1297  uint32_t current_tag_index = 0;
1298 
1299  uint32_t encoded_current_message_type = static_cast<uint32_t>(-1);
1300 
1301  bool in_a_repeating_group{ false };
1302  uint32_t current_rg_count_tag{ 0 };
1303 
1304  #ifdef LLFIX_ENABLE_BINARY_FIELDS
1305  int binary_field_length = 0;
1306  int binary_field_counter = 0;
1307  #endif
1308 
1309  LLFIX_ALIGN_CODE_32;
1310  while (true)
1311  {
1312  char current_char = buffer[current_index];
1313 
1314  if (looking_for_equals)
1315  {
1316  if (current_char == FixConstants::FIX_EQUALS)
1317  {
1318  current_tag_len = current_index - current_tag_start;
1319  current_value_start = current_index + 1;
1320  looking_for_equals = false;
1321  }
1322  else if (llfix_unlikely(current_char == FixConstants::FIX_DELIMITER))
1323  {
1324  current_tag_start = current_index + 1;
1325  parser_reject_code = FixParserErrorCodes::NO_EQUALS_SIGN;
1326  }
1327  }
1328  #ifdef LLFIX_ENABLE_BINARY_FIELDS
1329  else
1330  {
1331  if(llfix_unlikely(binary_field_length>0))
1332  if(binary_field_counter < binary_field_length)
1333  binary_field_counter++;
1334  }
1335 
1336  if (looking_for_equals == false && current_char == FixConstants::FIX_DELIMITER)
1337  {
1338  bool reached_value_end = true;
1339 
1340  if(llfix_unlikely(binary_field_length>0))
1341  {
1342  if(binary_field_length>binary_field_counter)
1343  {
1344  reached_value_end = false;
1345  }
1346  else if(binary_field_length == binary_field_counter)
1347  {
1348  binary_field_length=0;
1349  binary_field_counter=0;
1350  }
1351  }
1352 
1353  if(llfix_likely(reached_value_end))
1354  {
1355 
1356  #else
1357  else if (current_char == FixConstants::FIX_DELIMITER)
1358  {
1359  #endif
1360  current_value_len = current_index - current_value_start;
1361 
1362  if(llfix_unlikely(current_value_len==0))
1363  {
1364  parser_reject_code = FixConstants::FIX_ERROR_CODE_TAG_WITHOUT_VALUE;
1365  }
1366 
1367  bool is_current_tag_numeric{true};
1368  FixSession::validate_tag_format(buffer+current_tag_start, static_cast<std::size_t>(current_tag_len), is_current_tag_numeric, parser_reject_code);
1369 
1370  if(llfix_likely(is_current_tag_numeric))
1371  {
1373  // RECORD THE CURRENT TAG VALUE PAIR
1374  uint32_t tag = Converters::chars_to_unsigned_int<uint32_t>(buffer+ current_tag_start, static_cast<std::size_t>(current_tag_len));
1375 
1376  if(llfix_unlikely(tag == 0))
1377  {
1378  parser_reject_code = FixConstants::FIX_ERROR_CODE_INVALID_TAG_NUMBER;
1379  }
1380 
1381  FixStringView* value = m_session.get_fix_string_view_cache()->allocate();
1382  value->set_buffer(const_cast<char*>(buffer + current_value_start), static_cast<std::size_t>(current_value_len));
1383 
1385  current_tag_index++;
1386  FixSession::validate_header_tags_order(tag, current_tag_index, parser_reject_code);
1388 
1389  // Some tags may be used as a group member but also as an individual tag so we can't fully rely on is_a_repeating_group_tag
1390  // and need to detect start and end of a group
1391  if (encoded_current_message_type != static_cast<uint32_t>(-1))
1392  {
1393  if (llfix_likely(!in_a_repeating_group))
1394  {
1395  if (llfix_unlikely(FixSession::get_repeating_group_specs().is_a_repeating_group_count_tag(encoded_current_message_type, tag)))
1396  {
1397  current_rg_count_tag = tag;
1398  in_a_repeating_group = true;
1399  }
1400  }
1401  else
1402  {
1403  if (llfix_likely(FixSession::get_repeating_group_specs().is_a_repeating_group_tag(encoded_current_message_type, current_rg_count_tag, tag) == false))
1404  {
1405  in_a_repeating_group = false;
1406  }
1407  }
1408 
1409  #ifdef LLFIX_ENABLE_BINARY_FIELDS
1410  if (llfix_unlikely(FixSession::get_binary_field_specs().is_binary_data_length_tag(encoded_current_message_type, tag)))
1411  {
1412  binary_field_length = Converters::chars_to_int<int>(value->data(), value->length());
1413  }
1414  #endif
1415  }
1416 
1417  if (llfix_likely(in_a_repeating_group == false))
1418  {
1419  if(llfix_likely(m_session.get_incoming_fix_message()->has_tag(tag) == false))
1420  {
1421  m_session.get_incoming_fix_message()->set_tag(tag, value);
1422  }
1423  else
1424  {
1425  parser_reject_code = FixConstants::FIX_ERROR_CODE_TAG_APPEARS_MORE_THAN_ONCE;
1426  }
1427  }
1428  else
1429  {
1430  m_session.get_incoming_fix_message()->set_repeating_group_tag(tag, value);
1431  }
1432 
1433  if (tag == FixConstants::TAG_CHECKSUM)
1434  {
1435  current_tag_10_delimiter_index = current_index;
1437  FixSession::validate_tag9_and_tag35(m_session.get_incoming_fix_message(), current_tag_start, current_tag_35_tag_start_index, parser_reject_code);
1439  break;
1440  }
1441  else if (tag == FixConstants::TAG_MSG_TYPE)
1442  {
1443  current_tag_35_tag_start_index = current_tag_start;
1444  encoded_current_message_type = FixUtilities::pack_message_type(value->to_string_view());
1445  }
1446  } // if(llfix_likely(is_current_tag_numeric))
1447 
1448  current_tag_start = current_index + 1;
1449  looking_for_equals = true;
1450 
1451  #ifdef LLFIX_ENABLE_BINARY_FIELDS
1452  } // if(llfix_likely(reached_value_end))
1453  #endif
1454  } // else if (current_char == FixConstants::FIX_DELIMITER) or if (looking_for_equals == false && current_char == FixConstants::FIX_DELIMITER)
1455 
1456  // Apart from the check below (else if (static_cast<int>(buffer_read_index) > final_tag10_delimiter_index)),
1457  // we also need to check here if found last tag 10 delimiter is before the found tag8
1458  if (current_index >= static_cast<int>(buffer_size) - 1)
1459  {
1460  this->set_incomplete_buffer(buffer + buffer_read_index, buffer_size - buffer_read_index);
1461  return;
1462  }
1463 
1464  current_index++;
1465  }// while true
1467  auto current_message_length = current_tag_10_delimiter_index+1-buffer_read_index;
1468  process_incoming_fix_message(m_session.get_incoming_fix_message(), buffer+ buffer_read_index, current_message_length, parser_reject_code);
1469 
1470  #ifndef LLFIX_UNIT_TEST
1471  if(llfix_unlikely(m_session.get_state() == SessionState::DISCONNECTED)) // process_incoming_fix_message may terminate connection
1472  {
1473  return;
1474  }
1475  #endif
1476 
1477  buffer_read_index = current_tag_10_delimiter_index + 1;
1478 
1479  if (current_tag_10_delimiter_index == static_cast<int>(buffer_size) - 1)
1480  {
1481  this->reset_incomplete_buffer();
1482  return;
1483  }
1484  else if (static_cast<int>(buffer_read_index)> final_tag10_delimiter_index)
1485  {
1486  #ifdef LLFIX_UNIT_TEST
1487  this->m_incomplete_buffer = static_cast<char*>(malloc(65536));
1488  #endif
1489  this->set_incomplete_buffer(buffer + buffer_read_index, buffer_size - buffer_read_index);
1490  return;
1491  }
1492  }
1493  }
1494 
1495  void process_incoming_fix_message(IncomingFixMessage* incoming_message, const char* buffer_message, std::size_t buffer_message_length, uint32_t parser_reject_code)
1496  {
1497  m_session.set_last_received_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
1498 
1499  if(m_session.serialisation_enabled())
1500  m_session.get_incoming_message_serialiser()->write(buffer_message, buffer_message_length, true);
1501 
1502  if(llfix_unlikely(m_session.expecting_response_for_outgoing_test_request()))
1503  {
1504  // We are permissive , any message not just 35=0 with expected t112, satisfies our outgoing test request
1505  m_session.set_expecting_response_for_outgoing_test_request(false);
1506  LLFIX_LOG_DEBUG(m_name + " : other end satisfied the test request");
1507  }
1509  if(m_session.validations_enabled())
1510  {
1511  uint32_t reject_message_code = static_cast<uint32_t>(-1);
1512 
1513  if (m_session.validate_fix_message(*m_session.get_incoming_fix_message(), buffer_message, buffer_message_length, parser_reject_code, reject_message_code) == false)
1514  {
1515  if (reject_message_code != static_cast<uint32_t>(-1))
1516  {
1517  send_reject_message(*m_session.get_incoming_fix_message(), reject_message_code, buffer_message, buffer_message_length, m_session.get_last_error_tag());
1518  }
1519 
1520  return;
1521  }
1522  }
1524  // SEQUENCE NO & MESSAGE PERSIST PLUGIN
1525  auto sequence_store = m_session.get_sequence_store();
1526  sequence_store->increment_incoming_seq_no();
1527  auto sequence_store_incoming_seq_no = sequence_store->get_incoming_seq_no();
1528 
1529  if (m_message_persist_plugin)
1530  m_message_persist_plugin->persist_incoming_message(m_session.get_name(), sequence_store_incoming_seq_no, buffer_message, buffer_message_length);
1531 
1533  // SEQUENCE NO CHECKS
1534  auto incoming_seq_no = incoming_message->get_tag_value_as<uint32_t>(FixConstants::TAG_MSG_SEQ_NUM);
1535 
1536  if (llfix_unlikely(incoming_seq_no > sequence_store_incoming_seq_no))
1537  {
1538  if(FixSession::is_a_hard_sequence_reset_message(*incoming_message) == false)
1539  {
1540  m_session.queue_outgoing_resend_request(sequence_store_incoming_seq_no, incoming_seq_no);
1541  return;
1542  }
1543  }
1544  else if (llfix_unlikely(incoming_seq_no < sequence_store_incoming_seq_no))
1545  {
1546  if(FixSession::is_a_hard_sequence_reset_message(*incoming_message) == false)
1547  {
1548  sequence_store->set_incoming_seq_no(sequence_store_incoming_seq_no-1);
1549  LLFIX_LOG_DEBUG("FixClient " + m_name + " : terminating session as the incoming sequence no (" + std::to_string(incoming_seq_no) + ") is lower than expected ("+ std::to_string(sequence_store_incoming_seq_no) + ")");
1550  do_shutdown(false);
1551  return;
1552  }
1553  }
1554 
1555  if(llfix_unlikely(m_session.get_state() == SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF))
1556  {
1557  if(incoming_seq_no == m_session.get_outgoing_resend_request_end_no())
1558  {
1559  m_session.set_state(llfix::SessionState::LOGGED_ON);
1560  LLFIX_LOG_DEBUG(m_name + " : other end satisfied the resend request");
1561  }
1562  }
1564  auto message_type = incoming_message->get_tag_value(FixConstants::TAG_MSG_TYPE);
1565 
1566  if (llfix_likely(message_type->length() == 1))
1567  {
1568  switch (message_type->data()[0])
1569  {
1570  case FixConstants::MSG_TYPE_EXECUTION_REPORT: on_execution_report(incoming_message); break;
1571  case FixConstants::MSG_TYPE_HEARTBEAT: on_server_heartbeat(); break;
1572  case FixConstants::MSG_TYPE_TEST_REQUEST: m_session.process_test_request_message(*incoming_message); on_server_test_request(incoming_message); break;
1573  case FixConstants::MSG_TYPE_RESEND_REQUEST: m_session.process_resend_request(*incoming_message); on_server_resend_request(incoming_message); break;
1574  case FixConstants::MSG_TYPE_ORDER_CANCEL_REJECT: on_order_cancel_replace_reject(incoming_message); break;
1575  case FixConstants::MSG_TYPE_REJECT: process_session_level_reject(incoming_message); break;
1576  case FixConstants::MSG_TYPE_BUSINESS_REJECT: process_application_level_reject(incoming_message); break;
1577  case FixConstants::MSG_TYPE_LOGON: process_logon_response(incoming_message); break;
1578  case FixConstants::MSG_TYPE_LOGOUT: process_logout_response(incoming_message); break;
1579  case FixConstants::MSG_TYPE_SEQUENCE_RESET: if (m_session.process_incoming_sequence_reset_message(*incoming_message) == false) { send_reject_message(*incoming_message, FixConstants::FIX_ERROR_CODE_VALUE_INCORRECT_FOR_TAG, buffer_message, buffer_message_length, FixConstants::TAG_NEW_SEQ_NO); }; break;
1580  // Anything else
1581  default: on_custom_message_type(incoming_message); break;
1582  }
1583  }
1584  else
1585  {
1586  on_custom_message_type(incoming_message);
1587  }
1588  }
1589 
1590  void process_session_level_reject(const IncomingFixMessage* message)
1591  {
1592  if (m_session.get_state() == SessionState::PENDING_LOGON)
1593  {
1594  process_logon_reject(message);
1595  }
1596  else
1597  {
1598  on_session_level_reject(message);
1599  }
1600  }
1601 
1602  void process_application_level_reject(const IncomingFixMessage* message)
1603  {
1604  if (m_session.get_state() == SessionState::PENDING_LOGON)
1605  {
1606  process_logon_reject(message);
1607  }
1608  else
1609  {
1610  on_application_level_reject(message);
1611  }
1612  }
1613 
1614  void process_logon_response(const IncomingFixMessage* logon_response)
1615  {
1616  m_session.set_state(SessionState::LOGGED_ON);
1617  this->on_logon_response(logon_response);
1618  }
1619 
1620  void process_logout_response(const IncomingFixMessage* logout_response)
1621  {
1622  auto state = m_session.get_state();
1623 
1624  if (state == SessionState::PENDING_LOGON)
1625  {
1626  process_logon_reject(logout_response);
1627  }
1628  else
1629  {
1630  m_session.set_received_logout_response(true);
1631  m_session.set_state(SessionState::LOGGED_OUT);
1632  this->on_logout_response(logout_response);
1633  }
1634  }
1635 
1636  void process_logon_reject(const IncomingFixMessage* logon_reject)
1637  {
1638  m_session.set_state(SessionState::LOGON_REJECTED);
1639 
1640  if (logon_reject->has_tag(FixConstants::TAG_TEXT))
1641  {
1642  [[maybe_unused]] auto reject_message = logon_reject->get_tag_value(FixConstants::TAG_TEXT);
1643  LLFIX_LOG_DEBUG("FixClient " + m_name + " logon message rejected : " + reject_message->to_string());
1644  }
1645  else
1646  {
1647  LLFIX_LOG_DEBUG("FixClient " + m_name + " logon message rejected with no reason text/tag58.");
1648  }
1649 
1650  on_logon_reject(logon_reject);
1651  }
1652 
1653  FixClient(const FixClient& other) = delete;
1654  FixClient& operator= (const FixClient& other) = delete;
1655  FixClient(FixClient&& other) = delete;
1656  FixClient& operator=(FixClient&& other) = delete;
1657 };
1658 
1659 } // namespace
llfix::OutgoingFixMessage
FIX message builder and encoder for outbound messages.
Definition: outgoing_fix_message.h:99
llfix::FixClient::on_server_test_request
virtual void on_server_test_request(const IncomingFixMessage *message)
Called upon receiving a FIX Test Request.
Definition: fix_client.h:527
llfix::FixClient::shutdown
void shutdown(bool graceful_shutdown=true)
Shutdown the FIX client.
Definition: fix_client.h:411
llfix::FixClient::send_logout_request
virtual bool send_logout_request()
Send FIX Logout request.
Definition: fix_client.h:692
llfix::FixClient::connected_to_primary
bool connected_to_primary() const
Check if the client is connected to the primary endpoint.
Definition: fix_client.h:464
llfix::FixClient::send_logon_request
virtual bool send_logon_request()
Send FIX Logon request.
Definition: fix_client.h:629
llfix::FixClient
FIX client implementation.
Definition: fix_client.h:95
llfix::FixClient::on_logon_reject
virtual void on_logon_reject(const IncomingFixMessage *message)
Called when a FIX Logon is rejected.
Definition: fix_client.h:516
llfix::FixClient::set_message_persist_plugin
void set_message_persist_plugin(MessagePersistPlugin *plugin)
Set the message persistence plugin.
Definition: fix_client.h:594
llfix::FixClient::process
void process()
Process incoming and outgoing FIX protocol activity.
Definition: fix_client.h:358
llfix::FixClient::on_logon_response
virtual void on_logon_response(const IncomingFixMessage *message)
Called upon receiving a successful FIX Logon response.
Definition: fix_client.h:504
llfix::FixClient::on_connection
virtual void on_connection()
Called when a TCP connection is successfully established.
Definition: fix_client.h:481
llfix::FixClient::create
bool create(const std::string &client_name, const FixClientSettings &settings, const std::string &session_name, const FixSessionSettings &session_settings)
Create and initialise a FIX client instance.
Definition: fix_client.h:120
llfix::FixSession
Represents a single FIX protocol session.
Definition: fix_session.h:95
llfix::FixClient::on_server_heartbeat
virtual void on_server_heartbeat()
Called upon receiving a FIX Heartbeat from the server.
Definition: fix_client.h:521
llfix::FixClient::connect
bool connect()
Establish a FIX network connection and initiate logon.
Definition: fix_client.h:255
llfix::FixClient::on_custom_message_type
virtual void on_custom_message_type(const IncomingFixMessage *message)
Called for other FIX message types.
Definition: fix_client.h:564
llfix::FixClient::get_session
FixSession * get_session(const std::string &session_name="") override
Retrieve the FIX session managed by this client.
Definition: fix_client.h:169
llfix::FixClient::outgoing_message_instance
OutgoingFixMessage * outgoing_message_instance()
Obtain a reusable outgoing FIX message instance.
Definition: fix_client.h:430
llfix::FixClient::on_logout_response
virtual void on_logout_response(const IncomingFixMessage *message)
Called upon receiving a FIX Logout response.
Definition: fix_client.h:510
llfix::FixClient::on_order_cancel_replace_reject
virtual void on_order_cancel_replace_reject(const IncomingFixMessage *message)
Called when an Order Cancel/Replace Reject (35=9) is received.
Definition: fix_client.h:546
llfix::FixClient::on_session_level_reject
virtual void on_session_level_reject(const IncomingFixMessage *message)
Called on session-level FIX rejects.
Definition: fix_client.h:552
llfix::FixClient::connected_to_secondary
bool connected_to_secondary() const
Check if the client is connected to the secondary endpoint.
Definition: fix_client.h:471
llfix::IncomingFixMessage
Represents a parsed incoming FIX message.
Definition: incoming_fix_message.h:66
llfix::FixClient::create
bool create(const std::string &client_config_file_path, const std::string &client_name, const std::string &session_config_file_path, const std::string &session_name)
Create and initialise a FIX client using configuration files.
Definition: fix_client.h:136
llfix::FixUtilities::fix_to_human_readible
static std::string fix_to_human_readible(const char *buffer, std::size_t buffer_length)
Converts a FIX message buffer to a human-readable string.
Definition: fix_utilities.h:108
llfix::MessagePersistPlugin
Interface for custom FIX message persistence plugins.
Definition: message_persist_plugin.h:49
llfix::FixClient::on_execution_report
virtual void on_execution_report(const IncomingFixMessage *message)
Called when an Execution Report (35=8) is received.
Definition: fix_client.h:540
llfix::FixClient::send_outgoing_message
virtual bool send_outgoing_message(OutgoingFixMessage *message)
Encode and send an outgoing FIX message.
Definition: fix_client.h:442
llfix::FixClient::send_sequence_reset_message
virtual bool send_sequence_reset_message(uint32_t desired_sequence_no)
Send FIX Sequence Reset message.
Definition: fix_client.h:738
llfix::FixClient::on_server_resend_request
virtual void on_server_resend_request(const IncomingFixMessage *message)
Called upon receiving a FIX Resend Request.
Definition: fix_client.h:533
llfix::FixClient::on_application_level_reject
virtual void on_application_level_reject(const IncomingFixMessage *message)
Called on application-level FIX rejects.
Definition: fix_client.h:558
llfix::FixClient::specify_repeating_group
void specify_repeating_group(Args... args)
Specify repeating group definitions for incoming FIX messages.
Definition: fix_client.h:204
llfix::FixClient::on_disconnection
virtual void on_disconnection()
Called when the TCP connection is lost or closed.
Definition: fix_client.h:486
llfix::FixClient::start
bool start()
Start the client execution thread.
Definition: fix_client.h:233