@IgniteSpiMultipleInstancesSupport(value=true) @IgniteSpiConsistencyChecked(optional=false) public class TcpCommunicationSpi extends org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConfigInitializer
To enable communication with other nodes, this SPI adds ATTR_ADDRS
and ATTR_PORT
local node attributes (see ClusterNode.attributes()
.
At startup, this SPI tries to start listening to local port specified by
TcpCommunicationConfigInitializer.setLocalPort(int)
method. If local port is occupied, then SPI will
automatically increment the port number until it can successfully bind for
listening. TcpCommunicationConfigInitializer.setLocalPortRange(int)
configuration parameter controls
maximum number of ports that SPI will try before it fails. Port range comes
very handy when starting multiple grid nodes on the same machine or even
in the same VM. In this case all nodes can be brought up without a single
change in configuration.
This SPI caches connections to remote nodes so it does not have to reconnect every
time a message is sent. By default, idle connections are kept active for
DFLT_IDLE_CONN_TIMEOUT
period and then are closed. Use
TcpCommunicationConfigInitializer.setIdleConnectionTimeout(long)
configuration parameter to configure
you own idle connection timeout.
IgniteConfiguration.getFailureDetectionTimeout()
) for details) are chosen to make possible for
communication SPI work reliably on most of hardware and virtual deployments, but this has made failure detection
time worse.
If it's needed to tune failure detection then it's highly recommended to do this using
IgniteConfiguration.setFailureDetectionTimeout(long)
. This failure timeout automatically controls the
following parameters: TcpCommunicationConfigInitializer.getConnectTimeout()
, TcpCommunicationConfigInitializer.getMaxConnectTimeout()
,
TcpCommunicationConfigInitializer.getReconnectCount()
. If any of those parameters is set explicitly, then the failure timeout setting will be
ignored.
If it's required to perform advanced settings of failure detection and
IgniteConfiguration.getFailureDetectionTimeout()
is unsuitable then various TcpCommunicationSpi
configuration parameters may be used.
TcpCommunicationConfigInitializer.setAddressResolver(AddressResolver)
TcpCommunicationConfigInitializer.setLocalAddress(String)
)TcpCommunicationConfigInitializer.setLocalPort(int)
)TcpCommunicationConfigInitializer.setLocalPortRange(int)
TcpCommunicationConfigInitializer.setUsePairedConnections(boolean)
TcpCommunicationConfigInitializer.setConnectionsPerNode(int)
)TcpCommunicationConfigInitializer.setSharedMemoryPort(int)
TcpCommunicationConfigInitializer.setIdleConnectionTimeout(long)
)TcpCommunicationConfigInitializer.setDirectBuffer(boolean)
)TcpCommunicationConfigInitializer.setDirectSendBuffer(boolean)
)TcpCommunicationConfigInitializer.setSelectorsCount(int)
)TcpCommunicationConfigInitializer.setSelectorSpins(long)
TCP_NODELAY
socket option for sockets (see TcpCommunicationConfigInitializer.setTcpNoDelay(boolean)
)TcpCommunicationConfigInitializer.setFilterReachableAddresses(boolean)
TcpCommunicationConfigInitializer.setMessageQueueLimit(int)
)TcpCommunicationConfigInitializer.setSlowClientQueueLimit(int)
)TcpCommunicationConfigInitializer.setConnectTimeout(long)
)TcpCommunicationConfigInitializer.setMaxConnectTimeout(long)
)TcpCommunicationConfigInitializer.setReconnectCount(int)
)TcpCommunicationConfigInitializer.setSocketReceiveBuffer(int)
)TcpCommunicationConfigInitializer.setSocketSendBuffer(int)
)TcpCommunicationConfigInitializer.setSocketWriteTimeout(long)
)TcpCommunicationConfigInitializer.setAckSendThreshold(int)
)TcpCommunicationConfigInitializer.setAckSendThresholdBytes(long)
)TcpCommunicationConfigInitializer.setAckSendThresholdMillis(long)
)TcpCommunicationConfigInitializer.setUnacknowledgedMessagesBufferSize(int)
)TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); // Override local port. commSpi.setLocalPort(4321); IgniteConfiguration cfg = new IgniteConfiguration(); // Override default communication SPI. cfg.setCommunicationSpi(commSpi); // Start grid. Ignition.start(cfg);
<bean id="grid.custom.cfg" class="org.apache.ignite.configuration.IgniteConfiguration" singleton="true"> ... <property name="communicationSpi"> <bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi"> <!-- Override local port. --> <property name="localPort" value="4321"/> </bean> </property> ... </bean>
For information about Spring framework visit www.springframework.org
CommunicationSpi
Modifier and Type | Field and Description |
---|---|
static String |
ATTR_ADDRS
Node attribute that is mapped to node IP addresses (value is comm.tcp.addrs).
|
static String |
ATTR_EXT_ADDRS
Node attribute that is mapped to node's external addresses (value is comm.tcp.ext-addrs).
|
static String |
ATTR_FORCE_CLIENT_SERVER_CONNECTIONS |
static String |
ATTR_HOST_NAMES
Node attribute that is mapped to node host names (value is comm.tcp.host.names).
|
static String |
ATTR_PAIRED_CONN
Attr paired connection.
|
static String |
ATTR_PORT
Node attribute that is mapped to node port number (value is comm.tcp.port).
|
static String |
ATTR_SHMEM_PORT
Node attribute that is mapped to node port number (value is comm.shmem.tcp.port).
|
static String |
COMMUNICATION_METRICS_GROUP_NAME
Communication metrics group name.
|
static int |
CONN_IDX_META
Connection index meta for session.
|
static short |
CONNECTION_CHECK_MSG_TYPE
Heartbeat message type.
|
static int |
CONSISTENT_ID_META
Node consistent id meta for session.
|
static int |
DFLT_ACK_SND_THRESHOLD
Default received messages threshold (for the number of messages) for sending ack.
|
static long |
DFLT_ACK_SND_THRESHOLD_BYTES
Default received messages threshold (in bytes) for sending ack.
|
static long |
DFLT_ACK_SND_THRESHOLD_MILLIS
Default received messages timeout (in millis) for sending ack.
|
static int |
DFLT_CONN_PER_NODE
Default connections per node.
|
static long |
DFLT_CONN_TIMEOUT
Default connection timeout (value is 5000ms).
|
static boolean |
DFLT_FILTER_REACHABLE_ADDRESSES
Default value for
FILTER_REACHABLE_ADDRESSES socket option (value is false). |
static long |
DFLT_IDLE_CONN_TIMEOUT
Default idle connection timeout (value is 10min).
|
static long |
DFLT_MAX_CONN_TIMEOUT
Default Maximum connection timeout (value is 600,000ms).
|
static int |
DFLT_MSG_QUEUE_LIMIT
Default message queue limit for incoming connections.
|
static int |
DFLT_PORT
Default port which node sets listener to (value is 47100).
|
static int |
DFLT_PORT_RANGE
Default local port range (value is 100).
|
static int |
DFLT_RECONNECT_CNT
Default reconnect attempts count (value is 10).
|
static int |
DFLT_SELECTORS_CNT
Default count of selectors for TCP server equals to
"Math.max(4, Runtime.getRuntime().availableProcessors()
/ 2)" . |
static int |
DFLT_SHMEM_PORT
Default port which node sets listener for shared memory connections (value is 48100).
|
static int |
DFLT_SOCK_BUF_SIZE
Default socket send and receive buffer size.
|
static long |
DFLT_SOCK_WRITE_TIMEOUT
Default socket write timeout.
|
static boolean |
DFLT_TCP_NODELAY
Default value for
TCP_NODELAY socket option (value is true). |
static Integer |
DISABLED_CLIENT_PORT
Client nodes might have port
0 if they have no server socket opened. |
static short |
HANDSHAKE_MSG_TYPE
Handshake message type.
|
static short |
HANDSHAKE_WAIT_MSG_TYPE
Handshake wait message type.
|
static short |
NODE_ID_MSG_TYPE
Node ID message type.
|
static String |
OUT_OF_RESOURCES_TCP_MSG
IPC error message.
|
static String |
RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC
Received messages by node consistent id metric description.
|
static String |
RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME
Received messages by node consistent id metric name.
|
static String |
RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC
Received messages by type metric description.
|
static String |
RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME
Received messages by type metric name.
|
static String |
RECEIVED_MESSAGES_METRIC_DESC
Received messages metric description.
|
static String |
RECEIVED_MESSAGES_METRIC_NAME
Received messages metric name.
|
static short |
RECOVERY_LAST_ID_MSG_TYPE
Recovery last received ID message type.
|
static String |
SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC
Sent messages by node consistent id metric description.
|
static String |
SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME
Sent messages by node consistent id metric name.
|
static String |
SENT_MESSAGES_BY_TYPE_METRIC_DESC
Sent messages by type metric description.
|
static String |
SENT_MESSAGES_BY_TYPE_METRIC_NAME
Sent messages by type metric name.
|
static String |
SENT_MESSAGES_METRIC_DESC
Sent messages metric description.
|
static String |
SENT_MESSAGES_METRIC_NAME
Sent messages metric name.
|
attributeNames, cfg, connPlc, metricsLsnr, shmemSrv, tracing
ignite, igniteInstanceName
Constructor and Description |
---|
TcpCommunicationSpi() |
Modifier and Type | Method and Description |
---|---|
protected void |
checkConfigurationConsistency0(IgniteSpiContext spiCtx,
ClusterNode node,
boolean starting)
Method which is called in the end of checkConfigurationConsistency() method.
|
IgniteFuture<BitSet> |
checkConnection(List<ClusterNode> nodes) |
protected org.apache.ignite.internal.util.nio.GridCommunicationClient |
createTcpClient(ClusterNode node,
int connIdx)
Establish TCP connection to remote node and returns client.
|
org.apache.ignite.internal.IgniteInternalFuture<String> |
dumpNodeStatistics(UUID nodeId) |
void |
dumpStats()
Dumps SPI per-connection stats to logs.
|
CommunicationListener |
getListener() |
int |
getOutboundMessagesQueueSize()
Gets outbound messages queue size.
|
long |
getReceivedBytesCount()
Gets received bytes count.
|
Map<UUID,Long> |
getReceivedMessagesByNode()
Gets received messages counts (grouped by node).
|
Map<String,Long> |
getReceivedMessagesByType()
Gets received messages counts (grouped by type).
|
int |
getReceivedMessagesCount()
Gets received messages count.
|
long |
getSentBytesCount()
Gets sent bytes count.
|
Map<UUID,Long> |
getSentMessagesByNode()
Gets sent messages counts (grouped by node).
|
Map<String,Long> |
getSentMessagesByType()
Gets sent messages counts (grouped by type).
|
int |
getSentMessagesCount()
Gets sent messages count.
|
IgniteSpiContext |
getSpiContext()
Gets SPI context.
|
static short |
makeMessageType(byte b0,
byte b1)
Concatenates the two parameter bytes to form a message type value.
|
Collection<InetSocketAddress> |
nodeAddresses(ClusterNode node,
boolean filterReachableAddrs) |
protected void |
notifyListener(UUID sndId,
Message msg,
IgniteRunnable msgC) |
void |
onClientDisconnected(IgniteFuture<?> reconnectFut)
Client node disconnected callback.
|
void |
onClientReconnected(boolean clusterRestarted)
Client node reconnected callback.
|
protected void |
onContextDestroyed0()
Method to be called in the beginning of onContextDestroyed() method.
|
void |
onContextInitialized0(IgniteSpiContext spiCtx)
Method to be called in the end of onContextInitialized method.
|
protected void |
processSessionCreationError(ClusterNode node,
Collection<InetSocketAddress> addrs,
IgniteCheckedException errs)
Process errors if TCP/IP
GridNioSession creation to remote node hasn't been performed. |
void |
resetMetrics()
Resets metrics for this SPI instance.
|
void |
sendMessage(ClusterNode node,
Message msg)
Sends given message to destination node.
|
void |
sendMessage(ClusterNode node,
Message msg,
IgniteInClosure<IgniteException> ackC)
Sends given message to destination node.
|
void |
setConnectionRequestor(org.apache.ignite.spi.communication.tcp.internal.ConnectionRequestor connectionRequestor) |
void |
setListener(CommunicationListener<Message> lsnr)
Deprecated.
|
void |
simulateNodeFailure()
Deprecated.
you should you DI and get instances of [nioSrvWrapper, commWorker, clientPool] via it.
|
void |
spiStart(String igniteInstanceName)
This method is called to start SPI.
|
void |
spiStop()
This method is called to stop SPI.
|
String |
toString() |
static void |
writeMessageType(ByteBuffer buf,
short type)
Write message type to byte buffer.
|
boundPort, failureDetectionTimeout, failureDetectionTimeoutEnabled, failureDetectionTimeoutEnabled, forceClientToServerConnections, getAckSendThreshold, getAckSendThresholdBytes, getAckSendThresholdMillis, getAddressResolver, getConnectionsPerNode, getConnectTimeout, getIdleConnectionTimeout, getLocalAddress, getLocalPort, getLocalPortRange, getMaxConnectTimeout, getMessageQueueLimit, getNetworkInterfacesBlacklist, getNodeAttributes, getReconnectCount, getSelectorsCount, getSelectorSpins, getSharedMemoryPort, getSlowClientQueueLimit, getSocketReceiveBuffer, getSocketSendBuffer, getSocketWriteTimeout, getUnacknowledgedMessagesBufferSize, injectResources, isDirectBuffer, isDirectSendBuffer, isFilterReachableAddresses, isTcpNoDelay, isUsePairedConnections, resetShmemServer, setAckSendThreshold, setAckSendThresholdBytes, setAckSendThresholdMillis, setAddressResolver, setConnectionsPerNode, setConnectTimeout, setDirectBuffer, setDirectSendBuffer, setEnableConnectionCheck, setFilterReachableAddresses, setForceClientToServerConnections, setIdleConnectionTimeout, setLocalAddress, setLocalPort, setLocalPortRange, setMaxConnectTimeout, setMessageQueueLimit, setName, setNetworkInterfacesBlacklist, setReconnectCount, setSelectorsCount, setSelectorSpins, setSharedMemoryPort, setSlowClientQueueLimit, setSocketReceiveBuffer, setSocketSendBuffer, setSocketWriteTimeout, setTcpNoDelay, setUnacknowledgedMessagesBufferSize, setUsePairedConnections
addTimeoutObject, assertParameter, clientFailureDetectionTimeout, configInfo, createSpiAttributeName, getConsistentAttributeNames, getExceptionRegistry, getLocalNode, getName, ignite, initFailureDetectionTimeout, injectables, isNodeStopping, onBeforeStart, onContextDestroyed, onContextInitialized, registerMBean, removeTimeoutObject, started, startInfo, startStopwatch, stopInfo, unregisterMBean
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
getName, onContextDestroyed, onContextInitialized
public static final String OUT_OF_RESOURCES_TCP_MSG
public static final String ATTR_ADDRS
public static final String ATTR_HOST_NAMES
public static final String ATTR_PORT
public static final String ATTR_SHMEM_PORT
public static final String ATTR_EXT_ADDRS
public static final String ATTR_PAIRED_CONN
public static final String ATTR_FORCE_CLIENT_SERVER_CONNECTIONS
public static final int DFLT_PORT
public static final int DFLT_SHMEM_PORT
public static final long DFLT_IDLE_CONN_TIMEOUT
public static final int DFLT_SOCK_BUF_SIZE
public static final long DFLT_CONN_TIMEOUT
public static final long DFLT_MAX_CONN_TIMEOUT
public static final int DFLT_RECONNECT_CNT
public static final int DFLT_MSG_QUEUE_LIMIT
public static final int DFLT_SELECTORS_CNT
"Math.max(4, Runtime.getRuntime().availableProcessors()
/ 2)"
.public static final int CONN_IDX_META
public static final int CONSISTENT_ID_META
public static final int DFLT_PORT_RANGE
TcpCommunicationConfigInitializer.setLocalPortRange(int)
for details.public static final boolean DFLT_TCP_NODELAY
TCP_NODELAY
socket option (value is true).public static final boolean DFLT_FILTER_REACHABLE_ADDRESSES
FILTER_REACHABLE_ADDRESSES
socket option (value is false).public static final int DFLT_ACK_SND_THRESHOLD
public static final long DFLT_ACK_SND_THRESHOLD_BYTES
public static final long DFLT_ACK_SND_THRESHOLD_MILLIS
public static final long DFLT_SOCK_WRITE_TIMEOUT
public static final int DFLT_CONN_PER_NODE
public static final short NODE_ID_MSG_TYPE
public static final short RECOVERY_LAST_ID_MSG_TYPE
public static final short HANDSHAKE_MSG_TYPE
public static final short HANDSHAKE_WAIT_MSG_TYPE
public static final short CONNECTION_CHECK_MSG_TYPE
public static final String COMMUNICATION_METRICS_GROUP_NAME
public static final String SENT_MESSAGES_METRIC_NAME
public static final String SENT_MESSAGES_METRIC_DESC
public static final String RECEIVED_MESSAGES_METRIC_NAME
public static final String RECEIVED_MESSAGES_METRIC_DESC
public static final String SENT_MESSAGES_BY_TYPE_METRIC_NAME
public static final String SENT_MESSAGES_BY_TYPE_METRIC_DESC
public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME
public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC
public static final String SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME
public static final String SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC
public static final String RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME
public static final String RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC
public static final Integer DISABLED_CLIENT_PORT
0
if they have no server socket opened.@Deprecated public void setListener(CommunicationListener<Message> lsnr)
GridIoManager
set it after self construct.lsnr
- Listener to set or null
to unset the listener.public CommunicationListener getListener()
public void setConnectionRequestor(org.apache.ignite.spi.communication.tcp.internal.ConnectionRequestor connectionRequestor)
public int getSentMessagesCount()
public long getSentBytesCount()
public int getReceivedMessagesCount()
public long getReceivedBytesCount()
public Map<String,Long> getReceivedMessagesByType()
public Map<UUID,Long> getReceivedMessagesByNode()
public Map<String,Long> getSentMessagesByType()
public Map<UUID,Long> getSentMessagesByNode()
public int getOutboundMessagesQueueSize()
public void resetMetrics()
public org.apache.ignite.internal.IgniteInternalFuture<String> dumpNodeStatistics(UUID nodeId)
nodeId
- Target node ID.public void dumpStats()
public void spiStart(String igniteInstanceName) throws IgniteSpiException
igniteInstanceName
- Name of Ignite instance this SPI is being started for
(null
for default Ignite instance).IgniteSpiException
- Throws in case of any error during SPI start.public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException
onContextInitialized0
in class IgniteSpiAdapter
spiCtx
- SPI context.IgniteSpiException
- In case of errors.public IgniteSpiContext getSpiContext()
getSpiContext
in class IgniteSpiAdapter
public void spiStop() throws IgniteSpiException
Note that this method can be called at any point including during recovery of failed start. It should make no assumptions on what state SPI will be in when this method is called.
IgniteSpiException
- Thrown in case of any error during SPI stop.protected void onContextDestroyed0()
onContextDestroyed0
in class IgniteSpiAdapter
public void onClientDisconnected(IgniteFuture<?> reconnectFut)
onClientDisconnected
in interface IgniteSpi
onClientDisconnected
in class IgniteSpiAdapter
reconnectFut
- Future that will be completed when client reconnected.public void onClientReconnected(boolean clusterRestarted)
onClientReconnected
in interface IgniteSpi
onClientReconnected
in class IgniteSpiAdapter
clusterRestarted
- True
if all cluster nodes restarted while client was disconnected.protected void checkConfigurationConsistency0(IgniteSpiContext spiCtx, ClusterNode node, boolean starting) throws IgniteSpiException
checkConfigurationConsistency0
in class IgniteSpiAdapter
spiCtx
- SPI context.node
- Remote node.starting
- If this node is starting or not.IgniteSpiException
- in case of errors.public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException
node
- Destination node.msg
- Message to send.IgniteSpiException
- Thrown in case of any error during sending the message.
Note that this is not guaranteed that failed communication will result
in thrown exception as this is dependant on SPI implementation.public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes)
nodes
- Nodes to check connection with.public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException
node
- Destination node.msg
- Message to send.ackC
- Ack closure.IgniteSpiException
- Thrown in case of any error during sending the message. Note
that this is not guaranteed that failed communication will result in thrown exception as this is dependant on SPI
implementation.public Collection<InetSocketAddress> nodeAddresses(ClusterNode node, boolean filterReachableAddrs) throws IgniteCheckedException
node
- Node.filterReachableAddrs
- Filter addresses flag.IgniteCheckedException
- If node does not have addresses.protected org.apache.ignite.internal.util.nio.GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException
node
- Remote node.connIdx
- Connection index.IgniteCheckedException
- If failed.protected void processSessionCreationError(ClusterNode node, Collection<InetSocketAddress> addrs, IgniteCheckedException errs) throws IgniteCheckedException
GridNioSession
creation to remote node hasn't been performed.node
- Remote node.addrs
- Remote node addresses.errs
- TCP client creation errors.IgniteCheckedException
- If failed.protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC)
sndId
- Sender ID.msg
- Communication message.msgC
- Closure to call when message processing finished.@TestOnly @Deprecated public void simulateNodeFailure()
public static void writeMessageType(ByteBuffer buf, short type)
buf
- Byte buffer.type
- Message type.public static short makeMessageType(byte b0, byte b1)
b0
- The first byte.b1
- The second byte.
GridGain In-Memory Computing Platform : ver. 8.9.15 Release Date : December 3 2024