Net

(coming soon…)

  • File directory: (../ibraries/net/xxx.cpp)

  • File directory: (../ibraries/net/include/graphene/net/xxx.hpp)


config

#pragma once

#define GRAPHENE_NET_PROTOCOL_VERSION                        106

/**
 * Define this to enable debugging code in the p2p network interface.
 * This is code that would never be executed in normal operation, but is
 * used for automated testing (creating artificial net splits,
 * tracking where messages came from and when)
 */
#define ENABLE_P2P_DEBUGGING_API                             1

/**
 * 2MiB
 */
#define MAX_MESSAGE_SIZE                                     1024*1024*2
#define GRAPHENE_NET_DEFAULT_PEER_CONNECTION_RETRY_TIME      30 // seconds

/**
 * AFter trying all peers, how long to wait before we check to
 * see if there are peers we can try again.
 */
#define GRAPHENE_PEER_DATABASE_RETRY_DELAY                   15 // seconds

#define GRAPHENE_NET_PEER_HANDSHAKE_INACTIVITY_TIMEOUT       5

#define GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT                 20

/* uncomment next line to use testnet seed ip and port */
//#define GRAPHENE_TEST_NETWORK                                1

#define GRAPHENE_NET_TEST_SEED_IP                            "104.236.44.210" // autogenerated
#define GRAPHENE_NET_TEST_P2P_PORT                           1700
#define GRAPHENE_NET_DEFAULT_P2P_PORT                        1776
#define GRAPHENE_NET_DEFAULT_DESIRED_CONNECTIONS             20
#define GRAPHENE_NET_DEFAULT_MAX_CONNECTIONS                 200

#define GRAPHENE_NET_MAXIMUM_QUEUED_MESSAGES_IN_BYTES        (1024 * 1024)

/**
 * When we receive a message from the network, we advertise it to
 * our peers and save a copy in a cache were we will find it if
 * a peer requests it.  We expire out old items out of the cache
 * after this number of blocks go by.
 *
 * Recently lowered from 30 to match the default expiration time
 * the web wallet imposes on transactions.
 */
#define GRAPHENE_NET_MESSAGE_CACHE_DURATION_IN_BLOCKS        5

/**
 * We prevent a peer from offering us a list of blocks which, if we fetched them
 * all, would result in a blockchain that extended into the future.
 * This parameter gives us some wiggle room, allowing a peer to give us blocks
 * that would put our blockchain up to an hour in the future, just in case
 * our clock is a bit off.
 */
#define GRAPHENE_NET_FUTURE_SYNC_BLOCKS_GRACE_PERIOD_SEC     (60 * 60)

#define GRAPHENE_NET_MAX_INVENTORY_SIZE_IN_MINUTES           2

#define GRAPHENE_NET_MAX_BLOCKS_PER_PEER_DURING_SYNCING      200

/**
 * During normal operation, how many items will be fetched from each
 * peer at a time.  This will only come into play when the network
 * is being flooded -- typically transactions will be fetched as soon
 * as we find out about them, so only one item will be requested
 * at a time.
 *
 * No tests have been done to find the optimal value for this
 * parameter, so consider increasing or decreasing it if performance
 * during flooding is lacking.
 */
#define GRAPHENE_NET_MAX_ITEMS_PER_PEER_DURING_NORMAL_OPERATION  1

/**
 * Instead of fetching all item IDs from a peer, then fetching all blocks
 * from a peer, we will interleave them.  Fetch at least this many block IDs,
 * then switch into block-fetching mode until the number of blocks we know about
 * but haven't yet fetched drops below this
 */
#define GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH               10000

#define GRAPHENE_NET_MAX_TRX_PER_SECOND                      1000

#define GRAPHENE_NET_MAX_NESTED_OBJECTS                      (250)

#define MAXIMUM_PEERDB_SIZE 1000

core_messages

using graphene::chain::signed_transaction;
using graphene::chain::block_id_type;
using graphene::chain::transaction_id_type;
using graphene::chain::signed_block;
typedef fc::ecc::public_key_data node_id_t;
typedef fc::ripemd160 item_hash_t;
struct item_id
{
    uint32_t      item_type;
    item_hash_t   item_hash;

    item_id() {}
    item_id(uint32_t type, const item_hash_t& hash) :
      item_type(type),
      item_hash(hash)
    {}
    bool operator==(const item_id& other) const
    {
      return item_type == other.item_type &&
             item_hash == other.item_hash;
    }
};
enum core_message_type_enum
{
  trx_message_type                             = 1000,
  block_message_type                           = 1001,
  core_message_type_first                      = 5000,
  item_ids_inventory_message_type              = 5001,
  blockchain_item_ids_inventory_message_type   = 5002,
  fetch_blockchain_item_ids_message_type       = 5003,
  fetch_items_message_type                     = 5004,
  item_not_available_message_type              = 5005,
  hello_message_type                           = 5006,
  connection_accepted_message_type             = 5007,
  connection_rejected_message_type             = 5008,
  address_request_message_type                 = 5009,
  address_message_type                         = 5010,
  closing_connection_message_type              = 5011,
  current_time_request_message_type            = 5012,
  current_time_reply_message_type              = 5013,
  check_firewall_message_type                  = 5014,
  check_firewall_reply_message_type            = 5015,
  get_current_connections_request_message_type = 5016,
  get_current_connections_reply_message_type   = 5017,
  core_message_type_last                       = 5099
};

const uint32_t core_protocol_version = GRAPHENE_NET_PROTOCOL_VERSION;

trx_message

struct trx_message
{
   static const core_message_type_enum type;

   signed_transaction trx;
   trx_message() {}
   trx_message(signed_transaction transaction) :
     trx(std::move(transaction))
   {}
};

block_message

struct block_message
{
   static const core_message_type_enum type;

   block_message(){}
   block_message(const signed_block& blk )
   :block(blk),block_id(blk.id()){}

   signed_block    block;
   block_id_type   block_id;

};

item_ids_inventory_message

struct item_ids_inventory_message
{
  static const core_message_type_enum type;

  uint32_t item_type;
  std::vector<item_hash_t> item_hashes_available;

  item_ids_inventory_message() {}
  item_ids_inventory_message(uint32_t item_type, const std::vector<item_hash_t>& item_hashes_available) :
    item_type(item_type),
    item_hashes_available(item_hashes_available)
  {}
};

blockchain_item_ids_inventory_message

struct blockchain_item_ids_inventory_message
{
  static const core_message_type_enum type;

  uint32_t total_remaining_item_count;
  uint32_t item_type;
  std::vector<item_hash_t> item_hashes_available;

  blockchain_item_ids_inventory_message() {}
  blockchain_item_ids_inventory_message(uint32_t total_remaining_item_count,
                                        uint32_t item_type,
                                        const std::vector<item_hash_t>& item_hashes_available) :
    total_remaining_item_count(total_remaining_item_count),
    item_type(item_type),
    item_hashes_available(item_hashes_available)
  {}
};

fetch_blockchain_item_ids_message

struct fetch_blockchain_item_ids_message
{
  static const core_message_type_enum type;

  uint32_t item_type;
  std::vector<item_hash_t> blockchain_synopsis;

  fetch_blockchain_item_ids_message() {}
  fetch_blockchain_item_ids_message(uint32_t item_type, const std::vector<item_hash_t>& blockchain_synopsis) :
    item_type(item_type),
    blockchain_synopsis(blockchain_synopsis)
  {}
};

fetch_items_message

struct fetch_items_message
{
  static const core_message_type_enum type;

  uint32_t item_type;
  std::vector<item_hash_t> items_to_fetch;

  fetch_items_message() {}
  fetch_items_message(uint32_t item_type, const std::vector<item_hash_t>& items_to_fetch) :
    item_type(item_type),
    items_to_fetch(items_to_fetch)
  {}
};

item_not_available_message

struct item_not_available_message
{
  static const core_message_type_enum type;

  item_id requested_item;

  item_not_available_message() {}
  item_not_available_message(const item_id& requested_item) :
    requested_item(requested_item)
  {}
};

hello_message

struct hello_message
{
  static const core_message_type_enum type;

  std::string                user_agent;
  uint32_t                   core_protocol_version;
  fc::ip::address            inbound_address;
  uint16_t                   inbound_port;
  uint16_t                   outbound_port;
  node_id_t                  node_public_key;
  fc::ecc::compact_signature signed_shared_secret;
  fc::sha256                 chain_id;
  fc::variant_object         user_data;

  hello_message() {}
  hello_message(const std::string& user_agent,
                uint32_t core_protocol_version,
                const fc::ip::address& inbound_address,
                uint16_t inbound_port,
                uint16_t outbound_port,
                const node_id_t& node_public_key,
                const fc::ecc::compact_signature& signed_shared_secret,
                const fc::sha256& chain_id_arg,
                const fc::variant_object& user_data ) :
    user_agent(user_agent),
    core_protocol_version(core_protocol_version),
    inbound_address(inbound_address),
    inbound_port(inbound_port),
    outbound_port(outbound_port),
    node_public_key(node_public_key),
    signed_shared_secret(signed_shared_secret),
    chain_id(chain_id_arg),
    user_data(user_data)
  {}
};

connection_accepted_message

struct connection_accepted_message
{
  static const core_message_type_enum type;

  connection_accepted_message() {}
};

enum class rejection_reason_code { unspecified,
                                   different_chain,
                                   already_connected,
                                   connected_to_self,
                                   not_accepting_connections,
                                   blocked,
                                   invalid_hello_message,
                                   client_too_old };

connection_rejected_message

struct connection_rejected_message
{
  static const core_message_type_enum type;

  std::string                                   user_agent;
  uint32_t                                      core_protocol_version;
  fc::ip::endpoint                              remote_endpoint;
  std::string                                   reason_string;
  fc::enum_type<uint8_t, rejection_reason_code> reason_code;

  connection_rejected_message() {}
  connection_rejected_message(const std::string& user_agent, uint32_t core_protocol_version,
                              const fc::ip::endpoint& remote_endpoint, rejection_reason_code reason_code,
                              const std::string& reason_string) :
    user_agent(user_agent),
    core_protocol_version(core_protocol_version),
    remote_endpoint(remote_endpoint),
    reason_string(reason_string),
    reason_code(reason_code)
  {}
};

address_request_message

struct address_request_message
{
  static const core_message_type_enum type;

  address_request_message() {}
};

enum class peer_connection_direction { unknown, inbound, outbound };
enum class firewalled_state { unknown, firewalled, not_firewalled };

address_info

struct address_info
{
  fc::ip::endpoint          remote_endpoint;
  fc::time_point_sec        last_seen_time;
  fc::microseconds          latency;
  node_id_t                 node_id;
  fc::enum_type<uint8_t, peer_connection_direction> direction;
  fc::enum_type<uint8_t, firewalled_state> firewalled;

  address_info() {}
  address_info(const fc::ip::endpoint& remote_endpoint,
               const fc::time_point_sec last_seen_time,
               const fc::microseconds latency,
               const node_id_t& node_id,
               peer_connection_direction direction,
               firewalled_state firewalled) :
    remote_endpoint(remote_endpoint),
    last_seen_time(last_seen_time),
    latency(latency),
    node_id(node_id),
    direction(direction),
    firewalled(firewalled)
  {}
};

address_message

struct address_message
{
  static const core_message_type_enum type;

  std::vector<address_info> addresses;
};

closing_connection_message

struct closing_connection_message
{
  static const core_message_type_enum type;

  std::string        reason_for_closing;
  bool               closing_due_to_error;
  fc::oexception     error;

  closing_connection_message() : closing_due_to_error(false) {}
  closing_connection_message(const std::string& reason_for_closing,
                             bool closing_due_to_error = false,
                             const fc::oexception& error = fc::oexception()) :
    reason_for_closing(reason_for_closing),
    closing_due_to_error(closing_due_to_error),
    error(error)
  {}
};

current_time_request_message

struct current_time_request_message
{
  static const core_message_type_enum type;
  fc::time_point request_sent_time;

  current_time_request_message(){}
  current_time_request_message(const fc::time_point request_sent_time) :
    request_sent_time(request_sent_time)
  {}
};

current_time_reply_message

struct current_time_reply_message
{
  static const core_message_type_enum type;
  fc::time_point request_sent_time;
  fc::time_point request_received_time;
  fc::time_point reply_transmitted_time;

  current_time_reply_message(){}
  current_time_reply_message(const fc::time_point request_sent_time,
                             const fc::time_point request_received_time,
                             const fc::time_point reply_transmitted_time = fc::time_point()) :
    request_sent_time(request_sent_time),
    request_received_time(request_received_time),
    reply_transmitted_time(reply_transmitted_time)
  {}
};

check_firewall_message

struct check_firewall_message
{
  static const core_message_type_enum type;
  node_id_t node_id;
  fc::ip::endpoint endpoint_to_check;
};

enum class firewall_check_result
{
  unable_to_check,
  unable_to_connect,
  connection_successful
};

check_firewall_reply_message

struct check_firewall_reply_message
{
  static const core_message_type_enum type;
  node_id_t node_id;
  fc::ip::endpoint endpoint_checked;
  fc::enum_type<uint8_t, firewall_check_result> result;
};

get_current_connections_request_message

struct get_current_connections_request_message
{
  static const core_message_type_enum type;
};

current_connection_data

struct current_connection_data
{
  uint32_t           connection_duration; // in seconds
  fc::ip::endpoint   remote_endpoint;
  node_id_t          node_id;
  fc::microseconds   clock_offset;
  fc::microseconds   round_trip_delay;
  fc::enum_type<uint8_t, peer_connection_direction> connection_direction;
  fc::enum_type<uint8_t, firewalled_state> firewalled;
  fc::variant_object user_data;
};

get_current_connections_reply_message

struct get_current_connections_reply_message
{
  static const core_message_type_enum type;
  uint32_t upload_rate_one_minute;
  uint32_t download_rate_one_minute;
  uint32_t upload_rate_fifteen_minutes;
  uint32_t download_rate_fifteen_minutes;
  uint32_t upload_rate_one_hour;
  uint32_t download_rate_one_hour;
  std::vector<current_connection_data> current_connections;
};

exceptions

// registered in node.cpp

FC_DECLARE_EXCEPTION( net_exception, 90000, "P2P Networking Exception" );
FC_DECLARE_DERIVED_EXCEPTION( send_queue_overflow,                   graphene::net::net_exception, 90001, "send queue for this peer exceeded maximum size" );
FC_DECLARE_DERIVED_EXCEPTION( insufficient_relay_fee,                graphene::net::net_exception, 90002, "insufficient relay fee" );
FC_DECLARE_DERIVED_EXCEPTION( already_connected_to_requested_peer,   graphene::net::net_exception, 90003, "already connected to requested peer" );
FC_DECLARE_DERIVED_EXCEPTION( block_older_than_undo_history,         graphene::net::net_exception, 90004, "block is older than our undo history allows us to process" );
FC_DECLARE_DERIVED_EXCEPTION( peer_is_on_an_unreachable_fork,        graphene::net::net_exception, 90005, "peer is on another fork" );
FC_DECLARE_DERIVED_EXCEPTION( unlinkable_block_exception,            graphene::net::net_exception, 90006, "unlinkable block" )

message

message_header

  • Defines an 8 byte header that is always present because the minimum encrypted packet size is 8 bytes (blowfish). The maximum message size is defined in config.hpp. The channel, and message type is also included because almost every channel will have a message type field and we might as well include it in the 8 byte header to save space.

struct message_header
{
   uint32_t  size;   // number of bytes in message, capped at MAX_MESSAGE_SIZE
   uint32_t  msg_type;  // every channel gets a 16 bit message type specifier
};

typedef fc::uint160_t message_hash_type;

message

  • Abstracts the process of packing/unpacking a message for a particular channel.

struct message : public message_header
{
   std::vector<char> data;

   message(){}

   message( message&& m )
   :message_header(m),data( std::move(m.data) ){}

   message( const message& m )
   :message_header(m),data( m.data ){}

   /**
    *  Assumes that T::type specifies the message type
    */
   template<typename T>
   message( const T& m )
   {
      msg_type = T::type;
      data     = fc::raw::pack(m);
      size     = (uint32_t)data.size();
   }

   fc::uint160_t id()const
   {
      return fc::ripemd160::hash( data.data(), (uint32_t)data.size() );
   }

   /**
    *  Automatically checks the type and deserializes T in the
    *  opposite process from the constructor.
    */
   template<typename T>
   T as()const
   {
       try {
        FC_ASSERT( msg_type == T::type );
        T tmp;
        if( data.size() )
        {
           fc::datastream<const char*> ds( data.data(), data.size() );
           fc::raw::unpack( ds, tmp );
        }
        else
        {
           // just to make sure that tmp shouldn't have any data
           fc::datastream<const char*> ds( nullptr, 0 );
           fc::raw::unpack( ds, tmp );
        }
        return tmp;
       } FC_RETHROW_EXCEPTIONS( warn,
            "error unpacking network message as a '${type}'  ${x} !=? ${msg_type}",
            ("type", fc::get_typename<T>::name() )
            ("x", T::type)
            ("msg_type", msg_type)
            );
   }
};

message_oriented_connection

message_oriented_connection_delegate

  • receives incoming messages from a message_oriented_connection object

class message_oriented_connection_delegate
{
public:
  virtual void on_message(message_oriented_connection* originating_connection, const message& received_message) = 0;
  virtual void on_connection_closed(message_oriented_connection* originating_connection) = 0;
};

message_oriented_connection

  • uses a secure socket to create a connection that reads and writes a stream of fc::net::message objects

class message_oriented_connection
{
   public:
     message_oriented_connection(message_oriented_connection_delegate* delegate = nullptr);
     ~message_oriented_connection();
     fc::tcp_socket& get_socket();

     void accept();
     void bind(const fc::ip::endpoint& local_endpoint);
     void connect_to(const fc::ip::endpoint& remote_endpoint);

     void send_message(const message& message_to_send);
     void close_connection();
     void destroy_connection();

     uint64_t       get_total_bytes_sent() const;
     uint64_t       get_total_bytes_received() const;
     fc::time_point get_last_message_sent_time() const;
     fc::time_point get_last_message_received_time() const;
     fc::time_point get_connection_time() const;
     fc::sha512     get_shared_secret() const;
   private:
     std::unique_ptr<detail::message_oriented_connection_impl> my;
};
typedef std::shared_ptr<message_oriented_connection> message_oriented_connection_ptr;

node

message_propagation_data

  • during network development, we need to track message propagation across the network using a structure like this:

struct message_propagation_data
{
  fc::time_point received_time;
  fc::time_point validated_time;
  node_id_t originating_peer;
};

node_delegate

  • used by node reports status to client or fetch data from client

class node_delegate
{
   public:
      virtual ~node_delegate(){}

      /**
       *  If delegate has the item, the network has no need to fetch it.
       */
      virtual bool has_item( const net::item_id& id ) = 0;

      /**
       *  @brief Called when a new block comes in from the network
       *
       *  @param sync_mode true if the message was fetched through the sync process, false during normal operation
       *  @returns true if this message caused the blockchain to switch forks, false if it did not
       *
       *  @throws exception if error validating the item, otherwise the item is
       *          safe to broadcast on.
       */
      virtual bool handle_block( const graphene::net::block_message& blk_msg, bool sync_mode,
                                 std::vector<fc::uint160_t>& contained_transaction_message_ids ) = 0;

      /**
       *  @brief Called when a new transaction comes in from the network
       *
       *  @throws exception if error validating the item, otherwise the item is
       *          safe to broadcast on.
       */
      virtual void handle_transaction( const graphene::net::trx_message& trx_msg ) = 0;

      /**
       *  @brief Called when a new message comes in from the network other than a
       *         block or a transaction.  Currently there are no other possible
       *         messages, so this should never be called.
       *
       *  @throws exception if error validating the item, otherwise the item is
       *          safe to broadcast on.
       */
      virtual void handle_message( const message& message_to_process ) = 0;

      /**
       *  Assuming all data elements are ordered in some way, this method should
       *  return up to limit ids that occur *after* from_id.
       *  On return, remaining_item_count will be set to the number of items
       *  in our blockchain after the last item returned in the result,
       *  or 0 if the result contains the last item in the blockchain
       */
      virtual std::vector<item_hash_t> get_block_ids(const std::vector<item_hash_t>& blockchain_synopsis,
                                                     uint32_t& remaining_item_count,
                                                     uint32_t limit = 2000) = 0;

      /**
       *  Given the hash of the requested data, fetch the body.
       */
      virtual message get_item( const item_id& id ) = 0;

      virtual chain_id_type get_chain_id()const = 0;

      /**
       * Returns a synopsis of the blockchain used for syncing.
       * This consists of a list of selected item hashes from our current preferred
       * blockchain, exponentially falling off into the past.  Horrible explanation.
       *
       * If the blockchain is empty, it will return the empty list.
       * If the blockchain has one block, it will return a list containing just that block.
       * If it contains more than one block:
       *   the first element in the list will be the hash of the highest numbered block that
       *       we cannot undo
       *   the second element will be the hash of an item at the half way point in the undoable
       *       segment of the blockchain
       *   the third will be ~3/4 of the way through the undoable segment of the block chain
       *   the fourth will be at ~7/8...
       *     &c.
       *   the last item in the list will be the hash of the most recent block on our preferred chain
       */
      virtual std::vector<item_hash_t> get_blockchain_synopsis(const item_hash_t& reference_point,
                                                               uint32_t number_of_blocks_after_reference_point) = 0;

      /**
       *  Call this after the call to handle_message succeeds.
       *
       *  @param item_type the type of the item we're synchronizing, will be the same as item passed to the sync_from() call
       *  @param item_count the number of items known to the node that haven't been sent to handle_item() yet.
       *                    After `item_count` more calls to handle_item(), the node will be in sync
       */
      virtual void     sync_status( uint32_t item_type, uint32_t item_count ) = 0;

      /**
       *  Call any time the number of connected peers changes.
       */
      virtual void     connection_count_changed( uint32_t c ) = 0;

      virtual uint32_t get_block_number(const item_hash_t& block_id) = 0;

      /**
       * Returns the time a block was produced (if block_id = 0, returns genesis time).
       * If we don't know about the block, returns time_point_sec::min()
       */
      virtual fc::time_point_sec get_block_time(const item_hash_t& block_id) = 0;

      virtual item_hash_t get_head_block_id() const = 0;

      virtual uint32_t estimate_last_known_fork_from_git_revision_timestamp(uint32_t unix_timestamp) const = 0;

      virtual void error_encountered(const std::string& message, const fc::oexception& error) = 0;
      virtual uint8_t get_current_block_interval_in_seconds() const = 0;

};

peer_status

  • Information about connected peers that the client may want to make available to the user.

struct peer_status
{
   uint32_t         version;
   fc::ip::endpoint host;
   /** info contains the fields required by bitcoin-rpc's getpeerinfo call, we will likely
       extend it with our own fields. */
   fc::variant_object info;
};

node

  • provides application independent P2P broadcast and data synchronization

class node : public std::enable_shared_from_this<node>
{
   public:
     node(const std::string& user_agent);
     ~node();

     void close();

     void      set_node_delegate( node_delegate* del );

     void      load_configuration( const fc::path& configuration_directory );

     virtual void      listen_to_p2p_network();
     virtual void      connect_to_p2p_network();

     /**
      *  Add endpoint to internal level_map database of potential nodes
      *  to attempt to connect to.  This database is consulted any time
      *  the number connected peers falls below the target.
      */
     void      add_node( const fc::ip::endpoint& ep );

     /**
      *  Attempt to connect to the specified endpoint immediately.
      */
     virtual void connect_to_endpoint( const fc::ip::endpoint& ep );

     /**
      *  Specifies the network interface and port upon which incoming
      *  connections should be accepted.
      */
     void      listen_on_endpoint( const fc::ip::endpoint& ep, bool wait_if_not_available );

     /**
      *  Call with true to enable listening for incoming connections
      */
     void accept_incoming_connections(bool accept);

     /**
      *  Specifies the port upon which incoming connections should be accepted.
      *  @param port the port to listen on
      *  @param wait_if_not_available if true and the port is not available, enter a
      *                               sleep and retry loop to wait for it to become
      *                               available.  If false and the port is not available,
      *                               just choose a random available port
      */
     void      listen_on_port(uint16_t port, bool wait_if_not_available);

     /**
      * Returns the endpoint the node is listening on.  This is usually the same
      * as the value previously passed in to listen_on_endpoint, unless we
      * were unable to bind to that port.
      */
     virtual fc::ip::endpoint get_actual_listening_endpoint() const;

     /**
      *  @return a list of peers that are currently connected.
      */
     std::vector<peer_status> get_connected_peers() const;

     /** return the number of peers we're actively connected to */
     virtual uint32_t get_connection_count() const;

     /**
      *  Add message to outgoing inventory list, notify peers that
      *  I have a message ready.
      */
     virtual void  broadcast( const message& item_to_broadcast );
     virtual void  broadcast_transaction( const signed_transaction& trx )
     {
        broadcast( trx_message(trx) );
     }

     /**
      *  Node starts the process of fetching all items after item_id of the
      *  given item_type.   During this process messages are not broadcast.
      */
     virtual void      sync_from(const item_id& current_head_block, const std::vector<uint32_t>& hard_fork_block_numbers);

     bool      is_connected() const;

     void set_advanced_node_parameters(const fc::variant_object& params);
     fc::variant_object get_advanced_node_parameters();
     message_propagation_data get_transaction_propagation_data(const graphene::chain::transaction_id_type& transaction_id);
     message_propagation_data get_block_propagation_data(const graphene::chain::block_id_type& block_id);
     node_id_t get_node_id() const;
     void set_allowed_peers(const std::vector<node_id_t>& allowed_peers);

     /**
      * Instructs the node to forget everything in its peer database, mostly for debugging
      * problems where nodes are failing to connect to the network
      */
     void clear_peer_database();

     void set_total_bandwidth_limit(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second);

     fc::variant_object network_get_info() const;
     fc::variant_object network_get_usage_stats() const;

     std::vector<potential_peer_record> get_potential_peers() const;

     void disable_peer_advertising();
     fc::variant_object get_call_statistics() const;
   private:
     std::unique_ptr<detail::node_impl, detail::node_impl_deleter> my;
};

simulated_network

class simulated_network : public node
 {
 public:
   ~simulated_network();
   simulated_network(const std::string& user_agent) : node(user_agent) {}
   void      listen_to_p2p_network() override {}
   void      connect_to_p2p_network() override {}
   void      connect_to_endpoint(const fc::ip::endpoint& ep) override {}

   fc::ip::endpoint get_actual_listening_endpoint() const override { return fc::ip::endpoint(); }

   void      sync_from(const item_id& current_head_block, const std::vector<uint32_t>& hard_fork_block_numbers) override {}
   void      broadcast(const message& item_to_broadcast) override;
   void      add_node_delegate(node_delegate* node_delegate_to_add);

   virtual uint32_t get_connection_count() const override { return 8; }
 private:
   struct node_info;
   void message_sender(node_info* destination_node);
   std::list<node_info*> network_nodes;
 };


typedef std::shared_ptr<node> node_ptr;
typedef std::shared_ptr<simulated_network> simulated_network_ptr;

peer_connection

firewall_check_state_data

struct firewall_check_state_data
{
  node_id_t        expected_node_id;
  fc::ip::endpoint endpoint_to_test;

  // if we're coordinating a firewall check for another node, these are the helper
  // nodes we've already had do the test (if this structure is still relevant, that
  // that means they have all had indeterminate results
  std::set<node_id_t> nodes_already_tested;

  // If we're a just a helper node, this is the node we report back to
  // when we have a result
  node_id_t        requesting_peer;
};

peer_connection

        class peer_connection;
class peer_connection_delegate
{
public:
  virtual void on_message(peer_connection* originating_peer,
                          const message& received_message) = 0;
  virtual void on_connection_closed(peer_connection* originating_peer) = 0;
  virtual message get_message_for_item(const item_id& item) = 0;
};
class peer_connection;
typedef std::shared_ptr<peer_connection> peer_connection_ptr;

class peer_connection : public message_oriented_connection_delegate,
                        public std::enable_shared_from_this<peer_connection>
{
public:
  enum class our_connection_state
  {
    disconnected,
    just_connected, // if in this state, we have sent a hello_message
    connection_accepted, // remote side has sent us a connection_accepted, we're operating normally with them
    connection_rejected // remote side has sent us a connection_rejected, we may be exchanging address with them or may just be waiting for them to close
  };
  enum class their_connection_state
  {
    disconnected,
    just_connected, // we have not yet received a hello_message
    connection_accepted, // we have sent them a connection_accepted
    connection_rejected // we have sent them a connection_rejected
  };
  enum class connection_negotiation_status
  {
    disconnected,
    connecting,
    connected,
    accepting,
    accepted,
    hello_sent,
    peer_connection_accepted,
    peer_connection_rejected,
    negotiation_complete,
    closing,
    closed
  };
private:
  peer_connection_delegate*      _node;
  fc::optional<fc::ip::endpoint> _remote_endpoint;
  message_oriented_connection    _message_connection;

  /* a base class for messages on the queue, to hide the fact that some
   * messages are complete messages and some are only hashes of messages.
   */
  struct queued_message
  {
    fc::time_point enqueue_time;
    fc::time_point transmission_start_time;
    fc::time_point transmission_finish_time;

    queued_message(fc::time_point enqueue_time = fc::time_point::now()) :
      enqueue_time(enqueue_time)
    {}

    virtual message get_message(peer_connection_delegate* node) = 0;
    /** returns roughly the number of bytes of memory the message is consuming while
     * it is sitting on the queue
     */
    virtual size_t get_size_in_queue() = 0;
    virtual ~queued_message() {}
  };

  /* when you queue up a 'real_queued_message', a full copy of the message is
   * stored on the heap until it is sent
   */
  struct real_queued_message : queued_message
  {
    message        message_to_send;
    size_t         message_send_time_field_offset;

    real_queued_message(message message_to_send,
                        size_t message_send_time_field_offset = (size_t)-1) :
      message_to_send(std::move(message_to_send)),
      message_send_time_field_offset(message_send_time_field_offset)
    {}

    message get_message(peer_connection_delegate* node) override;
    size_t get_size_in_queue() override;
  };

  /* when you queue up a 'virtual_queued_message', we just queue up the hash of the
   * item we want to send.  When it reaches the top of the queue, we make a callback
   * to the node to generate the message.
   */
  struct virtual_queued_message : queued_message
  {
    item_id item_to_send;

    virtual_queued_message(item_id item_to_send) :
      item_to_send(std::move(item_to_send))
    {}

    message get_message(peer_connection_delegate* node) override;
    size_t get_size_in_queue() override;
  };


  size_t _total_queued_messages_size;
  std::queue<std::unique_ptr<queued_message>, std::list<std::unique_ptr<queued_message> > > _queued_messages;
  fc::future<void> _send_queued_messages_done;
public:
  fc::time_point connection_initiation_time;
  fc::time_point connection_closed_time;
  fc::time_point connection_terminated_time;
  peer_connection_direction direction;
  //connection_state state;
  firewalled_state is_firewalled;
  fc::microseconds clock_offset;
  fc::microseconds round_trip_delay;

  our_connection_state our_state;
  bool they_have_requested_close;
  their_connection_state their_state;
  bool we_have_requested_close;

  connection_negotiation_status negotiation_status;
  fc::oexception connection_closed_error;

  fc::time_point get_connection_time()const { return _message_connection.get_connection_time(); }
  fc::time_point get_connection_terminated_time()const { return connection_terminated_time; }

  /// data about the peer node
  /// @{
  /** node_public_key from the hello message, zero-initialized before we get the hello */
  node_id_t        node_public_key;
  /** the unique identifier we'll use to refer to the node with.  zero-initialized before
   * we receive the hello message, at which time it will be filled with either the "node_id"
   * from the user_data field of the hello, or if none is present it will be filled with a
   * copy of node_public_key */
  node_id_t        node_id;
  uint32_t         core_protocol_version;
  std::string      user_agent;
  fc::optional<std::string> graphene_git_revision_sha;
  fc::optional<fc::time_point_sec> graphene_git_revision_unix_timestamp;
  fc::optional<std::string> fc_git_revision_sha;
  fc::optional<fc::time_point_sec> fc_git_revision_unix_timestamp;
  fc::optional<std::string> platform;
  fc::optional<uint32_t> bitness;

  // for inbound connections, these fields record what the peer sent us in
  // its hello message.  For outbound, they record what we sent the peer
  // in our hello message
  fc::ip::address inbound_address;
  uint16_t inbound_port;
  uint16_t outbound_port;
  /// @}

  typedef std::unordered_map<item_id, fc::time_point> item_to_time_map_type;

  /// blockchain synchronization state data
  /// @{
  boost::container::deque<item_hash_t> ids_of_items_to_get; /// id of items in the blockchain that this peer has told us about
  std::set<item_hash_t> ids_of_items_being_processed; /// list of all items this peer has offered use that we've already handed to the client but the client hasn't finished processing
  uint32_t number_of_unfetched_item_ids; /// number of items in the blockchain that follow ids_of_items_to_get but the peer hasn't yet told us their ids
  bool peer_needs_sync_items_from_us;
  bool we_need_sync_items_from_peer;
  fc::optional<boost::tuple<std::vector<item_hash_t>, fc::time_point> > item_ids_requested_from_peer; /// we check this to detect a timed-out request and in busy()
  fc::time_point last_sync_item_received_time; /// the time we received the last sync item or the time we sent the last batch of sync item requests to this peer
  std::set<item_hash_t> sync_items_requested_from_peer; /// ids of blocks we've requested from this peer during sync.  fetch from another peer if this peer disconnects
  item_hash_t last_block_delegate_has_seen; /// the hash of the last block  this peer has told us about that the peer knows
  fc::time_point_sec last_block_time_delegate_has_seen;
  bool inhibit_fetching_sync_blocks;
  /// @}

  /// non-synchronization state data
  /// @{
  struct timestamped_item_id
  {
    item_id            item;
    fc::time_point_sec timestamp;
    timestamped_item_id(const item_id& item, const fc::time_point_sec timestamp) :
      item(item),
      timestamp(timestamp)
    {}
  };
  struct timestamp_index{};
  typedef boost::multi_index_container<timestamped_item_id,
                                       boost::multi_index::indexed_by<boost::multi_index::hashed_unique<boost::multi_index::member<timestamped_item_id, item_id, &timestamped_item_id::item>,
                                       std::hash<item_id> >,
                                       boost::multi_index::ordered_non_unique<boost::multi_index::tag<timestamp_index>,
                                       boost::multi_index::member<timestamped_item_id,
                                                                               fc::time_point_sec,
                                                                               &timestamped_item_id::timestamp> > > > timestamped_items_set_type;

  timestamped_items_set_type inventory_peer_advertised_to_us;
  timestamped_items_set_type inventory_advertised_to_peer;

  item_to_time_map_type items_requested_from_peer;  /// items we've requested from this peer during normal operation.  fetch from another peer if this peer disconnects
  /// @}

  // if they're flooding us with transactions, we set this to avoid fetching for a few seconds to let the
  // blockchain catch up
  fc::time_point transaction_fetching_inhibited_until;

  uint32_t last_known_fork_block_number;

  fc::future<void> accept_or_connect_task_done;

  firewall_check_state_data *firewall_check_state;

    #ifndef NDEBUG
private:
  fc::thread* _thread;
  unsigned _send_message_queue_tasks_running; // temporary debugging

    #endif
  bool _currently_handling_message; // true while we're in the middle of handling a message from the remote system
private:
  peer_connection(peer_connection_delegate* delegate);
  void destroy();
public:
  static peer_connection_ptr make_shared(peer_connection_delegate* delegate); // use this instead of the constructor
  virtual ~peer_connection();

  fc::tcp_socket& get_socket();
  void accept_connection();
  void connect_to(const fc::ip::endpoint& remote_endpoint, fc::optional<fc::ip::endpoint> local_endpoint = fc::optional<fc::ip::endpoint>());

  void on_message(message_oriented_connection* originating_connection, const message& received_message) override;
  void on_connection_closed(message_oriented_connection* originating_connection) override;

  void send_queueable_message(std::unique_ptr<queued_message>&& message_to_send);
  void send_message(const message& message_to_send, size_t message_send_time_field_offset = (size_t)-1);
  void send_item(const item_id& item_to_send);
  void close_connection();
  void destroy_connection();

  uint64_t get_total_bytes_sent() const;
  uint64_t get_total_bytes_received() const;

  fc::time_point get_last_message_sent_time() const;
  fc::time_point get_last_message_received_time() const;

  fc::optional<fc::ip::endpoint> get_remote_endpoint();
  fc::ip::endpoint get_local_endpoint();
  void set_remote_endpoint(fc::optional<fc::ip::endpoint> new_remote_endpoint);

  bool busy() const;
  bool idle() const;
  bool is_currently_handling_message() const;

  bool is_transaction_fetching_inhibited() const;
  fc::sha512 get_shared_secret() const;
  void clear_old_inventory();
  bool is_inventory_advertised_to_us_list_full_for_transactions() const;
  bool is_inventory_advertised_to_us_list_full() const;
  bool performing_firewall_check() const;
  fc::optional<fc::ip::endpoint> get_endpoint_for_connecting() const;
private:
  void send_queued_messages_task();
  void accept_connection_task();
  void connect_to_task(const fc::ip::endpoint& remote_endpoint);
};
typedef std::shared_ptr<peer_connection> peer_connection_ptr;

peer_database

potential_peer_last_connection_disposition

enum potential_peer_last_connection_disposition
{
  never_attempted_to_connect,
  last_connection_failed,
  last_connection_rejected,
  last_connection_handshaking_failed,
  last_connection_succeeded
};

potential_peer_record

struct potential_peer_record
{
  fc::ip::endpoint                  endpoint;
  fc::time_point_sec                last_seen_time;
  fc::enum_type<uint8_t,potential_peer_last_connection_disposition> last_connection_disposition;
  fc::time_point_sec                last_connection_attempt_time;
  uint32_t                          number_of_successful_connection_attempts;
  uint32_t                          number_of_failed_connection_attempts;
  fc::optional<fc::exception>       last_error;

  potential_peer_record() :
    number_of_successful_connection_attempts(0),
  number_of_failed_connection_attempts(0){}

  potential_peer_record(fc::ip::endpoint endpoint,
                        fc::time_point_sec last_seen_time = fc::time_point_sec(),
                        potential_peer_last_connection_disposition last_connection_disposition = never_attempted_to_connect) :
    endpoint(endpoint),
    last_seen_time(last_seen_time),
    last_connection_disposition(last_connection_disposition),
    number_of_successful_connection_attempts(0),
    number_of_failed_connection_attempts(0)
  {}
};
namespace detail
{
  class peer_database_impl;

  class peer_database_iterator_impl;
  class peer_database_iterator : public boost::iterator_facade<peer_database_iterator, const potential_peer_record, boost::forward_traversal_tag>
  {
  public:
    peer_database_iterator();
    ~peer_database_iterator();
    explicit peer_database_iterator(peer_database_iterator_impl* impl);
    peer_database_iterator( const peer_database_iterator& c );

  private:
    friend class boost::iterator_core_access;
    void increment();
    bool equal(const peer_database_iterator& other) const;
    const potential_peer_record& dereference() const;
  private:
    std::unique_ptr<peer_database_iterator_impl> my;
  };
}

peer_database

class peer_database
{
public:
  peer_database();
  ~peer_database();

  void open(const fc::path& databaseFilename);
  void close();
  void clear();

  void erase(const fc::ip::endpoint& endpointToErase);

  void update_entry(const potential_peer_record& updatedRecord);
  potential_peer_record lookup_or_create_entry_for_endpoint(const fc::ip::endpoint& endpointToLookup);
  fc::optional<potential_peer_record> lookup_entry_for_endpoint(const fc::ip::endpoint& endpointToLookup);

  typedef detail::peer_database_iterator iterator;
  iterator begin() const;
  iterator end() const;
  size_t size() const;
private:
  std::unique_ptr<detail::peer_database_impl> my;
};

stcp_socket

  • Uses ECDH to negotiate a aes key for communicating with other nodes on the network.

stcp_socket

class stcp_socket : public virtual fc::iostream
{
  public:
        stcp_socket();
        ~stcp_socket();
        fc::tcp_socket&  get_socket() { return _sock; }
        void             accept();

        void             connect_to( const fc::ip::endpoint& remote_endpoint );
        void             bind( const fc::ip::endpoint& local_endpoint );

        virtual size_t   readsome( char* buffer, size_t max );
        virtual size_t   readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset );
        virtual bool     eof()const;

        virtual size_t   writesome( const char* buffer, size_t len );
        virtual size_t   writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset );

        virtual void     flush();
        virtual void     close();

        using istream::get;
        void             get( char& c ) { read( &c, 1 ); }
        fc::sha512       get_shared_secret() const { return _shared_secret; }
  private:
        void do_key_exchange();

        fc::sha512           _shared_secret;
        fc::ecc::private_key _priv_key;
        fc::array<char,8>    _buf;
        //uint32_t             _buf_len;
        fc::tcp_socket       _sock;
        fc::aes_encoder      _send_aes;
        fc::aes_decoder      _recv_aes;
        std::shared_ptr<char> _read_buffer;
        std::shared_ptr<char> _write_buffer;
#ifndef NDEBUG
        bool _read_buffer_in_use;
        bool _write_buffer_in_use;
#endif
};

typedef std::shared_ptr<stcp_socket> stcp_socket_ptr;