@IgniteSpiMultipleInstancesSupport(value=true) @IgniteSpiConsistencyChecked(optional=false) public class TcpCommunicationSpi extends IgniteSpiAdapter implements CommunicationSpi<Message>
To enable communication with other nodes, this SPI adds ATTR_ADDRS
and ATTR_PORT
local node attributes (see ClusterNode.attributes()
.
At startup, this SPI tries to start listening to local port specified by
setLocalPort(int)
method. If local port is occupied, then SPI will
automatically increment the port number until it can successfully bind for
listening. setLocalPortRange(int)
configuration parameter controls
maximum number of ports that SPI will try before it fails. Port range comes
very handy when starting multiple grid nodes on the same machine or even
in the same VM. In this case all nodes can be brought up without a single
change in configuration.
This SPI caches connections to remote nodes so it does not have to reconnect every
time a message is sent. By default, idle connections are kept active for
DFLT_IDLE_CONN_TIMEOUT
period and then are closed. Use
setIdleConnectionTimeout(long)
configuration parameter to configure
you own idle connection timeout.
IgniteConfiguration.getFailureDetectionTimeout()
) for details) are chosen to make possible for
communication SPI work reliably on most of hardware and virtual deployments, but this has made failure detection
time worse.
If it's needed to tune failure detection then it's highly recommended to do this using
IgniteConfiguration.setFailureDetectionTimeout(long)
. This failure timeout automatically controls the
following parameters: getConnectTimeout()
, getMaxConnectTimeout()
,
getReconnectCount()
. If any of those parameters is set explicitly, then the failure timeout setting will be
ignored.
If it's required to perform advanced settings of failure detection and
IgniteConfiguration.getFailureDetectionTimeout()
is unsuitable then various TcpCommunicationSpi
configuration parameters may be used.
setAddressResolver(AddressResolver)
setLocalAddress(String)
)setLocalPort(int)
)setLocalPortRange(int)
setUsePairedConnections(boolean)
setConnectionsPerNode(int)
)setSharedMemoryPort(int)
setIdleConnectionTimeout(long)
)setDirectBuffer(boolean)
)setDirectSendBuffer(boolean)
)setSelectorsCount(int)
)setSelectorSpins(long)
TCP_NODELAY
socket option for sockets (see setTcpNoDelay(boolean)
)setFilterReachableAddresses(boolean)
setMessageQueueLimit(int)
)setSlowClientQueueLimit(int)
)setConnectTimeout(long)
)setMaxConnectTimeout(long)
)setReconnectCount(int)
)setSocketReceiveBuffer(int)
)setSocketSendBuffer(int)
)setSocketWriteTimeout(long)
)setAckSendThreshold(int)
)setUnacknowledgedMessagesBufferSize(int)
)TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); // Override local port. commSpi.setLocalPort(4321); IgniteConfiguration cfg = new IgniteConfiguration(); // Override default communication SPI. cfg.setCommunicationSpi(commSpi); // Start grid. Ignition.start(cfg);
<bean id="grid.custom.cfg" class="org.apache.ignite.configuration.IgniteConfiguration" singleton="true"> ... <property name="communicationSpi"> <bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi"> <!-- Override local port. --> <property name="localPort" value="4321"/> </bean> </property> ... </bean>
For information about Spring framework visit www.springframework.org
CommunicationSpi
Modifier and Type | Field and Description |
---|---|
static String |
ATTR_ADDRS
Node attribute that is mapped to node IP addresses (value is comm.tcp.addrs).
|
static String |
ATTR_EXT_ADDRS
Node attribute that is mapped to node's external addresses (value is comm.tcp.ext-addrs).
|
static String |
ATTR_HOST_NAMES
Node attribute that is mapped to node host names (value is comm.tcp.host.names).
|
static String |
ATTR_PAIRED_CONN |
static String |
ATTR_PORT
Node attribute that is mapped to node port number (value is comm.tcp.port).
|
static String |
ATTR_SHMEM_PORT
Node attribute that is mapped to node port number (value is comm.shmem.tcp.port).
|
static int |
CONN_IDX_META
Connection index meta for session.
|
static int |
DFLT_ACK_SND_THRESHOLD
Default received messages threshold for sending ack.
|
static int |
DFLT_CONN_PER_NODE
Default connections per node.
|
static long |
DFLT_CONN_TIMEOUT
Default connection timeout (value is 5000ms).
|
static boolean |
DFLT_FILTER_REACHABLE_ADDRESSES
Default value for
FILTER_REACHABLE_ADDRESSES socket option (value is false). |
static long |
DFLT_IDLE_CONN_TIMEOUT
Default idle connection timeout (value is 10min).
|
static long |
DFLT_MAX_CONN_TIMEOUT
Default Maximum connection timeout (value is 600,000ms).
|
static int |
DFLT_MSG_QUEUE_LIMIT
Default message queue limit per connection (for incoming and outgoing .
|
static int |
DFLT_PORT
Default port which node sets listener to (value is 47100).
|
static int |
DFLT_PORT_RANGE
Default local port range (value is 100).
|
static int |
DFLT_RECONNECT_CNT
Default reconnect attempts count (value is 10).
|
static int |
DFLT_SELECTORS_CNT
Default count of selectors for TCP server equals to
"Math.max(4, Runtime.getRuntime().availableProcessors() / 2)" . |
static int |
DFLT_SHMEM_PORT
Default port which node sets listener for shared memory connections (value is 48100).
|
static int |
DFLT_SOCK_BUF_SIZE
Default socket send and receive buffer size.
|
static long |
DFLT_SOCK_WRITE_TIMEOUT
Default socket write timeout.
|
static boolean |
DFLT_TCP_NODELAY
Default value for
TCP_NODELAY socket option (value is true). |
static short |
HANDSHAKE_MSG_TYPE
Handshake message type.
|
static short |
NODE_ID_MSG_TYPE
Node ID message type.
|
static String |
OUT_OF_RESOURCES_TCP_MSG
IPC error message.
|
static short |
RECOVERY_LAST_ID_MSG_TYPE
Recovery last received ID message type.
|
ignite, igniteInstanceName
Constructor and Description |
---|
TcpCommunicationSpi() |
Modifier and Type | Method and Description |
---|---|
int |
boundPort() |
protected void |
checkConfigurationConsistency0(IgniteSpiContext spiCtx,
ClusterNode node,
boolean starting)
Method which is called in the end of checkConfigurationConsistency() method.
|
IgniteFuture<BitSet> |
checkConnection(List<ClusterNode> nodes) |
protected org.apache.ignite.internal.util.nio.GridCommunicationClient |
createTcpClient(ClusterNode node,
int connIdx)
Establish TCP connection to remote node and returns client.
|
org.apache.ignite.internal.IgniteInternalFuture<String> |
dumpNodeStatistics(UUID nodeId) |
void |
dumpStats()
Dumps SPI per-connection stats to logs.
|
int |
getAckSendThreshold()
|
AddressResolver |
getAddressResolver()
|
int |
getConnectionsPerNode()
|
long |
getConnectTimeout()
|
long |
getIdleConnectionTimeout()
|
CommunicationListener |
getListener() |
String |
getLocalAddress()
|
int |
getLocalPort()
See
setLocalPort(int) . |
int |
getLocalPortRange()
|
long |
getMaxConnectTimeout()
Gets maximum connect timeout.
|
int |
getMessageQueueLimit()
Gets message queue limit for incoming and outgoing messages.
|
Map<String,Object> |
getNodeAttributes()
This method is called before SPI starts (before method
IgniteSpi.spiStart(String)
is called). |
int |
getOutboundMessagesQueueSize()
Gets outbound messages queue size.
|
long |
getReceivedBytesCount()
Gets received bytes count.
|
Map<UUID,Long> |
getReceivedMessagesByNode()
Gets received messages counts (grouped by node).
|
Map<String,Long> |
getReceivedMessagesByType()
Gets received messages counts (grouped by type).
|
int |
getReceivedMessagesCount()
Gets received messages count.
|
int |
getReconnectCount()
Gets maximum number of reconnect attempts used when establishing connection
with remote nodes.
|
int |
getSelectorsCount()
|
long |
getSelectorSpins()
|
long |
getSentBytesCount()
Gets sent bytes count.
|
Map<UUID,Long> |
getSentMessagesByNode()
Gets sent messages counts (grouped by node).
|
Map<String,Long> |
getSentMessagesByType()
Gets sent messages counts (grouped by type).
|
int |
getSentMessagesCount()
Gets sent messages count.
|
int |
getSharedMemoryPort()
|
int |
getSlowClientQueueLimit()
|
int |
getSocketReceiveBuffer()
|
int |
getSocketSendBuffer()
|
long |
getSocketWriteTimeout()
|
IgniteSpiContext |
getSpiContext()
Gets SPI context.
|
int |
getUnacknowledgedMessagesBufferSize()
|
protected void |
injectResources(Ignite ignite)
Injects resources.
|
boolean |
isDirectBuffer()
Gets flag that indicates whether direct or heap allocated buffer is used.
|
boolean |
isDirectSendBuffer()
Gets flag defining whether direct send buffer should be used.
|
boolean |
isFilterReachableAddresses()
Gets value for
FILTER_REACHABLE_ADDRESSES socket option. |
boolean |
isTcpNoDelay()
Gets value for
TCP_NODELAY socket option. |
boolean |
isUsePairedConnections()
|
Collection<InetSocketAddress> |
nodeAddresses(ClusterNode node,
boolean filterReachableAddresses) |
protected void |
notifyListener(UUID sndId,
Message msg,
IgniteRunnable msgC) |
void |
onClientDisconnected(IgniteFuture<?> reconnectFut)
Client node disconnected callback.
|
void |
onClientReconnected(boolean clusterRestarted)
Client node reconnected callback.
|
protected void |
onContextDestroyed0()
Method to be called in the beginning of onContextDestroyed() method.
|
void |
onContextInitialized0(IgniteSpiContext spiCtx)
Method to be called in the end of onContextInitialized method.
|
protected void |
processClientCreationError(ClusterNode node,
Collection<InetSocketAddress> addrs,
IgniteCheckedException errs)
Process errors if TCP client to remote node hasn't been created.
|
void |
resetMetrics()
Resets metrics for this SPI instance.
|
void |
sendMessage(ClusterNode node,
Message msg)
Sends given message to destination node.
|
void |
sendMessage(ClusterNode node,
Message msg,
IgniteInClosure<IgniteException> ackC)
Sends given message to destination node.
|
TcpCommunicationSpi |
setAckSendThreshold(int ackSndThreshold)
Sets number of received messages per connection to node after which acknowledgment message is sent.
|
TcpCommunicationSpi |
setAddressResolver(AddressResolver addrRslvr)
Sets address resolver.
|
TcpCommunicationSpi |
setConnectionsPerNode(int maxConnectionsPerNode)
Sets number of connections to each remote node. if
isUsePairedConnections()
is true then number of connections is doubled and half is used for incoming and
half for outgoing messages. |
TcpCommunicationSpi |
setConnectTimeout(long connTimeout)
Sets connect timeout used when establishing connection
with remote nodes.
|
TcpCommunicationSpi |
setDirectBuffer(boolean directBuf)
Sets flag to allocate direct or heap buffer in SPI.
|
TcpCommunicationSpi |
setDirectSendBuffer(boolean directSndBuf)
Sets whether to use direct buffer for sending.
|
TcpCommunicationSpi |
setFilterReachableAddresses(boolean filterReachableAddresses)
Setting this option to
true enables filter for reachable
addresses on creating tcp client. |
TcpCommunicationSpi |
setIdleConnectionTimeout(long idleConnTimeout)
Sets maximum idle connection timeout upon which a connection
to client will be closed.
|
void |
setListener(CommunicationListener<Message> lsnr)
Set communication listener.
|
TcpCommunicationSpi |
setLocalAddress(String locAddr)
Sets local host address for socket binding.
|
TcpCommunicationSpi |
setLocalPort(int locPort)
Sets local port for socket binding.
|
TcpCommunicationSpi |
setLocalPortRange(int locPortRange)
Sets local port range for local host ports (value must greater than or equal to 0).
|
TcpCommunicationSpi |
setMaxConnectTimeout(long maxConnTimeout)
Sets maximum connect timeout.
|
TcpCommunicationSpi |
setMessageQueueLimit(int msgQueueLimit)
Sets message queue limit for incoming and outgoing messages.
|
TcpCommunicationSpi |
setName(String name)
Sets SPI name.
|
TcpCommunicationSpi |
setReconnectCount(int reconCnt)
Sets maximum number of reconnect attempts used when establishing connection
with remote nodes.
|
TcpCommunicationSpi |
setSelectorsCount(int selectorsCnt)
Sets the count of selectors te be used in TCP server.
|
TcpCommunicationSpi |
setSelectorSpins(long selectorSpins)
Defines how many non-blocking
selector.selectNow() should be made before
falling into selector.select(long) in NIO server. |
TcpCommunicationSpi |
setSharedMemoryPort(int shmemPort)
Sets local port to accept shared memory connections.
|
TcpCommunicationSpi |
setSlowClientQueueLimit(int slowClientQueueLimit)
Sets slow client queue limit.
|
TcpCommunicationSpi |
setSocketReceiveBuffer(int sockRcvBuf)
Sets receive buffer size for sockets created or accepted by this SPI.
|
TcpCommunicationSpi |
setSocketSendBuffer(int sockSndBuf)
Sets send buffer size for sockets created or accepted by this SPI.
|
TcpCommunicationSpi |
setSocketWriteTimeout(long sockWriteTimeout)
Sets socket write timeout for TCP connection.
|
TcpCommunicationSpi |
setTcpNoDelay(boolean tcpNoDelay)
Sets value for
TCP_NODELAY socket option. |
TcpCommunicationSpi |
setUnacknowledgedMessagesBufferSize(int unackedMsgsBufSize)
Sets maximum number of stored unacknowledged messages per connection to node.
|
TcpCommunicationSpi |
setUsePairedConnections(boolean usePairedConnections)
Set this to
true if TcpCommunicationSpi should
maintain connection for outgoing and incoming messages separately. |
void |
simulateNodeFailure()
Stops service threads to simulate node failure.
|
void |
spiStart(String igniteInstanceName)
This method is called to start SPI.
|
void |
spiStop()
This method is called to stop SPI.
|
String |
toString() |
static void |
writeMessageType(ByteBuffer buf,
short type)
Write message type to byte buffer.
|
addTimeoutObject, assertParameter, clientFailureDetectionTimeout, configInfo, createSpiAttributeName, failureDetectionTimeout, failureDetectionTimeoutEnabled, failureDetectionTimeoutEnabled, getConsistentAttributeNames, getExceptionRegistry, getLocalNode, getName, ignite, initFailureDetectionTimeout, injectables, isNodeStopping, onBeforeStart, onContextDestroyed, onContextInitialized, registerMBean, removeTimeoutObject, started, startInfo, startStopwatch, stopInfo, unregisterMBean
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
getName, onContextDestroyed, onContextInitialized
public static final String OUT_OF_RESOURCES_TCP_MSG
public static final String ATTR_ADDRS
public static final String ATTR_HOST_NAMES
public static final String ATTR_PORT
public static final String ATTR_SHMEM_PORT
public static final String ATTR_EXT_ADDRS
public static final String ATTR_PAIRED_CONN
public static final int DFLT_PORT
public static final int DFLT_SHMEM_PORT
public static final long DFLT_IDLE_CONN_TIMEOUT
public static final int DFLT_SOCK_BUF_SIZE
public static final long DFLT_CONN_TIMEOUT
public static final long DFLT_MAX_CONN_TIMEOUT
public static final int DFLT_RECONNECT_CNT
public static final int DFLT_MSG_QUEUE_LIMIT
public static final int DFLT_SELECTORS_CNT
"Math.max(4, Runtime.getRuntime().availableProcessors() / 2)"
.public static final int CONN_IDX_META
public static final int DFLT_PORT_RANGE
setLocalPortRange(int)
for details.public static final boolean DFLT_TCP_NODELAY
TCP_NODELAY
socket option (value is true).public static final boolean DFLT_FILTER_REACHABLE_ADDRESSES
FILTER_REACHABLE_ADDRESSES
socket option (value is false).public static final int DFLT_ACK_SND_THRESHOLD
public static final long DFLT_SOCK_WRITE_TIMEOUT
public static final int DFLT_CONN_PER_NODE
public static final short NODE_ID_MSG_TYPE
public static final short RECOVERY_LAST_ID_MSG_TYPE
public static final short HANDSHAKE_MSG_TYPE
@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setAddressResolver(AddressResolver addrRslvr)
addrRslvr
- Address resolver.this
for chaining.public AddressResolver getAddressResolver()
@IgniteInstanceResource protected void injectResources(Ignite ignite)
injectResources
in class IgniteSpiAdapter
ignite
- Ignite.@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setLocalAddress(String locAddr)
locAddr
- IP address. Default value is any available local
IP address.this
for chaining.public String getLocalAddress()
@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setLocalPort(int locPort)
If not provided, default value is DFLT_PORT
.
locPort
- Port number.this
for chaining.public int getLocalPort()
setLocalPort(int)
.@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setLocalPortRange(int locPortRange)
setLocalPort(int)
} is occupied,
implementation will try to increment the port number for as long as it is less than
initial value plus this range.
If port range value is 0, then implementation will try bind only to the port provided by
setLocalPort(int)
method and fail if binding to this port did not succeed.
Local port range is very useful during development when more than one grid nodes need to run on the same physical machine.
If not provided, default value is DFLT_PORT_RANGE
.
locPortRange
- New local port range.this
for chaining.public int getLocalPortRange()
public boolean isUsePairedConnections()
true
to use paired connections and false
otherwise.@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setUsePairedConnections(boolean usePairedConnections)
true
if TcpCommunicationSpi
should
maintain connection for outgoing and incoming messages separately.
In this case total number of connections between local and each remote node
is getConnectionsPerNode()
* 2.
Set this to false
if each connection of getConnectionsPerNode()
should be used for outgoing and incoming messages. In this case total number
of connections between local and each remote node is getConnectionsPerNode()
.
Default is false
.
usePairedConnections
- true
to use paired connections and false
otherwise.this
for chaining.getConnectionsPerNode()
@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setConnectionsPerNode(int maxConnectionsPerNode)
isUsePairedConnections()
is true
then number of connections is doubled and half is used for incoming and
half for outgoing messages.maxConnectionsPerNode
- Number of connections per node.this
for chaining.isUsePairedConnections()
public int getConnectionsPerNode()
@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setSharedMemoryPort(int shmemPort)
If set to -1
shared memory communication will be disabled.
If not provided, default value is DFLT_SHMEM_PORT
.
shmemPort
- Port number.this
for chaining.public int getSharedMemoryPort()
@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setIdleConnectionTimeout(long idleConnTimeout)
If not provided, default value is DFLT_IDLE_CONN_TIMEOUT
.
idleConnTimeout
- Maximum idle connection time.this
for chaining.public long getIdleConnectionTimeout()
public long getSocketWriteTimeout()
@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setSocketWriteTimeout(long sockWriteTimeout)
Default to DFLT_SOCK_WRITE_TIMEOUT
.
sockWriteTimeout
- Socket write timeout for TCP connection.this
for chaining.public int getAckSendThreshold()
@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setAckSendThreshold(int ackSndThreshold)
Default to DFLT_ACK_SND_THRESHOLD
.
ackSndThreshold
- Number of received messages after which acknowledgment is sent.this
for chaining.public int getUnacknowledgedMessagesBufferSize()
@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setUnacknowledgedMessagesBufferSize(int unackedMsgsBufSize)
unackedMsgsBufSize
- Maximum number of unacknowledged messages.this
for chaining.@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setConnectTimeout(long connTimeout)
0
is interpreted as infinite timeout.
If not provided, default value is DFLT_CONN_TIMEOUT
.
When this property is explicitly set IgniteConfiguration.getFailureDetectionTimeout()
is ignored.
connTimeout
- Connect timeout.this
for chaining.public long getConnectTimeout()
@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setMaxConnectTimeout(long maxConnTimeout)
0
is interpreted as infinite timeout.
If not provided, default value is DFLT_MAX_CONN_TIMEOUT
.
When this property is explicitly set IgniteConfiguration.getFailureDetectionTimeout()
is ignored.
maxConnTimeout
- Maximum connect timeout.this
for chaining.public long getMaxConnectTimeout()
@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setReconnectCount(int reconCnt)
If not provided, default value is DFLT_RECONNECT_CNT
.
When this property is explicitly set IgniteConfiguration.getFailureDetectionTimeout()
is ignored.
reconCnt
- Maximum number of reconnection attempts.this
for chaining.public int getReconnectCount()
@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setDirectBuffer(boolean directBuf)
true
, then SPI will use ByteBuffer.allocateDirect(int)
call.
Otherwise, SPI will use ByteBuffer.allocate(int)
call.
If not provided, default value is true
.
directBuf
- Flag indicates to allocate direct or heap buffer in SPI.this
for chaining.public boolean isDirectBuffer()
public boolean isDirectSendBuffer()
True
if direct buffers should be used.@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setDirectSendBuffer(boolean directSndBuf)
false
.directSndBuf
- True
to use direct buffers for send.this
for chaining.@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setSelectorsCount(int selectorsCnt)
DFLT_SELECTORS_CNT
.selectorsCnt
- Selectors count.this
for chaining.public int getSelectorsCount()
public long getSelectorSpins()
@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setSelectorSpins(long selectorSpins)
selector.selectNow()
should be made before
falling into selector.select(long)
in NIO server. Long value. Default is 0
.
Can be set to Long.MAX_VALUE
so selector threads will never block.selectorSpins
- Selector thread busy-loop iterations.this
for chaining.@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setTcpNoDelay(boolean tcpNoDelay)
TCP_NODELAY
socket option. Each
socket will be opened using provided value.
Setting this option to true
disables Nagle's algorithm
for socket decreasing latency and delivery time for small messages.
For systems that work under heavy network load it is advisable to
set this value to false
.
If not provided, default value is DFLT_TCP_NODELAY
.
tcpNoDelay
- True
to disable TCP delay.this
for chaining.public boolean isTcpNoDelay()
TCP_NODELAY
socket option.True
if TCP delay is disabled.public boolean isFilterReachableAddresses()
FILTER_REACHABLE_ADDRESSES
socket option.True
if needed to filter reachable addresses.@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setFilterReachableAddresses(boolean filterReachableAddresses)
true
enables filter for reachable
addresses on creating tcp client.
Usually its advised to set this value to false
.
If not provided, default value is DFLT_FILTER_REACHABLE_ADDRESSES
.
filterReachableAddresses
- True
to filter reachable addresses.this
for chaining.@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setSocketReceiveBuffer(int sockRcvBuf)
If not provided, default is DFLT_SOCK_BUF_SIZE
.
sockRcvBuf
- Socket receive buffer size.this
for chaining.public int getSocketReceiveBuffer()
@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setSocketSendBuffer(int sockSndBuf)
If not provided, default is DFLT_SOCK_BUF_SIZE
.
sockSndBuf
- Socket send buffer size.this
for chaining.public int getSocketSendBuffer()
@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setMessageQueueLimit(int msgQueueLimit)
When set to positive number send queue is limited to the configured value.
0
disables the size limitations.
If not provided, default is DFLT_MSG_QUEUE_LIMIT
.
msgQueueLimit
- Send queue size limit.this
for chaining.public int getMessageQueueLimit()
public int getSlowClientQueueLimit()
@IgniteSpiConfiguration(optional=true) public TcpCommunicationSpi setSlowClientQueueLimit(int slowClientQueueLimit)
getMessageQueueLimit()
which controls
message back-pressure for server nodes. The default value for this parameter is 0
which means unlimited
.slowClientQueueLimit
- Slow client queue limit.this
for chaining.public void setListener(CommunicationListener<Message> lsnr)
setListener
in interface CommunicationSpi<Message>
lsnr
- Listener to set or null
to unset the listener.public CommunicationListener getListener()
public int getSentMessagesCount()
getSentMessagesCount
in interface CommunicationSpi<Message>
public long getSentBytesCount()
getSentBytesCount
in interface CommunicationSpi<Message>
public int getReceivedMessagesCount()
getReceivedMessagesCount
in interface CommunicationSpi<Message>
public long getReceivedBytesCount()
getReceivedBytesCount
in interface CommunicationSpi<Message>
public Map<String,Long> getReceivedMessagesByType()
public Map<UUID,Long> getReceivedMessagesByNode()
public Map<String,Long> getSentMessagesByType()
public Map<UUID,Long> getSentMessagesByNode()
public int getOutboundMessagesQueueSize()
getOutboundMessagesQueueSize
in interface CommunicationSpi<Message>
public void resetMetrics()
resetMetrics
in interface CommunicationSpi<Message>
public org.apache.ignite.internal.IgniteInternalFuture<String> dumpNodeStatistics(UUID nodeId)
nodeId
- Target node ID.public void dumpStats()
public Map<String,Object> getNodeAttributes() throws IgniteSpiException
IgniteSpi.spiStart(String)
is called). It allows SPI implementation to add attributes to a local
node. Kernal collects these attributes from all SPI implementations
loaded up and then passes it to discovery SPI so that they can be
exchanged with other nodes.getNodeAttributes
in interface IgniteSpi
getNodeAttributes
in class IgniteSpiAdapter
IgniteSpiException
- Throws in case of any error.public int boundPort()
public void spiStart(String igniteInstanceName) throws IgniteSpiException
spiStart
in interface IgniteSpi
igniteInstanceName
- Name of Ignite instance this SPI is being started for
(null
for default Ignite instance).IgniteSpiException
- Throws in case of any error during SPI start.public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException
onContextInitialized0
in class IgniteSpiAdapter
spiCtx
- SPI context.IgniteSpiException
- In case of errors.public IgniteSpiContext getSpiContext()
getSpiContext
in class IgniteSpiAdapter
public void spiStop() throws IgniteSpiException
Note that this method can be called at any point including during recovery of failed start. It should make no assumptions on what state SPI will be in when this method is called.
spiStop
in interface IgniteSpi
IgniteSpiException
- Thrown in case of any error during SPI stop.protected void onContextDestroyed0()
onContextDestroyed0
in class IgniteSpiAdapter
public void onClientDisconnected(IgniteFuture<?> reconnectFut)
onClientDisconnected
in interface IgniteSpi
onClientDisconnected
in class IgniteSpiAdapter
reconnectFut
- Future that will be completed when client reconnected.public void onClientReconnected(boolean clusterRestarted)
onClientReconnected
in interface IgniteSpi
onClientReconnected
in class IgniteSpiAdapter
clusterRestarted
- True
if all cluster nodes restarted while client was disconnected.protected void checkConfigurationConsistency0(IgniteSpiContext spiCtx, ClusterNode node, boolean starting) throws IgniteSpiException
checkConfigurationConsistency0
in class IgniteSpiAdapter
spiCtx
- SPI context.node
- Remote node.starting
- If this node is starting or not.IgniteSpiException
- in case of errors.public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException
sendMessage
in interface CommunicationSpi<Message>
node
- Destination node.msg
- Message to send.IgniteSpiException
- Thrown in case of any error during sending the message.
Note that this is not guaranteed that failed communication will result
in thrown exception as this is dependant on SPI implementation.public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes)
nodes
- Nodes to check connection with.public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException
node
- Destination node.msg
- Message to send.ackC
- Ack closure.IgniteSpiException
- Thrown in case of any error during sending the message.
Note that this is not guaranteed that failed communication will result
in thrown exception as this is dependant on SPI implementation.public Collection<InetSocketAddress> nodeAddresses(ClusterNode node, boolean filterReachableAddresses) throws IgniteCheckedException
node
- Node.filterReachableAddresses
- Filter addresses flag.IgniteCheckedException
- If node does not have addresses.protected org.apache.ignite.internal.util.nio.GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException
node
- Remote node.connIdx
- Connection index.IgniteCheckedException
- If failed.protected void processClientCreationError(ClusterNode node, Collection<InetSocketAddress> addrs, IgniteCheckedException errs) throws IgniteCheckedException
node
- Remote node.addrs
- Remote node addresses.errs
- TCP client creation errors.IgniteCheckedException
- If failed.protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC)
sndId
- Sender ID.msg
- Communication message.msgC
- Closure to call when message processing finished.public void simulateNodeFailure()
public TcpCommunicationSpi setName(String name)
setName
in class IgniteSpiAdapter
name
- SPI name.this
for chaining.public static void writeMessageType(ByteBuffer buf, short type)
buf
- Byte buffer.type
- Message type.
Follow @ApacheIgnite
Ignite Database and Caching Platform : ver. 2.7.2 Release Date : February 6 2019