public class JmsStreamer<T extends javax.jms.Message,K,V> extends StreamAdapter<T,K,V>
IgniteDataStreamer
instance.
This Streamer uses purely JMS semantics and it is not coupled with any JMS implementation. It uses MessageListener
to receive messages. You must provide your broker's ConnectionFactory
when
creating a JmsStreamer
.
You must also provide a MessageTransformer
to convert the incoming message into cache entries.
This Streamer has many features:
Session
objects with separate MessageListener
instances each,
therefore achieving native concurrency (in terms of the JMS standard).Session
s in parallel. Size-based closure applies individually to each Session (as transactions
are Session-bound in JMS, so it will fire when that Session
has processed that many messages.
Both options are compatible with each other, or you can disable either (see setter documentation),
but not both.Destination
objects or with names.ignite
Constructor and Description |
---|
JmsStreamer() |
Modifier and Type | Method and Description |
---|---|
void |
setBatchClosureMillis(long batchClosureMillis)
When using batched consumption, sets the time in milliseconds that will elapse before a batch is committed.
|
void |
setBatchClosureSize(int batchClosureSize)
When using batched consumption, sets the amount of messages that will be received before a batch is committed.
|
void |
setBatched(boolean batched)
Batch consumption leverages JMS Transactions to minimise round trips to the broker.
|
void |
setClientId(String clientId)
Sets the client ID of the JMS
Connection . |
void |
setConnectionFactory(javax.jms.ConnectionFactory connectionFactory)
Sets the JMS
ConnectionFactory . |
void |
setDestination(javax.jms.Destination destination)
Sets the JMS
Destination explicitly. |
void |
setDestinationName(String destinationName)
Sets the name of the JMS destination to consume from.
|
void |
setDestinationType(Class<? extends javax.jms.Destination> destinationType)
Sets the type of the destination to create, when used in combination with
setDestinationName(String) . |
void |
setDurableSubscription(boolean durableSubscription)
A true value is only accepted in combination with topic consumption.
|
void |
setDurableSubscriptionName(String durableSubscriptionName)
When using Durable Subscribers, sets the name of the durable subscriber.
|
void |
setExceptionListener(javax.jms.ExceptionListener exceptionListener)
Exception listener for queue/topic failures.
|
void |
setThreads(int threads)
Sets the number of threads to concurrently consume JMS messages.
|
void |
setTransacted(boolean transacted)
Instructs the streamer whether to use local JMS transactions or not.
|
void |
setTransformer(MessageTransformer<T,K,V> transformer)
Compulsory.
|
void |
start()
Starts streamer.
|
void |
stop()
Stops streamer.
|
addMessage, getIgnite, getMultipleTupleExtractor, getSingleTupleExtractor, getStreamer, getTupleExtractor, setIgnite, setMultipleTupleExtractor, setSingleTupleExtractor, setStreamer, setTupleExtractor
public void start() throws IgniteException
IgniteException
- If failed.public void stop() throws IgniteException
IgniteException
public void setConnectionFactory(javax.jms.ConnectionFactory connectionFactory)
ConnectionFactory
.connectionFactory
- JMS ConnectionFactory
for this streamer to use.public void setTransformer(MessageTransformer<T,K,V> transformer)
MessageTransformer
that converts an incoming JMS Message
(or subclass)
into one or multiple cache entries.transformer
- The implementation of the MessageTransformer to use.public void setDestination(javax.jms.Destination destination)
Destination
explicitly. Takes precedence over destinationName if both are set.destination
- JMS Destination
if setting it explicitly.public void setDestinationName(String destinationName)
destinationName
- The name of the destination; will be passed on directly to the broker.public void setDestinationType(Class<? extends javax.jms.Destination> destinationType)
setDestinationName(String)
. It
can be an interface or the implementation class specific to the broker.destinationType
- The class representing the destination type. Suggested values: Queue
or Topic
. Compulsory if using destinationName
.Queue
,
Topic
public void setThreads(int threads)
When working with queues, we will start as
many Session
objects as indicated by this field, i.e. you will get native concurrency.
On the other hand, when consuming from a topic, for obvious reason we will only start 1 message consumer but we will distribute the processing of received messages to as many concurrent threads as indicated.
threads
- Number of threads to use. Default: 1.public void setClientId(String clientId)
Connection
.clientId
- Client ID in case we're using durable subscribers. Default: none.public void setDurableSubscription(boolean durableSubscription)
durableSubscription
- Whether or not to use durable subscriptions. Default: false.public void setTransacted(boolean transacted)
transacted
- Whether to consume or not in a transacted manner. Default: false.public void setBatched(boolean batched)
Rather than ACKing every single message received, they will be received in the context of a JMS transaction which will be committed once the indicated batch closure size or batch closure time has elapsed.
Warning: May lead to duplicate consumption.
batched
- Whether to consume messages in batches. Value true implies transacted = true.
Default: false.setBatchClosureMillis(long)
,
setBatchClosureSize(int)
public void setBatchClosureSize(int batchClosureSize)
batchClosureSize
- The amount of messages processed before a batch is committed. Default: 50.public void setBatchClosureMillis(long batchClosureMillis)
batchClosureMillis
- Milliseconds before a batch is committed. Default: 1000ms.public void setDurableSubscriptionName(String durableSubscriptionName)
durableSubscriptionName
- Name of the durable subscriber. Default: none.public void setExceptionListener(javax.jms.ExceptionListener exceptionListener)
exceptionListener
- ExceptionListener interface implementation.
Follow @ApacheIgnite
Ignite Database and Caching Platform : ver. 2.7.2 Release Date : February 6 2019