Class TcpCommunicationSpi

  • All Implemented Interfaces:
    CommunicationSpi<Message>, IgniteSpi

    @IgniteSpiMultipleInstancesSupport(true)
    @IgniteSpiConsistencyChecked(optional=false)
    public class TcpCommunicationSpi
    extends org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConfigInitializer
    TcpCommunicationSpi is default communication SPI which uses TCP/IP protocol and Java NIO to communicate with other nodes.

    To enable communication with other nodes, this SPI adds ATTR_ADDRS and ATTR_PORT local node attributes (see ClusterNode.attributes().

    At startup, this SPI tries to start listening to local port specified by TcpCommunicationConfigInitializer.setLocalPort(int) method. If local port is occupied, then SPI will automatically increment the port number until it can successfully bind for listening. TcpCommunicationConfigInitializer.setLocalPortRange(int) configuration parameter controls maximum number of ports that SPI will try before it fails. Port range comes very handy when starting multiple grid nodes on the same machine or even in the same VM. In this case all nodes can be brought up without a single change in configuration.

    This SPI caches connections to remote nodes so it does not have to reconnect every time a message is sent. By default, idle connections are kept active for DFLT_IDLE_CONN_TIMEOUT period and then are closed. Use TcpCommunicationConfigInitializer.setIdleConnectionTimeout(long) configuration parameter to configure you own idle connection timeout.

    Failure Detection

    Configuration defaults (see Configuration section below and IgniteConfiguration.getFailureDetectionTimeout()) for details) are chosen to make possible for communication SPI work reliably on most of hardware and virtual deployments, but this has made failure detection time worse.

    If it's needed to tune failure detection then it's highly recommended to do this using IgniteConfiguration.setFailureDetectionTimeout(long). This failure timeout automatically controls the following parameters: TcpCommunicationConfigInitializer.getConnectTimeout(), TcpCommunicationConfigInitializer.getMaxConnectTimeout(), TcpCommunicationConfigInitializer.getReconnectCount(). If any of those parameters is set explicitly, then the failure timeout setting will be ignored.

    If it's required to perform advanced settings of failure detection and IgniteConfiguration.getFailureDetectionTimeout() is unsuitable then various TcpCommunicationSpi configuration parameters may be used.

    Configuration

    Mandatory

    This SPI has no mandatory configuration parameters.

    Optional

    The following configuration parameters are optional:
    • Address resolver (see TcpCommunicationConfigInitializer.setAddressResolver(AddressResolver)
    • Node local IP address (see TcpCommunicationConfigInitializer.setLocalAddress(String))
    • Node local port number (see TcpCommunicationConfigInitializer.setLocalPort(int))
    • Local port range (see TcpCommunicationConfigInitializer.setLocalPortRange(int)
    • Use paired connections (see TcpCommunicationConfigInitializer.setUsePairedConnections(boolean)
    • Connections per node (see TcpCommunicationConfigInitializer.setConnectionsPerNode(int))
    • Idle connection timeout (see TcpCommunicationConfigInitializer.setIdleConnectionTimeout(long))
    • Direct or heap buffer allocation (see TcpCommunicationConfigInitializer.setDirectBuffer(boolean))
    • Direct or heap buffer allocation for sending (see TcpCommunicationConfigInitializer.setDirectSendBuffer(boolean))
    • Count of selectors and selector threads for NIO server (see TcpCommunicationConfigInitializer.setSelectorsCount(int))
    • Selector thread busy-loop iterations (see TcpCommunicationConfigInitializer.setSelectorSpins(long)
    • TCP_NODELAY socket option for sockets (see TcpCommunicationConfigInitializer.setTcpNoDelay(boolean))
    • Filter reachable addresses (see TcpCommunicationConfigInitializer.setFilterReachableAddresses(boolean)
    • Message queue limit (see TcpCommunicationConfigInitializer.setMessageQueueLimit(int))
    • Slow client queue limit (see TcpCommunicationConfigInitializer.setSlowClientQueueLimit(int))
    • Connect timeout (see TcpCommunicationConfigInitializer.setConnectTimeout(long))
    • Maximum connect timeout (see TcpCommunicationConfigInitializer.setMaxConnectTimeout(long))
    • Reconnect attempts count (see TcpCommunicationConfigInitializer.setReconnectCount(int))
    • Socket receive buffer size (see TcpCommunicationConfigInitializer.setSocketReceiveBuffer(int))
    • Socket send buffer size (see TcpCommunicationConfigInitializer.setSocketSendBuffer(int))
    • Socket write timeout (see TcpCommunicationConfigInitializer.setSocketWriteTimeout(long))
    • Number of received messages after which acknowledgment is sent (see TcpCommunicationConfigInitializer.setAckSendThreshold(int))
    • Maximum number of unacknowledged messages (see TcpCommunicationConfigInitializer.setUnacknowledgedMessagesBufferSize(int))

    Java Example

    TcpCommunicationSpi is used by default and should be explicitly configured only if some SPI configuration parameters need to be overridden.
     TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
    
     // Override local port.
     commSpi.setLocalPort(4321);
    
     IgniteConfiguration cfg = new IgniteConfiguration();
    
     // Override default communication SPI.
     cfg.setCommunicationSpi(commSpi);
    
     // Start grid.
     Ignition.start(cfg);
     

    Spring Example

    TcpCommunicationSpi can be configured from Spring XML configuration file:
     <bean id="grid.custom.cfg" class="org.apache.ignite.configuration.IgniteConfiguration" singleton="true">
             ...
             <property name="communicationSpi">
                 <bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
                     <!-- Override local port. -->
                     <property name="localPort" value="4321"/>
                 </bean>
             </property>
             ...
     </bean>
     


    For information about Spring framework visit www.springframework.org

    See Also:
    CommunicationSpi
    • Field Detail

      • OUT_OF_RESOURCES_TCP_MSG

        @Deprecated
        public static final String OUT_OF_RESOURCES_TCP_MSG
        Deprecated.
        This constant is not used and will be removed in future releases.
        See Also:
        Constant Field Values
      • ATTR_ADDRS

        public static final String ATTR_ADDRS
        Node attribute that is mapped to node IP addresses (value is comm.tcp.addrs).
        See Also:
        Constant Field Values
      • ATTR_HOST_NAMES

        public static final String ATTR_HOST_NAMES
        Node attribute that is mapped to node host names (value is comm.tcp.host.names).
        See Also:
        Constant Field Values
      • ATTR_PORT

        public static final String ATTR_PORT
        Node attribute that is mapped to node port number (value is comm.tcp.port).
        See Also:
        Constant Field Values
      • ATTR_EXT_ADDRS

        public static final String ATTR_EXT_ADDRS
        Node attribute that is mapped to node's external addresses (value is comm.tcp.ext-addrs).
        See Also:
        Constant Field Values
      • DFLT_PORT

        public static final int DFLT_PORT
        Default port which node sets listener to (value is 47100).
        See Also:
        Constant Field Values
      • DFLT_SHMEM_PORT

        @Deprecated
        public static final int DFLT_SHMEM_PORT
        Deprecated.
        This constant is not used and will be removed in future releases.
        See Also:
        Constant Field Values
      • DFLT_IDLE_CONN_TIMEOUT

        public static final long DFLT_IDLE_CONN_TIMEOUT
        Default idle connection timeout (value is 10min).
        See Also:
        Constant Field Values
      • DFLT_SOCK_BUF_SIZE

        public static final int DFLT_SOCK_BUF_SIZE
        Default socket send and receive buffer size.
        See Also:
        Constant Field Values
      • DFLT_CONN_TIMEOUT

        public static final long DFLT_CONN_TIMEOUT
        Default connection timeout (value is 5000ms).
        See Also:
        Constant Field Values
      • DFLT_MAX_CONN_TIMEOUT

        public static final long DFLT_MAX_CONN_TIMEOUT
        Default Maximum connection timeout (value is 600,000ms).
        See Also:
        Constant Field Values
      • DFLT_RECONNECT_CNT

        public static final int DFLT_RECONNECT_CNT
        Default reconnect attempts count (value is 10).
        See Also:
        Constant Field Values
      • DFLT_MSG_QUEUE_LIMIT

        public static final int DFLT_MSG_QUEUE_LIMIT
        Default message queue limit per connection (for incoming and outgoing .
        See Also:
        Constant Field Values
      • DFLT_SELECTORS_CNT

        public static final int DFLT_SELECTORS_CNT
        Default count of selectors for TCP server equals to "Math.max(4, Runtime.getRuntime().availableProcessors() / 2)".
      • CONN_IDX_META

        public static final int CONN_IDX_META
        Connection index meta for session.
      • CONSISTENT_ID_META

        public static final int CONSISTENT_ID_META
        Node consistent id meta for session.
      • DFLT_PORT_RANGE

        public static final int DFLT_PORT_RANGE
        Default local port range (value is 100). See TcpCommunicationConfigInitializer.setLocalPortRange(int) for details.
        See Also:
        Constant Field Values
      • DFLT_TCP_NODELAY

        public static final boolean DFLT_TCP_NODELAY
        Default value for TCP_NODELAY socket option (value is true).
        See Also:
        Constant Field Values
      • DFLT_FILTER_REACHABLE_ADDRESSES

        public static final boolean DFLT_FILTER_REACHABLE_ADDRESSES
        Default value for FILTER_REACHABLE_ADDRESSES socket option (value is false).
        See Also:
        Constant Field Values
      • DFLT_ACK_SND_THRESHOLD

        public static final int DFLT_ACK_SND_THRESHOLD
        Default received messages threshold for sending ack.
        See Also:
        Constant Field Values
      • DFLT_SOCK_WRITE_TIMEOUT

        public static final long DFLT_SOCK_WRITE_TIMEOUT
        Default socket write timeout.
        See Also:
        Constant Field Values
      • DFLT_CONN_PER_NODE

        public static final int DFLT_CONN_PER_NODE
        Default connections per node.
        See Also:
        Constant Field Values
      • NODE_ID_MSG_TYPE

        public static final short NODE_ID_MSG_TYPE
        Node ID message type.
        See Also:
        Constant Field Values
      • RECOVERY_LAST_ID_MSG_TYPE

        public static final short RECOVERY_LAST_ID_MSG_TYPE
        Recovery last received ID message type.
        See Also:
        Constant Field Values
      • HANDSHAKE_MSG_TYPE

        public static final short HANDSHAKE_MSG_TYPE
        Handshake message type.
        See Also:
        Constant Field Values
      • HANDSHAKE_WAIT_MSG_TYPE

        public static final short HANDSHAKE_WAIT_MSG_TYPE
        Handshake wait message type.
        See Also:
        Constant Field Values
      • COMMUNICATION_METRICS_GROUP_NAME

        public static final String COMMUNICATION_METRICS_GROUP_NAME
        Communication metrics group name.
      • SENT_MESSAGES_METRIC_NAME

        public static final String SENT_MESSAGES_METRIC_NAME
        Sent messages metric name.
        See Also:
        Constant Field Values
      • SENT_MESSAGES_METRIC_DESC

        public static final String SENT_MESSAGES_METRIC_DESC
        Sent messages metric description.
        See Also:
        Constant Field Values
      • RECEIVED_MESSAGES_METRIC_NAME

        public static final String RECEIVED_MESSAGES_METRIC_NAME
        Received messages metric name.
        See Also:
        Constant Field Values
      • RECEIVED_MESSAGES_METRIC_DESC

        public static final String RECEIVED_MESSAGES_METRIC_DESC
        Received messages metric description.
        See Also:
        Constant Field Values
      • SENT_MESSAGES_BY_TYPE_METRIC_NAME

        public static final String SENT_MESSAGES_BY_TYPE_METRIC_NAME
        Sent messages by type metric name.
        See Also:
        Constant Field Values
      • SENT_MESSAGES_BY_TYPE_METRIC_DESC

        public static final String SENT_MESSAGES_BY_TYPE_METRIC_DESC
        Sent messages by type metric description.
        See Also:
        Constant Field Values
      • RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME

        public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME
        Received messages by type metric name.
        See Also:
        Constant Field Values
      • RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC

        public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC
        Received messages by type metric description.
        See Also:
        Constant Field Values
      • SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME

        public static final String SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME
        Sent messages by node consistent id metric name.
        See Also:
        Constant Field Values
      • SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC

        public static final String SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC
        Sent messages by node consistent id metric description.
        See Also:
        Constant Field Values
      • RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME

        public static final String RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME
        Received messages by node consistent id metric name.
        See Also:
        Constant Field Values
      • RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC

        public static final String RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC
        Received messages by node consistent id metric description.
        See Also:
        Constant Field Values
      • DISABLED_CLIENT_PORT

        public static final Integer DISABLED_CLIENT_PORT
        Client nodes might have port 0 if they have no server socket opened.
      • ATTR_FORCE_CLIENT_SERVER_CONNECTIONS

        public static final String ATTR_FORCE_CLIENT_SERVER_CONNECTIONS
        See Also:
        Constant Field Values
    • Constructor Detail

      • TcpCommunicationSpi

        public TcpCommunicationSpi()
    • Method Detail

      • setListener

        @Deprecated
        public void setListener​(CommunicationListener<Message> lsnr)
        Deprecated.
        Set communication listener. This call should be change after refactoring. It produces dependency hell. Because GridIoManager set it after self construct.
        Parameters:
        lsnr - Listener to set or null to unset the listener.
      • getSentMessagesCount

        public int getSentMessagesCount()
        Gets sent messages count.
        Returns:
        Sent messages count.
      • getSentBytesCount

        public long getSentBytesCount()
        Gets sent bytes count.
        Returns:
        Sent bytes count.
      • getReceivedMessagesCount

        public int getReceivedMessagesCount()
        Gets received messages count.
        Returns:
        Received messages count.
      • getReceivedBytesCount

        public long getReceivedBytesCount()
        Gets received bytes count.
        Returns:
        Received bytes count.
      • getReceivedMessagesByType

        public Map<String,​Long> getReceivedMessagesByType()
        Gets received messages counts (grouped by type).
        Returns:
        Map containing message types and respective counts.
      • getReceivedMessagesByNode

        public Map<UUID,​Long> getReceivedMessagesByNode()
        Gets received messages counts (grouped by node).
        Returns:
        Map containing sender nodes and respective counts.
      • getSentMessagesByType

        public Map<String,​Long> getSentMessagesByType()
        Gets sent messages counts (grouped by type).
        Returns:
        Map containing message types and respective counts.
      • getSentMessagesByNode

        public Map<UUID,​Long> getSentMessagesByNode()
        Gets sent messages counts (grouped by node).
        Returns:
        Map containing receiver nodes and respective counts.
      • getOutboundMessagesQueueSize

        public int getOutboundMessagesQueueSize()
        Gets outbound messages queue size.
        Returns:
        Outbound messages queue size.
      • resetMetrics

        public void resetMetrics()
        Resets metrics for this SPI instance.
      • dumpNodeStatistics

        public org.apache.ignite.internal.IgniteInternalFuture<String> dumpNodeStatistics​(UUID nodeId)
        Parameters:
        nodeId - Target node ID.
        Returns:
        Future.
      • dumpStats

        public void dumpStats()
        Dumps SPI per-connection stats to logs.
      • spiStart

        public void spiStart​(String igniteInstanceName)
                      throws IgniteSpiException
        This method is called to start SPI. After this method returns successfully kernel assumes that SPI is fully operational.
        Parameters:
        igniteInstanceName - Name of Ignite instance this SPI is being started for (null for default Ignite instance).
        Throws:
        IgniteSpiException - Throws in case of any error during SPI start.
      • spiStop

        public void spiStop()
                     throws IgniteSpiException
        This method is called to stop SPI. After this method returns kernel assumes that this SPI is finished and all resources acquired by it are released.

        Note that this method can be called at any point including during recovery of failed start. It should make no assumptions on what state SPI will be in when this method is called.

        Throws:
        IgniteSpiException - Thrown in case of any error during SPI stop.
      • onContextDestroyed0

        protected void onContextDestroyed0()
        Method to be called in the beginning of onContextDestroyed() method.
        Overrides:
        onContextDestroyed0 in class IgniteSpiAdapter
      • onClientReconnected

        public void onClientReconnected​(boolean clusterRestarted)
        Client node reconnected callback.
        Specified by:
        onClientReconnected in interface IgniteSpi
        Overrides:
        onClientReconnected in class IgniteSpiAdapter
        Parameters:
        clusterRestarted - True if all cluster nodes restarted while client was disconnected.
      • openChannel

        public org.apache.ignite.internal.IgniteInternalFuture<Channel> openChannel​(ClusterNode remote,
                                                                                    Message initMsg)
                                                                             throws IgniteSpiException
        Parameters:
        remote - Destination cluster node to communicate with.
        initMsg - Configuration channel attributes wrapped into the message.
        Returns:
        The future, which will be finished on channel ready.
        Throws:
        IgniteSpiException - If fails.
      • sendMessage

        public void sendMessage​(ClusterNode node,
                                Message msg)
                         throws IgniteSpiException
        Sends given message to destination node. Note that characteristics of the exchange such as durability, guaranteed delivery or error notification is dependant on SPI implementation.
        Parameters:
        node - Destination node.
        msg - Message to send.
        Throws:
        IgniteSpiException - Thrown in case of any error during sending the message. Note that this is not guaranteed that failed communication will result in thrown exception as this is dependant on SPI implementation.
      • checkConnection

        public IgniteFuture<BitSet> checkConnection​(List<ClusterNode> nodes)
        Parameters:
        nodes - Nodes to check connection with.
        Returns:
        Result future (each bit in result BitSet contains connection status to corresponding node).
      • sendMessage

        public void sendMessage​(ClusterNode node,
                                Message msg,
                                IgniteInClosure<IgniteException> ackC)
                         throws IgniteSpiException
        Sends given message to destination node. Note that characteristics of the exchange such as durability, guaranteed delivery or error notification is dependant on SPI implementation.
        Parameters:
        node - Destination node.
        msg - Message to send.
        ackC - Ack closure.
        Throws:
        IgniteSpiException - Thrown in case of any error during sending the message. Note that this is not guaranteed that failed communication will result in thrown exception as this is dependant on SPI implementation.
      • createTcpClient

        protected org.apache.ignite.internal.util.nio.GridCommunicationClient createTcpClient​(ClusterNode node,
                                                                                              int connIdx)
                                                                                       throws IgniteCheckedException
        Establish TCP connection to remote node and returns client.
        Parameters:
        node - Remote node.
        connIdx - Connection index.
        Returns:
        Client.
        Throws:
        IgniteCheckedException - If failed.
      • notifyListener

        protected void notifyListener​(UUID sndId,
                                      Message msg,
                                      IgniteRunnable msgC)
        Parameters:
        sndId - Sender ID.
        msg - Communication message.
        msgC - Closure to call when message processing finished.
      • simulateNodeFailure

        @TestOnly
        @Deprecated
        public void simulateNodeFailure()
        Deprecated.
        you should you DI and get instances of [nioSrvWrapper, commWorker, clientPool] via it.
        Stops service threads to simulate node failure. FOR TEST PURPOSES ONLY!!!
      • writeMessageType

        public static void writeMessageType​(ByteBuffer buf,
                                            short type)
        Write message type to byte buffer.
        Parameters:
        buf - Byte buffer.
        type - Message type.
      • makeMessageType

        public static short makeMessageType​(byte b0,
                                            byte b1)
        Concatenates the two parameter bytes to form a message type value.
        Parameters:
        b0 - The first byte.
        b1 - The second byte.
        Returns:
        Message type.