llfix
Low-latency FIX engine
fix_server.h
1 /*
2 MIT License
3 
4 Copyright (c) 2026 Coreware Limited
5 
6 Permission is hereby granted, free of charge, to any person obtaining a copy
7 of this software and associated documentation files (the "Software"), to deal
8 in the Software without restriction, including without limitation the rights
9 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 copies of the Software, and to permit persons to whom the Software is
11 furnished to do so, subject to the following conditions:
12 
13 The above copyright notice and this permission notice shall be included in all
14 copies or substantial portions of the Software.
15 
16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 SOFTWARE.
23 */
24 #pragma once
25 
26 #include "common.h"
27 
28 #include <cassert>
29 #include <cstdint>
30 #include <cstddef>
31 #include <string>
32 #include <string_view>
33 #include <cstdio>
34 #include <vector>
35 #include <unordered_map>
36 #include <functional>
37 #include <mutex> // for std::lock_guard
38 #include <new>
39 
40 #include "core/compiler/hints_branch_predictor.h"
41 #include "core/compiler/hints_hot_code.h"
42 #include "core/compiler/unused.h"
43 #include "core/os/vdso.h"
44 #include "core/os/thread_local_storage.h"
45 
46 #include "core/utilities/configuration.h"
47 #include "core/utilities/logger.h"
48 #include "core/utilities/object_cache.h"
49 #include "core/utilities/std_string_utilities.h"
50 #include "core/utilities/userspace_spinlock.h"
51 
52 #include "electronic_trading/common/message_persist_plugin.h"
53 #include "electronic_trading/managed_instance/modifying_admin_command.h"
54 #include "electronic_trading/managed_instance/managed_instance.h"
55 
56 #include "fix_constants.h"
57 #include "fix_string_view.h"
58 #include "fix_string.h"
59 #include "fix_session_settings.h"
60 #include "fix_utilities.h"
61 #include "incoming_fix_message.h"
62 #include "outgoing_fix_message.h"
63 #include "fix_parser_error_codes.h"
64 #include "fix_session.h"
65 #include "fix_server_settings.h"
66 
67 #include "core/utilities/tcp_reactor_options.h"
68 
69 namespace llfix
70 {
71 
72 // Thread local
73 struct UnknownSessionContext
74 {
75  IncomingFixMessage m_incoming_fix_message;
76  ObjectCache<FixStringView> m_fix_string_view_cache;
77 };
78 
79 class FixServerConnectors
80 {
81  public:
82  FixServerConnectors()
83  {
84  m_tables_lock.initialise();
85  m_peer_index_session_table.reserve(1024);
86  m_session_peer_index_table.reserve(1024);
87  }
88 
89  bool has_peer(std::size_t peer_index) const
90  {
91  const std::lock_guard<UserspaceSpinlock<>> lock(m_tables_lock);
92 
93  if (m_peer_index_session_table.find(peer_index) == m_peer_index_session_table.end())
94  {
95  return false;
96  }
97 
98  return true;
99  }
100 
101  void update(std::size_t peer_index, FixSession* session)
102  {
103  const std::lock_guard<UserspaceSpinlock<>> lock(m_tables_lock);
104 
105  // In concurrent scenarios, disconnection for an existing item can happen before its connection as they will happen on different threads
106  if (m_session_peer_index_table.find(session) != m_session_peer_index_table.end())
107  {
108  auto existing_peer_index = m_session_peer_index_table[session];
109 
110  if (existing_peer_index != peer_index)
111  {
112  m_peer_index_session_table.erase(existing_peer_index);
113  }
114  }
115 
116  m_peer_index_session_table[peer_index] = session;
117  m_session_peer_index_table[session] = peer_index;
118  }
119 
120  FixSession* get_session(std::size_t peer_index)
121  {
122  const std::lock_guard<UserspaceSpinlock<>> lock(m_tables_lock);
123 
124  if (m_peer_index_session_table.find(peer_index) == m_peer_index_session_table.end())
125  {
126  return nullptr;
127  }
128 
129  return m_peer_index_session_table[peer_index];
130  }
131 
132  std::size_t get_peer_index(FixSession* session)
133  {
134  const std::lock_guard<UserspaceSpinlock<>> lock(m_tables_lock);
135 
136  if (m_session_peer_index_table.find(session) == m_session_peer_index_table.end())
137  {
138  return static_cast<std::size_t>(-1);
139  }
140 
141  return m_session_peer_index_table[session];
142  }
143 
144  void remove(std::size_t peer_index, FixSession* session)
145  {
146  const std::lock_guard<UserspaceSpinlock<>> lock(m_tables_lock);
147 
148  m_peer_index_session_table.erase(peer_index);
149  m_session_peer_index_table.erase(session);
150  }
151 
152  private:
153  std::unordered_map<std::size_t, FixSession*> m_peer_index_session_table;
154  std::unordered_map<FixSession*, std::size_t> m_session_peer_index_table;
155  mutable UserspaceSpinlock<> m_tables_lock;
156 };
157 
167 template<typename Transport>
168 class FixServer : public Transport, public ManagedInstance
169 {
170  public:
171  FixServer() = default;
172 
173  virtual ~FixServer()
174  {
175  for (auto& entry : m_sessions)
176  {
177  delete entry.second;
178  }
179  }
180 
189  [[nodiscard]] bool create(const std::string& server_name, const std::string server_config_file_path)
190  {
191  m_name = server_name;
192 
193  if (m_settings.load_from_config_file(server_config_file_path, server_name) == false)
194  {
195  LLFIX_LOG_ERROR("Loading settings for server " + server_name + " failed : " + m_settings.config_load_error);
196  return false;
197  }
198 
199  if (m_settings.validate() == false)
200  {
201  LLFIX_LOG_ERROR("FixServerSettings for " + m_name + " validation failed : " + m_settings.validation_error);
202  return false;
203  }
204 
205  m_is_ha_primary = m_settings.starts_as_primary_instance;
206  m_unknown_session_context_lock.initialise();
207 
208  TCPReactorOptions reactor_options;
209 
210  reactor_options.m_accept_timeout_seconds = m_settings.accept_timeout_seconds;
211  reactor_options.m_async_io_timeout_nanoseconds = m_settings.async_io_timeout_nanoseconds;
212  reactor_options.m_busy_poll_microseconds = m_settings.busy_poll_microseconds;
213  reactor_options.m_cpu_core_id = m_settings.cpu_core_id;
214  reactor_options.m_worker_thread_count = m_settings.worker_thread_count;
215  reactor_options.m_send_try_count = m_settings.send_try_count;
216  reactor_options.m_disable_nagle = m_settings.disable_nagle;
217  reactor_options.m_enable_quick_ack = m_settings.quick_ack;
218  reactor_options.m_max_poll_events = m_settings.max_poll_events;
219  reactor_options.m_nic_interface_ip = m_settings.nic_address;
220  reactor_options.m_nic_interface_name = m_settings.nic_name;
221  reactor_options.m_nic_ringbuffer_rx_size = m_settings.nic_ringbuffer_rx_size;
222  reactor_options.m_nic_ringbuffer_tx_size = m_settings.nic_ringbuffer_tx_size;
223  reactor_options.m_pending_connection_queue_size = m_settings.pending_connection_queue_size;
224  reactor_options.m_port = m_settings.accept_port;
225  reactor_options.m_rx_buffer_capacity = m_settings.rx_buffer_capacity;
226  reactor_options.m_receive_size = m_settings.receive_size;
227  reactor_options.m_socket_rx_size = m_settings.socket_rx_size;
228  reactor_options.m_socket_tx_size = m_settings.socket_tx_size;
229  reactor_options.m_spin_count = m_settings.spin_count;
230  #ifdef LLFIX_ENABLE_OPENSSL
231  reactor_options.m_use_ssl = m_settings.use_ssl;
232  reactor_options.m_ssl_verify_peer = m_settings.ssl_verify_peer;
233  reactor_options.m_ssl_ca_pem_file = m_settings.ssl_ca_pem_file;
234  reactor_options.m_ssl_cert_pem_file = m_settings.ssl_certificate_pem_file;
235  reactor_options.m_ssl_private_key_pem_file = m_settings.ssl_private_key_pem_file;
236  reactor_options.m_ssl_private_key_password = m_settings.ssl_private_key_password;
237  reactor_options.m_ssl_version = m_settings.ssl_version;
238  reactor_options.m_ssl_cipher_suite = m_settings.ssl_cipher_suite;
239  reactor_options.m_ssl_crl_path = m_settings.ssl_crl_path;
240  #endif
241 
242  this->set_params(reactor_options);
243 
244  if constexpr (Transport::is_multithreaded())
245  {
246  this->register_acceptor_callback(
247  [this](std::size_t peer_index)
248  {
249  LLFIX_UNUSED(peer_index);
250  this->process_admin_commands_of_all_non_live_sessions();
251  });
252 
253  this->register_acceptor_termination_callback(
254  [this](std::size_t peer_index)
255  {
256  LLFIX_UNUSED(peer_index);
257  this->destroy_thread_local_unknown_session_context();
258  });
259 
260  this->register_worker_callback(
261  [this](std::size_t peer_index)
262  {
263  this->process_session(peer_index);
264  });
265 
266  this->register_worker_termination_callback(
267  [this](std::size_t peer_index)
268  {
269  LLFIX_UNUSED(peer_index);
270  this->destroy_thread_local_unknown_session_context();
271  });
272  }
273  else
274  {
275  this->register_callback(std::bind(&FixServer::process, this));
276  this->register_termination_callback(std::bind(&FixServer::destroy_thread_local_unknown_session_context, this));
277  }
278 
279  LLFIX_LOG_INFO(m_name + " : Loaded server config =>\n" + m_settings.to_string());
280  LLFIX_LOG_INFO("FixServer " + m_name + " creation success");
281 
282  return true;
283  }
284 
293  [[nodiscard]] bool add_session(const std::string& session_name, FixSessionSettings& session_settings)
294  {
295  return internal_add_session(session_name, session_settings);
296  }
297 
306  [[nodiscard]] bool add_session(const std::string& session_config_file_path, const std::string& session_name)
307  {
308  FixSessionSettings session_settings;
309 
310  if (session_settings.load_from_config_file(session_config_file_path, session_name) == false)
311  {
312  LLFIX_LOG_ERROR("Loading settings for session " + session_name + " failed : " + session_settings.config_load_error);
313  return false;
314  }
315 
316  return internal_add_session(session_name, session_settings);
317  }
318 
329  [[nodiscard]] bool add_sessions_from(const std::string& session_config_file_path)
330  {
331  Configuration config_file;
332  std::string config_load_error;
333 
334  if (config_file.load_from_file(session_config_file_path, config_load_error) == false)
335  {
336  return false;
337  }
338 
339  std::vector<std::string> session_names;
340  config_file.get_group_names(session_names);
341 
342  int added_session_count{0};
343 
344  for (const auto& session_name : session_names)
345  {
346  auto lowered_session_name = StringUtilities::to_lower(session_name);
347 
348  if (StringUtilities::contains(lowered_session_name, "session"))
349  {
350  FixSessionSettings session_settings;
351 
352  if (session_settings.load_from_config_file(session_config_file_path, session_name) == false)
353  {
354  LLFIX_LOG_ERROR("Loading settings for session " + session_name + " failed : " + session_settings.config_load_error);
355  return false;
356  }
357 
358  if (internal_add_session(session_name, session_settings) == false)
359  {
360  return false;
361  }
362 
363  added_session_count++;
364  }
365  }
366 
367  if(added_session_count == 0)
368  {
369  LLFIX_LOG_ERROR(m_name + " : Could not find any session in " + session_config_file_path + ". Make sure they have 'SESSION' in their names.");
370  return false;
371  }
372 
373  return true;
374  }
375 
381  std::size_t get_session_count() const
382  {
383  return m_sessions.size();
384  }
385 
386  bool is_instance_ha_primary() const override
387  {
388  return m_is_ha_primary;
389  }
390 
398  FixSession* get_session(const std::string& session_name) override
399  {
400  if (has_session(session_name) == false)
401  {
402  return nullptr;
403  }
404 
405  return m_sessions[session_name];
406  }
407 
415  std::string get_session_name(FixSession* session)
416  {
417  assert(session);
418 
419  for (const auto& session_entry : m_sessions)
420  {
421  if(session_entry.second == session)
422  {
423  return session_entry.first;
424  }
425  }
426 
427  return "";
428  }
429 
435  void get_session_names(std::vector<std::string>& target) override
436  {
437  for (const auto& iter : m_sessions)
438  {
439  target.push_back(iter.first);
440  }
441  }
442 
443  std::string get_name() const override
444  {
445  return m_name;
446  }
447 
456  template<typename... Args>
457  void specify_repeating_group(Args... args)
458  {
459  FixSession::get_repeating_group_specs().specify_repeating_group(args...);
460  }
461 
462  #ifdef LLFIX_ENABLE_BINARY_FIELDS
463 
472  void specify_binary_field(const std::string& message_type, uint32_t tag_length, uint32_t tag_data)
473  {
474  FixSession::get_binary_field_specs().specify_binary_field(message_type, tag_length, tag_data);
475  }
476  #endif
477 
486  {
487  return session->get_outgoing_fix_message();
488  }
489 
498  virtual bool send_outgoing_message(FixSession* session, OutgoingFixMessage* message)
499  {
500  std::size_t encoded_length = 0;
501  auto int_sequence_no = session->get_sequence_store()->get_outgoing_seq_no() + 1;
502 
503  message->encode(session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, int_sequence_no, encoded_length);
504  return send_bytes<true>(session, session->get_tx_encode_buffer(), encoded_length);
505  }
506 
507  void push_admin_command(const std::string& session_name, ModifyingAdminCommandType type, uint32_t arg = 0) override
508  {
509  if(llfix_likely(session_name != "*"))
510  {
511  push_admin_command_internal(session_name, type, arg);
512  }
513  else
514  {
515  for (auto& session : m_sessions)
516  {
517  push_admin_command_internal(session.first, type, arg);
518  }
519  }
520  }
521 
522  #ifdef LLFIX_AUTOMATION
523  // Used for testing outgoing msg resends and gap fills
524  bool send_fake_outgoing_message(FixSession* session, OutgoingFixMessage* message)
525  {
526  std::size_t encoded_length = 0;
527  auto int_sequence_no = session->get_sequence_store()->get_outgoing_seq_no() + 1;
528 
529  message->encode(session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, int_sequence_no, encoded_length);
530 
531  auto sequence_store = session->get_sequence_store();
532  sequence_store->increment_outgoing_seq_no();
533 
534  if(session->serialisation_enabled())
535  session->get_outgoing_message_serialiser()->write(reinterpret_cast<const void*>(session->get_tx_encode_buffer()), encoded_length, true, sequence_store->get_outgoing_seq_no());
536 
537  session->set_last_sent_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
538 
539  return true;
540  }
541 
542  void set_replay_messages_on_incoming_resend_request_for_all_sessions(bool b)
543  {
544  for (auto& entry : m_sessions)
545  {
546  entry.second->set_replay_messages_on_incoming_resend_request(b);
547  }
548  }
549  #endif
550 
556  void shutdown() { this->stop(); }
558  // TCP connections
559  virtual void on_async_io_error(int error_code, int event_result) override
560  {
561  LLFIX_UNUSED(error_code);
562  LLFIX_UNUSED(event_result);
563  }
564 
565  virtual void on_socket_error(int error_code, int event_result) override
566  {
567  LLFIX_UNUSED(error_code);
568  LLFIX_UNUSED(event_result);
569  }
570  // Application level messages
577  virtual void on_new_order(FixSession* session, const IncomingFixMessage* message) {LLFIX_UNUSED(session); LLFIX_UNUSED(message);};
584  virtual void on_cancel_order(FixSession* session, const IncomingFixMessage* message) {LLFIX_UNUSED(session); LLFIX_UNUSED(message);};
591  virtual void on_replace_order(FixSession* session, const IncomingFixMessage* message) {LLFIX_UNUSED(session); LLFIX_UNUSED(message);};
598  virtual void on_application_level_reject(FixSession* session, const IncomingFixMessage* message) {LLFIX_UNUSED(session); LLFIX_UNUSED(message);};
605  virtual void on_session_level_reject(FixSession* session, const IncomingFixMessage* message) {LLFIX_UNUSED(session); LLFIX_UNUSED(message);};
612  virtual void on_custom_message(FixSession* session, const IncomingFixMessage* message) {LLFIX_UNUSED(session); LLFIX_UNUSED(message);};
613  // Session/admin messages
620  virtual void on_logon_request(FixSession* session, const IncomingFixMessage* message) {LLFIX_UNUSED(session); LLFIX_UNUSED(message);}
627  virtual void on_logout_request(FixSession* session, const IncomingFixMessage* message) {LLFIX_UNUSED(session); LLFIX_UNUSED(message);}
634  virtual void on_client_resend_request(FixSession* session, const IncomingFixMessage* message) {LLFIX_UNUSED(session); LLFIX_UNUSED(message);};
641  virtual void on_client_test_request(FixSession* session, const IncomingFixMessage* message) {LLFIX_UNUSED(session); LLFIX_UNUSED(message);};
647  virtual void on_client_heartbeat(FixSession* session) {LLFIX_UNUSED(session);};
648  // Others
661  virtual bool process_incoming_throttling(FixSession* session, const IncomingFixMessage* incoming_fix_message)
662  {
663  if(session->settings()->throttle_limit == 0)
664  {
665  return true;
666  }
667 
668  session->throttler()->update();
669 
670  if(session->throttler()->reached_limit() )
671  {
672  session->increment_incoming_throttler_exceed_count();
673 
674  if(session->settings()->throttle_action == IncomingThrottlerAction::WAIT)
675  {
676  session->throttler()->wait();
677  }
678  else if(session->settings()->throttle_action == IncomingThrottlerAction::DISCONNECT)
679  {
680  LLFIX_LOG_DEBUG("FixServer " + m_name + " : terminating session " + session->get_name() + " as the other end exceeded the throttle rate : " + std::to_string(session->settings()->throttle_limit));
681  this->process_connection_closure(m_fix_connectors.get_peer_index(session));
682  return false;
683  }
684  else if(session->settings()->throttle_action == IncomingThrottlerAction::REJECT)
685  {
686  send_throttle_reject_message(session, incoming_fix_message);
687  return false;
688  }
689  }
690 
691  return true;
692  }
701  virtual void process_schedule_validator(FixSession* session)
702  {
703  if (session->is_now_valid_session_datetime() == false)
704  {
705  LLFIX_LOG_DEBUG("FixServer " + m_name + " : terminating session " + session->get_name() + " due to schedule settings");
706  this->process_connection_closure(m_fix_connectors.get_peer_index(session), true);
707  }
708  }
709 
725  virtual FixSession* accept_session(std::size_t peer_index, const IncomingFixMessage* incoming_fix_message, const char* buffer, std::size_t buffer_length, uint32_t parser_reject_code)
726  {
727  if (parser_reject_code != static_cast<uint32_t>(-1))
728  {
729  std::string message;
730  if (parser_reject_code > FixConstants::FIX_MAX_ERROR_CODE)
731  {
732  message = FixParserErrorCodes::get_internal_parser_error_description(parser_reject_code) + " : " + FixUtilities::fix_to_human_readible(buffer, buffer_length);
733  }
734  else
735  {
736  char temp_buf[512];
737  std::size_t length{ 0 };
738  FixUtilities::get_reject_reason_text(temp_buf, length, parser_reject_code);
739  message = temp_buf;
740  message += " , message : " + FixUtilities::fix_to_human_readible(buffer, buffer_length);
741  }
742  process_invalid_logon(peer_index, message);
743  return nullptr;
744  }
745 
746  FixSession* session = nullptr;
747 
748  if (!(incoming_fix_message->has_tag(FixConstants::TAG_BEGIN_STRING) && incoming_fix_message->has_tag(FixConstants::TAG_MSG_SEQ_NUM) && incoming_fix_message->has_tag(FixConstants::TAG_SENDER_COMP_ID) && incoming_fix_message->has_tag(FixConstants::TAG_TARGET_COMP_ID) && incoming_fix_message->has_tag(FixConstants::TAG_MSG_TYPE)))
749  {
750  process_invalid_logon(peer_index, "One of following required tags is missing : 8,34,49,56,35 , message : " + FixUtilities::fix_to_human_readible(buffer, buffer_length));
751  return nullptr;
752  }
753  else
754  {
755  auto message_type = incoming_fix_message->get_tag_value_as<char>(FixConstants::TAG_MSG_TYPE);
756 
757  if (message_type == FixConstants::MSG_TYPE_LOGON)
758  {
759  auto incoming_begin_string = incoming_fix_message->get_tag_value_as<std::string>(FixConstants::TAG_BEGIN_STRING);
760  auto incoming_comp_id = incoming_fix_message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID);
761  auto incoming_target_comp_id = incoming_fix_message->get_tag_value_as<std::string>(FixConstants::TAG_TARGET_COMP_ID);
762 
763  session = find_session(incoming_begin_string, incoming_target_comp_id, incoming_comp_id);
764 
765  if (session != nullptr)
766  {
767  auto session_state = session->get_state();
768 
769  if (is_state_live(session_state)) // This check is the most important part of multithreaded FixServer
770  {
771  std::string message = "Incoming logon message for server " + m_name + " is invalid as the client is already logged on, session : " + session->get_name();
772  process_invalid_logon(peer_index, message);
773  return nullptr;
774  }
775 
776  if(session_state == SessionState::DISABLED)
777  {
778  std::string message = "Cannot accept connection attempt as session " + session->get_name() + " is disabled. Closing connection.";
779  process_invalid_logon(peer_index, message);
780  return nullptr;
781  }
782 
783  if (session->is_now_valid_session_datetime() == false)
784  {
785  std::string message = "Cannot accept connection attempt for session " + session->get_name() + " due to schedule settings. Closing connection.";
786  process_invalid_logon(peer_index, message);
787  return nullptr;
788  }
789 
791  m_fix_connectors.update(peer_index, session);
792 
793  if constexpr(Transport::is_multithreaded())
794  {
795  this->mark_connector_as_ready_for_worker_thread_dispatch(peer_index);
796  }
798  session->reset_flags();
799  session->get_incoming_fix_message()->reset();
800  session->get_incoming_fix_message()->copy_non_dirty_tag_values_from(*incoming_fix_message);
801 
802  if(incoming_fix_message->has_tag(FixConstants::TAG_RESET_SEQ_NUM_FLAG))
803  {
804  if(incoming_fix_message->get_tag_value_as<char>(FixConstants::TAG_RESET_SEQ_NUM_FLAG) == FixConstants::FIX_BOOLEAN_TRUE)
805  {
806  session->get_sequence_store()->reset_numbers();
807  }
808  }
809 
810  session->set_state(SessionState::PENDING_LOGON);
811  return session;
812  }
813  else
814  {
815  std::string message = "Could not find a session for incoming message : 8=" + incoming_begin_string + " 49=" + incoming_comp_id + " 56=" + incoming_target_comp_id + " , message : " + FixUtilities::fix_to_human_readible(buffer, buffer_length);
816  process_invalid_logon(peer_index, message);
817  return nullptr;
818  }
819  }
820  else
821  {
822  std::string message = std::string("Received a message with a msgtype different than A (") + message_type + ") while expecting a logon message : " + FixUtilities::fix_to_human_readible(buffer, buffer_length);
823  process_invalid_logon(peer_index, message);
824  return nullptr;
825  }
826  }
827  }
828 
840  virtual void process_logon_request(FixSession* session, std::size_t peer_index, const IncomingFixMessage* message)
841  {
842  if (validate_logon_request(session, peer_index, message) == false)
843  {
844  process_invalid_logon(peer_index, "");
845  return;
846  }
847 
848  if (authenticate_logon_request(session, message) == false)
849  {
850  std::string message = "Authentication failed for incoming logon message for server " + m_name + " for session " + session->get_name();
851  process_invalid_logon(peer_index, message);
852  return;
853  }
854 
855  auto requested_heartbeat_interval = message->get_tag_value_as<uint32_t>(FixConstants::TAG_HEART_BT_INT);
856  auto requested_heartbeat_interval_nanoseconds = static_cast<uint64_t>(requested_heartbeat_interval) * static_cast<uint64_t>(1'000'000'000);
857 
858  session->set_heartbeart_interval_in_nanoseconds(requested_heartbeat_interval_nanoseconds);
859  session->set_outgoing_test_request_interval_in_nanoseconds(static_cast<uint64_t>(requested_heartbeat_interval_nanoseconds * session->settings()->outgoing_test_request_interval_multiplier));
860 
861  send_logon_response(session, message);
862  session->set_state(SessionState::LOGGED_ON);
863 
864  LLFIX_LOG_DEBUG(m_name + ", session logged on : " + session->get_name() + " , peer index : " + std::to_string(peer_index));
865 
866  do_post_logon_sequence_number_check(session);
867 
868  on_logon_request(session, message);
869  }
870 
882  virtual bool authenticate_logon_request(FixSession* session, const IncomingFixMessage* message)
883  {
884  LLFIX_UNUSED(session);
885  LLFIX_UNUSED(message);
886  return true;
887  }
889  public:
890  void on_data_ready(std::size_t peer_index) override
891  {
892  auto read = this->receive(peer_index);
893 
894  if (read > 0 && read <= static_cast<int>(this->m_options.m_rx_buffer_capacity))
895  {
896  auto buffer_size = this->get_rx_buffer_size(peer_index);
897  if(buffer_size >0 && buffer_size != static_cast<std::size_t>(-1))
898  process_rx_buffer(peer_index, this->get_rx_buffer(peer_index), buffer_size);
899  }
900 
901  this->receive_done(peer_index);
902  }
903 
904  void on_client_connected(std::size_t peer_index) override
905  {
906  LLFIX_LOG_DEBUG("FixServer " + m_name + " : new client connected , peer index : " + std::to_string(peer_index));
907  }
908 
909  void on_client_disconnected(std::size_t peer_index) override
910  {
911  Transport::on_client_disconnected(peer_index);
912 
913  if (m_fix_connectors.has_peer(peer_index) == false)
914  return;
915 
916  // A logged on client
917  auto session = m_fix_connectors.get_session(peer_index);
918 
919  if(session != nullptr)
920  {
921  LLFIX_LOG_DEBUG("FixServer " + m_name + " : session " + session->get_name() + " disconnected , peer index : " + std::to_string(peer_index));
922  on_client_disconnected(session);
923  }
924 
925  // Remove entries
926  m_fix_connectors.remove(peer_index, session);
927  }
928 
929  void on_client_disconnected(FixSession* session)
930  {
931  session->set_state(SessionState::DISCONNECTED);
932  }
933 
934  FixServerSettings& get_settings() { return m_settings; }
935 
936  std::string get_settings_as_string(const std::string& delimiter) override
937  {
938  return m_settings.to_string(delimiter);
939  }
940 
946  void set_message_persist_plugin(MessagePersistPlugin* plugin)
947  {
948  assert(plugin);
949  m_message_persist_plugin = plugin;
950  }
951 
952  protected:
954  // ADMIN
963  virtual bool send_heartbeat(FixSession* session, FixString* test_request_id)
964  {
965  session->build_heartbeat_message(outgoing_message_instance(session), test_request_id);
966  return send_outgoing_message(session, outgoing_message_instance(session));
967  }
968 
977  virtual bool send_logon_response(FixSession* session, const IncomingFixMessage* message)
978  {
979  LLFIX_UNUSED(message);
980  auto response = outgoing_message_instance(session);
981  response->set_msg_type(FixConstants::MSG_TYPE_LOGON);
982 
983  // TAG 98 ENCRYPTION METHOD
984  response->set_tag(FixConstants::TAG_ENCRYPT_METHOD, 0);
985 
986  // TAG 108 HEARTBEAT INTERVAL
987  response->set_tag(FixConstants::TAG_HEART_BT_INT, static_cast<uint32_t>(session->get_heartbeart_interval_in_nanoseconds() / 1'000'000'000));
988 
989  // TAG 1137 DEFAULT APP VER ID
990  auto default_app_ver_id = session->get_default_app_ver_id();
991 
992  if (default_app_ver_id.length() > 0)
993  {
994  response->set_tag(FixConstants::TAG_DEFAULT_APPL_VER_ID, default_app_ver_id);
995  }
996 
997  return send_outgoing_message(session, response);
998  }
999 
1008  virtual bool send_logout_message(FixSession* session, const std::string& reason_text = "")
1009  {
1010  session->build_logout_message(outgoing_message_instance(session), reason_text);
1011  return send_outgoing_message(session, outgoing_message_instance(session));
1012  }
1013 
1021  virtual bool send_test_request(FixSession* session)
1022  {
1023  session->build_test_request_message(outgoing_message_instance(session));
1024  return send_outgoing_message(session, outgoing_message_instance(session));
1025  }
1026 
1034  virtual bool send_resend_request(FixSession* session)
1035  {
1036  session->build_resend_request_message(outgoing_message_instance(session), "0");
1037  return send_outgoing_message(session, outgoing_message_instance(session));
1038  }
1039 
1048  virtual bool send_sequence_reset_message(FixSession* session, uint32_t desired_sequence_no)
1049  {
1050  /*
1051  This message is a "hard" gap fill that doesn't respond to an incoming 35=2/resend request but to an internal admin request
1052  Therefore we can't set 123=Y
1053  */
1054  auto message = outgoing_message_instance(session);
1055  session->build_sequence_reset_message(message, desired_sequence_no);
1056 
1057  // ENCODE
1058  std::size_t encoded_length = 0;
1059  message->encode(session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, desired_sequence_no, encoded_length);
1060 
1061  // SEND
1062  return send_bytes<false>(session, session->get_tx_encode_buffer(), encoded_length); // false is for not incrementing seq no for this message
1063  }
1064 
1072  virtual bool send_gap_fill_message(FixSession* session)
1073  {
1074  /*
1075  This message is a gap fill that responds to an incoming 35=2/resend request.
1076  For "hard" gap fills that don't set 123=Y, see send_sequence_reset_message
1077  */
1078  auto message = outgoing_message_instance(session);
1079  session->build_gap_fill_message(message);
1080 
1081  // ENCODE
1082  std::size_t encoded_length = 0;
1083  message->encode(session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, session->get_incoming_resend_request_begin_no(), encoded_length); // We need to encode with seq no that the peer expects hence using m_incoming_resend_request_begin_no
1084 
1085  // SEND
1086  return send_bytes<false>(session, session->get_tx_encode_buffer(), encoded_length); // false is for not incrementing seq no for this message
1087  }
1088 
1089  virtual void resend_messages_to_client(FixSession* session, uint32_t begining_seq_no, uint32_t end_seq_no)
1090  {
1091  for (uint32_t i = begining_seq_no; i <= end_seq_no; i++)
1092  {
1093  std::size_t message_length = 0;
1094  session->get_outgoing_message_serialiser()->read_message(i, session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, message_length);
1095 
1096  auto message = outgoing_message_instance(session);
1097  message->load_from_buffer(session->get_tx_encode_buffer(), message_length); // t122(orig sending time) will be handled by load_from_buffer
1098 
1099  message->template set_tag<FixMessageComponent::HEADER>(FixConstants::TAG_POSS_DUP_FLAG, FixConstants::FIX_BOOLEAN_TRUE);
1100 
1101  if (session->include_t97_during_resends())
1102  {
1103  message->template set_tag<FixMessageComponent::HEADER>(FixConstants::TAG_POSS_RESEND, FixConstants::FIX_BOOLEAN_TRUE);
1104  }
1105 
1106  std::size_t encoded_length = 0;
1107  message->encode(session->get_tx_encode_buffer(), m_settings.tx_encode_buffer_capacity, i, encoded_length);
1108  send_bytes<false>(session, session->get_tx_encode_buffer(), encoded_length); // false is for not incrementing seq no for this message
1109  }
1110  }
1111 
1112  virtual bool send_reject_message(FixSession* session, const IncomingFixMessage* incoming_message, uint32_t reject_reason_code, const char* buffer_message, std::size_t buffer_message_length, uint32_t error_tag=0)
1113  {
1114  LLFIX_UNUSED(incoming_message);
1115  char reject_reason_text[256];
1116 
1117  std::size_t text_length{ 0 };
1118  FixUtilities::get_reject_reason_text(reject_reason_text, text_length, reject_reason_code);
1119 
1120  if (error_tag == 0)
1121  {
1122  LLFIX_LOG_DEBUG("FixServer " + m_name + " session " + session->get_name() + " received an invalid message : " + std::string(reject_reason_text) + " ,message : " + FixUtilities::fix_to_human_readible(buffer_message, buffer_message_length));
1123  }
1124  else
1125  {
1126  LLFIX_LOG_DEBUG("FixServer " + m_name + " session " + session->get_name() + " received an invalid message : " + std::string(reject_reason_text) + " ,tag : " + std::to_string(error_tag) + " ,message : " + FixUtilities::fix_to_human_readible(buffer_message, buffer_message_length));
1127  }
1128 
1129  session->build_session_level_reject_message(outgoing_message_instance(session), reject_reason_code, reject_reason_text, error_tag);
1130  return send_outgoing_message(session, outgoing_message_instance(session));
1131  }
1132 
1133  virtual bool send_throttle_reject_message(FixSession* session, const IncomingFixMessage* incoming_message)
1134  {
1135  auto reject_message = outgoing_message_instance(session);
1136  reject_message->set_msg_type(FixConstants::MSG_TYPE_BUSINESS_REJECT);
1137 
1138  reject_message->set_tag(FixConstants::TAG_TEXT, session->settings()->throttler_reject_message);
1139 
1140  if (incoming_message->has_tag(FixConstants::TAG_MSG_SEQ_NUM))
1141  {
1142  reject_message->set_tag(FixConstants::TAG_REF_SEQ_NUM, incoming_message->get_tag_value_as<uint32_t>(FixConstants::TAG_MSG_SEQ_NUM));
1143  }
1144 
1145  if (incoming_message->has_tag(FixConstants::TAG_MSG_TYPE))
1146  {
1147  reject_message->set_tag(FixConstants::TAG_REF_MSG_TYPE, incoming_message->get_tag_value_as<std::string>(FixConstants::TAG_MSG_TYPE));
1148  }
1149 
1150  return send_outgoing_message(session, reject_message);
1151  }
1152 
1153  std::unordered_map<std::string, FixSession*> m_sessions;
1154 
1155  private:
1156  static inline constexpr std::size_t MINIMUM_REQUIRED_INITIAL_BUFFER_FOR_PARSING = 4; // 10=<DELIMITER>
1157  std::string m_name;
1158  bool m_is_ha_primary = true;
1159 
1160  FixServerSettings m_settings;
1161 
1162  FixServerConnectors m_fix_connectors;
1163 
1164  MessagePersistPlugin* m_message_persist_plugin = nullptr;
1165 
1166  UserspaceSpinlock<> m_unknown_session_context_lock;
1167 
1168  bool has_session(const std::string& session_name)
1169  {
1170  if (m_sessions.find(session_name) == m_sessions.end())
1171  {
1172  return false;
1173  }
1174 
1175  return true;
1176  }
1177 
1178  FixSession* find_session(const std::string& begin_string, const std::string& sender_comp_id, const std::string& target_comp_id)
1179  {
1180  FixSession* ret = nullptr;
1181 
1182  for (const auto& session_entry : m_sessions)
1183  {
1184  if (session_entry.second->settings()->sender_comp_id == sender_comp_id && session_entry.second->settings()->target_comp_id == target_comp_id && session_entry.second->settings()->begin_string == begin_string)
1185  {
1186  return session_entry.second;
1187  }
1188  }
1189 
1190  return ret;
1191  }
1192 
1193  bool does_any_session_use_path(const std::string& path)
1194  {
1195  for (const auto& session_entry : m_sessions)
1196  {
1197  if (session_entry.second->settings()->sequence_store_file_path == path)
1198  {
1199  return true;
1200  }
1201 
1202  if (session_entry.second->settings()->incoming_message_serialisation_path == path)
1203  {
1204  return true;
1205  }
1206 
1207  if (session_entry.second->settings()->outgoing_message_serialisation_path == path)
1208  {
1209  return true;
1210  }
1211  }
1212  return false;
1213  }
1214 
1215  bool does_any_session_use_compids(const std::string& sender_comp_id, const std::string& target_comp_id)
1216  {
1217  for (const auto& session_entry : m_sessions)
1218  {
1219  if (session_entry.second->settings()->sender_comp_id == sender_comp_id && session_entry.second->settings()->target_comp_id == target_comp_id)
1220  {
1221  return true;
1222  }
1223  }
1224  return false;
1225  }
1226 
1227  bool internal_add_session(const std::string& session_name, const FixSessionSettings& session_settings)
1228  {
1229  if (has_session(session_name))
1230  {
1231  LLFIX_LOG_ERROR(m_name + " : Already loaded a session with name " + session_name + " . You have to specify unique names for sessions.");
1232  return false;
1233  }
1234 
1235  if (does_any_session_use_path(session_settings.sequence_store_file_path))
1236  {
1237  LLFIX_LOG_ERROR(m_name + " : session " + session_name + "'s sequence_store_file_path already being used in another session.");
1238  return false;
1239  }
1240 
1241  if (does_any_session_use_path(session_settings.incoming_message_serialisation_path))
1242  {
1243  LLFIX_LOG_ERROR(m_name + " : session " + session_name + "'s incoming_message_serialisation_path already being used in another session.");
1244  return false;
1245  }
1246 
1247  if (does_any_session_use_path(session_settings.outgoing_message_serialisation_path))
1248  {
1249  LLFIX_LOG_ERROR(m_name + " : session " + session_name + "'s outgoing_message_serialisation_path already being used in another session.");
1250  return false;
1251  }
1252 
1253  if (does_any_session_use_compids(session_settings.sender_comp_id, session_settings.target_comp_id))
1254  {
1255  std::string warning_message = "WARNING (" + m_name + "): session " + session_name + "'s sender and target comp ids are already being used in another session.";
1256  LLFIX_LOG_WARNING(warning_message);
1257  fprintf(stderr, "%s", warning_message.c_str());
1258  }
1259 
1260  FixSession* current_session = new (std::nothrow) FixSession;
1261 
1262  if (current_session == nullptr)
1263  {
1264  LLFIX_LOG_ERROR(m_name + " : Failed to allocate fix session for " + session_name);
1265  return false;
1266  }
1267 
1268  session_settings.is_server = true;
1269  session_settings.tx_encode_buffer_capacity = m_settings.tx_encode_buffer_capacity; // Propagate
1270 
1271  if (current_session->initialise(session_name, session_settings) == false)
1272  {
1273  delete current_session;
1274  return false;
1275  }
1276 
1277  m_sessions[session_name] = current_session;
1278 
1279  if (m_settings.starts_as_primary_instance == false)
1280  {
1281  disable_session(current_session);
1282  }
1283 
1284  return true;
1285  }
1286 
1287  void process_session(std::size_t peer_index)
1288  {
1289  static_assert(Transport::is_multithreaded());
1290 
1291  FixSession* sess = m_fix_connectors.get_session(peer_index);
1292 
1293  if (sess)
1294  {
1295  sess->lock();
1296  process_session(sess);
1297  sess->unlock();
1298  }
1299  }
1300 
1301  void process_session(FixSession* session)
1302  {
1303  static_assert(Transport::is_multithreaded());
1304 
1305  if (llfix_likely(this->m_is_stopping.load() == false))
1306  {
1307  process_admin_commands(session);
1308 
1309  auto current_state = session->get_state();
1310 
1311  if (is_state_live(current_state))
1312  {
1313  process_outgoing_resend_request_if_necessary(session, VDSO::nanoseconds_monotonic());
1314  process_outgoing_test_request_if_necessary(session, VDSO::nanoseconds_monotonic());
1315  respond_to_resend_request_if_necessary(session);
1316  respond_to_test_request_if_necessary(session);
1317  send_heartbeat_if_necessary(session, VDSO::nanoseconds_monotonic());
1318 
1319  process_schedule_validator(session);
1320  }
1321  }
1322  else
1323  {
1324  send_logout_message(session);
1325  on_client_disconnected(session);
1326  }
1327  }
1328 
1329  void process_admin_commands_of_all_non_live_sessions()
1330  {
1331  static_assert(Transport::is_multithreaded());
1332 
1333  for (auto& iter : m_sessions)
1334  {
1335  auto state = iter.second->get_state();
1336 
1337  if (is_state_live(state) == false)
1338  {
1339  iter.second->lock();
1340  process_admin_commands(iter.second);
1341  iter.second->unlock();
1342  }
1343  }
1344 
1345  }
1346 
1347  void process()
1348  {
1349  static_assert(!Transport::is_multithreaded());
1350 
1351  if (llfix_likely(this->m_is_stopping.load() == false))
1352  {
1353  for (const auto& session_entry : m_sessions)
1354  {
1355  FixSession* current_session = session_entry.second;
1356 
1357  process_admin_commands(current_session);
1358 
1359  auto current_state = current_session->get_state();
1360 
1361  if (is_state_live(current_state))
1362  {
1363  process_outgoing_resend_request_if_necessary(current_session, VDSO::nanoseconds_monotonic());
1364  process_outgoing_test_request_if_necessary(current_session, VDSO::nanoseconds_monotonic());
1365  respond_to_resend_request_if_necessary(current_session);
1366  respond_to_test_request_if_necessary(current_session);
1367  send_heartbeat_if_necessary(current_session, VDSO::nanoseconds_monotonic());
1368 
1369  process_schedule_validator(current_session);
1370  }
1371  }
1372  }
1373  else
1374  {
1375  send_logout_messages_to_all_sessions();
1376  }
1377  }
1378 
1379  void push_admin_command_internal(const std::string& session_name, ModifyingAdminCommandType type, uint32_t arg = 0)
1380  {
1381  auto admin_command = new (std::nothrow) ModifyingAdminCommand;
1382 
1383  if (llfix_likely(admin_command))
1384  {
1385  admin_command->type = type;
1386  admin_command->arg = arg;
1387  m_sessions[session_name]->get_admin_commands()->push(admin_command); // Session name existence is checked in management server layer
1388  }
1389  else
1390  {
1391  LLFIX_LOG_ERROR("Failed to process setter admin command for FixServer " + m_name);
1392  }
1393  }
1394 
1395  void process_admin_commands(FixSession* session)
1396  {
1397  ModifyingAdminCommand* admin_command{ nullptr };
1398 
1399  if (session->get_admin_commands()->try_pop(&admin_command) == true)
1400  {
1401  switch (admin_command->type)
1402  {
1403  case ModifyingAdminCommandType::SET_INCOMING_SEQUENCE_NUMBER:
1404  {
1405  session->get_sequence_store()->set_incoming_seq_no(admin_command->arg);
1406  LLFIX_LOG_DEBUG(m_name + " : processed set incoming sequence number admin command , session name : " + session->get_name() + " , new seq no : " + std::to_string(admin_command->arg) );
1407  break;
1408  }
1409 
1410  case ModifyingAdminCommandType::SET_OUTGOING_SEQUENCE_NUMBER:
1411  {
1412  session->get_sequence_store()->set_outgoing_seq_no(admin_command->arg);
1413  LLFIX_LOG_DEBUG(m_name + " : processed set outgoing sequence number admin command , session name : " + session->get_name() + " , new seq no : " + std::to_string(admin_command->arg) );
1414  break;
1415  }
1416 
1417  case ModifyingAdminCommandType::SEND_SEQUENCE_RESET:
1418  {
1419  if( session->get_state() == SessionState::LOGGED_ON )
1420  {
1421  send_sequence_reset_message(session, admin_command->arg);
1422  LLFIX_LOG_DEBUG(m_name + " : processed send sequence reset admin command , session name : " + session->get_name() + " , new seq no : " + std::to_string(admin_command->arg) );
1423  }
1424  else
1425  {
1426  LLFIX_LOG_ERROR(m_name + " : dropping send sequence reset admin command as not logged on , session name : " + session->get_name());
1427  }
1428 
1429  break;
1430  }
1431 
1432  case ModifyingAdminCommandType::DISABLE_SESSION:
1433  {
1434  disable_session(session);
1435  LLFIX_LOG_DEBUG(m_name + " : processed disable session admin command for session " + session->get_name() + " , new session state : Disabled");
1436  break;
1437  }
1438 
1439  case ModifyingAdminCommandType::ENABLE_SESSION:
1440  {
1441  if(m_is_ha_primary == true)
1442  {
1443  enable_session(session);
1444  LLFIX_LOG_DEBUG(m_name + " : processed enable session admin command for " + session->get_name());
1445  }
1446  else
1447  {
1448  LLFIX_LOG_ERROR(m_name + " ignoring enable session admin command for " + session->get_name() + " since not a primary instance" );
1449  }
1450  break;
1451  }
1452 
1453  case ModifyingAdminCommandType::SET_IS_HA_PRIMARY_INSTANCE:
1454  {
1455  m_is_ha_primary = admin_command->arg == 1 ? true : false;
1456 
1457  if(admin_command->arg == 1)
1458  {
1459  enable_session(session);
1460  LLFIX_LOG_DEBUG(m_name + " : processed set is ha primary instance 1 admin command for " + session->get_name());
1461  }
1462  else
1463  {
1464  disable_session(session);
1465  LLFIX_LOG_DEBUG(m_name + " : processed set is ha primary instance 0 admin command for " + session->get_name());
1466  }
1467 
1468  break;
1469  }
1470 
1471 
1472  default: break;
1473  }
1474 
1475  delete admin_command;
1476  }
1477  }
1478 
1479  void disable_session(FixSession* session)
1480  {
1481  if (is_state_live(session->get_state()))
1482  {
1483  this->process_connection_closure(m_fix_connectors.get_peer_index(session), true);
1484  }
1485  session->set_state(SessionState::DISABLED);
1486  }
1487 
1488  void enable_session(FixSession* session)
1489  {
1490  if(session->get_state() == SessionState::DISABLED)
1491  {
1492  session->set_state(SessionState::DISCONNECTED);
1493  if(m_settings.refresh_resend_cache_during_promotion)
1494  session->reinitialise_outgoing_serialiser();
1495  }
1496  }
1497 
1499  // ADMIN UTILITY
1500  void process_outgoing_resend_request_if_necessary(FixSession* session, uint64_t current_timestamp_nanoseconds)
1501  {
1502  if (session->get_state() == SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF)
1503  {
1504  // TERMINATE SESSION IF NECESSARY
1505  uint64_t delta_nanoseconds = current_timestamp_nanoseconds - session->outgoing_resend_request_timestamp_nanoseconds();
1506  uint64_t required_delta_nanoseconds = static_cast<uint64_t>(session->outgoing_resend_request_expire_secs()) * 1'000'000'000;
1507 
1508  if (delta_nanoseconds >= required_delta_nanoseconds)
1509  {
1510  // We need to terminate the session as the other side didn't respond properly to our outgoing resend request
1511  LLFIX_LOG_DEBUG("FixServer " + m_name + " : terminating session " + session->get_name() + " as the other end didn't respond to 35=2/resend request in pre-configured timeout (outgoing_resend_request_expire_secs) : " + std::to_string(session->outgoing_resend_request_expire_secs()));
1512  this->process_connection_closure(m_fix_connectors.get_peer_index(session));
1513  }
1514  }
1515  else
1516  {
1517  // SEND IF NECESSARY
1518  if (session->needs_to_send_resend_request())
1519  {
1520  LLFIX_LOG_DEBUG("FixServer " + m_name + " session " + session->get_name() + " peer " + std::to_string(m_fix_connectors.get_peer_index(session)) + " : sending resend request , begin no : " + std::to_string(session->get_outgoing_resend_request_begin_no()));
1521  send_resend_request(session);
1522  session->set_outgoing_resend_request_timestamp_nanoseconds(current_timestamp_nanoseconds);
1523  session->set_state(llfix::SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF);
1524  session->set_needs_to_send_resend_request(false);
1525  }
1526  }
1527  }
1528 
1529  void process_outgoing_test_request_if_necessary(FixSession* session, uint64_t current_timestamp_nanoseconds)
1530  {
1531  if (session->expecting_response_for_outgoing_test_request() == false)
1532  {
1533  // SEND IF NECESSARY
1534  auto delta_nanoseconds = current_timestamp_nanoseconds - session->last_received_message_timestamp_nanoseconds();
1535 
1536  if (delta_nanoseconds >= (session->get_outgoing_test_request_interval_in_nanoseconds()))
1537  {
1538  LLFIX_LOG_DEBUG("FixServer " + m_name + " session " + session->get_name() + " : sending test request");
1539  send_test_request(session);
1540  session->set_outgoing_test_request_timestamp_nanoseconds(current_timestamp_nanoseconds);
1541  session->set_expecting_response_for_outgoing_test_request(true);
1542  }
1543  }
1544  else
1545  {
1546  // TERMINATE SESSION IF NECESSARY
1547  auto delta_nanoseconds = current_timestamp_nanoseconds - session->outgoing_test_request_timestamp_nanoseconds();
1548 
1549  if (delta_nanoseconds >= (session->get_heartbeart_interval_in_nanoseconds() * static_cast<uint64_t>(2)))
1550  {
1551  session->set_expecting_response_for_outgoing_test_request(false);
1552  LLFIX_LOG_DEBUG("FixServer " + m_name + " : terminating session " + session->get_name() + " as the other end didn't respond to 35=1/test request");
1553  this->process_connection_closure(m_fix_connectors.get_peer_index(session));
1554  }
1555  }
1556  }
1557 
1558  void respond_to_resend_request_if_necessary(FixSession* session)
1559  {
1560  if (session->needs_responding_to_incoming_resend_request())
1561  {
1562  const auto last_outgoing_seq_no = session->get_sequence_store()->get_outgoing_seq_no();
1563 
1564  // Partial replays not supported therefore incoming end no should be either 0 or same as last outgoing seq no
1565  bool will_replay = (session->serialisation_enabled()) && (session->replay_messages_on_incoming_resend_request() && (session->get_incoming_resend_request_end_no() == 0
1566  || session->get_incoming_resend_request_end_no() == last_outgoing_seq_no) && session->get_incoming_resend_request_begin_no()<=last_outgoing_seq_no);
1567 
1568  if(last_outgoing_seq_no > session->get_incoming_resend_request_begin_no())
1569  if(last_outgoing_seq_no-session->get_incoming_resend_request_begin_no()+1 > session->max_resend_range())
1570  will_replay = false;
1571 
1572  if(will_replay)
1573  {
1574  for (uint32_t i = session->get_incoming_resend_request_begin_no(); i <= last_outgoing_seq_no; i++)
1575  {
1576  if (session->get_outgoing_message_serialiser()->has_message_in_memory(i) == false)
1577  {
1578  will_replay = false;
1579  break;
1580  }
1581  }
1582  }
1583 
1584  if(will_replay)
1585  {
1586  LLFIX_LOG_DEBUG("FixServer " + m_name + " session " + session->get_name() + " : replaying messages , begin seq no : " + std::to_string(session->get_incoming_resend_request_begin_no()) + " , end seq no : " + std::to_string(last_outgoing_seq_no));
1587  resend_messages_to_client(session, session->get_incoming_resend_request_begin_no(), last_outgoing_seq_no);
1588  }
1589  else
1590  {
1591  LLFIX_LOG_DEBUG("FixServer " + m_name + " session " + session->get_name() + " : sending gap fill message");
1592  send_gap_fill_message(session);
1593  }
1594 
1595  session->set_needs_responding_to_incoming_resend_request(false);
1596  session->set_state(llfix::SessionState::LOGGED_ON);
1597  }
1598  }
1599 
1600  void respond_to_test_request_if_necessary(FixSession* session)
1601  {
1602  if (session->needs_responding_to_incoming_test_request())
1603  {
1604  if (session->get_incoming_test_request_id()->length() > 0)
1605  {
1606  send_heartbeat(session , session->get_incoming_test_request_id());
1607  session->get_incoming_test_request_id()->set_length(0);
1608  }
1609  else
1610  {
1611  // We should not hit here , but better than risking disconnection
1612  send_heartbeat(session, nullptr);
1613  }
1614 
1615  LLFIX_LOG_DEBUG("FixServer " + m_name + " session " + session->get_name() + " : responded to incoming test request");
1616  session->set_needs_responding_to_incoming_test_request(false);
1617  }
1618  }
1619 
1620  void send_heartbeat_if_necessary(FixSession* session, uint64_t current_timestamp_nanoseconds)
1621  {
1622  auto delta_nanoseconds = current_timestamp_nanoseconds - session->last_sent_message_timestamp_nanoseconds();
1623 
1624  if (delta_nanoseconds >= (session->get_heartbeart_interval_in_nanoseconds() * static_cast<uint64_t>(9) / static_cast<uint64_t>(10))) // 0.9 for safety
1625  {
1626  send_heartbeat(session, nullptr);
1627  }
1628  }
1629 
1630  void send_logout_messages_to_all_sessions()
1631  {
1632  for (const auto& session_entry : m_sessions)
1633  {
1634  FixSession* current_session = session_entry.second;
1635 
1636  if (current_session->get_state() != SessionState::DISCONNECTED)
1637  {
1638  send_logout_message(current_session);
1639  on_client_disconnected(current_session);
1640  }
1641  }
1642  }
1644  // TO CLIENT
1645  template <bool increment_outgoing_seq_no>
1646  LLFIX_FORCE_INLINE bool send_bytes(FixSession* session, const char* buffer, std::size_t buffer_size)
1647  {
1648  std::size_t peer_index = m_fix_connectors.get_peer_index(session);
1649 
1650  if (peer_index == static_cast<std::size_t>(-1))
1651  return false;
1652 
1653  auto sequence_store = session->get_sequence_store();
1654 
1655  auto ret = this->send(peer_index, buffer, buffer_size);
1656 
1657  if constexpr (increment_outgoing_seq_no == true)
1658  {
1659  if(ret==true)
1660  sequence_store->increment_outgoing_seq_no();
1661  }
1662 
1663  if(session->serialisation_enabled())
1664  session->get_outgoing_message_serialiser()->write(reinterpret_cast<const void*>(buffer), buffer_size, ret, sequence_store->get_outgoing_seq_no());
1665 
1666  if (m_message_persist_plugin)
1667  m_message_persist_plugin->persist_outgoing_message(session->get_name(), sequence_store->get_outgoing_seq_no(), buffer, buffer_size, ret);
1668 
1669  session->set_last_sent_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
1670 
1671  return ret;
1672  }
1673 
1675  // THREAD LOCAL UNKNOWN SESSION CONTEXT
1676  UnknownSessionContext* get_thread_local_unknown_session_context()
1677  {
1678  const std::lock_guard<UserspaceSpinlock<>> lock(m_unknown_session_context_lock);
1679 
1680  UnknownSessionContext* unknown_session_context = nullptr;
1681  unknown_session_context = reinterpret_cast<UnknownSessionContext*>(ThreadLocalStorage::get_instance().get());
1682 
1683  if (unknown_session_context == nullptr)
1684  {
1685  unknown_session_context = new(std::nothrow) UnknownSessionContext;
1686  ThreadLocalStorage::get_instance().set(unknown_session_context);
1687  }
1688 
1689  if (unknown_session_context)
1690  {
1691  if (unknown_session_context->m_incoming_fix_message.initialise() == false)
1692  {
1693  LLFIX_LOG_ERROR(m_name + " : Failed to initialise unknown session context's incoming_fix_message");
1694  return nullptr;
1695  }
1696 
1697  if (unknown_session_context->m_fix_string_view_cache.create(256) == false)
1698  {
1699  LLFIX_LOG_ERROR(m_name + " : Failed to create unknown session context's fix string view cache");
1700  return nullptr;
1701  }
1702  }
1703 
1704  return unknown_session_context;
1705  }
1706 
1707  void destroy_thread_local_unknown_session_context()
1708  {
1709  const std::lock_guard<UserspaceSpinlock<>> lock(m_unknown_session_context_lock);
1710 
1711  UnknownSessionContext* unknown_session_context{ nullptr };
1712  unknown_session_context = reinterpret_cast<UnknownSessionContext*>(ThreadLocalStorage::get_instance().get());
1713 
1714  if (unknown_session_context)
1715  {
1716  delete unknown_session_context;
1717  ThreadLocalStorage::get_instance().set(nullptr);
1718  }
1719  }
1720 
1722  // FROM CLIENT
1723  LLFIX_HOT void process_rx_buffer(std::size_t peer_index, char* buffer, std::size_t buffer_size)
1724  {
1725  std::size_t buffer_read_index = 0;
1726 
1727  if (llfix_unlikely(buffer_size < MINIMUM_REQUIRED_INITIAL_BUFFER_FOR_PARSING))
1728  {
1729  this->set_incomplete_buffer(peer_index, buffer, buffer_size);
1730  return;
1731  }
1732 
1733  FixSession* session = m_fix_connectors.get_session(peer_index);
1734 
1736  // FIND OUT THE END
1737  int final_tag10_delimiter_index{ -1 };
1738  int current_index = static_cast<int>(buffer_size - 1);
1739 
1740  if(llfix_unlikely(FixUtilities::find_delimiter_from_end(buffer, buffer_size, current_index) == false))
1741  {
1742  // We don't have the entire message
1743  this->set_incomplete_buffer(peer_index, buffer, buffer_size);
1744  return;
1745  }
1746 
1747  FixUtilities::find_tag10_start_from_end(buffer, buffer_size, current_index, final_tag10_delimiter_index);
1748 
1749  if (final_tag10_delimiter_index == -1)
1750  {
1751  // We don't have the entire message
1752  this->set_incomplete_buffer(peer_index, buffer, buffer_size);
1753  return;
1754  }
1756  // AT THIS POINT WE HAVE AT LEAST ONE COMPLETE MESSAGE
1757  LLFIX_ALIGN_CODE_32;
1758  while (true)
1759  {
1761  // FIND OUT THE BEGIN STRING
1762  int begin_string_offset{-1};
1763  FixUtilities::find_begin_string_position(buffer+buffer_read_index, buffer_size-buffer_read_index, begin_string_offset);
1764 
1765  if(llfix_likely(begin_string_offset>=0))
1766  {
1767  buffer_read_index += begin_string_offset;
1768  }
1769  else
1770  {
1771  LLFIX_LOG_ERROR(m_name + " received a message with no begin string : " + FixUtilities::fix_to_human_readible(buffer+buffer_read_index, buffer_size-buffer_read_index));
1772  return;
1773  }
1775  IncomingFixMessage* incoming_fix_message{ nullptr };
1776  ObjectCache<FixStringView>* fix_string_view_cache{ nullptr };
1777 
1778  if (llfix_likely(session != nullptr))
1779  {
1780  incoming_fix_message = session->get_incoming_fix_message();
1781  fix_string_view_cache = session->get_fix_string_view_cache();
1782  }
1783  else
1784  {
1785  UnknownSessionContext* unknown_session_context = get_thread_local_unknown_session_context();
1786 
1787  if (unknown_session_context)
1788  {
1789  incoming_fix_message = &(unknown_session_context->m_incoming_fix_message);
1790  fix_string_view_cache = &(unknown_session_context->m_fix_string_view_cache);
1791  }
1792  else
1793  {
1794  this->set_incomplete_buffer(peer_index, buffer, buffer_size);
1795  return;
1796  }
1797  }
1798 
1799  incoming_fix_message->reset();
1800  fix_string_view_cache->reset_pointer();
1801 
1802  current_index = static_cast<int>(buffer_read_index);
1803  bool looking_for_equals = true;
1804 
1805  int current_tag_start = static_cast<int>(buffer_read_index);
1806  int current_tag_len{ 0 };
1807 
1808  int current_value_start = static_cast<int>(buffer_read_index);
1809  int current_value_len{ 0 };
1810 
1811  int current_tag_10_delimiter_index = -1;
1812  int current_tag_35_tag_start_index = -1;
1813 
1814  uint32_t parser_reject_code = static_cast<uint32_t>(-1);
1815  uint32_t current_tag_index = 0;
1816 
1817  uint32_t encoded_current_message_type = static_cast<uint32_t>(-1);
1818 
1819  bool in_a_repeating_group{ false };
1820  uint32_t current_rg_count_tag{ 0 };
1821 
1822  #ifdef LLFIX_ENABLE_BINARY_FIELDS
1823  int binary_field_length = 0;
1824  int binary_field_counter = 0;
1825  #endif
1826 
1827  LLFIX_ALIGN_CODE_32;
1828  while (true)
1829  {
1830  char current_char = buffer[current_index];
1831 
1832  if (looking_for_equals)
1833  {
1834  if (current_char == FixConstants::FIX_EQUALS)
1835  {
1836  current_tag_len = current_index - current_tag_start;
1837  current_value_start = current_index + 1;
1838  looking_for_equals = false;
1839  }
1840  else if (llfix_unlikely(current_char == FixConstants::FIX_DELIMITER))
1841  {
1842  current_tag_start = current_index + 1;
1843  parser_reject_code = FixParserErrorCodes::NO_EQUALS_SIGN;
1844  }
1845  }
1846  #ifdef LLFIX_ENABLE_BINARY_FIELDS
1847  else
1848  {
1849  if(llfix_unlikely(binary_field_length>0))
1850  if(binary_field_counter < binary_field_length)
1851  binary_field_counter++;
1852  }
1853 
1854  if (looking_for_equals == false && current_char == FixConstants::FIX_DELIMITER)
1855  {
1856  bool reached_value_end = true;
1857 
1858  if(llfix_unlikely(binary_field_length>0))
1859  {
1860  if(binary_field_length>binary_field_counter)
1861  {
1862  reached_value_end = false;
1863  }
1864  else if(binary_field_length == binary_field_counter)
1865  {
1866  binary_field_length=0;
1867  binary_field_counter=0;
1868  }
1869  }
1870 
1871  if(llfix_likely(reached_value_end))
1872  {
1873 
1874  #else
1875  else if (current_char == FixConstants::FIX_DELIMITER)
1876  {
1877  #endif
1878  current_value_len = current_index - current_value_start;
1879 
1880  if (llfix_unlikely(current_value_len == 0))
1881  {
1882  parser_reject_code = FixConstants::FIX_ERROR_CODE_TAG_WITHOUT_VALUE;
1883  }
1884 
1885  bool is_current_tag_numeric{ true };
1886  FixSession::validate_tag_format(buffer + current_tag_start, static_cast<std::size_t>(current_tag_len), is_current_tag_numeric, parser_reject_code);
1887 
1888  if (llfix_likely(is_current_tag_numeric))
1889  {
1891  // RECORD THE CURRENT TAG VALUE PAIR
1892  uint32_t tag = Converters::chars_to_unsigned_int<uint32_t>(buffer + current_tag_start, static_cast<std::size_t>(current_tag_len));
1893 
1894  if(llfix_unlikely(tag == 0))
1895  {
1896  parser_reject_code = FixConstants::FIX_ERROR_CODE_INVALID_TAG_NUMBER;
1897  }
1898 
1899  FixStringView* value = fix_string_view_cache->allocate();
1900  value->set_buffer(const_cast<char*>(buffer + current_value_start), static_cast<std::size_t>(current_value_len));
1901 
1903  current_tag_index++;
1904  FixSession::validate_header_tags_order(tag, current_tag_index, parser_reject_code);
1906 
1907  // Some tags may be used as a group member but also as an individual tag so we can't fully rely on is_a_repeating_group_tag
1908  // and need to detect start and end of a group
1909  if(encoded_current_message_type != static_cast<uint32_t>(-1))
1910  {
1911  if(llfix_likely(!in_a_repeating_group))
1912  {
1913  if (llfix_unlikely(FixSession::get_repeating_group_specs().is_a_repeating_group_count_tag(encoded_current_message_type, tag)))
1914  {
1915  current_rg_count_tag = tag;
1916  in_a_repeating_group = true;
1917  }
1918  }
1919  else
1920  {
1921  if (llfix_likely(FixSession::get_repeating_group_specs().is_a_repeating_group_tag(encoded_current_message_type, current_rg_count_tag, tag) == false))
1922  {
1923  in_a_repeating_group = false;
1924  }
1925  }
1926 
1927  #ifdef LLFIX_ENABLE_BINARY_FIELDS
1928  if (llfix_unlikely(FixSession::get_binary_field_specs().is_binary_data_length_tag(encoded_current_message_type, tag)))
1929  {
1930  binary_field_length = Converters::chars_to_int<int>(value->data(), value->length());
1931  }
1932  #endif
1933  }
1934 
1935  if (llfix_likely(in_a_repeating_group == false))
1936  {
1937  if (llfix_likely(incoming_fix_message->has_tag(tag) == false))
1938  {
1939  incoming_fix_message->set_tag(tag, value);
1940  }
1941  else
1942  {
1943  parser_reject_code = FixConstants::FIX_ERROR_CODE_TAG_APPEARS_MORE_THAN_ONCE;
1944  }
1945  }
1946  else
1947  {
1948  incoming_fix_message->set_repeating_group_tag(tag, value);
1949  }
1950 
1951  if (tag == FixConstants::TAG_CHECKSUM)
1952  {
1953  current_tag_10_delimiter_index = current_index;
1955  FixSession::validate_tag9_and_tag35((incoming_fix_message), current_tag_start, current_tag_35_tag_start_index, parser_reject_code);
1957  break;
1958  }
1959  else if (tag == FixConstants::TAG_MSG_TYPE)
1960  {
1961  current_tag_35_tag_start_index = current_tag_start;
1962  encoded_current_message_type = FixUtilities::pack_message_type(value->to_string_view());
1963  }
1964  } // if (llfix_likely(is_current_tag_numeric))
1965 
1966  current_tag_start = current_index + 1;
1967  looking_for_equals = true;
1968  #ifdef LLFIX_ENABLE_BINARY_FIELDS
1969  } // if(llfix_likely(reached_value_end))
1970  #endif
1971  } // else if (current_char == FixConstants::FIX_DELIMITER) or if (looking_for_equals == false && current_char == FixConstants::FIX_DELIMITER)
1972 
1973  // Apart from the check below (else if (static_cast<int>(buffer_read_index) > final_tag10_delimiter_index)),
1974  // we also need to check here if found last tag 10 delimiter is before the found tag8
1975  if (current_index >= static_cast<int>(buffer_size) - 1)
1976  {
1977  this->set_incomplete_buffer(peer_index, buffer + buffer_read_index, buffer_size - buffer_read_index);
1978  return;
1979  }
1980 
1981  current_index++;
1982  } // while
1983 
1984  auto current_message_length = current_tag_10_delimiter_index + 1 - buffer_read_index;
1986  if (llfix_unlikely(session == nullptr))
1987  {
1988  session = accept_session(peer_index, incoming_fix_message, buffer + buffer_read_index, current_message_length, parser_reject_code);
1989 
1990  if (session == nullptr) // Means that it was an invalid logon attempt and we already closed the connection
1991  {
1992  return;
1993  }
1994  }
1996  session->lock();
1997  process_incoming_fix_message(session, peer_index, buffer + buffer_read_index, current_message_length, parser_reject_code);
1998  session->unlock();
1999 
2000  #ifndef LLFIX_UNIT_TEST
2001  if(llfix_unlikely(session->get_state() == SessionState::DISCONNECTED)) // process_incoming_fix_message may terminate connection
2002  {
2003  return;
2004  }
2005  #endif
2006 
2007  buffer_read_index = current_tag_10_delimiter_index + 1;
2008 
2009  if (current_tag_10_delimiter_index == static_cast<int>(buffer_size) - 1)
2010  {
2011  this->reset_incomplete_buffer(peer_index);
2012  return;
2013  }
2014  else if (static_cast<int>(buffer_read_index) > final_tag10_delimiter_index)
2015  {
2016  this->set_incomplete_buffer(peer_index, buffer + buffer_read_index, buffer_size - buffer_read_index);
2017  return;
2018  }
2019  }
2020  }
2021 
2022  void process_invalid_logon(std::size_t peer_index, const std::string& log_message, bool send_logout = false, const std::string& logout_reason_text = "")
2023  {
2024  if(!log_message.empty())
2025  {
2026  int peer_port{0};
2027  std::string peer_ip;
2028  this->get_peer_details(peer_index, peer_ip, peer_port);
2029 
2030  LLFIX_LOG_ERROR("Invalid logon attempt from " + peer_ip + " : " + log_message);
2031  }
2032 
2033  process_connection_closure(peer_index, send_logout, logout_reason_text);
2034  }
2035 
2036  void process_connection_closure(std::size_t peer_index, bool send_logout = false, const std::string& logout_reason_text = "")
2037  {
2038  if (send_logout)
2039  {
2040  auto session = m_fix_connectors.get_session(peer_index);
2041 
2042  if (session != nullptr)
2043  {
2044  send_logout_message(session, logout_reason_text);
2045  }
2046  }
2047 
2048  this->close_connection(peer_index);
2049  }
2050 
2051  void process_incoming_fix_message(FixSession* session, std::size_t peer_index, const char* buffer_message, std::size_t buffer_message_length, uint32_t parser_reject_code)
2052  {
2053  session->set_last_received_message_timestamp_nanoseconds(VDSO::nanoseconds_monotonic());
2054 
2055  if(session->serialisation_enabled())
2056  session->get_incoming_message_serialiser()->write(buffer_message, buffer_message_length, true);
2057 
2058  if (llfix_unlikely(session->expecting_response_for_outgoing_test_request()))
2059  {
2060  // We are permissive , any message not just 35=0 with expected t112, satisfies our outgoing test request
2061  session->set_expecting_response_for_outgoing_test_request(false);
2062  LLFIX_LOG_DEBUG("FixServer " + m_name + " session " + session->get_name() + " : other end satisfied the test request");
2063  }
2065  if (session->validations_enabled())
2066  {
2067  uint32_t reject_message_code = static_cast<uint32_t>(-1);
2068 
2069  if (session->validate_fix_message(*session->get_incoming_fix_message(), buffer_message, buffer_message_length, parser_reject_code, reject_message_code) == false)
2070  {
2071  if (reject_message_code != static_cast<uint32_t>(-1))
2072  {
2073  send_reject_message(session, session->get_incoming_fix_message(), reject_message_code, buffer_message, buffer_message_length, session->get_last_error_tag());
2074  }
2075 
2076  return;
2077  }
2078  }
2080  // SEQUENCE NO & MESSAGE PERSIST PLUGIN
2081  auto sequence_store = session->get_sequence_store();
2082  sequence_store->increment_incoming_seq_no();
2083  auto sequence_store_incoming_seq_no = sequence_store->get_incoming_seq_no();
2084 
2085  if (m_message_persist_plugin)
2086  m_message_persist_plugin->persist_incoming_message(session->get_name(), sequence_store_incoming_seq_no, buffer_message, buffer_message_length);
2087 
2089  // SEQUENCE NO CHECKS
2090  auto incoming_seq_no = session->get_incoming_fix_message()->get_tag_value_as<uint32_t>(FixConstants::TAG_MSG_SEQ_NUM);
2091 
2092  if (llfix_unlikely(incoming_seq_no > sequence_store_incoming_seq_no))
2093  {
2094  if(FixSession::is_a_hard_sequence_reset_message(*session->get_incoming_fix_message()) == false)
2095  {
2096  if (session->get_state() != SessionState::PENDING_LOGON) // on_logon_request handles this case on its own
2097  {
2098  session->queue_outgoing_resend_request(sequence_store_incoming_seq_no, incoming_seq_no);
2099  return;
2100  }
2101  }
2102  }
2103  else if (llfix_unlikely(incoming_seq_no < sequence_store_incoming_seq_no))
2104  {
2105  if(FixSession::is_a_hard_sequence_reset_message(*session->get_incoming_fix_message()) == false)
2106  {
2107  std::string logout_reason_text;
2108  bool send_logout = false;
2109 
2110  if (session->get_state() == SessionState::PENDING_LOGON)
2111  {
2112  send_logout = true;
2113  logout_reason_text = "MsgSeqNum too low, expecting " + std::to_string(sequence_store_incoming_seq_no) + " but received " + std::to_string(incoming_seq_no);
2114  }
2115 
2116  sequence_store->set_incoming_seq_no(sequence_store_incoming_seq_no - 1);
2117  LLFIX_LOG_DEBUG("FixServer " + m_name + " : terminating session " + session->get_name() + " as the incoming sequence no (" + std::to_string(incoming_seq_no) + ") is lower than expected (" + std::to_string(sequence_store_incoming_seq_no) + ")");
2118  this->process_connection_closure(peer_index, send_logout, logout_reason_text);
2119  return;
2120  }
2121  }
2122 
2123  if (llfix_unlikely(session->get_state() == SessionState::IN_RETRANSMISSION_INITIATED_BY_SELF))
2124  {
2125  if (incoming_seq_no == session->get_outgoing_resend_request_end_no())
2126  {
2127  LLFIX_LOG_DEBUG("FixServer " + m_name + " : other end satisfied the resend request");
2128  session->set_state(llfix::SessionState::LOGGED_ON);
2129  }
2130  }
2132  if(process_incoming_throttling(session, session->get_incoming_fix_message()) == false)
2133  {
2134  return;
2135  }
2136 
2137  auto message_type = session->get_incoming_fix_message()->get_tag_value(FixConstants::TAG_MSG_TYPE);
2138 
2139  if (llfix_likely(message_type->length() == 1))
2140  {
2141  switch (message_type->data()[0])
2142  {
2143  case FixConstants::MSG_TYPE_NEW_ORDER: on_new_order(session, session->get_incoming_fix_message()); break;
2144  case FixConstants::MSG_TYPE_ORDER_CANCEL: on_cancel_order(session, session->get_incoming_fix_message()); break;
2145  case FixConstants::MSG_TYPE_ORDER_CANCEL_REPLACE: on_replace_order(session, session->get_incoming_fix_message()); break;
2146  case FixConstants::MSG_TYPE_HEARTBEAT: on_client_heartbeat(session); break;
2147  case FixConstants::MSG_TYPE_TEST_REQUEST: session->process_test_request_message(*session->get_incoming_fix_message()); on_client_test_request(session, session->get_incoming_fix_message()); break;
2148  case FixConstants::MSG_TYPE_RESEND_REQUEST: session->process_resend_request(*session->get_incoming_fix_message()); on_client_resend_request(session, session->get_incoming_fix_message()); break;
2149  case FixConstants::MSG_TYPE_REJECT: on_session_level_reject(session, session->get_incoming_fix_message()); break;
2150  case FixConstants::MSG_TYPE_BUSINESS_REJECT: on_application_level_reject(session, session->get_incoming_fix_message()); break;
2151  case FixConstants::MSG_TYPE_LOGON: process_logon_request(session, peer_index, session->get_incoming_fix_message()); break;
2152  case FixConstants::MSG_TYPE_LOGOUT: process_logout_request(session, peer_index, session->get_incoming_fix_message()); break;
2153  case FixConstants::MSG_TYPE_SEQUENCE_RESET: if (session->process_incoming_sequence_reset_message(*session->get_incoming_fix_message()) == false) { send_reject_message(session, session->get_incoming_fix_message(), FixConstants::FIX_ERROR_CODE_VALUE_INCORRECT_FOR_TAG, buffer_message, buffer_message_length, FixConstants::TAG_NEW_SEQ_NO); }; break;
2154  // Anything else
2155  default: on_custom_message(session, session->get_incoming_fix_message()); break;
2156  }
2157  }
2158  else
2159  {
2160  on_custom_message(session, session->get_incoming_fix_message());
2161  }
2162  }
2163 
2164  bool validate_logon_request(FixSession* session, std::size_t peer_index, const IncomingFixMessage* message)
2165  {
2166  LLFIX_UNUSED(session);
2167  int peer_port{0};
2168  std::string peer_ip;
2169  this->get_peer_details(peer_index, peer_ip, peer_port);
2170 
2171  if (message->has_tag(FixConstants::TAG_ENCRYPT_METHOD) == false)
2172  {
2173  LLFIX_LOG_ERROR("Incoming logon message for server " + m_name + " from " + peer_ip + " does not have required t98(encryption method), compid : " + message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID));
2174  return false;
2175  }
2176 
2177  if (message->has_tag(FixConstants::TAG_HEART_BT_INT) == false)
2178  {
2179  LLFIX_LOG_ERROR("Incoming logon message for server " + m_name + " from " + peer_ip + " does not have required t108(heartbeat interval), compid : " + message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID));
2180  return false;
2181  }
2182 
2183  auto t108_string = message->get_tag_value_as<std::string>(FixConstants::TAG_HEART_BT_INT);
2184 
2185  if (t108_string.find('-') != std::string::npos)
2186  {
2187  LLFIX_LOG_ERROR("Incoming logon message for server " + m_name + " from " + peer_ip + " has invalid(negative) t108(heartbeat interval) value, compid : " + message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID));
2188  return false;
2189  }
2190 
2191  if (t108_string.find('.') != std::string::npos)
2192  {
2193  LLFIX_LOG_ERROR("Incoming logon message for server " + m_name + " from " + peer_ip + " has invalid(decimal points) t108(heartbeat interval) value, compid : " + message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID));
2194  return false;
2195  }
2196 
2197  if (message->get_tag_value_as<uint32_t>(FixConstants::TAG_HEART_BT_INT) == 0)
2198  {
2199  LLFIX_LOG_ERROR("Incoming logon message for server " + m_name + " from " + peer_ip + " has invalid(zero) t108(heartbeat interval) value, compid : " + message->get_tag_value_as<std::string>(FixConstants::TAG_SENDER_COMP_ID));
2200  return false;
2201  }
2202 
2203  return true;
2204  }
2205 
2206  void do_post_logon_sequence_number_check(FixSession* session)
2207  {
2208  if(session->get_state() == SessionState::LOGGED_ON)
2209  {
2210  // IF LOGON WAS WITH A SEQ NO HIGHER THAN EXPECTED , WE NEED TO SEND A RESEND REQUEST
2211  auto sequence_store_incoming_seq_no = session->get_sequence_store()->get_incoming_seq_no();
2212  auto incoming_seq_no = session->get_incoming_fix_message()->get_tag_value_as<uint32_t>(FixConstants::TAG_MSG_SEQ_NUM);
2213 
2214  if (llfix_unlikely(incoming_seq_no > sequence_store_incoming_seq_no))
2215  {
2216  if(FixSession::is_a_hard_sequence_reset_message(*session->get_incoming_fix_message()) == false)
2217  {
2218  session->queue_outgoing_resend_request(sequence_store_incoming_seq_no, incoming_seq_no);
2219  }
2220  }
2221 
2222  // LOW SEQ NOS ARE HANDLED IN process_incoming_fix_message
2223  }
2224  }
2225 
2226  void process_logout_request(FixSession* session, std::size_t peer_index, const IncomingFixMessage* message)
2227  {
2228  send_logout_message(session);
2229  session->set_state(SessionState::LOGGED_OUT);
2230  LLFIX_LOG_DEBUG(m_name + ", session logged out : " + session->get_name() + " , peer index : " + std::to_string(peer_index));
2231  on_logout_request(session, message);
2232  }
2233 
2234  FixServer(const FixServer& other) = delete;
2235  FixServer& operator= (const FixServer& other) = delete;
2236  FixServer(FixServer&& other) = delete;
2237  FixServer& operator=(FixServer&& other) = delete;
2238 };
2239 
2240 } // namespace
llfix::FixServer::on_client_resend_request
virtual void on_client_resend_request(FixSession *session, const IncomingFixMessage *message)
Called when a Resend Request (35=2) is received from a client.
Definition: fix_server.h:634
llfix::FixServer::shutdown
void shutdown()
Shuts down the FIX server.
Definition: fix_server.h:556
llfix::FixServer::send_gap_fill_message
virtual bool send_gap_fill_message(FixSession *session)
Sends a Gap Fill message in response to a Resend Request.
Definition: fix_server.h:1072
llfix::OutgoingFixMessage
FIX message builder and encoder for outbound messages.
Definition: outgoing_fix_message.h:99
llfix::FixServer::process_schedule_validator
virtual void process_schedule_validator(FixSession *session)
Validates the session against configured schedule rules.
Definition: fix_server.h:701
llfix::FixServer::authenticate_logon_request
virtual bool authenticate_logon_request(FixSession *session, const IncomingFixMessage *message)
Authenticates an incoming Logon (35=A) request.
Definition: fix_server.h:882
llfix::FixServer::create
bool create(const std::string &server_name, const std::string server_config_file_path)
Creates and initialises the FIX server instance.
Definition: fix_server.h:189
llfix::FixServer::on_logon_request
virtual void on_logon_request(FixSession *session, const IncomingFixMessage *message)
Called when a Logon (35=A) request is received from a client.
Definition: fix_server.h:620
llfix::FixServer::get_session_name
std::string get_session_name(FixSession *session)
Retrieves the name of a FIX session.
Definition: fix_server.h:415
llfix::FixServer
FIX server implementation.
Definition: fix_server.h:168
llfix::FixSession::get_sequence_store
SequenceStore * get_sequence_store()
Provides access to the session's sequence store.
Definition: fix_session.h:363
llfix::FixServer::send_logout_message
virtual bool send_logout_message(FixSession *session, const std::string &reason_text="")
Sends a Logout (35=5) message.
Definition: fix_server.h:1008
llfix::FixServer::get_session_names
void get_session_names(std::vector< std::string > &target) override
Retrieves the names of all configured FIX sessions.
Definition: fix_server.h:435
llfix::FixServer::accept_session
virtual FixSession * accept_session(std::size_t peer_index, const IncomingFixMessage *incoming_fix_message, const char *buffer, std::size_t buffer_length, uint32_t parser_reject_code)
Accepts and validates an incoming session before logon processing.
Definition: fix_server.h:725
llfix::FixServer::add_session
bool add_session(const std::string &session_name, FixSessionSettings &session_settings)
Adds a FIX session using preloaded session settings.
Definition: fix_server.h:293
llfix::FixServer::send_outgoing_message
virtual bool send_outgoing_message(FixSession *session, OutgoingFixMessage *message)
Encodes and sends an outgoing FIX message.
Definition: fix_server.h:498
llfix::IncomingFixMessage::has_tag
bool has_tag(uint32_t tag) const
Checks whether a FIX tag exists and is valid.
Definition: incoming_fix_message.h:84
llfix::FixSession
Represents a single FIX protocol session.
Definition: fix_session.h:95
llfix::FixServer::send_sequence_reset_message
virtual bool send_sequence_reset_message(FixSession *session, uint32_t desired_sequence_no)
Sends a Sequence Reset (35=4) message without gap fill.
Definition: fix_server.h:1048
llfix::FixServer::get_session
FixSession * get_session(const std::string &session_name) override
Retrieves a FIX session by name.
Definition: fix_server.h:398
llfix::FixServer::on_client_test_request
virtual void on_client_test_request(FixSession *session, const IncomingFixMessage *message)
Called when a Test Request (35=1) is received from a client.
Definition: fix_server.h:641
llfix::FixServer::on_cancel_order
virtual void on_cancel_order(FixSession *session, const IncomingFixMessage *message)
Called when an Order Cancel Request (35=F) message is received.
Definition: fix_server.h:584
llfix::FixSession::get_name
std::string get_name() const override
Returns the logical name of the FIX session.
Definition: fix_session.h:278
llfix::FixServer::on_new_order
virtual void on_new_order(FixSession *session, const IncomingFixMessage *message)
Called when a New Order (35=D) message is received.
Definition: fix_server.h:577
llfix::FixServer::on_application_level_reject
virtual void on_application_level_reject(FixSession *session, const IncomingFixMessage *message)
Called when an application-level reject message is generated.
Definition: fix_server.h:598
llfix::FixServer::add_sessions_from
bool add_sessions_from(const std::string &session_config_file_path)
Adds all FIX sessions found in a configuration file.
Definition: fix_server.h:329
llfix::FixServer::send_test_request
virtual bool send_test_request(FixSession *session)
Sends a Test Request (35=1) message.
Definition: fix_server.h:1021
llfix::FixServer::on_client_heartbeat
virtual void on_client_heartbeat(FixSession *session)
Called when a Heartbeat (35=0) message is received from a client.
Definition: fix_server.h:647
llfix::FixServer::on_replace_order
virtual void on_replace_order(FixSession *session, const IncomingFixMessage *message)
Called when an Order Cancel/Replace Request (35=G) message is received.
Definition: fix_server.h:591
llfix::FixServer::add_session
bool add_session(const std::string &session_config_file_path, const std::string &session_name)
Adds a FIX session by loading settings from a configuration file.
Definition: fix_server.h:306
llfix::IncomingFixMessage
Represents a parsed incoming FIX message.
Definition: incoming_fix_message.h:66
llfix::FixUtilities::fix_to_human_readible
static std::string fix_to_human_readible(const char *buffer, std::size_t buffer_length)
Converts a FIX message buffer to a human-readable string.
Definition: fix_utilities.h:108
llfix::FixServer::specify_repeating_group
void specify_repeating_group(Args... args)
Specify repeating group definitions for incoming FIX messages for all sessions.
Definition: fix_server.h:457
llfix::FixServer::process_incoming_throttling
virtual bool process_incoming_throttling(FixSession *session, const IncomingFixMessage *incoming_fix_message)
Applies incoming message throttling logic.
Definition: fix_server.h:661
llfix::FixServer::outgoing_message_instance
OutgoingFixMessage * outgoing_message_instance(FixSession *session)
Retrieves the reusable outgoing FIX message instance for a session.
Definition: fix_server.h:485
llfix::SequenceStore::get_outgoing_seq_no
uint32_t get_outgoing_seq_no() const
Get the outgoing FIX sequence number.
Definition: sequence_store.h:127
llfix::FixServer::process_logon_request
virtual void process_logon_request(FixSession *session, std::size_t peer_index, const IncomingFixMessage *message)
Processes an incoming Logon (35=A) request.
Definition: fix_server.h:840
llfix::FixServer::on_logout_request
virtual void on_logout_request(FixSession *session, const IncomingFixMessage *message)
Called when a Logout (35=5) request is received from a client.
Definition: fix_server.h:627
llfix::FixServer::on_session_level_reject
virtual void on_session_level_reject(FixSession *session, const IncomingFixMessage *message)
Called when a session-level reject message is generated.
Definition: fix_server.h:605
llfix::FixServer::send_resend_request
virtual bool send_resend_request(FixSession *session)
Sends a Resend Request (35=2) message.
Definition: fix_server.h:1034
llfix::FixServer::get_session_count
std::size_t get_session_count() const
Returns the number of configured FIX sessions.
Definition: fix_server.h:381
llfix::FixServer::on_custom_message
virtual void on_custom_message(FixSession *session, const IncomingFixMessage *message)
Called when any other FIX message type is received.
Definition: fix_server.h:612
llfix::IncomingFixMessage::get_tag_value_as
T get_tag_value_as(uint32_t tag, std::size_t decimal_points=0) const
Retrieves a FIX tag value converted to the requested type.
Definition: incoming_fix_message.h:260