22 Support for Reactive Streams Ingestion

Oracle Database Release 19c introduces a Java library that provides support for reactive streams ingestion, which enables customers to efficiently stream data into Oracle Database.

This chapter contains the following topics:

22.1 Overview of Java library for Reactive Streams Ingestion

The new Java library enables Java applications to continuously receive and ingest data from a large group of clients. IoT agents or Telco applications that need to persist large amount of records in the form of rows in an Oracle Database table, can make use of this library. Using the direct path load method of Oracle Database for inserting data, it makes the ingestion process nonblocking and extremely fast. Moreover, with the use of Universal Connection Pool (UCP), the library also furnishes several high-availability and scalability features of the database, like support for table partitions, Oracle RAC connection affinity, and sharding.

22.2 Architecture of Java Library for Reactive Streams Ingestion

The new Java library enables customers to efficiently stream data into the Oracle Database when a large number of clients use the database to persist information in the form of table rows and do not want to be blocked waiting for a synchronous response from the database. This library consists of the following modules:

  • Reactive Streams Ingestion (RSI) Library

    This is the core module that exposes the API to handle the logic. It supports the Memoptimized RowStore Fast Ingest feature of Oracle Database Release 19c to persist the user information in the database quickly and seamlessly. It uses UCP for connection pooling and management, including sharding topology knowledge and Fast Application Notification (FAN) awareness for an Oracle Real Application Cluster (RAC) database.

    For using this library, you must have the following JAR files added to your CLASSPATH:
    • fis.jar
    • ojdbc9.jar
    • ucp.jar
    • ons.jar

    This module is a collection of enhanced JDBC APIs that adds support for direct path load with the new prepareDirectPath method provided in the OracleConnection interface, which creates a new interface that is derived from the PreparedStatement interface. The use of this interface is transparent to the end user and you need to make very little change to your application code for performing a direct path load operation.

    The following code snippet shows how to use this feature:

    PreparedStatement preparedStatement = oracleConnection.
    prepareDirectPath("SCOTT", "EMP", new String[] {"EMPNO", "ENAME"});
    in a loop {
      preparedStatement.setInt(1, someIntValue);
      preparedStatement.setString(2, someStringValue):
    oracleConnection.commit(); // or
  • UCP Library

    This module is an extension of the existing UCP APIs that extract the sharding topology for a sharded database and listen for FAN events. The HSI library uses UCP to be able to re-use connections properly.

    The UCP library recognizes the sharding keys specified by the users and allows them to connect to the specific shard and chunk. As new connections are created, UCP caches the sharding-key ranges to the location of the shard and allows further connection requests to bypass the Global Service Manager (shard director), enabling a faster path key access. UCP can also select an available connection in the pool by providing sharding keys. This enables re-usage of connections to various shards by using the cached routing topology.

    The RSI library uses the sharding APIs exposed by UCP to get a proper connection for the specified sharding key. Each record in RSI can be mapped to a specific unique chunk id and then these records can be grouped together using the unique chunk id. When the RSI library has enough records to send to the database, then it borrows a chunk specific connection from UCP and uses that connection to insert the record into the sharded database.

22.3 Limitations of Java library for Reactive Streams Ingestion

Reactive streams ingestion of data streams may not ingest all the data into the database in case of unexpected crashes or errors because the library may lose some of the records while trying to store them in the database. The amount of data loss depends on the following two values:

  • MaxNoOfRecord

    This is the maximum amount of records to buffer in memory. This value is the maximum amount of records that might be lost in case of a database crash. There is a tradeoff between performance and the potential loss of records.

  • MaxBufferInterval

    This is the maximum amount of time to hold a record in memory. If the value of this parameter is low, then the number of records in memory that might be lost is also less. There is a tradeoff between performance and the potential loss of records.

Apart from the possibility of data loss due to database crashes, this library has the following limitations:

  • Triggers are not supported.
  • Referential integrity is not checked.
  • Clustered tables are not supported.
  • Loading of remote objects is not supported.
  • LONG data type must be specified as the last column.
  • LONG data type with streams is not supported
  • Loading of VARRAY columns is not supported.
  • All partitioning columns must appear before any LOB data type.