llfix
Low-latency FIX engine
fix_server.h
1 // DISCLAIMER_PLACEHOLDER
2 #pragma once
3 
4 #include "common.h"
5 
6 #include <cassert>
7 #include <cstdint>
8 #include <cstddef>
9 #include <string>
10 #include <string_view>
11 #include <cstdio>
12 #include <vector>
13 #include <unordered_map>
14 #include <functional>
15 #include <mutex> // for std::lock_guard
16 #include <new>
17 
18 #include "core/compiler/hints_branch_predictor.h"
19 #include "core/compiler/hints_hot_code.h"
20 #include "core/compiler/unused.h"
21 #include "core/os/vdso.h"
22 #include "core/os/thread_local_storage.h"
23 
24 #include "core/utilities/configuration.h"
25 #include "core/utilities/logger.h"
26 #include "core/utilities/object_cache.h"
27 #include "core/utilities/std_string_utilities.h"
28 #include "core/utilities/userspace_spinlock.h"
29 
30 #include "electronic_trading/common/message_persist_plugin.h"
31 #include "electronic_trading/managed_instance/modifying_admin_command.h"
32 #include "electronic_trading/managed_instance/managed_instance.h"
33 
34 #include "fix_constants.h"
35 #include "fix_string_view.h"
36 #include "fix_string.h"
37 #include "fix_session_settings.h"
38 #include "fix_utilities.h"
39 #include "incoming_fix_message.h"
40 #include "outgoing_fix_message.h"
41 #include "fix_parser_error_codes.h"
42 #include "fix_session.h"
43 #include "fix_server_settings.h"
44 
45 #include "core/utilities/tcp_reactor_options.h"
46 
47 namespace llfix
48 {
49 
50 // Thread local
51 struct UnknownSessionContext
52 {
53  IncomingFixMessage m_incoming_fix_message;
54  ObjectCache<FixStringView> m_fix_string_view_cache;
55 };
56 
57 class FixServerConnectors
58 {
59  public:
60  FixServerConnectors()
61  {
62  m_tables_lock.initialise();
63  m_peer_index_session_table.reserve(1024);
64  m_session_peer_index_table.reserve(1024);
65  }
66 
67  bool has_peer(std::size_t peer_index) const
68  {
69  const std::lock_guard<UserspaceSpinlock<>> lock(m_tables_lock);
70 
71  if (m_peer_index_session_table.find(peer_index) == m_peer_index_session_table.end())
72  {
73  return false;
74  }
75 
76  return true;
77  }
78 
79  void update(std::size_t peer_index, FixSession* session)
80  {
81  const std::lock_guard<UserspaceSpinlock<>> lock(m_tables_lock);
82 
83  // In concurrent scenarios, disconnection for an existing item can happen before its connection as they will happen on different threads
84  if (m_session_peer_index_table.find(session) != m_session_peer_index_table.end())
85  {
86  auto existing_peer_index = m_session_peer_index_table[session];
87 
88  if (existing_peer_index != peer_index)
89  {
90  m_peer_index_session_table.erase(existing_peer_index);
91  }
92  }
93 
94  m_peer_index_session_table[peer_index] = session;
95  m_session_peer_index_table[session] = peer_index;
96  }
97 
98  FixSession* get_session(std::size_t peer_index)
99  {
100  const std::lock_guard<UserspaceSpinlock<>> lock(m_tables_lock);
101 
102  if (m_peer_index_session_table.find(peer_index) == m_peer_index_session_table.end())
103  {
104  return nullptr;
105  }
106 
107  return m_peer_index_session_table[peer_index];
108  }
109 
110  std::size_t get_peer_index(FixSession* session)
111  {
112  const std::lock_guard<UserspaceSpinlock<>> lock(m_tables_lock);
113 
114  if (m_session_peer_index_table.find(session) == m_session_peer_index_table.end())
115  {
116  return static_cast<std::size_t>(-1);
117  }
118 
119  return m_session_peer_index_table[session];
120  }
121 
122  void remove(std::size_t peer_index, FixSession* session)
123  {
124  const std::lock_guard<UserspaceSpinlock<>> lock(m_tables_lock);
125 
126  m_peer_index_session_table.erase(peer_index);
127  m_session_peer_index_table.erase(session);
128  }
129 
130  private:
131  std::unordered_map<std::size_t, FixSession*> m_peer_index_session_table;
132  std::unordered_map<FixSession*, std::size_t> m_session_peer_index_table;
133  mutable UserspaceSpinlock<> m_tables_lock;
134 };
135 
145 template<typename Transport>
146 class FixServer : public Transport, public ManagedInstance
147 {
148  public:
149  FixServer() = default;
150 
151  virtual ~FixServer()
152  {
153  for (auto& entry : m_sessions)
154  {
155  delete entry.second;
156  }
157  }
158 
167  [[nodiscard]] bool create(const std::string& server_name, const std::string server_config_file_path)
168  {
169  m_name = server_name;
170 
171  if (m_settings.load_from_config_file(server_config_file_path, server_name) == false)
172  {
173  LLFIX_LOG_ERROR("Loading settings for server " + server_name + " failed : " + m_settings.config_load_error);
174  return false;
175  }
176 
177  if (m_settings.validate() == false)
178  {
179  LLFIX_LOG_ERROR("FixServerSettings for " + m_name + " validation failed : " + m_settings.validation_error);
180  return false;
181  }
182 
183  m_is_ha_primary = m_settings.starts_as_primary_instance;
184  m_unknown_session_context_lock.initialise();
185 
186  TCPReactorOptions reactor_options;
187 
188  reactor_options.m_accept_timeout_seconds = m_settings.accept_timeout_seconds;
189  reactor_options.m_async_io_timeout_nanoseconds = m_settings.async_io_timeout_nanoseconds;
190  reactor_options.m_busy_poll_microseconds = m_settings.busy_poll_microseconds;
191  reactor_options.m_cpu_core_id = m_settings.cpu_core_id;
192  reactor_options.m_worker_thread_count = m_settings.worker_thread_count;
193  reactor_options.m_send_try_count = m_settings.send_try_count;
194  reactor_options.m_disable_nagle = m_settings.disable_nagle;
195  reactor_options.m_enable_quick_ack = m_settings.quick_ack;
196  reactor_options.m_max_poll_events = m_settings.max_poll_events;
197  reactor_options.m_nic_interface_ip = m_settings.nic_address;
198  reactor_options.m_nic_interface_name = m_settings.nic_name;
199  reactor_options.m_nic_ringbuffer_rx_size = m_settings.nic_ringbuffer_rx_size;
200  reactor_options.m_nic_ringbuffer_tx_size = m_settings.nic_ringbuffer_tx_size;
201  reactor_options.m_pending_connection_queue_size = m_settings.pending_connection_queue_size;
202  reactor_options.m_port = m_settings.accept_port;
203  reactor_options.m_rx_buffer_capacity = m_settings.rx_buffer_capacity;
204  reactor_options.m_receive_size = m_settings.receive_size;
205  reactor_options.m_socket_rx_size = m_settings.socket_rx_size;
206  reactor_options.m_socket_tx_size = m_settings.socket_tx_size;
207  reactor_options.m_spin_count = m_settings.spin_count;
208  #ifdef LLFIX_ENABLE_OPENSSL
209  reactor_options.m_use_ssl = m_settings.use_ssl;
210  reactor_options.m_ssl_verify_peer = m_settings.ssl_verify_peer;
211  reactor_options.m_ssl_ca_pem_file = m_settings.ssl_ca_pem_file;
212  reactor_options.m_ssl_cert_pem_file = m_settings.ssl_certificate_pem_file;
213  reactor_options.m_ssl_private_key_pem_file = m_settings.ssl_private_key_pem_file;
214  reactor_options.m_ssl_private_key_password = m_settings.ssl_private_key_password;
215  reactor_options.m_ssl_version = m_settings.ssl_version;
216  reactor_options.m_ssl_cipher_suite = m_settings.ssl_cipher_suite;
217  reactor_options.m_ssl_crl_path = m_settings.ssl_crl_path;
218  #endif
219 
220  this->set_params(reactor_options);
221 
222  if constexpr (Transport::is_multithreaded())
223  {
224  this->register_acceptor_callback(
225  [this](std::size_t peer_index)
226  {
227  LLFIX_UNUSED(peer_index);
228  this->process_admin_commands_of_all_non_live_sessions();
229  });
230 
231  this->register_acceptor_termination_callback(
232  [this](std::size_t peer_index)
233  {
234  LLFIX_UNUSED(peer_index);
235  this->destroy_thread_local_unknown_session_context();
236  });
237 
238  this->register_worker_callback(
239  [this](std::size_t peer_index)
240  {
241  this->process_session(peer_index);
242  });
243 
244  this->register_worker_termination_callback(
245  [this](std::size_t peer_index)
246  {
247  LLFIX_UNUSED(peer_index);
248  this->destroy_thread_local_unknown_session_context();
249  });
250  }
251  else
252  {
253  this->register_callback(std::bind(&FixServer::process, this));
254  this->register_termination_callback(std::bind(&FixServer::destroy_thread_local_unknown_session_context, this));
255  }
256 
257  LLFIX_LOG_INFO(m_name + " : Loaded server config =>\n" + m_settings.to_string());
258  LLFIX_LOG_INFO("FixServer " + m_name + " creation success");
259 
260  return true;
261  }
262 
271  [[nodiscard]] bool add_session(const std::string& session_name, FixSessionSettings& session_settings)
272  {
273  return internal_add_session(session_name, session_settings);
274  }
275 
284  [[nodiscard]] bool add_session(const std::string& session_config_file_path, const std::string& session_name)
285  {
286  FixSessionSettings session_settings;
287 
288  if (session_settings.load_from_config_file(session_config_file_path, session_name) == false)
289  {
290  LLFIX_LOG_ERROR("Loading settings for session " + session_name + " failed : " + session_settings.config_load_error);
291  return false;
292  }
293 
294  return internal_add_session(session_name, session_settings);
295  }
296 
307  [[nodiscard]] bool add_sessions_from(const std::string& session_config_file_path)
308  {
309  Configuration config_file;
310  std::string config_load_error;
311 
312  if (config_file.load_from_file(session_config_file_path, config_load_error) == false)
313  {
314  return false;
315  }
316 
317  std::vector<std::string> session_names;
318  config_file.get_group_names(session_names);
319 
320  int added_session_count{0};
321 
322  for (const auto& session_name : session_names)
323  {
324  auto lowered_session_name = StringUtilities::to_lower(session_name);
325 
326  if (StringUtilities::contains(lowered_session_name, "session"))
327  {
328  FixSessionSettings session_settings;
329 
330  if (session_settings.load_from_config_file(session_config_file_path, session_name) == false)
331  {
332  LLFIX_LOG_ERROR("Loading settings for session " + session_name + " failed : " + session_settings.config_load_error);
333  return false;
334  }
335 
336  if (internal_add_session(session_name, session_settings) == false)
337  {
338  return false;
339  }
340 
341  added_session_count++;
342  }
343  }
344 
345  if(added_session_count == 0)
346  {
347  LLFIX_LOG_ERROR(m_name + " : Could not find any session in " + session_config_file_path + ". Make sure they have 'SESSION' in their names.");
348  return false;
349  }
350 
351  return true;
352  }
353 
359  std::size_t get_session_count() const
360  {
361  return m_sessions.size();
362  }
363 
364  bool is_instance_ha_primary() const override
365  {
366  return m_is_ha_primary;
367  }
368 
376  FixSession* get_session(const std::string& session_name) override
377  {
378  if (has_session(session_name) == false)
379  {
380  return nullptr;
381  }
382 
383  return m_sessions[session_name];
384  }
385 
393  std::string get_session_name(FixSession* session)
394  {
395  assert(session);
396 
397  for (const auto& session_entry : m_sessions)
398  {
399  if(session_entry.second == session)
400  {
401  return session_entry.first;
402  }
403  }
404 
405  return "";
406  }
407 
413  void get_session_names(std::vector<std::string>& target) override
414  {
415  for (const auto& iter : m_sessions)
416  {
417  target.push_back(iter.first);
418  }
419  }
420 
421  std::string get_name() const override
422  {
423  return m_name;
424  }
425 
434  template<typename... Args>
435  void specify_repeating_group(Args... args)
436  {
437  FixSession::get_repeating_group_specs().specify_repeating_group(args...);
438  }
439 
440  #ifdef LLFIX_ENABLE_BINARY_FIELDS
441 
450  void specify_binary_field(const std::string& message_type, uint32_t tag_length, uint32_t tag_data)
451  {
452  FixSession::get_binary_field_specs().specify_binary_field(message_type, tag_length, tag_data);
453  }
454  #endif
455 
464  {
465  return session->get_outgoing_fix_message();
466  }
467 
476  virtual bool send_outgoing_message(FixSession* session, OutgoingFixMessage* message)
477  {
478  std::size_t encoded_length = 0;
479  auto int_sequence_no = session->get_sequence_store()->get_outgoing_seq_no() + 1;
480 
481  message->encode(session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, int_sequence_no, encoded_length);
482  return send_bytes<true>(session, session->get_tx_encode_buffer(), encoded_length);
483  }
484 
485  void push_admin_command(const std::string& session_name, ModifyingAdminCommandType type, uint32_t arg = 0) override
486  {
487  if(llfix_likely(session_name != "*"))
488  {
489  push_admin_command_internal(session_name, type, arg);
490  }
491  else
492  {
493  for (auto& session : m_sessions)
494  {
495  push_admin_command_internal(session.first, type, arg);
496  }
497  }
498  }
499 
500  #ifdef LLFIX_AUTOMATION
501  // Used for testing outgoing msg resends and gap fills
502  bool send_fake_outgoing_message(FixSession* session, OutgoingFixMessage* message)
503  {
504  std::size_t encoded_length = 0;
505  auto int_sequence_no = session->get_sequence_store()->get_outgoing_seq_no() + 1;
506 
507  message->encode(session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, int_sequence_no, encoded_length);
508 
509  auto sequence_store = session->get_sequence_store();
510  sequence_store->increment_outgoing_seq_no();
511 
512  if(session->serialisation_enabled())
513  session->get_outgoing_message_serialiser()->write(reinterpret_cast<const void*>(session->get_tx_encode_buffer()), encoded_length, true, sequence_store->get_outgoing_seq_no());
514 
515  session->set_last_sent_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
516 
517  return true;
518  }
519 
520  void set_replay_messages_on_incoming_resend_request_for_all_sessions(bool b)
521  {
522  for (auto& entry : m_sessions)
523  {
524  entry.second->set_replay_messages_on_incoming_resend_request(b);
525  }
526  }
527  #endif
528 
534  void shutdown() { this->stop(); }
536  // TCP connections
537  virtual void on_async_io_error(int error_code, int event_result) override
538  {
539  LLFIX_UNUSED(error_code);
540  LLFIX_UNUSED(event_result);
541  }
542 
543  virtual void on_socket_error(int error_code, int event_result) override
544  {
545  LLFIX_UNUSED(error_code);
546  LLFIX_UNUSED(event_result);
547  }
548  // Application level messages
555  virtual void on_new_order(FixSession* session, const IncomingFixMessage* message) {LLFIX_UNUSED(session); LLFIX_UNUSED(message);};
562  virtual void on_cancel_order(FixSession* session, const IncomingFixMessage* message) {LLFIX_UNUSED(session); LLFIX_UNUSED(message);};
569  virtual void on_replace_order(FixSession* session, const IncomingFixMessage* message) {LLFIX_UNUSED(session); LLFIX_UNUSED(message);};
576  virtual void on_application_level_reject(FixSession* session, const IncomingFixMessage* message) {LLFIX_UNUSED(session); LLFIX_UNUSED(message);};
583  virtual void on_session_level_reject(FixSession* session, const IncomingFixMessage* message) {LLFIX_UNUSED(session); LLFIX_UNUSED(message);};
590  virtual void on_custom_message(FixSession* session, const IncomingFixMessage* message) {LLFIX_UNUSED(session); LLFIX_UNUSED(message);};
591  // Session/admin messages
598  virtual void on_logon_request(FixSession* session, const IncomingFixMessage* message) {LLFIX_UNUSED(session); LLFIX_UNUSED(message);}
605  virtual void on_logout_request(FixSession* session, const IncomingFixMessage* message) {LLFIX_UNUSED(session); LLFIX_UNUSED(message);}
612  virtual void on_client_resend_request(FixSession* session, const IncomingFixMessage* message) {LLFIX_UNUSED(session); LLFIX_UNUSED(message);};
619  virtual void on_client_test_request(FixSession* session, const IncomingFixMessage* message) {LLFIX_UNUSED(session); LLFIX_UNUSED(message);};
625  virtual void on_client_heartbeat(FixSession* session) {LLFIX_UNUSED(session);};
626  // Others
639  virtual bool process_incoming_throttling(FixSession* session, const IncomingFixMessage* incoming_fix_message)
640  {
641  if(session->settings()->throttle_limit == 0)
642  {
643  return true;
644  }
645 
646  session->throttler()->update();
647 
648  if(session->throttler()->reached_limit() )
649  {
650  session->increment_incoming_throttler_exceed_count();
651 
652  if(session->settings()->throttle_action == IncomingThrottlerAction::WAIT)
653  {
654  session->throttler()->wait();
655  }
656  else if(session->settings()->throttle_action == IncomingThrottlerAction::DISCONNECT)
657  {
658  LLFIX_LOG_DEBUG("FixServer " + m_name + " : terminating session " + session->get_name() + " as the other end exceeded the throttle rate : " + std::to_string(session->settings()->throttle_limit));
659  this->process_connection_closure(m_fix_connectors.get_peer_index(session));
660  return false;
661  }
662  else if(session->settings()->throttle_action == IncomingThrottlerAction::REJECT)
663  {
664  send_throttle_reject_message(session, incoming_fix_message);
665  return false;
666  }
667  }
668 
669  return true;
670  }
679  virtual void process_schedule_validator(FixSession* session)
680  {
681  if (session->is_now_valid_session_datetime() == false)
682  {
683  LLFIX_LOG_DEBUG("FixServer " + m_name + " : terminating session " + session->get_name() + " due to schedule settings");
684  this->process_connection_closure(m_fix_connectors.get_peer_index(session), true);
685  }
686  }
687 
703  virtual FixSession* accept_session(std::size_t peer_index, const IncomingFixMessage* incoming_fix_message, const char* buffer, std::size_t buffer_length, uint32_t parser_reject_code)
704  {
705  if (parser_reject_code != static_cast<uint32_t>(-1))
706  {
707  std::string message;
708  if (parser_reject_code > FixConstants::FIX_MAX_ERROR_CODE)
709  {
710  message = FixParserErrorCodes::get_internal_parser_error_description(parser_reject_code) + " : " + FixUtilities::fix_to_human_readible(buffer, buffer_length);
711  }
712  else
713  {
714  char temp_buf[512];
715  std::size_t length{ 0 };
716  FixUtilities::get_reject_reason_text(temp_buf, length, parser_reject_code);
717  message = temp_buf;
718  message += " , message : " + FixUtilities::fix_to_human_readible(buffer, buffer_length);
719  }
720  process_invalid_logon(peer_index, message);
721  return nullptr;
722  }
723 
724  FixSession* session = nullptr;
725 
726  if (!(incoming_fix_message->has_tag(FixConstants::TAG_BEGIN_STRING) && incoming_fix_message->has_tag(FixConstants::TAG_MSG_SEQ_NUM) && incoming_fix_message->has_tag(FixConstants::TAG_SENDER_COMP_ID) && incoming_fix_message->has_tag(FixConstants::TAG_TARGET_COMP_ID) && incoming_fix_message->has_tag(FixConstants::TAG_MSG_TYPE)))
727  {
728  process_invalid_logon(peer_index, "One of following required tags is missing : 8,34,49,56,35 , message : " + FixUtilities::fix_to_human_readible(buffer, buffer_length));
729  return nullptr;
730  }
731  else
732  {
733  auto message_type = incoming_fix_message->get_tag_value_as<char>(FixConstants::TAG_MSG_TYPE);
734 
735  if (message_type == FixConstants::MSG_TYPE_LOGON)
736  {
737  auto incoming_begin_string = incoming_fix_message->get_tag_value_as<std::string>(FixConstants::TAG_BEGIN_STRING);
738  auto incoming_comp_id = incoming_fix_message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID);
739  auto incoming_target_comp_id = incoming_fix_message->get_tag_value_as<std::string>(FixConstants::TAG_TARGET_COMP_ID);
740 
741  session = find_session(incoming_begin_string, incoming_target_comp_id, incoming_comp_id);
742 
743  if (session != nullptr)
744  {
745  auto session_state = session->get_state();
746 
747  if (is_state_live(session_state)) // This check is the most important part of multithreaded FixServer
748  {
749  std::string message = "Incoming logon message for server " + m_name + " is invalid as the client is already logged on, session : " + session->get_name();
750  process_invalid_logon(peer_index, message);
751  return nullptr;
752  }
753 
754  if(session_state == SessionState::DISABLED)
755  {
756  std::string message = "Cannot accept connection attempt as session " + session->get_name() + " is disabled. Closing connection.";
757  process_invalid_logon(peer_index, message);
758  return nullptr;
759  }
760 
761  if (session->is_now_valid_session_datetime() == false)
762  {
763  std::string message = "Cannot accept connection attempt for session " + session->get_name() + " due to schedule settings. Closing connection.";
764  process_invalid_logon(peer_index, message);
765  return nullptr;
766  }
767 
769  m_fix_connectors.update(peer_index, session);
770 
771  if constexpr(Transport::is_multithreaded())
772  {
773  this->mark_connector_as_ready_for_worker_thread_dispatch(peer_index);
774  }
776  session->reset_flags();
777  session->get_incoming_fix_message()->reset();
778  session->get_incoming_fix_message()->copy_non_dirty_tag_values_from(*incoming_fix_message);
779 
780  if(incoming_fix_message->has_tag(FixConstants::TAG_RESET_SEQ_NUM_FLAG))
781  {
782  if(incoming_fix_message->get_tag_value_as<char>(FixConstants::TAG_RESET_SEQ_NUM_FLAG) == FixConstants::FIX_BOOLEAN_TRUE)
783  {
784  session->get_sequence_store()->reset_numbers();
785  }
786  }
787 
788  session->set_state(SessionState::PENDING_LOGON);
789  return session;
790  }
791  else
792  {
793  std::string message = "Could not find a session for incoming message : 8=" + incoming_begin_string + " 49=" + incoming_comp_id + " 56=" + incoming_target_comp_id + " , message : " + FixUtilities::fix_to_human_readible(buffer, buffer_length);
794  process_invalid_logon(peer_index, message);
795  return nullptr;
796  }
797  }
798  else
799  {
800  std::string message = std::string("Received a message with a msgtype different than A (") + message_type + ") while expecting a logon message : " + FixUtilities::fix_to_human_readible(buffer, buffer_length);
801  process_invalid_logon(peer_index, message);
802  return nullptr;
803  }
804  }
805  }
806 
818  virtual void process_logon_request(FixSession* session, std::size_t peer_index, const IncomingFixMessage* message)
819  {
820  if (validate_logon_request(session, peer_index, message) == false)
821  {
822  process_invalid_logon(peer_index, "");
823  return;
824  }
825 
826  if (authenticate_logon_request(session, message) == false)
827  {
828  std::string message = "Authentication failed for incoming logon message for server " + m_name + " for session " + session->get_name();
829  process_invalid_logon(peer_index, message);
830  return;
831  }
832 
833  auto requested_heartbeat_interval = message->get_tag_value_as<uint32_t>(FixConstants::TAG_HEART_BT_INT);
834  auto requested_heartbeat_interval_nanoseconds = static_cast<uint64_t>(requested_heartbeat_interval) * static_cast<uint64_t>(1'000'000'000);
835 
836  session->set_heartbeart_interval_in_nanoseconds(requested_heartbeat_interval_nanoseconds);
837  session->set_outgoing_test_request_interval_in_nanoseconds(static_cast<uint64_t>(requested_heartbeat_interval_nanoseconds * session->settings()->outgoing_test_request_interval_multiplier));
838 
839  send_logon_response(session, message);
840  session->set_state(SessionState::LOGGED_ON);
841 
842  LLFIX_LOG_DEBUG(m_name + ", session logged on : " + session->get_name() + " , peer index : " + std::to_string(peer_index));
843 
844  do_post_logon_sequence_number_check(session);
845 
846  on_logon_request(session, message);
847  }
848 
860  virtual bool authenticate_logon_request(FixSession* session, const IncomingFixMessage* message)
861  {
862  LLFIX_UNUSED(session);
863  LLFIX_UNUSED(message);
864  return true;
865  }
867  public:
868  void on_data_ready(std::size_t peer_index) override
869  {
870  auto read = this->receive(peer_index);
871 
872  if (read > 0 && read <= static_cast<int>(this->m_options.m_rx_buffer_capacity))
873  {
874  auto buffer_size = this->get_rx_buffer_size(peer_index);
875  if(buffer_size >0 && buffer_size != static_cast<std::size_t>(-1))
876  process_rx_buffer(peer_index, this->get_rx_buffer(peer_index), buffer_size);
877  }
878 
879  this->receive_done(peer_index);
880  }
881 
882  void on_client_connected(std::size_t peer_index) override
883  {
884  LLFIX_LOG_DEBUG("FixServer " + m_name + " : new client connected , peer index : " + std::to_string(peer_index));
885  }
886 
887  void on_client_disconnected(std::size_t peer_index) override
888  {
889  Transport::on_client_disconnected(peer_index);
890 
891  if (m_fix_connectors.has_peer(peer_index) == false)
892  return;
893 
894  // A logged on client
895  auto session = m_fix_connectors.get_session(peer_index);
896 
897  if(session != nullptr)
898  {
899  LLFIX_LOG_DEBUG("FixServer " + m_name + " : session " + session->get_name() + " disconnected , peer index : " + std::to_string(peer_index));
900  on_client_disconnected(session);
901  }
902 
903  // Remove entries
904  m_fix_connectors.remove(peer_index, session);
905  }
906 
907  void on_client_disconnected(FixSession* session)
908  {
909  session->set_state(SessionState::DISCONNECTED);
910  }
911 
912  FixServerSettings& get_settings() { return m_settings; }
913 
914  std::string get_settings_as_string(const std::string& delimiter) override
915  {
916  return m_settings.to_string(delimiter);
917  }
918 
924  void set_message_persist_plugin(MessagePersistPlugin* plugin)
925  {
926  assert(plugin);
927  m_message_persist_plugin = plugin;
928  }
929 
930  protected:
932  // ADMIN
941  virtual bool send_heartbeat(FixSession* session, FixString* test_request_id)
942  {
943  session->build_heartbeat_message(outgoing_message_instance(session), test_request_id);
944  return send_outgoing_message(session, outgoing_message_instance(session));
945  }
946 
955  virtual bool send_logon_response(FixSession* session, const IncomingFixMessage* message)
956  {
957  LLFIX_UNUSED(message);
958  auto response = outgoing_message_instance(session);
959  response->set_msg_type(FixConstants::MSG_TYPE_LOGON);
960 
961  // TAG 98 ENCRYPTION METHOD
962  response->set_tag(FixConstants::TAG_ENCRYPT_METHOD, 0);
963 
964  // TAG 108 HEARTBEAT INTERVAL
965  response->set_tag(FixConstants::TAG_HEART_BT_INT, static_cast<uint32_t>(session->get_heartbeart_interval_in_nanoseconds() / 1'000'000'000));
966 
967  // TAG 1137 DEFAULT APP VER ID
968  auto default_app_ver_id = session->get_default_app_ver_id();
969 
970  if (default_app_ver_id.length() > 0)
971  {
972  response->set_tag(FixConstants::TAG_DEFAULT_APPL_VER_ID, default_app_ver_id);
973  }
974 
975  return send_outgoing_message(session, response);
976  }
977 
986  virtual bool send_logout_message(FixSession* session, const std::string& reason_text = "")
987  {
988  session->build_logout_message(outgoing_message_instance(session), reason_text);
989  return send_outgoing_message(session, outgoing_message_instance(session));
990  }
991 
999  virtual bool send_test_request(FixSession* session)
1000  {
1001  session->build_test_request_message(outgoing_message_instance(session));
1002  return send_outgoing_message(session, outgoing_message_instance(session));
1003  }
1004 
1012  virtual bool send_resend_request(FixSession* session)
1013  {
1014  session->build_resend_request_message(outgoing_message_instance(session), "0");
1015  return send_outgoing_message(session, outgoing_message_instance(session));
1016  }
1017 
1026  virtual bool send_sequence_reset_message(FixSession* session, uint32_t desired_sequence_no)
1027  {
1028  /*
1029  This message is a "hard" gap fill that doesn't respond to an incoming 35=2/resend request but to an internal admin request
1030  Therefore we can't set 123=Y
1031  */
1032  auto message = outgoing_message_instance(session);
1033  session->build_sequence_reset_message(message, desired_sequence_no);
1034 
1035  // ENCODE
1036  std::size_t encoded_length = 0;
1037  message->encode(session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, desired_sequence_no, encoded_length);
1038 
1039  // SEND
1040  return send_bytes<false>(session, session->get_tx_encode_buffer(), encoded_length); // false is for not incrementing seq no for this message
1041  }
1042 
1050  virtual bool send_gap_fill_message(FixSession* session)
1051  {
1052  /*
1053  This message is a gap fill that responds to an incoming 35=2/resend request.
1054  For "hard" gap fills that don't set 123=Y, see send_sequence_reset_message
1055  */
1056  auto message = outgoing_message_instance(session);
1057  session->build_gap_fill_message(message);
1058 
1059  // ENCODE
1060  std::size_t encoded_length = 0;
1061  message->encode(session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, session->get_incoming_resend_request_begin_no(), encoded_length); // We need to encode with seq no that the peer expects hence using m_incoming_resend_request_begin_no
1062 
1063  // SEND
1064  return send_bytes<false>(session, session->get_tx_encode_buffer(), encoded_length); // false is for not incrementing seq no for this message
1065  }
1066 
1067  virtual void resend_messages_to_client(FixSession* session, uint32_t begining_seq_no, uint32_t end_seq_no)
1068  {
1069  for (uint32_t i = begining_seq_no; i <= end_seq_no; i++)
1070  {
1071  std::size_t message_length = 0;
1072  session->get_outgoing_message_serialiser()->read_message(i, session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, message_length);
1073 
1074  auto message = outgoing_message_instance(session);
1075  message->load_from_buffer(session->get_tx_encode_buffer(), message_length); // t122(orig sending time) will be handled by load_from_buffer
1076 
1077  message->template set_tag<FixMessageComponent::HEADER>(FixConstants::TAG_POSS_DUP_FLAG, FixConstants::FIX_BOOLEAN_TRUE);
1078 
1079  if (session->include_t97_during_resends())
1080  {
1081  message->template set_tag<FixMessageComponent::HEADER>(FixConstants::TAG_POSS_RESEND, FixConstants::FIX_BOOLEAN_TRUE);
1082  }
1083 
1084  std::size_t encoded_length = 0;
1085  message->encode(session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, i, encoded_length);
1086  send_bytes<false>(session, session->get_tx_encode_buffer(), encoded_length); // false is for not incrementing seq no for this message
1087  }
1088  }
1089 
1090  virtual bool send_reject_message(FixSession* session, const IncomingFixMessage* incoming_message, uint32_t reject_reason_code, const char* buffer_message, std::size_t buffer_message_length, uint32_t error_tag=0)
1091  {
1092  LLFIX_UNUSED(incoming_message);
1093  char reject_reason_text[256];
1094 
1095  std::size_t text_length{ 0 };
1096  FixUtilities::get_reject_reason_text(reject_reason_text, text_length, reject_reason_code);
1097 
1098  if (error_tag == 0)
1099  {
1100  LLFIX_LOG_DEBUG("FixServer " + m_name + " session " + session->get_name() + " received an invalid message : " + std::string(reject_reason_text) + " ,message : " + FixUtilities::fix_to_human_readible(buffer_message, buffer_message_length));
1101  }
1102  else
1103  {
1104  LLFIX_LOG_DEBUG("FixServer " + m_name + " session " + session->get_name() + " received an invalid message : " + std::string(reject_reason_text) + " ,tag : " + std::to_string(error_tag) + " ,message : " + FixUtilities::fix_to_human_readible(buffer_message, buffer_message_length));
1105  }
1106 
1107  session->build_session_level_reject_message(outgoing_message_instance(session), reject_reason_code, reject_reason_text, error_tag);
1108  return send_outgoing_message(session, outgoing_message_instance(session));
1109  }
1110 
1111  virtual bool send_throttle_reject_message(FixSession* session, const IncomingFixMessage* incoming_message)
1112  {
1113  auto reject_message = outgoing_message_instance(session);
1114  reject_message->set_msg_type(FixConstants::MSG_TYPE_BUSINESS_REJECT);
1115 
1116  reject_message->set_tag(FixConstants::TAG_TEXT, session->settings()->throttler_reject_message);
1117 
1118  if (incoming_message->has_tag(FixConstants::TAG_MSG_SEQ_NUM))
1119  {
1120  reject_message->set_tag(FixConstants::TAG_REF_SEQ_NUM, incoming_message->get_tag_value_as<uint32_t>(FixConstants::TAG_MSG_SEQ_NUM));
1121  }
1122 
1123  if (incoming_message->has_tag(FixConstants::TAG_MSG_TYPE))
1124  {
1125  reject_message->set_tag(FixConstants::TAG_REF_MSG_TYPE, incoming_message->get_tag_value_as<std::string>(FixConstants::TAG_MSG_TYPE));
1126  }
1127 
1128  return send_outgoing_message(session, reject_message);
1129  }
1130 
1131  std::unordered_map<std::string, FixSession*> m_sessions;
1132 
1133  private:
1134  static inline constexpr std::size_t MINIMUM_REQUIRED_INITIAL_BUFFER_FOR_PARSING = 4; // 10=<DELIMITER>
1135  std::string m_name;
1136  bool m_is_ha_primary = true;
1137 
1138  FixServerSettings m_settings;
1139 
1140  FixServerConnectors m_fix_connectors;
1141 
1142  MessagePersistPlugin* m_message_persist_plugin = nullptr;
1143 
1144  UserspaceSpinlock<> m_unknown_session_context_lock;
1145 
1146  bool has_session(const std::string& session_name)
1147  {
1148  if (m_sessions.find(session_name) == m_sessions.end())
1149  {
1150  return false;
1151  }
1152 
1153  return true;
1154  }
1155 
1156  FixSession* find_session(const std::string& begin_string, const std::string& sender_comp_id, const std::string& target_comp_id)
1157  {
1158  FixSession* ret = nullptr;
1159 
1160  for (const auto& session_entry : m_sessions)
1161  {
1162  if (session_entry.second->settings()->sender_comp_id == sender_comp_id && session_entry.second->settings()->target_comp_id == target_comp_id && session_entry.second->settings()->begin_string == begin_string)
1163  {
1164  return session_entry.second;
1165  }
1166  }
1167 
1168  return ret;
1169  }
1170 
1171  bool does_any_session_use_path(const std::string& path)
1172  {
1173  for (const auto& session_entry : m_sessions)
1174  {
1175  if (session_entry.second->settings()->sequence_store_file_path == path)
1176  {
1177  return true;
1178  }
1179 
1180  if (session_entry.second->settings()->incoming_message_serialisation_path == path)
1181  {
1182  return true;
1183  }
1184 
1185  if (session_entry.second->settings()->outgoing_message_serialisation_path == path)
1186  {
1187  return true;
1188  }
1189  }
1190  return false;
1191  }
1192 
1193  bool does_any_session_use_compids(const std::string& sender_comp_id, const std::string& target_comp_id)
1194  {
1195  for (const auto& session_entry : m_sessions)
1196  {
1197  if (session_entry.second->settings()->sender_comp_id == sender_comp_id && session_entry.second->settings()->target_comp_id == target_comp_id)
1198  {
1199  return true;
1200  }
1201  }
1202  return false;
1203  }
1204 
1205  bool internal_add_session(const std::string& session_name, const FixSessionSettings& session_settings)
1206  {
1207  if (has_session(session_name))
1208  {
1209  LLFIX_LOG_ERROR(m_name + " : Already loaded a session with name " + session_name + " . You have to specify unique names for sessions.");
1210  return false;
1211  }
1212 
1213  if (does_any_session_use_path(session_settings.sequence_store_file_path))
1214  {
1215  LLFIX_LOG_ERROR(m_name + " : session " + session_name + "'s sequence_store_file_path already being used in another session.");
1216  return false;
1217  }
1218 
1219  if (does_any_session_use_path(session_settings.incoming_message_serialisation_path))
1220  {
1221  LLFIX_LOG_ERROR(m_name + " : session " + session_name + "'s incoming_message_serialisation_path already being used in another session.");
1222  return false;
1223  }
1224 
1225  if (does_any_session_use_path(session_settings.outgoing_message_serialisation_path))
1226  {
1227  LLFIX_LOG_ERROR(m_name + " : session " + session_name + "'s outgoing_message_serialisation_path already being used in another session.");
1228  return false;
1229  }
1230 
1231  if (does_any_session_use_compids(session_settings.sender_comp_id, session_settings.target_comp_id))
1232  {
1233  std::string warning_message = "WARNING (" + m_name + "): session " + session_name + "'s sender and target comp ids are already being used in another session.";
1234  LLFIX_LOG_WARNING(warning_message);
1235  fprintf(stderr, "%s", warning_message.c_str());
1236  }
1237 
1238  FixSession* current_session = new (std::nothrow) FixSession;
1239 
1240  if (current_session == nullptr)
1241  {
1242  LLFIX_LOG_ERROR(m_name + " : Failed to allocate fix session for " + session_name);
1243  return false;
1244  }
1245 
1246  session_settings.is_server = true;
1247  session_settings.tx_encode_buffer_capacity = m_settings.tx_encode_buffer_capacity; // Propagate
1248 
1249  if (current_session->initialise(session_name, session_settings) == false)
1250  {
1251  delete current_session;
1252  return false;
1253  }
1254 
1255  m_sessions[session_name] = current_session;
1256 
1257  if (m_settings.starts_as_primary_instance == false)
1258  {
1259  disable_session(current_session);
1260  }
1261 
1262  return true;
1263  }
1264 
1265  void process_session(std::size_t peer_index)
1266  {
1267  static_assert(Transport::is_multithreaded());
1268 
1269  FixSession* sess = m_fix_connectors.get_session(peer_index);
1270 
1271  if (sess)
1272  {
1273  sess->lock();
1274  process_session(sess);
1275  sess->unlock();
1276  }
1277  }
1278 
1279  void process_session(FixSession* session)
1280  {
1281  static_assert(Transport::is_multithreaded());
1282 
1283  if (llfix_likely(this->m_is_stopping.load() == false))
1284  {
1285  process_admin_commands(session);
1286 
1287  auto current_state = session->get_state();
1288 
1289  if (is_state_live(current_state))
1290  {
1291  process_outgoing_resend_request_if_necessary(session, VDSO::nanoseconds_monotonic());
1292  process_outgoing_test_request_if_necessary(session, VDSO::nanoseconds_monotonic());
1293  respond_to_resend_request_if_necessary(session);
1294  respond_to_test_request_if_necessary(session);
1295  send_heartbeat_if_necessary(session, VDSO::nanoseconds_monotonic());
1296 
1297  process_schedule_validator(session);
1298  }
1299  }
1300  else
1301  {
1302  send_logout_message(session);
1303  on_client_disconnected(session);
1304  }
1305  }
1306 
1307  void process_admin_commands_of_all_non_live_sessions()
1308  {
1309  static_assert(Transport::is_multithreaded());
1310 
1311  for (auto& iter : m_sessions)
1312  {
1313  auto state = iter.second->get_state();
1314 
1315  if (is_state_live(state) == false)
1316  {
1317  iter.second->lock();
1318  process_admin_commands(iter.second);
1319  iter.second->unlock();
1320  }
1321  }
1322 
1323  }
1324 
1325  void process()
1326  {
1327  static_assert(!Transport::is_multithreaded());
1328 
1329  if (llfix_likely(this->m_is_stopping.load() == false))
1330  {
1331  for (const auto& session_entry : m_sessions)
1332  {
1333  FixSession* current_session = session_entry.second;
1334 
1335  process_admin_commands(current_session);
1336 
1337  auto current_state = current_session->get_state();
1338 
1339  if (is_state_live(current_state))
1340  {
1341  process_outgoing_resend_request_if_necessary(current_session, VDSO::nanoseconds_monotonic());
1342  process_outgoing_test_request_if_necessary(current_session, VDSO::nanoseconds_monotonic());
1343  respond_to_resend_request_if_necessary(current_session);
1344  respond_to_test_request_if_necessary(current_session);
1345  send_heartbeat_if_necessary(current_session, VDSO::nanoseconds_monotonic());
1346 
1347  process_schedule_validator(current_session);
1348  }
1349  }
1350  }
1351  else
1352  {
1353  send_logout_messages_to_all_sessions();
1354  }
1355  }
1356 
1357  void push_admin_command_internal(const std::string& session_name, ModifyingAdminCommandType type, uint32_t arg = 0)
1358  {
1359  auto admin_command = new (std::nothrow) ModifyingAdminCommand;
1360 
1361  if (llfix_likely(admin_command))
1362  {
1363  admin_command->type = type;
1364  admin_command->arg = arg;
1365  m_sessions[session_name]->get_admin_commands()->push(admin_command); // Session name existence is checked in management server layer
1366  }
1367  else
1368  {
1369  LLFIX_LOG_ERROR("Failed to process setter admin command for FixServer " + m_name);
1370  }
1371  }
1372 
1373  void process_admin_commands(FixSession* session)
1374  {
1375  ModifyingAdminCommand* admin_command{ nullptr };
1376 
1377  if (session->get_admin_commands()->try_pop(&admin_command) == true)
1378  {
1379  switch (admin_command->type)
1380  {
1381  case ModifyingAdminCommandType::SET_INCOMING_SEQUENCE_NUMBER:
1382  {
1383  session->get_sequence_store()->set_incoming_seq_no(admin_command->arg);
1384  LLFIX_LOG_DEBUG(m_name + " : processed set incoming sequence number admin command , session name : " + session->get_name() + " , new seq no : " + std::to_string(admin_command->arg) );
1385  break;
1386  }
1387 
1388  case ModifyingAdminCommandType::SET_OUTGOING_SEQUENCE_NUMBER:
1389  {
1390  session->get_sequence_store()->set_outgoing_seq_no(admin_command->arg);
1391  LLFIX_LOG_DEBUG(m_name + " : processed set outgoing sequence number admin command , session name : " + session->get_name() + " , new seq no : " + std::to_string(admin_command->arg) );
1392  break;
1393  }
1394 
1395  case ModifyingAdminCommandType::SEND_SEQUENCE_RESET:
1396  {
1397  if( session->get_state() == SessionState::LOGGED_ON )
1398  {
1399  send_sequence_reset_message(session, admin_command->arg);
1400  LLFIX_LOG_DEBUG(m_name + " : processed send sequence reset admin command , session name : " + session->get_name() + " , new seq no : " + std::to_string(admin_command->arg) );
1401  }
1402  else
1403  {
1404  LLFIX_LOG_ERROR(m_name + " : dropping send sequence reset admin command as not logged on , session name : " + session->get_name());
1405  }
1406 
1407  break;
1408  }
1409 
1410  case ModifyingAdminCommandType::DISABLE_SESSION:
1411  {
1412  disable_session(session);
1413  LLFIX_LOG_DEBUG(m_name + " : processed disable session admin command for session " + session->get_name() + " , new session state : Disabled");
1414  break;
1415  }
1416 
1417  case ModifyingAdminCommandType::ENABLE_SESSION:
1418  {
1419  if(m_is_ha_primary == true)
1420  {
1421  enable_session(session);
1422  LLFIX_LOG_DEBUG(m_name + " : processed enable session admin command for " + session->get_name());
1423  }
1424  else
1425  {
1426  LLFIX_LOG_ERROR(m_name + " ignoring enable session admin command for " + session->get_name() + " since not a primary instance" );
1427  }
1428  break;
1429  }
1430 
1431  case ModifyingAdminCommandType::SET_IS_HA_PRIMARY_INSTANCE:
1432  {
1433  m_is_ha_primary = admin_command->arg == 1 ? true : false;
1434 
1435  if(admin_command->arg == 1)
1436  {
1437  enable_session(session);
1438  LLFIX_LOG_DEBUG(m_name + " : processed set is ha primary instance 1 admin command for " + session->get_name());
1439  }
1440  else
1441  {
1442  disable_session(session);
1443  LLFIX_LOG_DEBUG(m_name + " : processed set is ha primary instance 0 admin command for " + session->get_name());
1444  }
1445 
1446  break;
1447  }
1448 
1449 
1450  default: break;
1451  }
1452 
1453  delete admin_command;
1454  }
1455  }
1456 
1457  void disable_session(FixSession* session)
1458  {
1459  if (is_state_live(session->get_state()))
1460  {
1461  this->process_connection_closure(m_fix_connectors.get_peer_index(session), true);
1462  }
1463  session->set_state(SessionState::DISABLED);
1464  }
1465 
1466  void enable_session(FixSession* session)
1467  {
1468  if(session->get_state() == SessionState::DISABLED)
1469  {
1470  session->set_state(SessionState::DISCONNECTED);
1471  if(m_settings.refresh_resend_cache_during_promotion)
1472  session->reinitialise_outgoing_serialiser();
1473  }
1474  }
1475 
1477  // ADMIN UTILITY
1478  void process_outgoing_resend_request_if_necessary(FixSession* session, uint64_t current_timestamp_nanoseconds)
1479  {
1480  if (session->get_state() == SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF)
1481  {
1482  // TERMINATE SESSION IF NECESSARY
1483  uint64_t delta_nanoseconds = current_timestamp_nanoseconds - session->outgoing_resend_request_timestamp_nanoseconds();
1484  uint64_t required_delta_nanoseconds = static_cast<uint64_t>(session->outgoing_resend_request_expire_secs()) * 1'000'000'000;
1485 
1486  if (delta_nanoseconds >= required_delta_nanoseconds)
1487  {
1488  // We need to terminate the session as the other side didn't respond properly to our outgoing resend request
1489  LLFIX_LOG_DEBUG("FixServer " + m_name + " : terminating session " + session->get_name() + " as the other end didn't respond to 35=2/resend request in pre-configured timeout (outgoing_resend_request_expire_secs) : " + std::to_string(session->outgoing_resend_request_expire_secs()));
1490  this->process_connection_closure(m_fix_connectors.get_peer_index(session));
1491  }
1492  }
1493  else
1494  {
1495  // SEND IF NECESSARY
1496  if (session->needs_to_send_resend_request())
1497  {
1498  LLFIX_LOG_DEBUG("FixServer " + m_name + " session " + session->get_name() + " peer " + std::to_string(m_fix_connectors.get_peer_index(session)) + " : sending resend request , begin no : " + std::to_string(session->get_outgoing_resend_request_begin_no()));
1499  send_resend_request(session);
1500  session->set_outgoing_resend_request_timestamp_nanoseconds(current_timestamp_nanoseconds);
1501  session->set_state(llfix::SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF);
1502  session->set_needs_to_send_resend_request(false);
1503  }
1504  }
1505  }
1506 
1507  void process_outgoing_test_request_if_necessary(FixSession* session, uint64_t current_timestamp_nanoseconds)
1508  {
1509  if (session->expecting_response_for_outgoing_test_request() == false)
1510  {
1511  // SEND IF NECESSARY
1512  auto delta_nanoseconds = current_timestamp_nanoseconds - session->last_received_message_timestamp_nanoseconds();
1513 
1514  if (delta_nanoseconds >= (session->get_outgoing_test_request_interval_in_nanoseconds()))
1515  {
1516  LLFIX_LOG_DEBUG("FixServer " + m_name + " session " + session->get_name() + " : sending test request");
1517  send_test_request(session);
1518  session->set_outgoing_test_request_timestamp_nanoseconds(current_timestamp_nanoseconds);
1519  session->set_expecting_response_for_outgoing_test_request(true);
1520  }
1521  }
1522  else
1523  {
1524  // TERMINATE SESSION IF NECESSARY
1525  auto delta_nanoseconds = current_timestamp_nanoseconds - session->outgoing_test_request_timestamp_nanoseconds();
1526 
1527  if (delta_nanoseconds >= (session->get_heartbeart_interval_in_nanoseconds() * static_cast<uint64_t>(2)))
1528  {
1529  session->set_expecting_response_for_outgoing_test_request(false);
1530  LLFIX_LOG_DEBUG("FixServer " + m_name + " : terminating session " + session->get_name() + " as the other end didn't respond to 35=1/test request");
1531  this->process_connection_closure(m_fix_connectors.get_peer_index(session));
1532  }
1533  }
1534  }
1535 
1536  void respond_to_resend_request_if_necessary(FixSession* session)
1537  {
1538  if (session->needs_responding_to_incoming_resend_request())
1539  {
1540  const auto last_outgoing_seq_no = session->get_sequence_store()->get_outgoing_seq_no();
1541 
1542  // Partial replays not supported therefore incoming end no should be either 0 or same as last outgoing seq no
1543  bool will_replay = (session->serialisation_enabled()) && (session->replay_messages_on_incoming_resend_request() && (session->get_incoming_resend_request_end_no() == 0
1544  || session->get_incoming_resend_request_end_no() == last_outgoing_seq_no) && session->get_incoming_resend_request_begin_no()<=last_outgoing_seq_no);
1545 
1546  if(last_outgoing_seq_no > session->get_incoming_resend_request_begin_no())
1547  if(last_outgoing_seq_no-session->get_incoming_resend_request_begin_no()+1 > session->max_resend_range())
1548  will_replay = false;
1549 
1550  if(will_replay)
1551  {
1552  for (uint32_t i = session->get_incoming_resend_request_begin_no(); i <= last_outgoing_seq_no; i++)
1553  {
1554  if (session->get_outgoing_message_serialiser()->has_message_in_memory(i) == false)
1555  {
1556  will_replay = false;
1557  break;
1558  }
1559  }
1560  }
1561 
1562  if(will_replay)
1563  {
1564  LLFIX_LOG_DEBUG("FixServer " + m_name + " session " + session->get_name() + " : replaying messages , begin seq no : " + std::to_string(session->get_incoming_resend_request_begin_no()) + " , end seq no : " + std::to_string(last_outgoing_seq_no));
1565  resend_messages_to_client(session, session->get_incoming_resend_request_begin_no(), last_outgoing_seq_no);
1566  }
1567  else
1568  {
1569  LLFIX_LOG_DEBUG("FixServer " + m_name + " session " + session->get_name() + " : sending gap fill message");
1570  send_gap_fill_message(session);
1571  }
1572 
1573  session->set_needs_responding_to_incoming_resend_request(false);
1574  session->set_state(llfix::SessionState::LOGGED_ON);
1575  }
1576  }
1577 
1578  void respond_to_test_request_if_necessary(FixSession* session)
1579  {
1580  if (session->needs_responding_to_incoming_test_request())
1581  {
1582  if (session->get_incoming_test_request_id()->length() > 0)
1583  {
1584  send_heartbeat(session , session->get_incoming_test_request_id());
1585  session->get_incoming_test_request_id()->set_length(0);
1586  }
1587  else
1588  {
1589  // We should not hit here , but better than risking disconnection
1590  send_heartbeat(session, nullptr);
1591  }
1592 
1593  LLFIX_LOG_DEBUG("FixServer " + m_name + " session " + session->get_name() + " : responded to incoming test request");
1594  session->set_needs_responding_to_incoming_test_request(false);
1595  }
1596  }
1597 
1598  void send_heartbeat_if_necessary(FixSession* session, uint64_t current_timestamp_nanoseconds)
1599  {
1600  auto delta_nanoseconds = current_timestamp_nanoseconds - session->last_sent_message_timestamp_nanoseconds();
1601 
1602  if (delta_nanoseconds >= (session->get_heartbeart_interval_in_nanoseconds() * static_cast<uint64_t>(9) / static_cast<uint64_t>(10))) // 0.9 for safety
1603  {
1604  send_heartbeat(session, nullptr);
1605  }
1606  }
1607 
1608  void send_logout_messages_to_all_sessions()
1609  {
1610  for (const auto& session_entry : m_sessions)
1611  {
1612  FixSession* current_session = session_entry.second;
1613 
1614  if (current_session->get_state() != SessionState::DISCONNECTED)
1615  {
1616  send_logout_message(current_session);
1617  on_client_disconnected(current_session);
1618  }
1619  }
1620  }
1622  // TO CLIENT
1623  template <bool increment_outgoing_seq_no>
1624  LLFIX_FORCE_INLINE bool send_bytes(FixSession* session, const char* buffer, std::size_t buffer_size)
1625  {
1626  std::size_t peer_index = m_fix_connectors.get_peer_index(session);
1627 
1628  if (peer_index == static_cast<std::size_t>(-1))
1629  return false;
1630 
1631  auto sequence_store = session->get_sequence_store();
1632 
1633  auto ret = this->send(peer_index, buffer, buffer_size);
1634 
1635  if constexpr (increment_outgoing_seq_no == true)
1636  {
1637  if(ret==true)
1638  sequence_store->increment_outgoing_seq_no();
1639  }
1640 
1641  if(session->serialisation_enabled())
1642  session->get_outgoing_message_serialiser()->write(reinterpret_cast<const void*>(buffer), buffer_size, ret, sequence_store->get_outgoing_seq_no());
1643 
1644  if (m_message_persist_plugin)
1645  m_message_persist_plugin->persist_outgoing_message(session->get_name(), sequence_store->get_outgoing_seq_no(), buffer, buffer_size, ret);
1646 
1647  session->set_last_sent_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
1648 
1649  return ret;
1650  }
1651 
1653  // THREAD LOCAL UNKNOWN SESSION CONTEXT
1654  UnknownSessionContext* get_thread_local_unknown_session_context()
1655  {
1656  const std::lock_guard<UserspaceSpinlock<>> lock(m_unknown_session_context_lock);
1657 
1658  UnknownSessionContext* unknown_session_context = nullptr;
1659  unknown_session_context = reinterpret_cast<UnknownSessionContext*>(ThreadLocalStorage::get_instance().get());
1660 
1661  if (unknown_session_context == nullptr)
1662  {
1663  unknown_session_context = new(std::nothrow) UnknownSessionContext;
1664  ThreadLocalStorage::get_instance().set(unknown_session_context);
1665  }
1666 
1667  if (unknown_session_context)
1668  {
1669  if (unknown_session_context->m_incoming_fix_message.initialise() == false)
1670  {
1671  LLFIX_LOG_ERROR(m_name + " : Failed to initialise unknown session context's incoming_fix_message");
1672  return nullptr;
1673  }
1674 
1675  if (unknown_session_context->m_fix_string_view_cache.create(256) == false)
1676  {
1677  LLFIX_LOG_ERROR(m_name + " : Failed to create unknown session context's fix string view cache");
1678  return nullptr;
1679  }
1680  }
1681 
1682  return unknown_session_context;
1683  }
1684 
1685  void destroy_thread_local_unknown_session_context()
1686  {
1687  const std::lock_guard<UserspaceSpinlock<>> lock(m_unknown_session_context_lock);
1688 
1689  UnknownSessionContext* unknown_session_context{ nullptr };
1690  unknown_session_context = reinterpret_cast<UnknownSessionContext*>(ThreadLocalStorage::get_instance().get());
1691 
1692  if (unknown_session_context)
1693  {
1694  delete unknown_session_context;
1695  ThreadLocalStorage::get_instance().set(nullptr);
1696  }
1697  }
1698 
1700  // FROM CLIENT
1701  LLFIX_HOT void process_rx_buffer(std::size_t peer_index, char* buffer, std::size_t buffer_size)
1702  {
1703  std::size_t buffer_read_index = 0;
1704 
1705  if (llfix_unlikely(buffer_size < MINIMUM_REQUIRED_INITIAL_BUFFER_FOR_PARSING))
1706  {
1707  this->set_incomplete_buffer(peer_index, buffer, buffer_size);
1708  return;
1709  }
1710 
1711  FixSession* session = m_fix_connectors.get_session(peer_index);
1712 
1714  // FIND OUT THE END
1715  int final_tag10_delimiter_index{ -1 };
1716  int current_index = static_cast<int>(buffer_size - 1);
1717 
1718  if(llfix_unlikely(FixUtilities::find_delimiter_from_end(buffer, buffer_size, current_index) == false))
1719  {
1720  // We don't have the entire message
1721  this->set_incomplete_buffer(peer_index, buffer, buffer_size);
1722  return;
1723  }
1724 
1725  FixUtilities::find_tag10_start_from_end(buffer, buffer_size, current_index, final_tag10_delimiter_index);
1726 
1727  if (final_tag10_delimiter_index == -1)
1728  {
1729  // We don't have the entire message
1730  this->set_incomplete_buffer(peer_index, buffer, buffer_size);
1731  return;
1732  }
1734  // AT THIS POINT WE HAVE AT LEAST ONE COMPLETE MESSAGE
1735  LLFIX_ALIGN_CODE_32;
1736  while (true)
1737  {
1739  // FIND OUT THE BEGIN STRING
1740  int begin_string_offset{-1};
1741  FixUtilities::find_begin_string_position(buffer+buffer_read_index, buffer_size-buffer_read_index, begin_string_offset);
1742 
1743  if(llfix_likely(begin_string_offset>=0))
1744  {
1745  buffer_read_index += begin_string_offset;
1746  }
1747  else
1748  {
1749  LLFIX_LOG_ERROR(m_name + " received a message with no begin string : " + FixUtilities::fix_to_human_readible(buffer+buffer_read_index, buffer_size-buffer_read_index));
1750  return;
1751  }
1753  IncomingFixMessage* incoming_fix_message{ nullptr };
1754  ObjectCache<FixStringView>* fix_string_view_cache{ nullptr };
1755 
1756  if (llfix_likely(session != nullptr))
1757  {
1758  incoming_fix_message = session->get_incoming_fix_message();
1759  fix_string_view_cache = session->get_fix_string_view_cache();
1760  }
1761  else
1762  {
1763  UnknownSessionContext* unknown_session_context = get_thread_local_unknown_session_context();
1764 
1765  if (unknown_session_context)
1766  {
1767  incoming_fix_message = &(unknown_session_context->m_incoming_fix_message);
1768  fix_string_view_cache = &(unknown_session_context->m_fix_string_view_cache);
1769  }
1770  else
1771  {
1772  this->set_incomplete_buffer(peer_index, buffer, buffer_size);
1773  return;
1774  }
1775  }
1776 
1777  incoming_fix_message->reset();
1778  fix_string_view_cache->reset_pointer();
1779 
1780  current_index = static_cast<int>(buffer_read_index);
1781  bool looking_for_equals = true;
1782 
1783  int current_tag_start = static_cast<int>(buffer_read_index);
1784  int current_tag_len{ 0 };
1785 
1786  int current_value_start = static_cast<int>(buffer_read_index);
1787  int current_value_len{ 0 };
1788 
1789  int current_tag_10_delimiter_index = -1;
1790  int current_tag_35_tag_start_index = -1;
1791 
1792  uint32_t parser_reject_code = static_cast<uint32_t>(-1);
1793  uint32_t current_tag_index = 0;
1794 
1795  uint32_t encoded_current_message_type = static_cast<uint32_t>(-1);
1796 
1797  bool in_a_repeating_group{ false };
1798  uint32_t current_rg_count_tag{ 0 };
1799 
1800  #ifdef LLFIX_ENABLE_BINARY_FIELDS
1801  int binary_field_length = 0;
1802  int binary_field_counter = 0;
1803  #endif
1804 
1805  LLFIX_ALIGN_CODE_32;
1806  while (true)
1807  {
1808  char current_char = buffer[current_index];
1809 
1810  if (looking_for_equals)
1811  {
1812  if (current_char == FixConstants::FIX_EQUALS)
1813  {
1814  current_tag_len = current_index - current_tag_start;
1815  current_value_start = current_index + 1;
1816  looking_for_equals = false;
1817  }
1818  else if (llfix_unlikely(current_char == FixConstants::FIX_DELIMITER))
1819  {
1820  current_tag_start = current_index + 1;
1821  parser_reject_code = FixParserErrorCodes::NO_EQUALS_SIGN;
1822  }
1823  }
1824  #ifdef LLFIX_ENABLE_BINARY_FIELDS
1825  else
1826  {
1827  if(llfix_unlikely(binary_field_length>0))
1828  if(binary_field_counter < binary_field_length)
1829  binary_field_counter++;
1830  }
1831 
1832  if (looking_for_equals == false && current_char == FixConstants::FIX_DELIMITER)
1833  {
1834  bool reached_value_end = true;
1835 
1836  if(llfix_unlikely(binary_field_length>0))
1837  {
1838  if(binary_field_length>binary_field_counter)
1839  {
1840  reached_value_end = false;
1841  }
1842  else if(binary_field_length == binary_field_counter)
1843  {
1844  binary_field_length=0;
1845  binary_field_counter=0;
1846  }
1847  }
1848 
1849  if(llfix_likely(reached_value_end))
1850  {
1851 
1852  #else
1853  else if (current_char == FixConstants::FIX_DELIMITER)
1854  {
1855  #endif
1856  current_value_len = current_index - current_value_start;
1857 
1858  if (llfix_unlikely(current_value_len == 0))
1859  {
1860  parser_reject_code = FixConstants::FIX_ERROR_CODE_TAG_WITHOUT_VALUE;
1861  }
1862 
1863  bool is_current_tag_numeric{ true };
1864  FixSession::validate_tag_format(buffer + current_tag_start, static_cast<std::size_t>(current_tag_len), is_current_tag_numeric, parser_reject_code);
1865 
1866  if (llfix_likely(is_current_tag_numeric))
1867  {
1869  // RECORD THE CURRENT TAG VALUE PAIR
1870  uint32_t tag = Converters::chars_to_unsigned_int<uint32_t>(buffer + current_tag_start, static_cast<std::size_t>(current_tag_len));
1871 
1872  if(llfix_unlikely(tag == 0))
1873  {
1874  parser_reject_code = FixConstants::FIX_ERROR_CODE_INVALID_TAG_NUMBER;
1875  }
1876 
1877  FixStringView* value = fix_string_view_cache->allocate();
1878  value->set_buffer(const_cast<char*>(buffer + current_value_start), static_cast<std::size_t>(current_value_len));
1879 
1881  current_tag_index++;
1882  FixSession::validate_header_tags_order(tag, current_tag_index, parser_reject_code);
1884 
1885  // Some tags may be used as a group member but also as an individual tag so we can't fully rely on is_a_repeating_group_tag
1886  // and need to detect start and end of a group
1887  if(encoded_current_message_type != static_cast<uint32_t>(-1))
1888  {
1889  if(llfix_likely(!in_a_repeating_group))
1890  {
1891  if (llfix_unlikely(FixSession::get_repeating_group_specs().is_a_repeating_group_count_tag(encoded_current_message_type, tag)))
1892  {
1893  current_rg_count_tag = tag;
1894  in_a_repeating_group = true;
1895  }
1896  }
1897  else
1898  {
1899  if (llfix_likely(FixSession::get_repeating_group_specs().is_a_repeating_group_tag(encoded_current_message_type, current_rg_count_tag, tag) == false))
1900  {
1901  in_a_repeating_group = false;
1902  }
1903  }
1904 
1905  #ifdef LLFIX_ENABLE_BINARY_FIELDS
1906  if (llfix_unlikely(FixSession::get_binary_field_specs().is_binary_data_length_tag(encoded_current_message_type, tag)))
1907  {
1908  binary_field_length = Converters::chars_to_int<int>(value->data(), value->length());
1909  }
1910  #endif
1911  }
1912 
1913  if (llfix_likely(in_a_repeating_group == false))
1914  {
1915  if (llfix_likely(incoming_fix_message->has_tag(tag) == false))
1916  {
1917  incoming_fix_message->set_tag(tag, value);
1918  }
1919  else
1920  {
1921  parser_reject_code = FixConstants::FIX_ERROR_CODE_TAG_APPEARS_MORE_THAN_ONCE;
1922  }
1923  }
1924  else
1925  {
1926  incoming_fix_message->set_repeating_group_tag(tag, value);
1927  }
1928 
1929  if (tag == FixConstants::TAG_CHECKSUM)
1930  {
1931  current_tag_10_delimiter_index = current_index;
1933  FixSession::validate_tag9_and_tag35((incoming_fix_message), current_tag_start, current_tag_35_tag_start_index, parser_reject_code);
1935  break;
1936  }
1937  else if (tag == FixConstants::TAG_MSG_TYPE)
1938  {
1939  current_tag_35_tag_start_index = current_tag_start;
1940  encoded_current_message_type = FixUtilities::pack_message_type(value->to_string_view());
1941  }
1942  } // if (llfix_likely(is_current_tag_numeric))
1943 
1944  current_tag_start = current_index + 1;
1945  looking_for_equals = true;
1946  #ifdef LLFIX_ENABLE_BINARY_FIELDS
1947  } // if(llfix_likely(reached_value_end))
1948  #endif
1949  } // else if (current_char == FixConstants::FIX_DELIMITER) or if (looking_for_equals == false && current_char == FixConstants::FIX_DELIMITER)
1950 
1951  // Apart from the check below (else if (static_cast<int>(buffer_read_index) > final_tag10_delimiter_index)),
1952  // we also need to check here if found last tag 10 delimiter is before the found tag8
1953  if (current_index >= static_cast<int>(buffer_size) - 1)
1954  {
1955  this->set_incomplete_buffer(peer_index, buffer + buffer_read_index, buffer_size - buffer_read_index);
1956  return;
1957  }
1958 
1959  current_index++;
1960  } // while
1961 
1962  auto current_message_length = current_tag_10_delimiter_index + 1 - buffer_read_index;
1964  if (llfix_unlikely(session == nullptr))
1965  {
1966  session = accept_session(peer_index, incoming_fix_message, buffer + buffer_read_index, current_message_length, parser_reject_code);
1967 
1968  if (session == nullptr) // Means that it was an invalid logon attempt and we already closed the connection
1969  {
1970  return;
1971  }
1972  }
1974  session->lock();
1975  process_incoming_fix_message(session, peer_index, buffer + buffer_read_index, current_message_length, parser_reject_code);
1976  session->unlock();
1977 
1978  #ifndef LLFIX_UNIT_TEST
1979  if(llfix_unlikely(session->get_state() == SessionState::DISCONNECTED)) // process_incoming_fix_message may terminate connection
1980  {
1981  return;
1982  }
1983  #endif
1984 
1985  buffer_read_index = current_tag_10_delimiter_index + 1;
1986 
1987  if (current_tag_10_delimiter_index == static_cast<int>(buffer_size) - 1)
1988  {
1989  this->reset_incomplete_buffer(peer_index);
1990  return;
1991  }
1992  else if (static_cast<int>(buffer_read_index) > final_tag10_delimiter_index)
1993  {
1994  this->set_incomplete_buffer(peer_index, buffer + buffer_read_index, buffer_size - buffer_read_index);
1995  return;
1996  }
1997  }
1998  }
1999 
2000  void process_invalid_logon(std::size_t peer_index, const std::string& log_message, bool send_logout = false, const std::string& logout_reason_text = "")
2001  {
2002  if(!log_message.empty())
2003  {
2004  int peer_port{0};
2005  std::string peer_ip;
2006  this->get_peer_details(peer_index, peer_ip, peer_port);
2007 
2008  LLFIX_LOG_ERROR("Invalid logon attempt from " + peer_ip + " : " + log_message);
2009  }
2010 
2011  process_connection_closure(peer_index, send_logout, logout_reason_text);
2012  }
2013 
2014  void process_connection_closure(std::size_t peer_index, bool send_logout = false, const std::string& logout_reason_text = "")
2015  {
2016  if (send_logout)
2017  {
2018  auto session = m_fix_connectors.get_session(peer_index);
2019 
2020  if (session != nullptr)
2021  {
2022  send_logout_message(session, logout_reason_text);
2023  }
2024  }
2025 
2026  this->close_connection(peer_index);
2027  }
2028 
2029  void process_incoming_fix_message(FixSession* session, std::size_t peer_index, const char* buffer_message, std::size_t buffer_message_length, uint32_t parser_reject_code)
2030  {
2031  session->set_last_received_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
2032 
2033  if(session->serialisation_enabled())
2034  session->get_incoming_message_serialiser()->write(buffer_message, buffer_message_length, true);
2035 
2036  if (llfix_unlikely(session->expecting_response_for_outgoing_test_request()))
2037  {
2038  // We are permissive , any message not just 35=0 with expected t112, satisfies our outgoing test request
2039  session->set_expecting_response_for_outgoing_test_request(false);
2040  LLFIX_LOG_DEBUG("FixServer " + m_name + " session " + session->get_name() + " : other end satisfied the test request");
2041  }
2043  if (session->validations_enabled())
2044  {
2045  uint32_t reject_message_code = static_cast<uint32_t>(-1);
2046 
2047  if (session->validate_fix_message(*session->get_incoming_fix_message(), buffer_message, buffer_message_length, parser_reject_code, reject_message_code) == false)
2048  {
2049  if (reject_message_code != static_cast<uint32_t>(-1))
2050  {
2051  send_reject_message(session, session->get_incoming_fix_message(), reject_message_code, buffer_message, buffer_message_length, session->get_last_error_tag());
2052  }
2053 
2054  return;
2055  }
2056  }
2058  // SEQUENCE NO & MESSAGE PERSIST PLUGIN
2059  auto sequence_store = session->get_sequence_store();
2060  sequence_store->increment_incoming_seq_no();
2061  auto sequence_store_incoming_seq_no = sequence_store->get_incoming_seq_no();
2062 
2063  if (m_message_persist_plugin)
2064  m_message_persist_plugin->persist_incoming_message(session->get_name(), sequence_store_incoming_seq_no, buffer_message, buffer_message_length);
2065 
2067  // SEQUENCE NO CHECKS
2068  auto incoming_seq_no = session->get_incoming_fix_message()->get_tag_value_as<uint32_t>(FixConstants::TAG_MSG_SEQ_NUM);
2069 
2070  if (llfix_unlikely(incoming_seq_no > sequence_store_incoming_seq_no))
2071  {
2072  if(FixSession::is_a_hard_sequence_reset_message(*session->get_incoming_fix_message()) == false)
2073  {
2074  if (session->get_state() != SessionState::PENDING_LOGON) // on_logon_request handles this case on its own
2075  {
2076  session->queue_outgoing_resend_request(sequence_store_incoming_seq_no, incoming_seq_no);
2077  return;
2078  }
2079  }
2080  }
2081  else if (llfix_unlikely(incoming_seq_no < sequence_store_incoming_seq_no))
2082  {
2083  if(FixSession::is_a_hard_sequence_reset_message(*session->get_incoming_fix_message()) == false)
2084  {
2085  std::string logout_reason_text;
2086  bool send_logout = false;
2087 
2088  if (session->get_state() == SessionState::PENDING_LOGON)
2089  {
2090  send_logout = true;
2091  logout_reason_text = "MsgSeqNum too low, expecting " + std::to_string(sequence_store_incoming_seq_no) + " but received " + std::to_string(incoming_seq_no);
2092  }
2093 
2094  sequence_store->set_incoming_seq_no(sequence_store_incoming_seq_no - 1);
2095  LLFIX_LOG_DEBUG("FixServer " + m_name + " : terminating session " + session->get_name() + " as the incoming sequence no (" + std::to_string(incoming_seq_no) + ") is lower than expected (" + std::to_string(sequence_store_incoming_seq_no) + ")");
2096  this->process_connection_closure(peer_index, send_logout, logout_reason_text);
2097  return;
2098  }
2099  }
2100 
2101  if (llfix_unlikely(session->get_state() == SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF))
2102  {
2103  if (incoming_seq_no == session->get_outgoing_resend_request_end_no())
2104  {
2105  LLFIX_LOG_DEBUG("FixServer " + m_name + " : other end satisfied the resend request");
2106  session->set_state(llfix::SessionState::LOGGED_ON);
2107  }
2108  }
2110  if(process_incoming_throttling(session, session->get_incoming_fix_message()) == false)
2111  {
2112  return;
2113  }
2114 
2115  auto message_type = session->get_incoming_fix_message()->get_tag_value(FixConstants::TAG_MSG_TYPE);
2116 
2117  if (llfix_likely(message_type->length() == 1))
2118  {
2119  switch (message_type->data()[0])
2120  {
2121  case FixConstants::MSG_TYPE_NEW_ORDER: on_new_order(session, session->get_incoming_fix_message()); break;
2122  case FixConstants::MSG_TYPE_ORDER_CANCEL: on_cancel_order(session, session->get_incoming_fix_message()); break;
2123  case FixConstants::MSG_TYPE_ORDER_CANCEL_REPLACE: on_replace_order(session, session->get_incoming_fix_message()); break;
2124  case FixConstants::MSG_TYPE_HEARTBEAT: on_client_heartbeat(session); break;
2125  case FixConstants::MSG_TYPE_TEST_REQUEST: session->process_test_request_message(*session->get_incoming_fix_message()); on_client_test_request(session, session->get_incoming_fix_message()); break;
2126  case FixConstants::MSG_TYPE_RESEND_REQUEST: session->process_resend_request(*session->get_incoming_fix_message()); on_client_resend_request(session, session->get_incoming_fix_message()); break;
2127  case FixConstants::MSG_TYPE_REJECT: on_session_level_reject(session, session->get_incoming_fix_message()); break;
2128  case FixConstants::MSG_TYPE_BUSINESS_REJECT: on_application_level_reject(session, session->get_incoming_fix_message()); break;
2129  case FixConstants::MSG_TYPE_LOGON: process_logon_request(session, peer_index, session->get_incoming_fix_message()); break;
2130  case FixConstants::MSG_TYPE_LOGOUT: process_logout_request(session, peer_index, session->get_incoming_fix_message()); break;
2131  case FixConstants::MSG_TYPE_SEQUENCE_RESET: if (session->process_incoming_sequence_reset_message(*session->get_incoming_fix_message()) == false) { send_reject_message(session, session->get_incoming_fix_message(), FixConstants::FIX_ERROR_CODE_VALUE_INCORRECT_FOR_TAG, buffer_message, buffer_message_length, FixConstants::TAG_NEW_SEQ_NO); }; break;
2132  // Anything else
2133  default: on_custom_message(session, session->get_incoming_fix_message()); break;
2134  }
2135  }
2136  else
2137  {
2138  on_custom_message(session, session->get_incoming_fix_message());
2139  }
2140  }
2141 
2142  bool validate_logon_request(FixSession* session, std::size_t peer_index, const IncomingFixMessage* message)
2143  {
2144  LLFIX_UNUSED(session);
2145  int peer_port{0};
2146  std::string peer_ip;
2147  this->get_peer_details(peer_index, peer_ip, peer_port);
2148 
2149  if (message->has_tag(FixConstants::TAG_ENCRYPT_METHOD) == false)
2150  {
2151  LLFIX_LOG_ERROR("Incoming logon message for server " + m_name + " from " + peer_ip + " does not have required t98(encryption method), compid : " + message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID));
2152  return false;
2153  }
2154 
2155  if (message->has_tag(FixConstants::TAG_HEART_BT_INT) == false)
2156  {
2157  LLFIX_LOG_ERROR("Incoming logon message for server " + m_name + " from " + peer_ip + " does not have required t108(heartbeat interval), compid : " + message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID));
2158  return false;
2159  }
2160 
2161  auto t108_string = message->get_tag_value_as<std::string>(FixConstants::TAG_HEART_BT_INT);
2162 
2163  if (t108_string.find('-') != std::string::npos)
2164  {
2165  LLFIX_LOG_ERROR("Incoming logon message for server " + m_name + " from " + peer_ip + " has invalid(negative) t108(heartbeat interval) value, compid : " + message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID));
2166  return false;
2167  }
2168 
2169  if (t108_string.find('.') != std::string::npos)
2170  {
2171  LLFIX_LOG_ERROR("Incoming logon message for server " + m_name + " from " + peer_ip + " has invalid(decimal points) t108(heartbeat interval) value, compid : " + message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID));
2172  return false;
2173  }
2174 
2175  if (message->get_tag_value_as<uint32_t>(FixConstants::TAG_HEART_BT_INT) == 0)
2176  {
2177  LLFIX_LOG_ERROR("Incoming logon message for server " + m_name + " from " + peer_ip + " has invalid(zero) t108(heartbeat interval) value, compid : " + message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID));
2178  return false;
2179  }
2180 
2181  return true;
2182  }
2183 
2184  void do_post_logon_sequence_number_check(FixSession* session)
2185  {
2186  if(session->get_state() == SessionState::LOGGED_ON)
2187  {
2188  // IF LOGON WAS WITH A SEQ NO HIGHER THAN EXPECTED , WE NEED TO SEND A RESEND REQUEST
2189  auto sequence_store_incoming_seq_no = session->get_sequence_store()->get_incoming_seq_no();
2190  auto incoming_seq_no = session->get_incoming_fix_message()->get_tag_value_as<uint32_t>(FixConstants::TAG_MSG_SEQ_NUM);
2191 
2192  if (llfix_unlikely(incoming_seq_no > sequence_store_incoming_seq_no))
2193  {
2194  if(FixSession::is_a_hard_sequence_reset_message(*session->get_incoming_fix_message()) == false)
2195  {
2196  session->queue_outgoing_resend_request(sequence_store_incoming_seq_no, incoming_seq_no);
2197  }
2198  }
2199 
2200  // LOW SEQ NOS ARE HANDLED IN process_incoming_fix_message
2201  }
2202  }
2203 
2204  void process_logout_request(FixSession* session, std::size_t peer_index, const IncomingFixMessage* message)
2205  {
2206  send_logout_message(session);
2207  session->set_state(SessionState::LOGGED_OUT);
2208  LLFIX_LOG_DEBUG(m_name + ", session logged out : " + session->get_name() + " , peer index : " + std::to_string(peer_index));
2209  on_logout_request(session, message);
2210  }
2211 
2212  FixServer(const FixServer& other) = delete;
2213  FixServer& operator= (const FixServer& other) = delete;
2214  FixServer(FixServer&& other) = delete;
2215  FixServer& operator=(FixServer&& other) = delete;
2216 };
2217 
2218 } // namespace
llfix::FixServer::on_client_resend_request
virtual void on_client_resend_request(FixSession *session, const IncomingFixMessage *message)
Called when a Resend Request (35=2) is received from a client.
Definition: fix_server.h:612
llfix::FixServer::shutdown
void shutdown()
Shuts down the FIX server.
Definition: fix_server.h:534
llfix::FixServer::send_gap_fill_message
virtual bool send_gap_fill_message(FixSession *session)
Sends a Gap Fill message in response to a Resend Request.
Definition: fix_server.h:1050
llfix::OutgoingFixMessage
FIX message builder and encoder for outbound messages.
Definition: outgoing_fix_message.h:77
llfix::FixServer::process_schedule_validator
virtual void process_schedule_validator(FixSession *session)
Validates the session against configured schedule rules.
Definition: fix_server.h:679
llfix::FixServer::authenticate_logon_request
virtual bool authenticate_logon_request(FixSession *session, const IncomingFixMessage *message)
Authenticates an incoming Logon (35=A) request.
Definition: fix_server.h:860
llfix::FixServer::create
bool create(const std::string &server_name, const std::string server_config_file_path)
Creates and initialises the FIX server instance.
Definition: fix_server.h:167
llfix::FixServer::on_logon_request
virtual void on_logon_request(FixSession *session, const IncomingFixMessage *message)
Called when a Logon (35=A) request is received from a client.
Definition: fix_server.h:598
llfix::FixServer::get_session_name
std::string get_session_name(FixSession *session)
Retrieves the name of a FIX session.
Definition: fix_server.h:393
llfix::FixServer
FIX server implementation.
Definition: fix_server.h:146
llfix::FixSession::get_sequence_store
SequenceStore * get_sequence_store()
Provides access to the session's sequence store.
Definition: fix_session.h:341
llfix::FixServer::send_logout_message
virtual bool send_logout_message(FixSession *session, const std::string &reason_text="")
Sends a Logout (35=5) message.
Definition: fix_server.h:986
llfix::FixServer::get_session_names
void get_session_names(std::vector< std::string > &target) override
Retrieves the names of all configured FIX sessions.
Definition: fix_server.h:413
llfix::FixServer::accept_session
virtual FixSession * accept_session(std::size_t peer_index, const IncomingFixMessage *incoming_fix_message, const char *buffer, std::size_t buffer_length, uint32_t parser_reject_code)
Accepts and validates an incoming session before logon processing.
Definition: fix_server.h:703
llfix::FixServer::add_session
bool add_session(const std::string &session_name, FixSessionSettings &session_settings)
Adds a FIX session using preloaded session settings.
Definition: fix_server.h:271
llfix::FixServer::send_outgoing_message
virtual bool send_outgoing_message(FixSession *session, OutgoingFixMessage *message)
Encodes and sends an outgoing FIX message.
Definition: fix_server.h:476
llfix::IncomingFixMessage::has_tag
bool has_tag(uint32_t tag) const
Checks whether a FIX tag exists and is valid.
Definition: incoming_fix_message.h:62
llfix::FixSession
Represents a single FIX protocol session.
Definition: fix_session.h:73
llfix::FixServer::send_sequence_reset_message
virtual bool send_sequence_reset_message(FixSession *session, uint32_t desired_sequence_no)
Sends a Sequence Reset (35=4) message without gap fill.
Definition: fix_server.h:1026
llfix::FixServer::get_session
FixSession * get_session(const std::string &session_name) override
Retrieves a FIX session by name.
Definition: fix_server.h:376
llfix::FixServer::on_client_test_request
virtual void on_client_test_request(FixSession *session, const IncomingFixMessage *message)
Called when a Test Request (35=1) is received from a client.
Definition: fix_server.h:619
llfix::FixServer::on_cancel_order
virtual void on_cancel_order(FixSession *session, const IncomingFixMessage *message)
Called when an Order Cancel Request (35=F) message is received.
Definition: fix_server.h:562
llfix::FixSession::get_name
std::string get_name() const override
Returns the logical name of the FIX session.
Definition: fix_session.h:256
llfix::FixServer::on_new_order
virtual void on_new_order(FixSession *session, const IncomingFixMessage *message)
Called when a New Order (35=D) message is received.
Definition: fix_server.h:555
llfix::FixServer::on_application_level_reject
virtual void on_application_level_reject(FixSession *session, const IncomingFixMessage *message)
Called when an application-level reject message is generated.
Definition: fix_server.h:576
llfix::FixServer::add_sessions_from
bool add_sessions_from(const std::string &session_config_file_path)
Adds all FIX sessions found in a configuration file.
Definition: fix_server.h:307
llfix::FixServer::send_test_request
virtual bool send_test_request(FixSession *session)
Sends a Test Request (35=1) message.
Definition: fix_server.h:999
llfix::FixServer::on_client_heartbeat
virtual void on_client_heartbeat(FixSession *session)
Called when a Heartbeat (35=0) message is received from a client.
Definition: fix_server.h:625
llfix::FixServer::on_replace_order
virtual void on_replace_order(FixSession *session, const IncomingFixMessage *message)
Called when an Order Cancel/Replace Request (35=G) message is received.
Definition: fix_server.h:569
llfix::FixServer::add_session
bool add_session(const std::string &session_config_file_path, const std::string &session_name)
Adds a FIX session by loading settings from a configuration file.
Definition: fix_server.h:284
llfix::IncomingFixMessage
Represents a parsed incoming FIX message.
Definition: incoming_fix_message.h:44
llfix::FixUtilities::fix_to_human_readible
static std::string fix_to_human_readible(const char *buffer, std::size_t buffer_length)
Converts a FIX message buffer to a human-readable string.
Definition: fix_utilities.h:86
llfix::FixServer::specify_repeating_group
void specify_repeating_group(Args... args)
Specify repeating group definitions for incoming FIX messages for all sessions.
Definition: fix_server.h:435
llfix::FixServer::process_incoming_throttling
virtual bool process_incoming_throttling(FixSession *session, const IncomingFixMessage *incoming_fix_message)
Applies incoming message throttling logic.
Definition: fix_server.h:639
llfix::FixServer::outgoing_message_instance
OutgoingFixMessage * outgoing_message_instance(FixSession *session)
Retrieves the reusable outgoing FIX message instance for a session.
Definition: fix_server.h:463
llfix::SequenceStore::get_outgoing_seq_no
uint32_t get_outgoing_seq_no() const
Get the outgoing FIX sequence number.
Definition: sequence_store.h:105
llfix::FixServer::process_logon_request
virtual void process_logon_request(FixSession *session, std::size_t peer_index, const IncomingFixMessage *message)
Processes an incoming Logon (35=A) request.
Definition: fix_server.h:818
llfix::FixServer::on_logout_request
virtual void on_logout_request(FixSession *session, const IncomingFixMessage *message)
Called when a Logout (35=5) request is received from a client.
Definition: fix_server.h:605
llfix::FixServer::on_session_level_reject
virtual void on_session_level_reject(FixSession *session, const IncomingFixMessage *message)
Called when a session-level reject message is generated.
Definition: fix_server.h:583
llfix::FixServer::send_resend_request
virtual bool send_resend_request(FixSession *session)
Sends a Resend Request (35=2) message.
Definition: fix_server.h:1012
llfix::FixServer::get_session_count
std::size_t get_session_count() const
Returns the number of configured FIX sessions.
Definition: fix_server.h:359
llfix::FixServer::on_custom_message
virtual void on_custom_message(FixSession *session, const IncomingFixMessage *message)
Called when any other FIX message type is received.
Definition: fix_server.h:590
llfix::IncomingFixMessage::get_tag_value_as
T get_tag_value_as(uint32_t tag, std::size_t decimal_points=0) const
Retrieves a FIX tag value converted to the requested type.
Definition: incoming_fix_message.h:238