Read Me First!

Before reading this document, it is recommended that Managing Adaptors is read as it contains information about the entire process of configuring and deploying an Adaptor, whereas this document deals specifically with the off-the-shelf YOUnite Database (DB) Adaptor.

Quick Start

A quick start guide exists here: YOUnite Database Adaptor Quick Start Guide

Introduction

The YOUnite DB Adaptor is an off-the-shelf adaptor that can scan for changes in a JDBC data source, or listen for changes on a Kafka stream. In addition, the YOUnite DB Adaptor listens for messages from YOUnite and updates the data source with those changes.

The following methods of detecting changes are supported:

Method Description

Timestamp

Changes are detected based on a timestamp column in a database table.

Auto Key

Changes are detected based on new records being added to a table.

Change Table

Changes are detected by scanning a "change table" for new records. The "change table" includes the key of the record that changed and what the operation was (INSERT, UPDATE, DELETE).

Streams

Changes are detected by listening on a stream (Kafka is currently supported).

The YOUnite DB Adaptor using Kafka to listen for incoming messages from external data sources.

image

See StreamOptions in the "Adaptor Metadata Configuration" section below.

There are three methods available for scanning database changes: TIMESTAMP, AUTO_KEY, and CHANGE_TABLE. The YOUnite DB Adaptor conducts scans at regular specified time intervals and handles one or more batches during each scan. A scan stops when it runs out of time or a an empty batch is returned:

image

See Table in the "Adaptor Metadata Configuration" section below.

YOUnite DB Adaptor Processing Service

The YOUnite DB Adaptor includes a processing service that keeps track of:

  • Scanning information (what range of timestamps or keys has been scanned)

  • Incoming records from YOUnite (to prevent infinite loops) and batches of changes identified by a scan.

In addition to providing a place to store this information, the processing service ensures that in the event of a crash, the Adaptor can pick up where it left off and no transactions will be lost.

The processing service is implemented using Mongo DB.

image

See Processing Service Configuration in the "Service Configuration" section below.

Supported Data Sources

The YOUnite Database Adaptor supports many popular databases using the open source QueryDSL project to assist with translations to the various SQL dialects of different databases. See the JDBC Configuration section below for a list of databases that are supported.

Even if a database is not in the list of supported databases, it may still be compatible with the YOUnite DB adaptor if:

  1. It has a JDBC driver

  2. It meets minimal standards:

    1. Must be able to return a current timestamp (for timestamp scanning)

    2. Must support the usual SELECT, INSERT, UPDATE, DELETE operations

    3. Must support IN ( .. ) clause

  3. If it meets these criteria, the database type that has the same current timestamp function should be used as the rest of the criteria is standard SQL. See the property template.name in the JDBC Adaptor section.

  4. If the above does not work, a custom Java class can be developed to implement the proper translations - contact YOUnite for additional information.

In addition to JDBC data sources, the YOUnite DB Adaptor supports MongoDB, NEO4J and Kafka streams.

An important note regarding null handling and missing values

Below are a couple important notes about how the YOUnite DB Adaptor handles null and missing values:

  1. If a data event includes a field with the value explicitly set to null, the value in the database will be set to null.

  2. If a data event does not include a field, its value will not be set or updated in the database.

Reasons why a field may be excluded:

  1. The adaptor that originated the data event did not include data for a field.

  2. Data Governance was applied and a field was excluded.

Configuration Overview

An adaptor’s' configuration is stored in the YOUnite API server’s metadata DB and is broken out into two major sections:

  1. Adaptor Configuration:

    • Connection information e.g. credentials, message bus URL

    • Other general run-time configuration

  2. Metadata Configuration:

    • List of datasources (e.g. databases, streams, including the processing service’s Mongo DB)

    • List of tables. Each table contains:

    • A reference to a "datasource"

    • A reference to a domain:version

    • A table name if the datasource is a database

    • Mappings between domain:version properties and datasource properties (e.g. database column)

    • Database and domain:version key specifications and configuration

    • Optional, processing service collection (table) configuration

    • Optional, change scanning configuration

The first section "Adaptor Configuration" (not "Metadata Configuration") includes credentials and connection information. The credentials and connection information is generated automatically when the adaptor is created in the YOUnite UI or with the YOUnite API request

POST /zone/<zone-uuid>/adaptors

The credentials and connection information can be retrieved from the YOUnite UI or the response body from the above request and must be configured either as environment variables or included in the adaptor.properties file on the adaptor system (see Creating and Managing Adaptors for more).

The "Metadata Configuration" section is configured using the YOUnite UI or the YOUnite API and forwarded to the Adaptor after it establishes a connection with the YOUnite API server.

Adaptor Configuration

Before starting on this section, read Managing Adaptors.

Required properties:

Property Environment Variable Description Example Value

adaptor.uuid

ADAPTOR_UUID

UUID of the adaptor

3dfcc03d-e5d4-4d57-9e9b-5c5d2db32f9a

client.id

CLIENT_ID

Username used to connect to message bus

8c9167a6-bb83-4f77-bdfc-1947a946f77b

client.secret

CLIENT_SECRET

Password used to connect to message bus

de02e3fa-4b23-46cb-aed6-5665a16e73d3

message.bus.url

MESSAGE_BUS_URL

Message Bus URL

tcp://message-bus-uri:61617

auth.server.url

AUTH_SERVER_URL

OAuth Server to validate adaptor access credentials. The YOUnite Server runs an embedded OAuth server that your implementation may be using. By default it runs on port 8080 so, in this case the value would be http://ip-address-of-the-YOUnite-Server:8080

http://oauth-server-uri

Optional properties:

Property Environment Variable Description Default Value

hostname

HOSTNAME

Name of the host for the health check endpoint. Note that Kubernetes sets this environment variable by default.

localhost

port

PORT

HTTP Port to expose for health checks. The health check endpoint at /health, ie GET host:8001/health

8001

concurrency

CONCURRENCY

Concurrency for incoming data messages, ie, number of threads that will handle incoming messages. Note that this could lead to messages being processed out of order, though they are guaranteed to be processed in order based on their message grouping (the default is unique ID of the record, ie the DR UUID).

1

ops.concurrency

OPS_CONCURRENCY

Concurrency for incoming ops messages, ie, number of threads that will handle incoming messages. These are messages with adaptor status information and metadata, request for stats, shutdown requests. Expressed as a range, ie "1-5" or a single value indicating the maximum (with an implied minimum of 1).

1

assembly.concurrency

ASSEMBLY_CONCURRENCY

Concurrency for incoming assembly messages, ie, number of threads that will handle incoming messages. These are federated GET requests. Expressed as a range, ie "1-5" or a single value indicating the maximum (with an implied minimum of 1).

1

session.cache.size

SESSION_CACHE_SIZE

JMS session cache size.

10

enable.metrics

ENABLE_METRICS

Enable / disable displaying metrics at a given interval.

true

metrics.log.interval

METRIC_LOGS_INTERVAL

Interval in milliseconds to display metrics.

3600000 (one hour)

n/a

LOG_LEVEL

Log level for YOUnite logback messages. Applies to younite-db-adaptor implementations, but not the Adaptor SDK. Must come from an environment variable.

INFO

n/a

ROOT_LOG_LEVEL

Log level for non-YOUnite logback messages. Applies to younite-db-adaptor implementations, but not the Adaptor SDK. Must come from an environment variable.

INFO

Example:

# Configuration

# Adaptor UUID
adaptor.uuid = 3dfcc03d-e5d4-4d57-9e9b-5c5d2db32f9a

# ClientID and Secret to be used by JMS to connect to the message bus
client.id = 8c9167a6-bb83-4f77-bdfc-1947a946f77b
client.secret = de02e3fa-4b23-46cb-aed6-5665a16e73d3

# Message Bus URL
message.bus.url = tcp://192.2.200.25:61616

# Optional configuration
concurrency = 4
ops.concurrency = 1-4
assembly.concurrency = 2-2
session.cache.size = 10

Health Check Endpoint

The adaptor includes a HTTP endpoint to check the health of the adaptor at /health. The default port is 8001.

If the adaptor is healthy, it will return a 200 OK response when querying the endpoint. If it’s not healthy, it will return a 503 Service Unavailable. For example:

{
  "status": "UP"
}

In addition, it will return some information about the adaptor such as the status of each service and the uptime. For example, if the employee v1 domain handler is not working, the adaptor might return something like this:

{
  "status": "DOWN",
  "adaptorService": "UP",
  "upTimeMilliseconds": 100000,
  "customer:1": "UP",
  "employee:1": "DOWN"
}

Adaptor Metadata Configuration

The Adaptor Metadata Configuration contains the rest of the configuration for the YOUnite DB Adaptor. This configuration is stored in JSON format in the metadata of an adaptor. See Managing Adaptors, which discusses how to assign metadata to an adaptor, which can be done when the adaptor is created or later by updating the adaptor metadata in the UI or via the API.

You can jump ahead to a full metadata example or, read the specifications below:

Metadata Schema Specification

The top level of the metadata schema contains a list of databases and tables.

{
    "databases": { "string" : {DatabaseConnection}, ... },
    "tables": [ {Table}, ... ]
}
Property Description Type Default Value Required

databases

Key = data source name, value = DatabaseConnection.

Object of String → DatabaseConnection

none

yes

tables

List of table specifications.

List of Table

none

yes

DatabaseConnection

A DatabaseConnection contains connection information for a database referenced by a Table:

{
    "type": "string",
    "properties": { "string": object, ... }
}
Property Description Type Default Value Required

type

Database type. JDBC, MONGO, NEO4J, KAFKA, etc or custom type.

String

none

yes

properties

List of properties for the connection.

Object of String → Object

none

yes

Notes:

  1. Type is case sensitive and must correspond to a built-in type (JDBC, MONGO, NEO4J, KAFKA, etc) or a custom type.

  2. Properties are specific to the database type. See JDBC Configuration, Processing Service Configuration and Kafka Database Metadata Specification for information about what the properties mean.

Table

A Table is the specification of an entity that contains data:

{
    "_ Domain and Version _": "",
    "domain": "string",
    "version": integer,

    "_ Data Source _": "",
    "dataSource": {DataSource},
    "dataColumns": { "string": {Column}, ... },
    "readOnly": boolean,
    "writeOnly": boolean,

    "_ Change Scanning and Key Management _": "",
    "scanMethod": "string",
    "scanOptions": {ScanOptions},
    "primaryKeySpecs": [ {PrimaryKeySpec}, ... ],
    "autoKeySpec": {AutoKeySpec},
    "timestampSpec": {TimestampSpec},
    "streamOptions": {StreamOptions},
    "changeTable": {ChangeTable},

    "_ Processing Service _": "",
    "processingDataSource": {DataSource},
    "localIdentifierStoreDataSource": {DataSource}
}
Property Description Type Default Value Required

domain

Data domain name defined in YOUnite that this table maps to.

String

none

yes

version

Data domain version defined in YOUnite that this table maps to.

Integer

none

yes

readOnly

Set to true if no updates are allowed on this table.

Boolean

false

no

writeOnly

Set to true if updates are allowed on this table, but YOUnite is not permitted to GET data from this table.

Boolean

false

no

dataSource

Data source for this table.

DataSource

none

yes

dataColumns

Column names and specifications of the data columns in the table that will map to a YOUnite domain version's schema. domain schema.

Object of String → Column

none

yes

scanMethod

Scan Method. Allowed values are: TIMESTAMP, AUTO_KEY, CHANGE_TABLE, STREAM and NONE.

String

none

yes

scanOptions

Scan options. Required when scanMethod = TIMESTAMP, AUTO_KEY or CHANGE_TABLE.

ScanOptions

none

See ScanOptions below

primaryKeySpecs

Specifications for primary keys and/or unique keys that uniquely identify records in the table.

List of PrimaryKeySpec

none

yes

autoKeySpec

Automatically generated key specifications. Required if scanMethod = AUTO_KEY, optional if scanMethod = CHANGE_TABLE.

AutoKeySpec

none

See AutoKey specification below

timestampSpec

Timestamp specification. Required if scanMethod = TIMESTAMP.

TimestampSpec

none

See TimeStamp specification below

streamOptions

Stream options. Required when scanMethod = STREAM.

StreamOptions

none

See StreamOptions below

changeTable

Change table options. Required when scanMethod = CHANGE_TABLE.

ChangeTable

none

See ChangeTable below

processingDataSource

Processing data source for this table.

DataSource

none

See "Notes" below.

localIdentifierStoreDataSource

Optional data source used to store the mapping between a data record’s UUID in YOUnite and the local identifier at this adaptor.

DataSource

none

See "Notes" below.

Notes:

  1. The domain name and version reference the domain / version defined in YOUnite.

  2. The key of dataColumns is the name of the column in the table. The Column specification defines what the name of the property is in the data domain version schema.

  3. dataColumns must ONLY include columns that are defined in the specified data domain version in YOUnite. This must include any primary key columns for a PrimaryKeySpec of type DOMAIN.

  4. When writeOnly = true, YOUnite will not be allowed to request data from this table via a Federated GET. However, if the table is set up to perform scanning, it may still send data to YOUnite in the normal way.

  5. A processing data source is always required unless the adaptor only receives incoming changes from YOUnite for this table, or if ScanMethod = STREAM and streamOptions.matchIncoming = false.

  6. A local identifier store data source is required if the adaptor will be performing "deletion scanning" to look for deleted records. If the adaptor does not require this capability, the local identifier store is not required.

  7. See Appendix B for more information about primary key specifications.

DataSource

A DataSource is a reference to a DatabaseConnection with specific database, schema and table information:

{
    "name": "string",
    "databaseName": "string",
    "schema": "string",
    "tableName": "string",
    "cacheSize": 1000
}
Property Description Type Default Value Required

name

Name of a DatabaseConnection listed in databases.

String

none

yes

databaseName

Database name.

String

none

no* (except for Mongo)

schema

Schema name.

String

none

no

tableName

Table name.

String

none

yes* (except for Mongo and local identifier stores)

cacheSize

Applicable to local identifier store only. The size of the cache in memory of local identifier mappings.

Integer

1000

no

Notes:

  1. Required / optional parameters may vary depending on the database type. See JDBC Configuration, Processing Service configuration and Kafka Database Metadata Specification for more information on these database connection types.

  2. For local identifier stores, the tableName is optional. If not specified, the table name will be id_store_{table name}_{version}_{adaptor uuid with underscores instead of dashes}.

  3. If a table name is specified for a local identifier store it MUST be unique and not shared with any other adaptor or with any other table specification.

Column

A Column is the specification of a column in a table (or property in a stream):

{
    "source": "string",
    "columnType": "string"
}
Property Description Type Default Value Required

source

Name of the column in the domain/version specification in YOUnite.

String

See "Notes" below

no

columnType

Column type. Must be one of the following: STRING, LONG, DOUBLE, DATE, LOCAL_DATE, LOCAL_TIME, TIMESTAMP.

String

STRING

no

Notes:

  1. If source is not specified, it will default to the name of the column in the database.

  2. The columnType corresponds to the Java data type (for TIMESTAMP its in the java.sql data type).

  3. What’s with all the date types?

    1. DATE = an instant in time with millisecond precision relative to a time zone.

    2. LOCAL_DATE = a date (no time) with no time zone information.

    3. LOCAL_TIME = a time (no date) with no time zone information.

    4. TIMESTAMP = a higher precision date with up to nanosecond precision.

ScanOptions

The ScanOptions defines specific options for scanning for changes. These are required when using the scanning method of detecting changes.

{
    "processorConcurrency": integer,
    "maxBatchSize": integer,
    "enableIncoming": boolean,
    "enableDeletionScan": boolean,
    "enableModificationScan": boolean,
    "enableProcessor": boolean,
    "modScanPriority": double,
    "maxModScanTimeMilliseconds": integer,
    "processorFullRange": "string",
    "scanIntervalMilliseconds": long,
    "preloadNextBatch": boolean,
    "keyReadBatchSize": integer
}
Property Description Type Default Value Required

processorConcurrency

Concurrency to use when processing a batch of changes identified by a scan.

Integer

1

no

maxBatchSize

Maximum number of changes to be included in a batch to be processed. Note that multiple batches can be run in a single scan (see scanIntervalMilliseconds and maxModeScanTimeMilliseconds). Set this no higher than 60,000.

Integer

10,000

no

enableIncoming

True or false indicating whether incoming changes from YOUnite should be accepted.

Boolean

false

no

enableDeletionScan

True or false indicating whether deletion scanning should be performed.

Boolean

false

no

enableModificationScan

True or false indicating whether modification scanning should be performed (inserts and updates).

Boolean

false

no

enableProcessor

True or false indicating whether processing of batches of changes should be enabled.

Boolean

false

no

modScanPriority

Priority of modification scans relative to deletion scans (if both are enabled).

Double

1.0

no

maxModScanTimeMilliseconds

Maximum time in milliseconds to spend scanning for modifications.

Integer

5,000

no

processorFullRange

Range at which a processor is considered full. Can be expressed as a single integer (ie 10) or a range (8-10). See "Notes" below.

String

8-10

no

scanIntervalMilliseconds

Minimum interval in milliseconds at which to perform a scan.

Long

1,000

no

preloadNextBatch

Pre-load a batch of changes, for efficiency, if multiple batches are awaiting processing.

Boolean

true

no

keyReadBatchSize

For processes that identify records by primary key, the batch size to use. See "Notes" below.

Integer

1,000 / # of columns in primary key

no

Notes:

  1. By default, all scanning options are disabled (plus allowing incoming changes from YOUnite). This is by design to ensure that only those options that are desired are explicitly enabled.

  2. The scanning method of detecting changes is a two step process. First, the primary key and timestamp of changed records is detected and enqueued for processing. Second, the identified changes are processed in timestamp order* (see note #3 below) and sent to YOUnite. These two processes (scan and process) are run in parallel, however, only one scan and one process task can be running at any given time.

  3. When processorConcurrency is greater than one, changes are not guaranteed to be sent in timestamp order, however, only one batch is processed at a single time, so modifications to specific records are guaranteed to be sent in the order that they are detected.

  4. When processorFullRange is expressed as a range (ie 5-10), the processor is considered "full" when it reaches 10 pending batches and scanning will not resume until the number of pending batches goes below the 5.

  5. keyReadBatchSize is used to limit the size of queries (and prevent the system from crashing) for processes that need to read batches of records by primary key. Queries by primary key can get large as they typically involve long IN (…​ ) clauses or lots of ANDs and ORs - above a certain limit they may cause the system to crash. The default value of 1,000 divided by the number of primary key columns has been found to work well and provide good performance (much better than reading the records one at a time!).

PrimaryKeySpec

A PrimaryKeySpec is the specification of a primary key or unique key. Up to two specifications may be included, one of type DOMAIN and one of type LOCAL or LOCAL_AUTO.

{
    "columns": { "string": {PrimaryKeyValue}, ... },
    "type": "string"
}
Property Description Type Default Value Required

columns

Columns that make up the primary or unique key

PrimaryKeyValue

none

yes

type

Type of primary / unique key. Must be one of DOMAIN, LOCAL or LOCAL_AUTO. See below.

String

none

yes

Primary Key Types:

  • DOMAIN = A primary / unique key based on values from the data domain version schema.

  • LOCAL_AUTO = A primary / unique key that is an auto-incrementing number. One column can be defined for this type of key and it must be of data type LONG.

  • LOCAL = A primary / unique key that is not an auto-incrementing number.

Notes:

  1. The key of columns is the name of the column in the table.

  2. It is highly recommended that a LOCAL or LOCAL_AUTO key is defined if available and if it differs from the values that are included in the data domain schema. See Appendix B for more information about primary key specifications.

PrimaryKeyValue

A PrimaryKeyValue is the specification of a column that is part of a primary key specification.

{
    "columnType": "string",
    "required": boolean
}
Property Description Type Default Value Required

columnType

Column type. Must be one of the following: STRING, LONG, DOUBLE, DATE, LOCAL_DATE, LOCAL_TIME, TIMESTAMP.

String

none

yes

required

Is the column required, ie "not null" in the database?

Boolean

true

no

AutoKeySpec

The AutoKeySpec defines specific options for the AUTO_KEY Scan Method and optionally the CHANGE_TABLE Scan Method. This method is used for tables that have a non-negative auto-incrementing primary key:

{
    "columnName": "string",
    "defaultStartKey": long,
    "gapDetectionEnabled": boolean,
    "gapDetectionTimeout": long
}
Property Description Type Default Value Required

columnName

Column name in the database.

String

none

yes

defaultStartKey

When the adaptor runs for the first time, the key it should start with.

Long

none

yes* (See "Notes" below)

gapDetectionEnabled

Enable gap detection during scanning. See Appendix A: Gap Detection for a description.

Boolean

true

no

gapDetectionTimeout

Time to wait in milliseconds for an auto-incrementing key gap to be resolved if gap detection is enabled.

Long

5,000

no

Notes:

  1. YOUnite is only capable of handling unsigned auto-incrementing columns with a maximum precision of 64 bits which corresponds to the long data type in Java 8.

  2. defaultStartKey is required for the Adaptor to begin its first scan, unless (warning, advanced option!! do not attempt unless you know what you’re doing!!) the processing table is manually updated with previous scan information.

TimestampSpec

The TimestampSpec defines specific options for the TIMESTAMP Scan Method. This method is used for tables that have a column with the date/time that the row was created or modified. The precision of this column may vary, but it must be a data type that includes a date and optionally a time.

{
    "columnName": "string",
    "type": "string",
    "precision": "string",
    "scanStartTime": "string",
    "lagTimeMilliseconds": long,
    "overlapMilliseconds": long
}
Property Description Type Default Value Required

type

Date / time type. Must be either DATE or TIMESTAMP and corresponds to the data type that is produced by a JDBC query against the database. See "Notes" below.

String

DATE

yes

precision

Precision of the column, regardless of type. Must be NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS or MINUTES.

IMPORTANT: This must exactly match the precision of the timestamp column (columnName) in the underlying data source.

See "Notes" below.

String

MILLISECONDS

no

setOnAddOrUpdate

Set the timestamp when adding or updating records. For data sources / tables that don’t have triggers that automatically set the timestamp. The system time of the adaptor will be used to set the timestamp.

Boolean

false

no

scanStartTime

When the adaptor runs for the first time, the timestamp must be in ISO-8601 format with optional precision up to a nanosecond: YYYY-MM-DDThh:mm:ss.nnnZ.

String

none

yes* (See "Notes" below)

lagTimeMilliseconds

Milliseconds to "lag" behind the current time (as reported by the database) for scanning.

Long

0

no

overlapMilliseconds

Milliseconds to overlap scans to accommodate pending transactions. Recommended! and its important to read all notes in this section (especially "Very Important Notes" above) and the Scan Options below.

Long

0

no

Notes:

  1. The DATE type is capable of millisecond precision and the TIMESTAMP type is capable of nanosecond precision. DATE should work for nearly all cases, regardless of the data type in the database, and is recommended. Some databases are capable of higher precision, such as microsecond or even nanosecond precision in which case TIMESTAMP can be used.

  2. If the type is DATE the precision may not be finer than MILLISECONDS.

  3. The precision should exactly represent the precision of the database column. For example, if it is only capable of representing the date in hours:minutes:seconds, then SECONDS precision should be specified. Precision greater than MINUTES is not supported.

  4. scanStartTime is required for the Adaptor to begin its first scan, unless (warning, advanced option!! do not attempt unless you know what you’re doing!!) the processing table is manually updated with previous scan information.

Important
Very Important Notes Regarding OverlapMilliseconds
  1. There will typically be a delay between the time a "this record was updated" timestamp is calculated and when it is committed to the database. In fact, for databases that provide transactional support (most of them), it is entirely possible that this old timestamp value could be sitting in an uncommitted transaction for quite some time before it is finally committed to disk. To mitigate this, the overlapMilliseconds property should be set to the maximum amount of time a transaction may be pending before it is committed to disk.

  2. The higher the value of overlapMilliseconds, the higher the performance penalty on scanning; however, unless the system has an extremely large number of records, the effect should be minimal as only primary key and timestamp information for overlapping records is read on each scan (and those that have already been processed are discarded).

  3. Another consideration with overlapping scans should be maxBatchSize in ScanOptions. It is recommended that maxBatchSize be set to the number of estimated transactions during overlapMilliseconds plus the maximum number of changes expected per scan to reduce the number of queries required to find new changes. For example, if overlapMilliseconds is set at one hour (or 3,600,000ms), and the database typically sees 5,000 transactions per hour, maxBatchSize should be set greater than 5,000 (say at least 6,000 to be safe). If uncertain of what value to set, the default of 10,000 is recommended.

Important
Very Important Notes Regarding MaxBatchSize and Timestamps

Database applications typically use the INSERT statement to create a new record in a database table and each INSERT typically gets a unique timestamp or at most a few INSERTs may have the same timestamp (but not more than a large maxBatchSize).

If the number of rows created or updated in a table with identical timestamps greater than maxBatchSize, an adaptor overflow data event exception will be generated and the batch will be ignored.

This can occur in the following scenarios:

  • A single SQL COPY command with a CSV file with more rows than maxBatchSize:

COPY  your_table(your_table_rows) FROM 'YOUR_LARGE_SQL_FILE' WITH (YOUR_CSV_FORMAT)
  • A bulk UPDATE or INSERT larger than maxBatchSize were an exact timestamp value is specified e.g.:

UPDATE your_table SET timestamp_column = CURRENT_TIMESTAMP WHERE condition;

If this is a possibility, set maxBatchSize to an amount higher than the largest possible batch or use another change scanMethod such as CHANGE_TABLE.

StreamOptions

The StreamOptions defines specific options for streaming service such as Kafka:

{
    "concurrency": integer,
    "maxDeliveryAttempts": integer,
    "redeliveryDelay": long,
    "incomingTopic": "string",
    "outgoingTopic": "string",
    "incomingDataRequestTopic": "string",
    "outgoingDataRequestTopic": "string",
    "matchIncoming": boolean
}
Property Description Type Default Value Required

concurrency

Number of concurrent Kafka consumers to start, that will listen to incoming messages from Kafka.

Integer

1

no

maxDeliveryAttempts

Maximum number of times to attempt to deliver a message from the Kafka stream to YOUnite before abandoning the message.

Integer

5

no

redeliveryDelay

Delay in milliseconds between redelivery attempts.

Long

1,000

no

incomingTopic

Name of the Kafka topic to poll for incoming messages. If no topic is specified then the adaptor will not listen for messages.

String

none

no* (either incoming or outgoing is required)

outgoingTopic

Name of the Kafka topic to send data events received from YOUnite to. If no topic is specified then the adaptor will not listen for YOUnite data events.

String

none

no* (either incoming or outgoing is required)

incomingDataRequestTopic

Name of the Kafka topic to poll for incoming messages related to data requests (such as federated GET, or retry of data event exception).

String

none

no

outgoingDataRequestTopic

Name of the Kafka topic to send data requests to (such as federated GET, or retry of data event exception).

String

none

no

matchIncoming

True or false indicating whether messages from YOUnite that are sent to the outgoingTopic should be matched against messages received on the incomingTopic to prevent an infinite loop.

Boolean

True if both an incoming and outgoing topic are specified otherwise false.

no

ChangeTable

The ChangeTable defines specific options for the CHANGE_TABLE scan method. Note that these specifications are for the change table that identifies changes, whereas the other settings in the Table are for the table with the actual data:

{
    "autoKeyColumnName": "string",
    "defaultStartKey": long,
    "changeTypeColumnName": "string",
    "dataSource": {DataSource},
    "gapDetectionEnabled": boolean,
    "gapDetectionTimeout": long
}
Property Description Type Default Value Required

autoKeyColumnName

Name of the auto-incrementing column in the change table.

String

none

yes

defaultStartKey

They key the adaptor starts with when the adaptor runs for the first time.

Long

none

yes* (See "Notes" below)

changeTypeColumnName

Name of the column in the change table that indicates the type of change that was made. See "Notes" below.

String

none

yes

gapDetectionEnabled

Enable gap detection during scanning. See Appendix A: Gap Detection for a description.

Boolean

true

no

gapDetectionTimeout

Time to wait in milliseconds for an auto-incrementing key gap to be resolved if gap detection is enabled.

Long

5,000

no

Notes:

  1. YOUnite is only capable of handling unsigned auto-incrementing columns with a maximum precision of 64 bits which corresponds to the long data type in Java 8.

  2. defaultStartKey is required for the Adaptor to begin its first scan, unless (warning, advanced option!! do not attempt unless you know what you’re doing!!) the processing table is manually updated with previous scan information.

  3. changeTypeColumnName indicates the column in the change table that specifies the type of change made. The values in this column must correspond to the SQL operations INSERT, UPDATE and DELETE, or the first letter of each operation (I, U or D).

Inbound and Outbound Transformations

Inbound and outbound transformations provide utilities for performing data transformations in the DB adaptor on payloads originating from:

  • Source systems (outbound) destined for YOUnite

  • YOUnite (inbound) to source systems

Transformations use a Javascript engine, which follows the ES5 standard for the most part (it’s not fully ES6 compatible).

A full example can be found in the YOUnite Off-the-Shelf Adaptor Transformation Example

Overview

A transformation job includes a list of "steps". There are three types of steps - branch, rest and script:

  • A branch step executes a script and sets the next step to executed based on the result.

  • A rest step executes a REST request, storing the results as a value in the _requests object named after the name of the step. For example, if the name of the step is get_token, the result is stored in _requests.get_token or _requests['get_token'] (either syntax is valid in JavaScript). The result from the request is stored as an object, so for example if the response has an access_token, it can be accessed by _requests.get_token.access_token. Alternatively, use the custom "path" function to safely get a value from the payload or null if the value cannot be found, ie var token = path(_requests, 'get_token.access_token').

  • A script step executes some JavaScript. This can be used to set variables, alter the next step that is executed conditionally, set an error condition, etc. Note that a rest step also has the ability to include pre and post-processing scripts.

Special Variables

There are several special variables that are used throughout the processing of the transformation:

  • _record (object): The data record to be transformed. Elements in this object may be added, removed and/or updated and will be returned as the result of the transformation.

  • _action (string): The action that is being performed. This will be POST, PUT, GET or LINK_DR. The most common usage of this would be to skip certain steps on a GET, which is a request for the data record from YOUnite, typically for federated DR assembly. On a GET, you might not want to run certain data quality tools, for example, and instead just return the data as-is.

  • _response (string): Response from the last executed REST request, if it was successful. Note that this value is only temporary and will be wiped out each time a new REST request is made.

  • _requests (object): Responses to all executed REST requests. When a REST request succeeds, its is parsed and stored by step name is this object. If jsonResponse=true for the REST request, the response will be parsed into an object, otherwise the raw string of the response is stored.

  • _error (boolean): A flag indicating whether an error has occurred. This is set automatically if any step fails. It may be updated by any script step to set or unset the error condition. Note that setting the error condition itself does not stop the transformation, the _next_step variable must be set to null to stop the transformation.

  • _error_message (string): The error message that should be set whenever _error is set to true.

  • _next_step (string): The name of the next step to be performed. For a rest or script step, this is set automatically at the beginning of the step based on the value in successStep and may be modified as needed in scripts performed in the step. If an error occurs during a step (ie the REST request fails or a script causes an exception), this value is changed to the value of errorStep. For a branch step, this value is set to the value of trueStep or falseStep depending on the result of the conditional statement. Setting this value to null will stop the transformation.

Scripting Notes & Tips
  • A script that is executed operates in the same context as the last, therefore any variable assignment in one script can be referenced in a later step.

  • Substitution can be performed in the various parts of a REST request (body, headers, query parameters, etc) by using the ${variable-name} syntax. Paths are supported and will be safely evaluated, returning null if the element does not exist, ie ${_requests.token.access_token} or ${_requests['token']['access_token']}. For more complex scenarios where variable substitution is not enough, use the "pre" to run some code to set up variables for the request, then use variable substitution.

  • By default each transformation step can only be run a single time, to avoid infinite loops. If a step needs to be executed multiple times, its maximumInteractions value can be set to allow this (be careful of infinite loops!!).

Full Adaptor Metadata Example

The following example is an adaptor for the PostgreSQL table customer. A Mongo database is used as the processing table and the TIMESTAMP method of scanning is used.

{
    "databases": {
        "mongo": {
            "type": "MONGO",
            "properties": {
                "connectionString": "mongodb://admin:admin@mongo:27017/admin"
            }
        },
        "postgres": {
            "type": "JDBC",
            "properties": {
                "jdbcUrl": "jdbc:postgresql://postgres-db:5432/customers",
                "dataSource.password": "mysecretpassword",
                "dataSource.user": "postgres",
                "maximumPoolSize": "10"
            }
        }
    },
    "tables": [
        {
            "domain": "customer",
            "version": 1,
            "primaryKeySpecs": [
                {
                    "type": "DOMAIN",
                    "customer_id": {
                        "columnType": "STRING",
                        "required": true
                    }
                },
                {
                    "type": "LOCAL_AUTO",
                    "id": {
                        "columnType": "LONG",
                        "required": true
                    }
                }
            ],
            "timestampSpec": {
                "columnName": "updated",
                "type": "TIMESTAMP",
                "precision": "MICROSECONDS",
                "scanStartTime": "2020-01-09T20:42:47.936Z",
                "lagTimeMilliseconds": 0,
                "overlapMilliseconds": 60000
            },
            "dataColumns": {
                "customer_id": {
                    "source": "customerId",
                    "columnType": "STRING",
                    "updatable": false
                },
                "first_name": {
                    "source": "firstName",
                    "columnType": "STRING"
                },
                "last_name": {
                    "source": "lastName",
                    "columnType": "STRING"
                },
                "email": {
                    "source": "email",
                    "columnType": "STRING"
                },
                "gender": {
                    "source": "gender",
                    "columnType": "STRING"
                },
                "birth_date": {
                    "source": "birthDate",
                    "columnType": "LOCAL_DATE"
                },
                "phone": {
                    "source": "phone",
                    "columnType": "STRING"
                },
                "address": {
                    "source": "address",
                    "columnType": "STRING"
                },
                "city": {
                    "source": "city",
                    "columnType": "STRING"
                },
                "state": {
                    "source": "state",
                    "columnType": "STRING"
                },
                "zip": {
                    "source": "zip",
                    "columnType": "STRING"
                },
                "service_rep": {
                    "source": "serviceRep",
                    "columnType": "STRING"
                },
                "last_visit": {
                    "source": "lastVisit",
                    "columnType": "LOCAL_DATE"
                },
                "account_balance": {
                    "source": "accountBalance",
                    "columnType": "DOUBLE"
                }
            },
            "dataSource": {
                "name": "postgres",
                "schema": "public",
                "tableName": "customer"
            },
            "processingDataSource": {
                "name": "mongo",
                "databaseName": "processing"
            },
            "localIdentifierStoreDataSource": {
                "name": "mongo",
                "databaseName": "id-store"
            },
            "scanMethod": "TIMESTAMP",
            "scanOptions": {
                "processorConcurrency": 4,
                "maxBatchSize": 10000,
                "enableIncoming": true,
                "enableProcessor": true,
                "enableModificationScan": true,
                "enableDeletionScan": true,
                "scanIntervalMilliseconds": 1000
            }
        }
    ]
}

Service Configuration

JDBC Configuration

When configuring a DatabaseConnection that is of type JDBC, the following are some required and common options to use in properties:

Property Description Required

jdbcUrl

JDBC Url for the data source, ie jdbc:postgresql://localhost:5432/customers

yes

dataSource.user

Username to connect to the database as

usually

dataSource.password

Password to use

usually

maximumPoolSize

Connection pool size

no

template.name

Database template name. Leave blank to auto-detect. See Template Names below.

no

template.printschema

Print schema name when running queries? (ie SELECT …​ FROM public.customer vs SELECT …​ FROM customer)

no

More details on options:

  1. HikariCP is used to pool JDBC connections and all of the properties are passed to it with the exception of properties that start with "template." which are used to set certain options in the QueryDSL template used.

  2. HikariCP uses properties that start with "dataSource." to configure the JDBC driver. All other properties (except "template." properties) are used to configure HikariCP itself. So in the example above (see Full Adaptor Metadata Example - "databases.postgres") "dataSource.user" and "dataSource.password" are passed to the JDBC driver to connect to the database whereas "maximumPoolSize" is a configuration property of HikariCP.

  3. These are the configuration options for templates:

    1. template.printschema: true or false to specify whether the schema should be used by SQL queries before table names.

    2. template.quote: true or false indicating whether names should be quoted.

    3. template.newlinetosinglespace: specifies newlines should be converted to spaces

    4. template.escape: specifies a single character that should be used as the escape character.

HikariCP documentation can be found here: https://github.com/brettwooldridge/HikariCP

Template Names

QueryDSL templates handle translations for the various SQL dialects. The template.name property can be used to explicitly use a template (otherwise, if it’s a supported database it should be auto-detected). The following templates are available:

  • CUBRID

  • DB2

  • Derby

  • Firebird

  • HSQLDB

  • H2

  • MySQL

  • Oracle

  • PostgreSQL

  • SQLite

  • SQLServer2005

  • SQLServer2008

  • SQLServer2012

  • Teradata

If none of these templates matches the data source, the closest one can be chosen and may work. For timestamp scanning it’s important that the template used knows how to retrieve the current timestamp, otherwise the rest of the SQL used to SELECT, INSERT, UPDATE and DELETE records is standard and should be supported by nearly any database.

In addition, custom templates may be generated for sources not listed above. Contact YOUnite for more information.

Dynamically Integrating an JDBC Connector

The YOUnite DB Adaptor is designed to support a wide range of databases through the versatile HikariCP connection pool. However, there are scenarios where specific JDBC drivers required by certain databases or database versions are not directly incorporated into the YOUnite DB Adaptor. To address this situation, users must ensure that the necessary JDBC driver is included when deploying the adaptor, particularly when using the adaptor in different environments.

For implementations requiring a JDBC driver not pre-integrated into the YOUnite DB Adaptor, the required JDBC driver .jar file must be explicitly specified. This is achieved by appending the -classpath <path-to-jar-file> option to the command line when launching the YOUnite DB Adaptor. This ensures that the adaptor can establish a reliable connection pool to the database using HikariCP, leveraging the external JDBC driver.

Given that the YOUnite DB Adaptor operates within a Docker container, the following steps are required to create a suitable Docker image that includes the necessary JDBC driver:

  1. Request the latest YOUnite DB Adaptor .jar file from your YOUnite representative

  2. Download the required JDBC connector .jar file

  3. Build a Dockerfile to create a YOUnite DB Adaptor Docker image

  4. Use the Dockerfile to build a YOUnite DB Adaptor Docker image

Below is an example Dockerfile setup:

FROM openjdk:8

# Argument to specify the name of the JAR file for the adaptor
ARG JAR_FILE

# Add the JDBC driver and the YOUnite DB Adaptor JAR file to the container
ADD target/lib /
ADD target/<db-vendor-JDBC.jar> /<db-vendor-JDBC.jar>
ADD target/younite-db-adaptor-<version>.jar /younite-db-adaptor.jar

# Entry point to execute the adaptor, including the necessary classpath settings
ENTRYPOINT ["exec", "java", "$JAVA_OPTS", "-jar", "/younite-db-adaptor.jar", "-classpath", "<path-to-jdbc-jar-file>"]

This Dockerfile template uses build arguments to facilitate the dynamic specification of the YOUnite DB Adaptor JAR file. The ADD commands place the necessary libraries and the adaptor JAR into the container. The ENTRYPOINT defines the command used to start the adaptor, incorporating environment variables and options for Java execution, including where to find the JDBC driver on the classpath.

It is crucial to replace <path-to-jdbc-jar-file> and <db-vendor-JDBC.jar> in the ENTRYPOINT command with the actual path to the JDBC driver .jar file within the Docker container. This setup ensures that the YOUnite DB Adaptor can access the database driver needed to connect to the specific database instance, thereby enhancing the adaptor’s flexibility and compatibility with various database technologies.

Neo4J Configuration

When configuring a DatabaseConnection that is of type NEO4J, the following are the options to use in properties:

Property Description Required

connectionString

Neo4J Connection string. See https://neo4j.com/developer/java/#driver-configuration

yes

username

For basic authentication, the username

no

password

For basic authentication, the password

no

token

For bearer token authentication, the token

no

kerberos

For kerberos authentication, the base64 encoded ticket

no

connectionAcquisitionTimeout

Connection acquisition timeout in seconds

no

connectionLivenessCheckTimeout

Connection liveness check timeout in seconds

no

connectionTimeout

Connection timeout in seconds

no

eventLoopThreads

Event loop threads

no

maxConnectionLifetime

Maximum connection lifetime

no

maxConnectionPoolSize

Maximum connection pool size

no

maxTransactionRetryTime

Maximum transaction retry time

no

userAgent

User agent

no

trustAllCertificates

Set to "true" to trust all certificates. Not recommended in production. If not specified, or "false", only certificates loaded in the system are trusted.

no

encryption

Set to "true" to use encryption. If "false" or not specified, encryption is not used.

no

driverMetrics

Set to "true" to enable driver metrics

no

MongoDB Configuration

When configuring a DatabaseConnection that is of type MONGO, the following are the options to use in properties:

Property Description Required

connectionString

Full MongoDB Connection string, including username and password

yes

See https://docs.mongodb.com/manual/reference/connection-string/ for information about this connection string.

Processing Service Configuration (MongoDB)

Currently MongoDB is required as the "Processing" service. Most Adaptors require a processing service, however, there are two instances where they do not:

  1. When the Adaptor only accepts incoming changes from YOUnite.

  2. When the Adaptor is for Kafka streams and has matchIncoming set to false.

What Does the Processing Service Do?

The processing service keeps track of:

  • Scanning information (what range of timestamps or keys has been scanned)

  • Incoming records from YOUnite (to prevent infinite loops) and batches of changes identified by a scan.

In addition to providing a place to store this information, the processing service ensures that in the event of a crash, the Adaptor can pick up where it left off and no transactions will be lost.

Deploying Mongo DB

When load testing and deploying Mongo DB you may want to take advantage of CPU Advanced Vector Extensions if they are available.

Important
By default, YOUnite uses MongoDB 4.4.6 since MongoDB 5.0+ requires a CPU with AVX support. If you want to use MongoDB 5.0+, then perform an online search to determine whether your CPU supports AVX, as various operating systems and CPUs have distinct ways of indicating AVX compatibility. If your CPU supports, AVX then you can use the latest version of Mongo DB by making the following change to the Docker Compose file before deploying the processing service:

Change:

image: mongo:4.4.6

To

image: mongo

Local Identifier Store Configuration

The DatabaseConnection for a Local Identifier Store can be of type MONGO or JDBC for one of the following databases only:

  • PostgreSQL

  • H2

What Does the Local Identifier Store Do?

The Local Identifier Store stores key/value pairs of a data record’s unique UUID in YOUnite and the local identifier of the data record at the adaptor. This database is automatically populated with data as data events occur. This store is used by deletion scanning to determine whether a record that once existed has now been deleted.

Processing and Local Identifier Store Example

{
	"databases": {
		"mongo": {
			"type": "MONGO",
			"properties": {
				"connectionString": "mongodb://admin:admin@mongo:27017/admin"
			}
		},

        ...

	},
	"tables": [
		{
			...

			"processingDataSource": {
				"name": "mongo",
				"databaseName": "processing"
			},
			"localIdentifierStoreDataSource": {
				"name": "mongo",
				"databaseName": "local-id-store"
			}
		}
	]
}

Timestamp Scanning

When scanMethod = TIMESTAMP, the timestamp method of scanning will be used for that table. See TimestampSpec and ScanOptions for information on how to set up timestamp scanning.

Deletion Scanning

Deletion scanning is supported for the TIMESTAMP method of scanning. Since deletions cannot be detected by looking at timestamps, a separate process is run that checks to see if records that previously existed (based on their presence in the DR to ID cache) are still in the database.

On larger databases, deletion scanning can take a long time. To ensure that deletion scanning does not interfere with modification scanning (as the two do not run concurrently, one runs, then the other), there are two ScanOptions that impact how long the deletion scan can run each time it is invoked:

  1. modScanPriority specifies how much time should be spent on scanning for modifications vs deletions (i.e. a value of 2.0 means twice as much time on modifications as deletions and a value of 0.5 means half as much time on modifications as deletions).

  2. maxModScanTimeMilliseconds specifies how much time a modification scan should run (at maximum).

  3. scanIntervalMilliseconds specifies the minimum interval between scans.

How these options relate to deletion scanning:

  1. Generally, the amount of time spent on modification scanning, divided by modScanPriority is given to deletion scanning.

  2. If scanIntervalMilliseconds is not exceeded by modification scanning, the remaining time will be given to deletion scanning (if it needs it).

  3. If deletion scanning misbehaves and runs for too long it will be penalized on subsequent runs and given less time to ensure that it doesn’t take priority away from modification scanning.

Auto Key Scanning

When scanMethod = AUTO_KEY, the auto-increment key method of scanning will be used for that table. See AutoKeySpec and ScanOptions for information on how to set up auto key scanning.

Important notes:

  1. Auto-incrementing key scanning is useful for data sources that only produce new records and whose prior records never change (such as a log). Detection of modifications to records and deletion of records is not supported.

  2. See the Appendix A: Gap Detection for important information regarding gap detection.

Change Table Scanning

When scanMethod = CHANGE_TABLE, the change table method of scanning will be used for that table. See ChangeTable and ScanOptions for information on how to set up change table scanning.

Notes:

  1. Change table scanning must be on an auto-incrementing table with a list of primary key columns and the type of modification. The change table itself should not include the data that was changed, or the complete record. Only the primary key information and the type of change.

  2. If AutoKeySpec or TimestampSpec is included in the Table specification, optimizations may be used when reading batches of records based on this information. In this case, the auto-key and/or timestamp of the source table should be included in the change table as well as the primary key columns.

  3. See the Appendix A: Gap Detection for important information regarding gap detection.

Change Table Example

The following example is for a table named customer which has a primary key of customer_id but is also identified by an auto-incrementing key id. The syntax below is for PostgreSQL (note: "bigserial" indicates an auto-incrementing key):

-- Customer table
CREATE TABLE customer
(
    id bigserial primary key not null,
    customer_id varchar(10) unique not null,
    first_name varchar(100),
    last_name varchar(100) not null
);

-- Customer change table
-- Note the column names from customer (id and customer_id) are named
-- identically with the same data type (this is a requirement!)
CREATE TABLE customer_change_table
(
    customer_change_table_id bigserial primary key not null,
    id bigint not null,
    customer_id varchar(10) not null,
    change_type varchar(20) not null
)

-- Function to populate change table
CREATE OR REPLACE FUNCTION update_customer_change_table()
    RETURNS trigger
    LANGUAGE plpgsql
AS $$
BEGIN
    INSERT INTO public.customer_change_table (id, customer_id, change_type)
    VALUES(CASE WHEN TG_OP = 'DELETE' THEN OLD.id ELSE NEW.id END,
           CASE WHEN TG_OP = 'DELETE' THEN OLD.customer_id ELSE NEW.customer_id END,
           TG_OP);

    RETURN NEW;
END;
$$;

-- Trigger that calls the function to populate the change table
CREATE TRIGGER customer_changed AFTER UPDATE OR INSERT OR DELETE ON custome
FOR EACH ROW EXECUTE PROCEDURE update_customer_change_table();

The metadata would look like this:

{
    "databases": {
        "mongo": {
            "type": "MONGO",
            "properties": {
                "connectionString": "mongodb://admin:admin@mongo:27017/admin"
            }
        },
        "postgres": {
            "type": "JDBC",
            "properties": {
                "jdbcUrl": "jdbc:postgresql://{postgres-db:5432/customers",
                "dataSource.password": "mysecretpassword",
                "dataSource.user": "postgres",
                "maximumPoolSize": "10"
            }
        }
    },
    "tables": [
        {
            "domain": "customer",
            "version": 1,
            "primaryKeySpecs": [
                {
                    "type": "DOMAIN",
                    "customer_id": {
                        "columnType": "STRING",
                        "required": true
                    }
                },
                {
                    "type": "LOCAL_AUTO",
                    "id": {
                        "columnType": "LONG",
                        "required": true
                    }
                }
            ],
            "autoKeySpec": {
                "columnName": "id"
            },
            "dataColumns": {
                "customer_id": {
                    "source": "customerId",
                    "columnType": "STRING"
                },
                "first_name": {
                    "source": "firstName",
                    "columnType": "STRING"
                },
                "last_name": {
                    "source": "lastName",
                    "columnType": "STRING"
                }
            },
            "dataSource": {
                "name": "postgres",
                "schema": "public",
                "tableName": "customer"
            },
            "processingDataSource": {
                "name": "mongo",
                "databaseName": "processing"
            },
            "localIdentifierStoreDataSource": {
                "name": "mongo",
                "databaseName": "id-store"
            },
            "scanMethod": "CHANGE_TABLE",
            "scanOptions": {
                "processorConcurrency": 4,
                "maxBatchSize": 10000,
                "enableIncoming": true,
                "enableProcessor": true,
                "enableModificationScan": true,
                "enableDeletionScan": true,
                "scanIntervalMilliseconds": 1000
            },
            "changeTable": {
                "autoKeyColumnName": "customer_change_table_id",
                "defaultStartKey": 1,
                "changeTypeColumnName": "change_type",
                "dataSource": {
                    "name": "postgres",
                    "schema": "public",
                    "tableName": "customer_change_table"
                },
                "gapDetectionEnabled": true,
                "gapDetectionTimeout": 120000
            }
        }
    ]
}

Kafka Streams

The YOUnite DB Adaptor can use Kafka as a data source to listen for incoming messages and to send outgoing messages, much like any other data source, except the ultimate source / destination of the data is not known.

Like other data sources, the adaptor metadata configuration for Kafka includes a database entry (which includes the IP of the server and configuration properties for the consumer / producer) as well as a table entry (which includes column information and the Kafka topics to listen to / send to).

Kafka Message format

Messages sent to/from Kafka follow a common format.

Example:

{
  "dr": {
    "uuid": "b3f1c201-0c58-4055-8baf-da217b00c81a",
    "domainVersionUuid": "fd115b83-06dc-48c3-9a77-27a0567d231f",
    "drKeyValues": {
      "customerId": "ABC123"
    }
  },
  "id": {
    "customerId": "ABC123"
  },
  "action": "POST",
  "transactionUuid": "46856e99-c701-4be4-9de8-4de02c0fcfa0",
  "additionalParams": {
    "additional": "param"
  },
  "data": {
    "customerId": "ABC123",
    "firstName": "Ima",
    "lastName": "Customer"
  }
}
Outgoing Kafka Messages

Messages sent to the outgoing topics will include the following:

  1. dr: uuid, domainVersionUuid and drKeyValues

  2. id: local identifier based on the table specification

  3. action: POST, PUT or DELETE are sent to the outgoingTopic. Other requests such as GET are sent to the outgoingDataRequestTopic.

  4. data: the actual data of the record, if applicable

  5. transactionUuid: GET requests will include the transactionUuid, which must be included in the message responding to the request.

  6. additionalParams: Currently not used.

Incoming Kafka Messages

DELETE:

Messages sent to an incoming topic for deleted data records MUST include the following:

  1. action: The action code being performed (DELETE)

  2. one of: dr uuid, dr key values, id or data. One of these is required to uniquely identify the record.

POST, PUT, GET, etc:

All non-delete messages MUST include the following:

  1. action: The action code being performed

  2. data: The data of the record

  3. transactionUuid: GET responses MUST include the transactionUuid. Not required for other message types.

Optionally, if known, the following may also be supplied. If the local identifier cannot be determined by the data, one of these is required to identify the record:

  1. dr: the uuid of the data record

  2. id: local identifier

Requests on the outgoingDataRequestTopic and incomingDataRequestTopic

For a Kafka adaptor to be able to handle data virtualization (Federated GET) as well as respond to other requests for data (for example, re-processing data event exceptions), the outgoingDataRequestTopic and incomingDataRequestTopic must be configured and the application that reads the data from these topics must know how to respond.

In general, responses to messages on the outgoingDataRequestTopic should include all the information from the request, along with the data of the data record that is being requested.

Example GET request:

{
  "dr": {
    "uuid": "b3f1c201-0c58-4055-8baf-da217b00c81a",
    "domainVersionUuid": "fd115b83-06dc-48c3-9a77-27a0567d231f",
    "drKeyValues": {
      "customerId": "ABC123"
    }
  },
  "id": {
    "customerId": "ABC123"
  },
  "action": "GET",
  "transactionUuid": "46856e99-c701-4be4-9de8-4de02c0fcfa0"
}

The response should include all information from the request, plus the data of the record.

If not all information can be passed back, at a minimum transactionUuid MUST be included for a GET. In addition, unless the record can be uniquely identified by values in data, one of the following must be supplied to identify the record:

  1. dr.uuid

  2. dr.drKeyValues

  3. id

Example Response:

{
  "dr": {
    "uuid": "b3f1c201-0c58-4055-8baf-da217b00c81a",
    "domainVersionUuid": "fd115b83-06dc-48c3-9a77-27a0567d231f",
    "drKeyValues": {
      "customerId": "ABC123"
    }
  },
  "id": {
    "customerId": "ABC123"
  },
  "action": "GET",
  "transactionUuid": "46856e99-c701-4be4-9de8-4de02c0fcfa0",
  "data": {
    "customerId": "ABC123",
    "firstName": "Ima",
    "lastName": "Customer"
  }
}

Kafka Database Metadata Specification

A Kafka database configuration has a type of KAFKA and a list of properties that correspond to the configuration options for the Consumer (for incoming messages - see https://kafka.apache.org/documentation/#consumerconfigs) and Producer (for outgoing messages - see https://kafka.apache.org/documentation/#producerconfigs).

The only required property is bootstrap.servers, however it is also recommended that auto.offset.reset is set to specify where the adaptor should start reading from the stream when it starts up. Some properties have default values and some properties cannot be modified (see the two tables below.)

Note that the only type of serialization supported by the YOUnite DB Adaptor is String serialization / deserialization. The key of the message is ignored and the value must be a JSON-encoded String.

Common configuration options and their defaults:

Property Description Default Value Example

bootstrap.servers

Kafka Server(s) IP address and port information. Required.

localhost:9092

auto.offset.reset

Behavior for the first time the adaptor reads the topic.

latest

latest, earliest or none

isolation.level

Isolation level.

read_committed

read_committed, read_uncommitted

max.poll.records

Maximum records to receive in a single poll.

1,000

100

max.poll.interval.ms

Maximum delay between invocation of poll until a Consumer is considered failed and its messages will be rebalanced.

60,000

300,000

The following properties cannot be updated:

Property Description Value

group.id

Identifier of the group the consumer belongs to. This ensures that concurrency and transactional support are allowed and on restart the adaptor will pick up where it left off.

adaptor:{adaptor UUID}

enable.auto.commit

Indicates whether auto commit is enabled. This feature is disabled to ensure transactional support.

false

key.serializer

Serializer for keys of outgoing messages.

org.apache.kafka.common.serialization.StringSerializer

key.deserializer

Deserializer for keys of incoming messages.

org.apache.kafka.common.serialization.StringDeserializer

value.serializer

Serializer for values of outgoing messages.

org.apache.kafka.common.serialization.StringSerializer

value.deserializer

Deserializer for values of outgoing messages.

org.apache.kafka.common.serialization.StringDeserializer

Example metadata configuration:

{
	"name": "Kafka Adaptor",
	"description": "Example of a Kafka database definition",
	"metadata": {
		"databases": {
			"kafka": {
				"type": "KAFKA",
				"properties": {
					"bootstrap.servers": "kakfa-server:9092",
					"auto.offset.reset": "earliest",
					"max.poll.records": 100
				}
			}
		}
	}
}

Kafka Table Metadata Specification

A Kafka table specification will have a scanMethod of KAFKA and include streamOptions with configuration information.

Either an incomingTopic or outgoingTopic must be specified as well as changeTypeColumnName.

See StreamOptions for more information about these options.

Metadata example for a Kafka Adaptor:

{
    "name": "Kafka Adaptor",
    "description": "Example of a Kafka table definition",
    "metadata": {
        "tables": [
            {
                "domain": "customer",
                "version": 1,
                "primaryKeySpecs": [
                    {
                        "type": "DOMAIN",
                        "customer_id": {
                            "columnType": "STRING"
                        }
                    }
                ],
                "dataColumns": {
                    "customer_id": {
                        "source": "customerId",
                        "columnType": "STRING"
                    },
                    "first_name": {
                        "source": "firstName",
                        "columnType": "STRING"
                    },
                    "last_name": {
                        "source": "lastName",
                        "columnType": "STRING"
                    }
                },
                "dataSource": {
                    "name": "kafka",
                    "tableName": "customer"
                },
                "processingDataSource": {
                    "name": "mongo",
                    "databaseName": "processing-customer"
                },
                "scanMethod": "STREAM",
                "streamOptions": {
                    "concurrency": 4,
                    "maxDeliveryAttempts": 10,
                    "redeliveryDelay": 1000,
                    "incomingTopic": "incoming-customer-topic",
                    "outgoingTopic": "outgoing-customer-topic",
                    "incomingDataRequestTopic": "incoming-customer-request-topic",
                    "outgoingDataRequestTopic": "outgoing-customer-request-topic",
                    "matchIncoming": true
                }
            }
        ]
    }
}

Custom Data Services

The YOUnite Database Adaptor comes with a number of built-in data sources that it supports. These can be extended by implementing the DataService interface of the YOUnite Database Adaptor SDK. Typically, the AbstractTableDataService is implemented as it contains some common functionality for data services.

Contact YOUnite for access to the YOUnite Database Adaptor SDK.

Requirements

  1. Implement DataService or AbstractTableDataService

  2. Include a constructor with the following parameters, in this order:

    1. DatabaseTable

    2. Map<String, DatabaseConnection>

    3. TableIDFactory<?>

  3. Include a static String ALIAS which can be used to identify the database type, ie private static final String ALIAS = "JDBC";

  4. Copy the JAR file with the custom data service into the docker container in the class path (currently the root directory)

  5. Add an environment variable CUSTOM_CLASS_PATH with a comma-delimited list of class paths to scan for custom data services. This is REQUIRED or the custom implementation will not be detected.

Example

public class MyDataService extends DatabaseTableDataService {
    private static final String ALIAS = "MY_DATA_SERVICE";

    public MyDataService(DatabaseTable table, Map<String, DatabaseConnection> databaseConnections, TableIDFactory<?> idFactory) {
        super(table, idFactory);
    }

    // implement abstract methods here ...
}

Appendix A: Gap Detection

When scanning a table or change table that has an auto-incrementing key, gap detection is recommended to mitigate the possibility of the scanner missing records that were written to disk out of order or "late". For example, if records 1, 2, 3, 4 and 5 are all committed at the same time, the scanner might detect records 1, 2, 4 and 5 (missing 3) if the commit on record #3 takes longer than the rest. The scanner will think that it has completed all records up through #5 when in fact it missed #3.

To mitigate this problem, gap detection should be turned on to pause scanning up to a specified interval to wait for the missing record to appear.

To turn gap detection on, set these properties in AutoKeySpec for the AUTO_KEY scan method or in ChangeTable for the CHANGE_TABLE method:

  1. gapDetectionEnabled - set to true

  2. gapDetectionTimeout - set tot the number of milliseconds to wait for the gap to resolve

Appendix B: Primary Key Specifications Explained

There are three types of primary key specifications: DOMAIN, LOCAL_AUTO and LOCAL.

Typically, one DOMAIN and one LOCAL or LOCAL_AUTO specification should be included, unless they are the same in which case a single DOMAIN specification is sufficient.

Why have a LOCAL or LOCAL_AUTO primary key?

There are a few reasons why a LOCAL or LOCAL_AUTO primary key may be useful and/or necessary:

  1. LOCAL_AUTO primary keys allows for some optimizations in processing data events and scanning for changes.

  2. When the DOMAIN primary key is something that can be updated in a system, another identifier is needed that will not change, ie the LOCAL or LOCAL_AUTO primary key.

  3. When matching rules are used for a data domain to determine if a record exists already at an adaptor. In this case, YOUnite performs the matching and if a match is found, the data event includes the LOCAL / LOCAL_AUTO primary key information so that the adaptor can find and update the record.

DOMAIN Primary Key Specifications

A DOMAIN primary key specification indicates the column(s) from the data domain schema that are used to uniquely identify a record in the table. This allows the adaptor to determine whether an incoming record exists in the source system already or if its a new record.

Note that if no DOMAIN primary key specification is given, there will be no way to determine if an incoming record exists already unless YOUnite has already been made aware of the linkage and sends that information in the data event. This may be suitable for systems that only receive new records and do not need to update existing records. On the other hand, if records do need to be updated, then YOUnite must be made aware of all existing records in the system FIRST before it can begin processing incoming data events.

LOCAL_AUTO Primary Key Specifications

A LOCAL_AUTO primary key specification indicates the column that is an auto-incrementing number. If such a column exists for the table, it is highly recommended its primary key specification be included as it makes processing more efficient.

LOCAL Primary Key Specifications

A LOCAL primary key specification indicates the column(s) in the table that uniquely identify a record. At least one of these columns must not be in the data domain schema, otherwise a DOMAIN primary key specification should be used instead.