In last week's post, "Distributed data structures: Part 1 (overview),"I talked about why you need distributed data structures (hereinafter - RSD) and disassembled several options offered by the distributed cache Apache Ignite. Today I want to talk about the details of the implementation of specific RSD, as well as a small educational program on distributed caches. To begin with, at least in the case of Apache Ignite, RDBs are not implemented from scratch, but are a superstructure over a distributed cache.
To support the operation of the RSD, two caches are used: one is Replicated and one is Partitioned.
Replicated-cache - in this case it is a system cache, ( ignite-sys-cache
) responsible, including, for storing information about RSD registered in the system.
Partitioned-cache ( ignite-atomics-sys-cache
) stores the data necessary for the operation of the RSD, and their state.
So, most of the RNC is created as follows:
- The transaction starts.
- In the cache
ignite-sys-cache
, by the keyDATA_STRUCTURES_KEY
, it is takenMap<Имя_РСД, DataStructureInfo>
(it is created if necessary), and a new element with a description, for example, is added to itIgniteAtomicReference
. - In the cache
ignite-atomics-sys-cache
, by the key from the added previouslyDataStructureInfo
added the element responsible for the state of the RSD. - The transaction commits.
At the first request for creating an RSD, a new instance is created, and subsequent requests are received previously created.
IgniteAtomicReference and IgniteAtomicLong (short introduction)
The third step of initialization for both types is to add to ignite-atomics-sys-cache
objects of type GridCacheAtomicReferenceValue
or GridCacheAtomicLongValue
.
Both classes contain one single field val
.
Accordingly, any change IgniteAtomicReference
:
... this is the start EntryProcessor
with the following method code process
:
IgniteAtomicLong
is a de facto extension IgniteAtomicReference
, so its method is compareAndSet
implemented in a similar way.
The method incrementAndGet
does not have checks on the expected value, but simply adds one.
IgniteAtomicSequence ( short introduction)
When you create each instance IgniteAtomicSequence
...
... it is allocated a pool of identifiers.
Accordingly, the call ...
... just increments the local counter until it reaches the upper limit of the value pool.
When the boundary is reached, a new identifier pool is allocated, similar to the way it happens when a new instance is created IgniteAtomicSequence
.
IgniteCountDownLatch ( short introduction)
Counter decrement:
... is implemented as follows:
Waiting for decrementing the counter to 0 ...
... is implemented through the mechanism of Continuous Queries , that is, each change GridCacheCountDownLatchValue
in the cache, all instances are IgniteCountDownLatch
notified of these changes.
Each instance IgniteCountDownLatch
has a local:
Each notification decrements internalLatch
to the current value. Therefore, it latch.await()
is very simple:
IgniteSemaphore ( short introduction)
Getting permission...
... occurs as follows:
Returning permission ...
... occurs in a similar manner, except that the new value is greater than the current value.
IgniteQueue ( short introduction )
Unlike other RSD, IgniteQueue
does not use ignite-atomics-sys-cache
. The cache used is described via the parameter colCfg
.
Depending on the specified Atomicity Mode (TRANSACTIONAL, ATOMIC), you can get different options IgniteQueue
.
In both cases, the state is IgniteQueue
controlled by:
To add an element, use AddProcessor
...
... which, in fact, simply moves the pointer to the tail of the queue.
After that...
... a new element is added to the queue:
Deleting an element is similar, but the pointer changes not to tail
, but to head
...
... and the item is deleted.
The difference between GridAtomicCacheQueueImpl
and GridTransactionalCacheQueueImpl
is that:
GridAtomicCacheQueueImpl
when adding an element, first incrementally incrementallyhdr.tail()
, and then adds the element to the cache by the resulting index.GridTransactionalCacheQueueImpl
makes both actions within the same transaction.
As a result, it GridAtomicCacheQueueImpl
works faster, but the data consistency problem may arise: if information about the size of the queue and the data itself is not stored at the same time, then they can not be deducted simultaneously.
It is quite possible that inside the method poll
it is seen that the queue contains new elements, but the elements themselves are not yet there. This is extremely rare, but still possible.
This problem is solved by the timeout value waiting.
Instead of concluding
I would like to note once again that a distributed cache is, in essence, ConcurrentHashMap within a set of computers that are clustered together.
Distributed caches can be used to implement many important, complex but reliable systems.
A particular case of implementation is distributed data structures, but in general they are used to store and process huge amounts of data in real time, with the possibility of increasing the volumes or speed of processing by simply adding new nodes.