Validation of field values and constraints are not supported in the current version.
Attention! The field marked with the @Id annotation must be filled with unique data.
It is highly recommended that you always use the @GeneratedValue annotation in
conjunction with this annotation to automatically generate unique keys.
See Examples page for detailed information.
Entity class registration
For any further operations, we need a user session:
final Session session = Session.getSession();
Register method registers the Dept class in the system directory, then creates and
loads the corresponding proxy class for it (register Dept entity class, as example):
session.registerClass(Dept.class);
Create instance of entity class
All further operations of inserting, changing, or retrieving data must be performed on
an object obtained with the participation of the factory method newEntity(class)
Dept dept = (Dept) session.newEntity(Dept.class);
Persistent operations
The newly created object is not persistent, in order to save it in the database you need
to execute the persist() method:
session.persist(dept);
so, the session supports the following methods:
Object object = session.newEntity(class); //creating a proxy instance of a user class
I repeat once again that in order to use all the features of the Interference engine,
all the operations described below must be performed with the proxy object constructed
by this method after registering the user class, and not with the user class instance
obtained through new. At least in this case, transactions will not be supported, and
in the current version this will most likely lead to an error.
Inserts a newly created instance into the database or saves
changes to an existing one:
session.persist(object);
To change an object (update) you need to use find(), then make the necessary changes to
the object, then execute persist. If identifiers are set up by the application, then
using persist it is possible to change (update) an object in the database both in the
above way (find - change - persist) and using a newly created object with the necessary
identifier. The changed object is locked for changes from other sessions (transactions)
until commit or rollback are executed (see below for transactions).
It should be noted that for objects with @NoCheck annotated identifier, NO checks are
made for the existence of an existing object with such an id. In this case we rely
entirely on the correctness of the mechanism for generating identifiers.
@NoCheck annotation was designed for use with CEP tables that do not use indexes. In the
case of bulk inserts, there will be no decreases performance with lengthy checks. For
mass inserts, it is recommended to use just such an approach, because it is significantly
faster.
Removes an object from the database:
session.delete(object);
The object is locked for changes from other sessions (transactions) until commit or rollback
(see below - transactions).
Returns an instance from the database by
identifier:
Object object = session.find(class, id);
Note: in the current version, the following types are supported for the identifier field:
int,
long
java.lang.Integer,
java.lang.Long
java.util.concurrent.AtomicInteger
java.util.concurrent.AtomicLong
java.lang.String
Transactions isolation
Interference supports transactions for read / write operations. The default isolation
level is READ COMMITTED, which means that all changes made in any transaction will only
apply to those retrievable datasets whose retrieval started after commit was executed in
the original transaction, regardless of the retrieval duration.
All the above methods of extracting or saving data automatically create a transaction,
if it has not yet been created.
To complete the transaction and apply changes to this transaction for the remaining
sessions, we perform:
session.commit();
To complete the transaction and rollback the changes, we use:
session.rollback();
In addition, the following two methods can be used:
session.lock(o);
- the current transaction receives an object lock and creates an
undo-snapshot for it. Returns the current object, i.e. similar to the find method, but
with getting an object lock for subsequent possible changes. The object remains locked
until commit or rollback.
session.start();
- start of the statement - all selections in the current session will
return consistent data at the time the start command is executed in the current session
until commit or rollback are executed. At the same time, it must be understood that the
above methods for extracting data (find, get) execute the start method automatically at
each start.
It is important to understand that the Entity class instance which returned by get() or
find() methods is shared (Interference does not create separate instances for each
session) and supports isolation. So, changing the instance field in one session will not
be visible in another session until commit is called in the first. It does not require
re-retrieving of the Entity class instance.
SQL SELECT clauses available in 2021.1 release:
SELECT [GROUP_FUNC(alias1.group_column), ] alias1.column_name1, alias2.column_name2, ...
FROM fully_qualified_class_name1 alias1, fully_qualified_class_name2 alias2, ...
[WHERE CONDITION1 AND/OR CONDITION2 ...]
[GROUP BY alias1.column_name1, alias2.column_name2, ...]
[ORDER BY alias1.column_name1, alias2.column_name2, ...]
where CONDITIONn is standard SQL condition, e.g.:
alias.num_column_name = 12345
alias.string_column_name = 'string constant'
alias.date_column_name = '01.01.2019' (use config.dateformat)
alias.column_name IN / NOT IN [12345, 12346, ...]
(use square brackets with this clause unlike standard SQL)
alias1.column_name1 = alias2.column_name2
instead equals sign may use least than sign, more than sign,
and least/more with equals signs.
GROUP_FUNC is one of next group function: COUNT(), SUM(), MIN(), MAX(), AVG().
SELECT and FROM clauses is mandatory.
In [ ... ] described optional (not mandatory) clauses.
Executing SQL queries
SQL query is run using following method:
session.execute(query);
where query is a string constant. Execute method returns su.interference.sql.ResultSet
object which contains set of su.interference.proxy.GenericResult objects which is
contains result row data. Table names are indicated as fully qualified class names,
for example, su.interference.entity.Dept.
All class and field names are case sensitive.
The field names in the ResultSet are the specified names or aliases, if the table alias
was used in the field naming, the name in the ResultSet will look like the table alias
+ field name, for example, ddeptName for the d.deptName specified in the request:
String sql = "select d.deptName, e.empName, e.descript from
su.interference.entity.Dept d,
su.interference.entity.Emp e
where d.deptId = e.deptId";
ResultSet rs = s.execute(sql);
Object result = (GenericResult) t.poll(s);
while (result != null) {
Object o = result.getValueByName("ddeptName");
result = (GenericResult) t.poll(s);
}
All data inside the ResultSet will be consistent at the time the SQL query starts.
Batch event processing
Interference open cluster does not currently support the standard DML UPDATE and DELETE operations, instead for bulk table processing (including the optional WHERE clause) we have implemented PROCESS and PROCESS STREAM clauses that allow us to process each record from a selection of one of the EventProcessor interface implementations.
On the one hand, this approach allows us to obtain results similar to those that we would achieve using UPDATE and DELETE, on the other hand, it significantly expands the possibilities for custom processing of records, allowing full event processing.
In order to create a custom EventProcessor implementation, we need to implement two methods:
boolean process(Object event);
in this method, custom event handling should be implemented, in case of successful processing, true is returned.
boolean delete();
if this method returns true, the record will be deleted from the table upon successful completion of processing (the process method returned true).
Next, we can use the following query:
PROCESS fully_qualified_class_name alias
WITHIN fully_qualified_event_processor_class_name
[WHERE condition1 AND/OR condition2 : ]
[ORDER BY alias.column_name : ]
For example, it might look like this:
String sql = "process su.interference.entity.SomeEvent d within su.interference.processor.SomeEventProcessor where d.eventType = 1";
ResultSet rs = s.execute(sql);
The PROCESS statement allows to process records from one specific table in batch mode, currently the query does not support any joins to other tables.
The PROCESS statement is a distributed operation and performs processing on all nodes of the cluster, for which it locks the table at the cluster level while the query is running for any other PROCESS statements may be launched from other nodes or from other sessions.
This processing is performed inside a transaction, therefore, after execution, we need to explicitly apply commit or rollback.
Online complex event processing
Interference supports complex event processing using
SELECT STREAM clause in SQL statement.
The basic differences between of a streaming query and the usual one are as follows:
the session.execute(...) method returns a StreamQueue object, which is an implementation
of ResulSet,
the request is executed asynchronously until StreamQueue.stop() method will be called
or until the application terminates,
the StreamQueue.poll() method returns all records previously inserted into the table
and according to the WHERE condition (if exist) and continues to return newly added records,
each StreamQueue.poll() method always return next record after last polled position
within the session, so that, provided that the SQL request is stopped and called again
within same session, data retrieve was continued from the last fixed position, in another
session data will be retrieve from begin of table,
unlike usual, a streaming request does not support transactions and always returns
actually inserted rows, regardless ofthe use of the commit() method in a session
inserting data (DIRTY READS).
However, it should be keep in mind that rollback() physically deletes data, so use it in
this context with careful. The session.closeStreamQueue method stops the current stream
query execute.
The simplest example is a query from a table that returns either all records or filtered
by some condition. Such a request can be used to broadcast events from node to node or to
generate alerts of a certain type:
String sql = "select stream e.empName, e.descript from
su.interference.test.entity.Event e
where e.eventType = 1";
StreamQueue q = (StreamQueue)
s.execute(sql);
while (true) {
Object o = q.poll();
...
}
Tumbling windows
This example implements the so-called streaming aggregation and assumes that the inserted
records are analyzed in a strictly defined order and for each group of such records one
output record will be generated using group functions such as AVG, COUNT, MAX, MIN, SUM:
String sql = "select stream sum(e.eventValue), e.groupValue from " +
"su.interference.test.entity.Event e group by e.groupValue";
StreamQueue q = (StreamQueue) s.execute(sql);
while (true) {
Object o = q.poll();
...
}
A necessary and important note: since the insertion can be carried out in several
threads, the order in which records are analyzed and grouped is based on the value of the
identifier column (@Id), therefore, we strongly recommend using the @GeneratedValue
annotation for the identifier, which ensures that the order of the increment identifier.
Sliding windows
In this case, unlike the previous case, the output grouped record is generated for each
newly inserted record, and the calculation of group function values is carried out for
some group of records, the size of which is determined using the keyword
WINDOW BY column INTERVAL = value.
It should be noted that the syntax of this keyword differs from the generally accepted
one - the only column with values and the interval by which the window size is determined
are set directly in WINDOW BY:
String sql = "select stream count(e.eventId), sum(e.eventValue) from " +
"su.interference.test.entity.Event e window by e.eventId interval = 100";
StreamQueue q = (StreamQueue) s.execute(sql);
while (true) {
Object o = q.poll();
...
}
If the WINDOW BY keyword contains a column marked with @Id annotation, then the window
size (in the rows) will be constant and equal to the value specified in INTERVAL.
CEP features understanding
Below, we will list the features used in event processing for understanding how it works
within a cluster:
SELECT STREAM always returns DIRTY READS data, regardless of which node the request is
executed and whether the commit was executed.
session.purge(object)
operation deletes an given object regardless of transaction state
and available to execute only on the same node where data is inserted!
@NoDistribute annotation (entity level) disables remote data synchronization for cases when the event
stream is processed on a single node and there is no need to save this data on other
nodes.
@Threshold annotation (entity level) limits the maximum amount of data in the table; upon reaching a
given threshold, data will be deleted from the beginning automatically.
@NoCheck annotation (on @Id column) means that NO checks are made for the existence of an
existing object with such an id. In this case we rely entirely on the correctness of the
mechanism for generating identifiers. @NoCheck annotation was designed for use with CEP
tables that do not use indexes.
@DistributedId annotation
Using standard @Id, @GeneratedValue annotations implies the generation of unique values
within a single node. If your distributed application is guaranteed that data insertion
process will be performed on only one specific node, then this pair of annotations is
enough. If the data can be inserted on different nodes, you must use the @DistributedId
annotation with the above pair of annotations. This annotation guarantees the uniqueness
of the generated identifier within the cluster and is highly recommended for use with
@Id and @GeneratedValue.
Configuration parameters
The current configuration is contained in the config/interference.properties file
The following describes the values of the configuration parameters and provides the
optimal values for most applications of the parameters:
local.node.id - node identifier in the cluster - integer value from 1 to 64.
All nodes in the cluster must have unique identifiers.
The parameter must be specified when creating the instance and cannot be changed further.
files.amount - the number of threads that have the ability to simultaneously execute changes to the repository.
Each thread operates with own, unique selected file.
It is recommended to set a value equal to the number of processor cores.
The default value is 4.
Values from 1 to 64 can be used.
The parameter must be specified when creating the instance and cannot be changed further.
frame.size - size of the physical storage frame in bytes.
The default value is 16384.
Values 4096, 16384, 32768, 65536, 131072, 262144, 524288 can be used only.
Higher values reduce the size of the internal system data in heap but may decrease performance,
see Performance section.
The parameter must be specified when creating the instance and cannot be changed further.
frame.size.ix - size of the physical frame for storing indexes (in bytes).
The default value is 16384.
Values 4096, 16384, 32768, 65536, 131072, 262144, 524288 can be used only.
Higher values reduce the size of the internal system data in heap but may decrease performance,
see Performance section.
The parameter must be specified when creating the instance and cannot be changed further.
codepage - The codepage used to serialize string objects (String).
The default value is UTF-8.
The parameter must be specified when creating the instance and cannot be changed further.
dateformat - Date format used in SQL queries for string constants which used
in WHERE clause condition, and, optionally in the management console (not use in 2020.2)
db.path - path to store data files
rmport - initial numeric value which defining first server port for cluster transport
interactions (see cluster configuration rules below).
mmport - http port for access to the control console via http protocol.
The parameter must be specified when creating the instance and cannot be changed further.
diskio.mode - write mode to disk.
Possibly values:
sync (write through mode)
async (write back mode).
By default, sync is used and it is not recommended to change it.
sync.period - time between writes of changed frames from the queue to disk in milliseconds.
The default is 2000. For OLTP systems, it is recommended to set it to 100-1000,
for storages with rare changes - at 5000-10000.
Min value = 10, max value = 60000.
sync.lock.enable - lock data changes for the duration of a scheduled sync of frames to disk.
By default, set to true.
cluster.nodes - list of nodeIds, hosts and ports of cluster nodes, separated by commas.
The list must contains string of the following format:
nodeId:host:port,nodeId:host:port, : etc.
If the value is not set, the node will function in single mode (as local database)
retrieve.threads.amount - the number of threads for parallel processing of the SQL query.
retrieve.queue.size - size (amount of elements) of blocking queue, which use in SQL retrieve
mechanism for prevent of heap overload. For best performance, use max possibly value depends
of your heap size and amount of SQL queries running simultaneously.
Usually, 10000-100000 is enough for optimal performance.
auto.class.register - a list of fully qualified names of entity classes, separated by commas,
for which when the service starts, verification will be performed and, if necessary, automatic
registration (both for services operating in standalone mode and at the application level)
transport.sync.timeout - internal transport inter-node synchronization timeout (in milliseconds)
transport.read.buffer - amount (in bytes) of internal transport read buffer
transport.write.buffer - amount (in bytes) of internal transport write buffer
cleanup.enable - enable heap management/cleanup
may be true or false, true by default. If false is described, then all frames loaded into heap memory
will remain there until the instance terminates.
cleanup.tx.timeout - timeout in milleseconds for asynchronous cleanup internal transaction data
for closed transactions
cleanup.frames.timeout - timeout for cleanup of unused frames in heap
cleanup.data.threshold - maximum amount of data frames in table before which the table
will be excluded from the cleanup process
cleanup.ix.threshold - maximum amount of index frames before which the index
will be excluded from the cleanup process
cleanup.heap.data.threshold - threshold in percent of heap usage upon reaching which all frames of data
will be cleaned from heap memory (if they satisfy other conditions)
cleanup.heap.ix.threshold - same for indexes
cleanup.heap.temp.threshold - same for temp tables
cleanup.heap.undo.threshold - same for undo frames