Read Me First!
Before reading this guide, read Adaptor SDK Summary which gives a higher level view of the Adaptor SDK, which is used to build adaptors.
You might also want to read YOUnite DB Adaptor (off the shelf adaptor) which is a guide to setting up the off-the-shelf adaptor and contains a lot of concepts relevant to developing adaptors.
Setting up Your Development Environment
Maven setup
Set up a maven project with this example pom.xml
file.
In addition to the Adaptor SDK
, this includes a build configuration that has been tested to build a working docker
image. If not deploying to docker, these dependencies can be excluded.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<groupId>com.example</groupId>
<artifactId>example-adaptor</artifactId>
<version>1.0.0-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<sdk.version>1.0.30-SNAPSHOT</sdk.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<compilerArgument>-parameters</compilerArgument>
</configuration>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>us.younite.adaptor.Main</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<!-- Copy Maven dependencies into target/lib/ -->
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.2</version>
<executions>
<execution>
<phase>initialize</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<!-- this is required or the snapshot jar files are not named
the same as the class path included in maven-jar-plugin above -->
<useBaseVersion>false</useBaseVersion>
<includeScope>runtime</includeScope>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<!-- Build docker image -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>docker</executable>
<arguments>
<argument>build</argument>
<argument>--build-arg</argument>
<argument>JAR_FILE=${project.build.finalName}.jar</argument>
<argument>-t</argument>
<argument>example-adaptor</argument>
<argument>.</argument>
</arguments>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!-- Adaptor SDK -->
<dependency>
<groupId>us.younite</groupId>
<artifactId>adaptor-sdk</artifactId>
<version>${sdk.version}</version>
</dependency>
</dependencies>
</project>
Dockerfile
If deploying to docker, use this Dockerfile
. Along with the pom.xml
above, this will build a docker image
named example-adaptor
during the package
maven phase.
FROM openjdk:17
ARG JAR_FILE
ADD target/lib /
ADD target/${JAR_FILE} /example-adaptor.jar
ENTRYPOINT exec java $JAVA_OPTS -jar /example-adaptor.jar
Source Code: Starting Point
The following is a starting point for source code that defines a Main
class and an ExampleService
that does the rest.
The Main
class has two purposes: start the ExampleService
and wait for a shutdown request so that it can gracefully
stop the application.
The ExampleService
class creates all the elements necessary to create and start an adaptor, including loading the
adaptor’s credentials (UUID, clientId and clientSecret) and the message bus URL from the environment.
This code is a staring point - the rest of this document discusses how to fill in the details.
Main.java
public class Main {
private static boolean shutdown = false;
private static final Object lock = new Object();
private static final ExampleService service;
public static void main(String[] args) throws Exception {
synchronized (lock) {
Runtime.getRuntime().addShutdownHook(new Thread(QuickStartAdaptorApplication::shutdown));
service = new ExampleService();
// wait for a shutdown request to exit
while (!shutdown) {
lock.wait();
}
}
}
private static void shutdown() {
synchronized (lock) {
shutdown = true;
if (service != null)
service.close();
lock.notifyAll();
}
}
}
ExampleService.java
package us.younite.adaptor;
import us.younite.adaptor.sdk.builder.AdaptorServiceBuilder;
import us.younite.adaptor.sdk.listener.ds.AbstractDatasource;
import us.younite.adaptor.sdk.listener.ds.Datasource;
import us.younite.adaptor.sdk.model.DataEventResponse;
import us.younite.adaptor.sdk.model.DataRequest;
import us.younite.adaptor.sdk.model.DataResponse;
import us.younite.adaptor.sdk.model.IncomingDataEvent;
import us.younite.adaptor.sdk.model.id.LocalIdentifierFactory;
import us.younite.adaptor.sdk.model.id.simple.LongIdentifier;
import us.younite.adaptor.sdk.service.AdaptorService;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import java.io.Closeable;
import java.util.Optional;
import java.util.UUID;
/**
* This class is used in the YOUnite-Adaptor-Guide-for-Java-Developers example.
*/
@ParametersAreNonnullByDefault
public class AdaptorGuideExample implements Closeable {
private final AdaptorService adaptorService;
public AdaptorGuideExample() throws Exception {
//
// get configuration from the environment
//
String adaptorUuid = System.getenv("ADAPTOR_UUID");
String clientId = System.getenv("ADAPTOR_CLIENT_ID");
String clientSecret = System.getenv("ADAPTOR_CLIENT_SECRET");
String messageBusUrl = System.getenv("MESSAGE_BUS_URL");
String messageBusConcurrency = System.getenv("MESSAGE_BUS_CONCURRENCY");
String messageBusOpsConcurrency = System.getenv("MESSAGE_BUS_OPS_CONCURRENCY");
String messageBusAssemblyConcurrency = System.getenv("MESSAGE_BUS_ASSEMBLY_CONCURRENCY");
// records in the data source will be identified by a long, for example,
// a database table with an auto-incrementing primary key.
LocalIdentifierFactory<LongIdentifier> localIdentifierFactory = LongIdentifier.getFactory();
// create an AbstractDatasource implementation
Datasource<LongIdentifier> datasource = new AbstractDatasource<LongIdentifier>("person", 1, localIdentifierFactory) {
@Nonnull
@Override
public Optional<DataResponse<LongIdentifier>> get(DataRequest<LongIdentifier> request, @Nullable IncomingDataEvent<LongIdentifier> dataEvent, @Nullable DataEventResponse<LongIdentifier> response) {
// TODO - get a record from the data source
return Optional.empty();
}
@Nonnull
@Override
public DataEventResponse<LongIdentifier> put(IncomingDataEvent<LongIdentifier> incoming) {
// TODO - add or update a record in the data source and return its local identifier
return DataEventResponse.failure("TODO");
}
@Nonnull
@Override
public DataEventResponse<LongIdentifier> remove(IncomingDataEvent<LongIdentifier> incoming) {
// TODO - remove a record in the data source
return DataEventResponse.failure("TODO");
}
};
// build the adaptor service
adaptorService = new AdaptorServiceBuilder()
.uuid(UUID.fromString(adaptorUuid))
.clientId(clientId)
.clientSecret(clientSecret)
.messageBusUrl(messageBusUrl)
.concurrency(Integer.parseInt(messageBusConcurrency))
.opsConcurrency(messageBusOpsConcurrency)
.assemblyConcurrency(messageBusAssemblyConcurrency)
.datasource(datasource)
.build();
// start the adaptor service
adaptorService.start(60);
}
@Override
public void close() {
adaptorService.close();
}
}
Source Code: Configuration and Services
The example code above shows examples of creating skeleton services that are required to run the Adaptor. This section details creation of those services.
Below is a summary of the tasks to perform / services to build to create an Adaptor using the SDK:
Task / Service | Description |
---|---|
Essential and Optional Configuration |
Essential configuration includes the URL of the message bus as well as the adaptor’s UUID, client id and secret that were generated when the adaptor was registered. Optional configuration includes message bus concurrency, metrics logging, etc. This configuration may be loaded from the environment (recommended for Docker), or a properties file, or somewhere else. |
Local Identifier Factories |
A |
|
A |
|
A This is an optional feature used by adaptors that need to know what records have been linked with YOUnite and what have not. For the YOUnite Off-the-Shelf Database Adaptor it is required for certain features such as deletion scanning. |
Essential and Optional Configuration
The Adaptor’s UUID, clientId, clientSecret and the URL of the message bus are required to build an adaptor. This information can be found on the Adaptors page of the YOUnite UI under Adaptor Credentials.
There are also a number of optional configuration items including message bus concurrency, metrics logging interval, etc. See Appendix C: Adaptor Service Builder Configuration Values for a full ist of configuration values.
Typical solutions include:
-
Environment variables (recommended for Docker)
-
Properties file
-
A persisted source, such as a database or file share
Example using environment variables:
String adaptorUuid = System.getenv("ADAPTOR_UUID");
String clientId = System.getenv("ADAPTOR_CLIENT_ID");
String clientSecret = System.getenv("ADAPTOR_CLIENT_SECRET");
String messageBusUrl = System.getenv("MESSAGE_BUS_URL");
String messageBusConcurrency = System.getenv("MESSAGE_BUS_CONCURRENCY");
String messageBusOpsConcurrency = System.getenv("MESSAGE_BUS_OPS_CONCURRENCY");
String messageBusAssemblyConcurrency = System.getenv("MESSAGE_BUS_ASSEMBLY_CONCURRENCY");
AdaptorServiceBuilder builder = new AdaptorServiceBuilder();
// essential configuration
builder
.uuid(UUID.fromString(adaptorUuid))
.clientId(clientId)
.clientSecret(clientSecret)
.messageBusUrl(messageBusUrl);
// optional configuration
// note: concurrency for data events is a fixed integer ie (5)
// whereas concurrency for ops and assembly queues may be a range (ie 1-5) or a fixed value indicating the maximum (ie 1)
builder
.concurrency(Integer.parseInt(messageBusConcurrency))
.opsConcurrency(messageBusOpsConcurrency)
.assemblyConcurrency(messageBusAssemblyConcurrency);
Local Identifier Factories
Local identifier factories create identifiers that uniquely identify a data record locally. For example, if the data source is a database, this would normally be the primary key of the database.
Several factories are provided by the SDK to handle petty much any situation:
Type | Description | Java Equivalent | Factory |
---|---|---|---|
|
Identifies a record by a string |
|
|
|
Identifies a record by a long integer |
|
|
|
Identifies a record by a UUID |
|
|
|
Identifies a record by a value of a specific type. Must be a |
|
|
|
Identifies a record by a collection of key/value pairs. Values must be Numeric types are converted to the highest precision: |
|
|
|
Identifies a record by an ordered list of values. Values must be Numeric types are converted to the highest precision: |
|
|
|
Identifies a record by arbitrary key/values pairs. Stored in JSON format. |
|
|
Datasource Implementation
A Datasource
implementation handles interactions with a data source for a domain version.
A Datasource
has three main jobs:
-
Add, update and remove data records in a data source
-
Retrieve a data record from a data source
-
Identify local changes in a data source
A custom Datasource
extends AbstractDatasource
.
By default, local changes are handled by a DefaultChangeListener
which has a method receieve(Change<T> change)
that can be called to send local changes to the YOUnite Server. See Creating a Custom ChangeListener
for a guide to creating
a custom ChangeListener
for an event-driven or similar system.
See Polling for Changes for a custom Datasource
implementation that has a custom
ChangeListener
that polls for changes at a given interval.
Example:
//
// this example creates a data source for domain "person" version 1.
// A LongIdentifier is used to identify records locally.
//
String domainName = "person";
long versionNumber = 1L;
LocalIdentifierFactory<LongIdentifier> localIdentifierFactory = LongIdentifier.getFactory();
Datasource<LongIdentifier> datasource = new AbstractDatasource<LongIdentifier>(domainName, versionNumber,
localIdentifierFactory) {
@Nonnull
@Override
public Optional<DataResponse<LongIdentifier>> get(DataRequest<LongIdentifier> request, @Nullable IncomingDataEvent<LongIdentifier> dataEvent, @Nullable DataEventResponse<LongIdentifier> response) {
// TODO - get a record from the data source as a JSON ObjectNode or null if not found
}
@Nonnull
@Override
public DataEventResponse<LongIdentifier> put(IncomingDataEvent<LongIdentifier> incoming) {
// TODO - add or update a record in the data source and return its local identifier
}
@Override
public DataEventResponse<LongIdentifier> remove(IncomingDataEvent<LongIdentifier> incoming) {
// TODO - remove a record in the data source
}
};
// create the AdaptorService
AdaptorService adaptorService = new AdaptorServiceBuilder()
.uuid(adaptorUuid)
.clientId(clientId)
.clientSecret(clientSecret)
.messageBusUrl(messageBusUrl)
.concurrency(4)
.datasource(datasource)
.build();
// start the adaptor service
adaptorService.start(60);
// send a local change to YOUnite via the DefaultChangeListener
DefaultChangeListener changeListener = (DefaultChangeListener)datasource1.getChangeListener();
changeListener.receive(new Change<>(id, data, type));
Creating a Custom ChangeListener
A custom Datasource
extends AbstractChangeListener
. This is particular useful for an event driven system.
Its responsibilities are:
-
Starting and stopping any process(es) which will be listening for events
-
Handling events and calling the
receive(Change<T> change)
method to send changes to the YOUnite Server
If the adaptor should poll for changes instead, see Polling for Changes.
Example:
//
// this example creates a data source for domain "person" version 1.
// A LongIdentifier is used to identify records locally.
// A ChangeListener is implemented to listen for changes locally.
//
String domainName = "person";
long versionNumber = 1L;
LocalIdentifierFactory<LongIdentifier> localIdentifierFactory = LongIdentifier.getFactory();
//
// this example assumes a service has a start method that takes a callback
// to process events as they occur.
//
ChangeListener<LongIdentifier> changeListener = new AbstractChangeListener<LongIdentifier>() {
@Override
public void start() {
// TODO - add start up code here, for example:
service.start(this::onEvent);
}
@Override
public void startAsync() {
// TODO - add asynchronous start up code here, for example:
service.startAsync(this::onEvent);
}
@Override
public boolean stop(long timeout) {
// TODO - add stop code here, for example:
return service.stop(timeout);
}
@Override
public boolean isRunning() {
// TODO - test whether the service is running, for example:
return service.isRunning();
}
private void onEvent(Event event) {
try {
// TODO - convert event to a Change
Change<LongIdentifier> change = new Change<>( ... );
receive(change);
} catch (ChangeListenerException ex) {
// TODO handle exception
}
}
};
Datasource<LongIdentifier> datasource = new AbstractDatasource<LongIdentifier>(domainName, versionNumber,
localIdentifierFactory) {
@Nonnull
@Override
public Optional<DataResponse<LongIdentifier>> get(DataRequest<LongIdentifier> request, @Nullable IncomingDataEvent<LongIdentifier> dataEvent, @Nullable DataEventResponse<LongIdentifier> response) {
// TODO - get a record from the data source as a JSON ObjectNode or null if not found
}
@Nonnull
@Override
public DataEventResponse<LongIdentifier> put(IncomingDataEvent<LongIdentifier> incoming) {
// TODO - add or update a record in the data source and return its local identifier
}
@Override
public DataEventResponse<LongIdentifier> remove(IncomingDataEvent<LongIdentifier> incoming) {
// TODO - remove a record in the data source
}
};
// create the AdaptorService
AdaptorService adaptorService = new AdaptorServiceBuilder()
.uuid(adaptorUuid)
.clientId(clientId)
.clientSecret(clientSecret)
.messageBusUrl(messageBusUrl)
.concurrency(4)
.datasource(datasource)
.build();
// start the adaptor service
adaptorService.start(60);
// the change listener should automatically start when the adaptor starts
assert changeListener.isRunning();
Polling for Changes
AbstractPollingDatasource
extends AbstractDatasource
with a custom ChangeListener
that polls for
changes at a given interval as well as methods to check for changes and commit or rollback each change
as it is processed.
Example:
//
// this example creates a data source for domain "person" version 1.
// A LongIdentifier is used to identify records locally.
// A ChangeListener is runs at an interval to poll for new changes.
//
String domainName = "person";
long versionNumber = 1L;
int pollingInterval = 10_000;
LocalIdentifierFactory<LongIdentifier> localIdentifierFactory = LongIdentifier.getFactory("id");
Datasource<LongIdentifier> datasource = new AbstractPollingDatasource<LongIdentifier>(domainName,
versionNumber, localIdentifierFactory, pollingInterval) {
@Nonnull
@Override
public Optional<DataResponse<LongIdentifier>> get(DataRequest<LongIdentifier> request, @Nullable IncomingDataEvent<LongIdentifier> dataEvent, @Nullable DataEventResponse<LongIdentifier> response) {
// TODO - get a record from the data source as a JSON ObjectNode or null if not found
}
@Nonnull
@Override
public DataEventResponse<LongIdentifier> put(IncomingDataEvent<LongIdentifier> incoming) {
// TODO - add or update a record in the data source and return its local identifier
}
@Override
public DataEventResponse<LongIdentifier> remove(IncomingDataEvent<LongIdentifier> incoming) {
// TODO - remove a record in the data source
}
//
// methods used for polling are below
//
@Override
public boolean hasChanges() {
// TODO - return true if there are changes waiting
}
@Nullable
@Override
public Change<LongIdentifier> next() {
// TODO - get the next available change or null if there are no changes
}
@Override
public void commit(Change<LongIdentifier> change) {
// TODO - mark a change previously retrieved with next() as successfully processed
}
@Override
public void rollback(Change<LongIdentifier> change) {
// TODO - mark a change previously retrieved with next() as unsuccessfully processed
}
};
// create the AdaptorService
AdaptorService adaptorService = new AdaptorServiceBuilder()
.uuid(adaptorUuid)
.clientId(clientId)
.clientSecret(clientSecret)
.messageBusUrl(messageBusUrl)
.concurrency(4)
.datasource(datasource)
.build();
// start the adaptor service
adaptorService.start(60);
// the polling change listener should automatically start when the adaptor starts
assert datasource.getChangeListener().isRunning();
LocalIdentifierStore Implementations (optional)
The YOUnite Server maintains a list of all data records at an adaptor, including the local identifier of each data
record at the adaptor. Implementing a LocalIdentifierStore
allows the adaptor to do the same thing on its side:
store a list of all data records at the adaptor that have been "matched" or "linked" to a data record in YOUnite.
This is an optional feature and is not required for typical adaptor operation.
Three default implementations are provided in the SDK:
-
In-memory store - good for testing, not production!
-
H2-backed store
-
PostgreSQL backed store
This store is populated when:
-
YOUnite sends a new data record to the adaptor.
-
The adaptor sends a new data record to YOUnite. YOUnite responds with a
MATCH
event informing the adaptor of the unique UUID of the data record at YOUnite so it can be added to the store.
Entries in the store are removed when:
-
YOUnite sends a DELETE event to the adaptor and the event succeeds.
-
The adaptor sends a DELETE event to YOUnite.
Example:
// create an H2 data source with a connection pool size of 5
DataSource datasource = JDBCLocalIdentifierStore.createDataSource("jdbc:h2:/data/local_identifier_store", "sa", "sa", 5);
// create a local identifier store for this H2 datasource with a cache size of 1,000
LocalIdentifierStore<StringIdentifier> store = JDBCLocalIdentifierStore.h2(datasource, "tableName", 1000);
// create the Datasource implementation
Datasource<StringIdentifier> datasource = new AbstractDatasource<>("person", 1, StringIdentifier.getFactory()) {
// AbstractDatasource implementation goes here
};
// set the LocalIdentifierStore
datasource.setLocalIdentifierStore(store);
Appendix A: Null values vs missing values
Incoming data events from YOUnite include a JSON string with the data of the data record. For several reasons, this may or may not include all fields in the domain. Therefore, it should not be assumed that the value of a field is null if it is not present; nulls will be explicitly specific in the JSON string if that is their value.
Reasons why a field may be excluded:
-
The adaptor that originated the data event did not include data for a field.
-
Data Governance was applied and a field was excluded.
Appendix B: The Infinite Update Issue
Adaptors are required to handle the changes they receive and apply from other adaptors. Failure to do so may result in detecting these changes as new updates and subsequently sending them back to YOUnite. This is called the Infinite update issue and more about it can be read here.
Appendix C: Full example application - QuickStartAdaptorApplication.java
Below is a full example application in one file that starts up two adaptors and has them send messages to YOUnite that will then be routed to each other. This application, other than being an example, can be used as a quick way to test that things are working as expected.
package us.younite.adaptor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.younite.adaptor.sdk.builder.AdaptorServiceBuilder;
import us.younite.adaptor.sdk.exceptions.AdaptorConfigurationException;
import us.younite.adaptor.sdk.exceptions.AdaptorStartupException;
import us.younite.adaptor.sdk.listener.change.Change;
import us.younite.adaptor.sdk.listener.ds.InMemDatabase;
import us.younite.adaptor.sdk.model.id.simple.LongIdentifier;
import us.younite.adaptor.sdk.service.AdaptorService;
import javax.annotation.ParametersAreNonnullByDefault;
import java.io.Closeable;
import java.util.UUID;
/**
* * This is a quick start, referenced in the Adaptor Guide for Java Developers.
*
* <p>This application creates two adaptors that use test databases and simple listeners.
* Data inserted, updated or deleted in the database of one adaptor will be synchronized
* with the other adaptor.
*
* @see InMemDatabase
* @author zeke@younite.us
*/
@ParametersAreNonnullByDefault
public class QuickStartAdaptorApplication {
private static final Logger logger = LoggerFactory.getLogger(QuickStartAdaptorApplication.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
//
// BEFORE RUNNING, CREATE THE TEST DOMAIN AND ADAPTORS!!!
//
// 1) Create a domain named person with three fields and the exact matching type:
// a. personId (string) - set as the DR Key property
// b. name (string)
// c. phone (string)
//
// 2) Create two adaptors and give them all capabilities to the person domain.
// Enter the adaptors credentials below.
//
private static final String messageBusUrl = "tcp://localhost:61616";
private static final UUID adaptorUuid_1 = UUID.fromString("46193a67-5243-483f-a129-85b5620c0574");
private static final String clientId_1 = "b84a3379-02e8-4b17-b05e-24ffb3caec18";
private static final String clientSecret_1 = "ea6e038d-dc5e-437d-85c1-ec094003996a";
private static final UUID adaptorUuid_2 = UUID.fromString("a76e67f9-5aeb-47e6-a2d8-cbd60ef4dd32");
private static final String clientId_2 = "a8bb9a19-cb49-4a2f-a730-ee3c2e7d7ef6";
private static final String clientSecret_2 = "35e5fb86-6d97-4aa9-a774-ebd9d1d49984";
//
// UPDATE THE VALUES ABOVE BEFORE RUNNING!
//
private static boolean shutdown = false;
private static final Object lock = new Object();
public static void main(String[] args) throws Exception {
synchronized (lock) {
Runtime.getRuntime().addShutdownHook(new Thread(QuickStartAdaptorApplication::shutdown));
// adaptor #1 uses the polling method to look for changes
TestAdaptor adaptor1 = new TestAdaptor(adaptorUuid_1, clientId_1, clientSecret_1, true);
// adaptor #2 uses an event-driven model of sending changes to YOUnite
TestAdaptor adaptor2 = new TestAdaptor(adaptorUuid_2, clientId_2, clientSecret_2, false);
// create some test records
ObjectNode rylan = testRecord("BCN487", "Rylan Jorgenson", "555-202-4571");
ObjectNode juneau = testRecord("ABA055", "Juneau Momo", "555-202-8644");
ObjectNode jobe = testRecord("VRQ721", "Jobe Fallby", "555-230-9732");
ObjectNode nevel = testRecord("PRX002", "Nevel Andersen", "555-230-8877");
// add the data to adaptor 1
logger.info("Creating test data in adaptor 1...");
adaptor1.database.change(Change.Type.NEW, rylan);
adaptor1.database.change(Change.Type.NEW, juneau);
adaptor1.database.change(Change.Type.NEW, jobe);
adaptor1.database.change(Change.Type.NEW, nevel);
logger.info("Contents of adaptor 1's database {}", adaptor1.database.toMap());
// wait for the data to show up in adaptor 2
while (!shutdown && adaptor2.database.size() < 4) {
logger.info("Waiting for changes to show up in adaptor 2 ...");
lock.wait(1000);
}
logger.info("Contents of adaptor2: {}", adaptor2.database.toMap());
// make sure the values are identical
if (adaptor1.database.size() == adaptor2.database.size() &&
CollectionUtils.containsAll(adaptor1.database.toMap().values(), adaptor2.database.toMap().values())) {
logger.info("SUCCESS! Values are identical in both databases");
} else {
logger.error("ERROR! Values are not identical!");
}
// change data in adaptor 2
logger.info("Changing Rylan phone number in adaptor 2 ...");
rylan.put("phone", "555-233-1144");
adaptor2.database.change(Change.Type.UPDATE, rylan);
// wait for the change to show up in adaptor 1
while (!shutdown) {
LongIdentifier id = adaptor1.database.lookup(rylan);
if (id == null) {
logger.error("ERROR! Unable to determine id for Rylan record in adaptor 1!");
break;
}
ObjectNode updated = adaptor1.database.get(id);
if (updated != null && rylan.get("phone").equals(updated.get("phone"))) {
logger.info("Success, found updated record in adaptor 1");
break;
}
logger.info("Waiting for update to show up in adaptor 1 ...");
lock.wait(1000);
}
// delete all the records in adaptor 1
for (LongIdentifier i : adaptor1.database.toMap().keySet()) {
adaptor1.database.change(Change.Type.DELETE, i, null);
}
// wait for the deletions to show up in adaptor 2
while (!shutdown && adaptor2.database.size() > 0) {
logger.info("Waiting for deletions to show up in adaptor 2 ...");
lock.wait(1000);
}
if (adaptor1.database.size() == 0 && adaptor2.database.size() == 0) {
logger.info("SUCCESS! All records have been deleted in both databases");
} else {
logger.error("ERROR! Not all records have been deleted");
}
logger.info("Shutting down adaptors ...");
adaptor1.close();
adaptor2.close();
}
}
private static void shutdown() {
synchronized (lock) {
shutdown = true;
lock.notifyAll();
}
}
static class TestAdaptor implements Closeable {
private final AdaptorService adaptorService;
private final InMemDatabase database;
TestAdaptor(UUID adaptorUuid, String clientId, String clientSecret, boolean polling)
throws AdaptorConfigurationException, AdaptorStartupException, InterruptedException {
// Create a test database for this domain version which stores data in memory. Records in the
// database are uniquely identified locally by an auto-incrementing long integer and indexed by
// primary key (personId).
database = InMemDatabase.newPrimaryKeyDatabase("person", 1, "personId", polling);
adaptorService = new AdaptorServiceBuilder()
.uuid(adaptorUuid)
.clientId(clientId)
.clientSecret(clientSecret)
.messageBusUrl(messageBusUrl)
.concurrency(4)
.datasource(database)
.build();
adaptorService.start(60);
}
@Override
public void close() {
adaptorService.close();
}
}
private static ObjectNode testRecord(String personId, String name, String phone) {
ObjectNode node = objectMapper.createObjectNode();
node.put("personId", personId);
node.put("name", name);
node.put("phone", phone);
return node;
}
}
Appendix C: Adaptor Service Builder Configuration Values
Required Adaptor Service Builder Configuration
Builder Property | Description | Example | Notes |
---|---|---|---|
uuid |
Adaptor UUID |
aa8b8a53-b49f-47f7-a9cd-bd5e0ff909bc |
Obtained when registering the Adaptor with YOUnite |
clientId |
Client ID |
6ccb861b-0400-423c-94d6-1df32096de34 |
Obtained when registering the Adaptor with the YOUnite Server. Used to authenticate with the message bus. |
clientSecret |
Client Secret |
f744544d-d1cf-4561-9d50-27a9fef8d076 |
Obtained when registering the Adaptor with the YOUnite Server. Used to authenticate with the message bus. |
messageBusUrl |
URL of the Message Bus |
tcp://YOUnite-mb.example.com:61616 |
Address of the message bus |
Optional Adaptor Service Builder Configuration
Builder Property | Description | Example | Default Value | Notes |
---|---|---|---|---|
assemblyConcurrency |
Message bus concurrency for data virtualization events (i.e. federated GET). |
1 |
1 |
Concurrency for incoming assembly messages, ie, number of threads that will handle incoming messages. These are federated GET requests. Expressed as a range, ie "1-5" or a single value indicating the maximum (with an implied minimum of 1). |
autoStartDataListener |
Automatically start the data listener when the connection to YOUnite has been established |
true |
true |
Can be set to false to allow the application to manually start the data listener |
changeListener |
Register a |
None |
For each domain version register either a |
|
concurrency |
Message bus concurrency for data events |
1 |
1 |
Concurrency for incoming data messages, ie, number of threads that will handle incoming messages. Note that this could lead to messages being processed out of order, though they are guaranteed to be processed in order based on their message grouping (the default is unique ID of the record, ie the DR UUID). |
datasource |
Register a |
None |
A Datasource must be registered for each domain version that the adaptor is capable of handling or the adaptor will fail to start. |
|
enableMetrics |
Enable metrics |
true |
true |
Metrics log the number of incoming and outgoing messages over various intervals. |
hooks |
Hooks to execute code before/after start and stop or on an error |
None* |
*When useDefaultHooks=true, default hooks are used for sourceDataListeners (see below) |
|
messagePostProcessor |
Post processor for outgoing JMS messages |
None |
This could be used to add headers to messages, or modify the payload before the message is sent to YOUnite. |
|
metricsLogInterval |
Interval in milliseconds to display the metrics in the adaptor logs. |
3,600,000 |
3,600,000 (one hour) |
|
opsConcurrency |
Message bus concurrency for operational messages |
1 |
1 |
Concurrency for incoming ops messages, ie, number of threads that will handle incoming messages. These are messages with adaptor status information and metadata, request for stats, shutdown requests. Expressed as a range, ie "1-5" or a single value indicating the maximum (with an implied minimum of 1). |
sessionCacheSize |
JMS Session cache size |
10 |
10 |
Specifies the size of the cache of sessions to be used for JMS communication. |
useDefaultHooks |
Use the default hooks for the sourceDataListeners (if any) |
true |
true |
The default hooks start each sourceDataListener on AFTER_START and stop each sourceDataListener on BEFORE_STOP and ON_ERROR. |
Appendix D: Finding Data Errors Logged by an Adaptor
Log into the YOUnite UI and select "LOGS".
To search the logs for data errors:
log-entry-type: "ADAPTOR_ERROR"
To limit results to a single adaptor use the adaptor’s UUID with the source-adaptor-uuid
property e.g.:
log-entry-type: "ADAPTOR_ERROR" AND source-adaptor-uuid: "8862fdc0-43c9-4ec6-b5b0-a14ec3bdd352"