Read Me First!

Before reading this guide, read Adaptor SDK Summary which gives a higher level view of the Adaptor SDK, which is used to build adaptors.

You might also want to read YOUnite DB Adaptor (off the shelf adaptor) which is a guide to setting up the off-the-shelf adaptor and contains a lot of concepts relevant to developing adaptors.

Setting up Your Development Environment

Maven setup

Set up a maven project with this example pom.xml file.

In addition to the Adaptor SDK, this includes a build configuration that has been tested to build a working docker image. If not deploying to docker, these dependencies can be excluded.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <packaging>jar</packaging>
    <groupId>com.example</groupId>
    <artifactId>example-adaptor</artifactId>
    <version>1.0.0-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <sdk.version>1.0.30-SNAPSHOT</sdk.version>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <compilerArgument>-parameters</compilerArgument>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.2.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <mainClass>us.younite.adaptor.Main</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <!-- Copy Maven dependencies into target/lib/ -->
            <plugin>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>3.1.2</version>
                <executions>
                    <execution>
                        <phase>initialize</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <!-- this is required or the snapshot jar files are not named
                                 the same as the class path included in maven-jar-plugin above -->
                            <useBaseVersion>false</useBaseVersion>
                            <includeScope>runtime</includeScope>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <!-- Build docker image -->
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>exec</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <executable>docker</executable>
                    <arguments>
                        <argument>build</argument>
                        <argument>--build-arg</argument>
                        <argument>JAR_FILE=${project.build.finalName}.jar</argument>
                        <argument>-t</argument>
                        <argument>example-adaptor</argument>
                        <argument>.</argument>
                    </arguments>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <!-- Adaptor SDK -->
        <dependency>
            <groupId>us.younite</groupId>
            <artifactId>adaptor-sdk</artifactId>
            <version>${sdk.version}</version>
        </dependency>
    </dependencies>

</project>

Dockerfile

If deploying to docker, use this Dockerfile. Along with the pom.xml above, this will build a docker image named example-adaptor during the package maven phase.

FROM openjdk:17

ARG JAR_FILE
ADD target/lib /
ADD target/${JAR_FILE} /example-adaptor.jar
ENTRYPOINT exec java $JAVA_OPTS -jar /example-adaptor.jar

Source Code: Starting Point

The following is a starting point for source code that defines a Main class and an ExampleService that does the rest.

The Main class has two purposes: start the ExampleService and wait for a shutdown request so that it can gracefully stop the application.

The ExampleService class creates all the elements necessary to create and start an adaptor, including loading the adaptor’s credentials (UUID, clientId and clientSecret) and the message bus URL from the environment.

This code is a staring point - the rest of this document discusses how to fill in the details.

Main.java

public class Main {

    private static boolean shutdown = false;
    private static final Object lock = new Object();
    private static final ExampleService service;

    public static void main(String[] args) throws Exception {
        synchronized (lock) {
            Runtime.getRuntime().addShutdownHook(new Thread(QuickStartAdaptorApplication::shutdown));

            service = new ExampleService();

            // wait for a shutdown request to exit
            while (!shutdown) {
                lock.wait();
            }
        }
    }

    private static void shutdown() {
        synchronized (lock) {
            shutdown = true;
            if (service != null)
                service.close();
            lock.notifyAll();
        }
    }
}

ExampleService.java

package us.younite.adaptor;

import us.younite.adaptor.sdk.builder.AdaptorServiceBuilder;
import us.younite.adaptor.sdk.listener.ds.AbstractDatasource;
import us.younite.adaptor.sdk.listener.ds.Datasource;
import us.younite.adaptor.sdk.model.DataEventResponse;
import us.younite.adaptor.sdk.model.DataRequest;
import us.younite.adaptor.sdk.model.DataResponse;
import us.younite.adaptor.sdk.model.IncomingDataEvent;
import us.younite.adaptor.sdk.model.id.LocalIdentifierFactory;
import us.younite.adaptor.sdk.model.id.simple.LongIdentifier;
import us.younite.adaptor.sdk.service.AdaptorService;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import java.io.Closeable;
import java.util.Optional;
import java.util.UUID;

/**
 * This class is used in the YOUnite-Adaptor-Guide-for-Java-Developers example.
 */
@ParametersAreNonnullByDefault
public class AdaptorGuideExample implements Closeable {

    private final AdaptorService adaptorService;

    public AdaptorGuideExample() throws Exception {
        //
        // get configuration from the environment
        //
        String adaptorUuid = System.getenv("ADAPTOR_UUID");
        String clientId = System.getenv("ADAPTOR_CLIENT_ID");
        String clientSecret = System.getenv("ADAPTOR_CLIENT_SECRET");
        String messageBusUrl = System.getenv("MESSAGE_BUS_URL");
        String messageBusConcurrency = System.getenv("MESSAGE_BUS_CONCURRENCY");
        String messageBusOpsConcurrency = System.getenv("MESSAGE_BUS_OPS_CONCURRENCY");
        String messageBusAssemblyConcurrency = System.getenv("MESSAGE_BUS_ASSEMBLY_CONCURRENCY");

        // records in the data source will be identified by a long, for example,
        // a database table with an auto-incrementing primary key.
        LocalIdentifierFactory<LongIdentifier> localIdentifierFactory = LongIdentifier.getFactory();

        // create an AbstractDatasource implementation
        Datasource<LongIdentifier> datasource = new AbstractDatasource<LongIdentifier>("person", 1, localIdentifierFactory) {
            @Nonnull
            @Override
            public Optional<DataResponse<LongIdentifier>> get(DataRequest<LongIdentifier> request, @Nullable IncomingDataEvent<LongIdentifier> dataEvent, @Nullable DataEventResponse<LongIdentifier> response) {
                // TODO - get a record from the data source
                return Optional.empty();
            }

            @Nonnull
            @Override
            public DataEventResponse<LongIdentifier> put(IncomingDataEvent<LongIdentifier> incoming) {
                // TODO - add or update a record in the data source and return its local identifier
                return DataEventResponse.failure("TODO");
            }

            @Nonnull
            @Override
            public DataEventResponse<LongIdentifier> remove(IncomingDataEvent<LongIdentifier> incoming) {
                // TODO - remove a record in the data source
                return DataEventResponse.failure("TODO");
            }
        };

        // build the adaptor service
        adaptorService = new AdaptorServiceBuilder()
                .uuid(UUID.fromString(adaptorUuid))
                .clientId(clientId)
                .clientSecret(clientSecret)
                .messageBusUrl(messageBusUrl)
                .concurrency(Integer.parseInt(messageBusConcurrency))
                .opsConcurrency(messageBusOpsConcurrency)
                .assemblyConcurrency(messageBusAssemblyConcurrency)
                .datasource(datasource)
                .build();

        // start the adaptor service
        adaptorService.start(60);
    }

    @Override
    public void close() {
        adaptorService.close();
    }
}

Source Code: Configuration and Services

The example code above shows examples of creating skeleton services that are required to run the Adaptor. This section details creation of those services.

Below is a summary of the tasks to perform / services to build to create an Adaptor using the SDK:

Task / Service Description

Essential and Optional Configuration

Essential configuration includes the URL of the message bus as well as the adaptor’s UUID, client id and secret that were generated when the adaptor was registered. Optional configuration includes message bus concurrency, metrics logging, etc. This configuration may be loaded from the environment (recommended for Docker), or a properties file, or somewhere else.

Local Identifier Factories

A LocalIdentifierFactory handles creation of local identifiers of records in a domain version.

Datasource implementations

A Datasource implementation handles interactions with a data source for a domain version.

LocalIdentifierStore implementations (optional)

A LocalIdentifierStore stores the linkage between the local identifier of a data record and the unique UUID of a data record (the DR UUID) in YOUnite.

This is an optional feature used by adaptors that need to know what records have been linked with YOUnite and what have not. For the YOUnite Off-the-Shelf Database Adaptor it is required for certain features such as deletion scanning.

Essential and Optional Configuration

The Adaptor’s UUID, clientId, clientSecret and the URL of the message bus are required to build an adaptor. This information can be found on the Adaptors page of the YOUnite UI under Adaptor Credentials.

There are also a number of optional configuration items including message bus concurrency, metrics logging interval, etc. See Appendix C: Adaptor Service Builder Configuration Values for a full ist of configuration values.

Typical solutions include:

  • Environment variables (recommended for Docker)

  • Properties file

  • A persisted source, such as a database or file share

Example using environment variables:

String adaptorUuid = System.getenv("ADAPTOR_UUID");
String clientId = System.getenv("ADAPTOR_CLIENT_ID");
String clientSecret = System.getenv("ADAPTOR_CLIENT_SECRET");
String messageBusUrl = System.getenv("MESSAGE_BUS_URL");
String messageBusConcurrency = System.getenv("MESSAGE_BUS_CONCURRENCY");
String messageBusOpsConcurrency = System.getenv("MESSAGE_BUS_OPS_CONCURRENCY");
String messageBusAssemblyConcurrency = System.getenv("MESSAGE_BUS_ASSEMBLY_CONCURRENCY");

AdaptorServiceBuilder builder = new AdaptorServiceBuilder();

// essential configuration
builder
    .uuid(UUID.fromString(adaptorUuid))
    .clientId(clientId)
    .clientSecret(clientSecret)
    .messageBusUrl(messageBusUrl);

// optional configuration
// note: concurrency for data events is a fixed integer ie (5)
// whereas concurrency for ops and assembly queues may be a range (ie 1-5) or a fixed value indicating the maximum (ie 1)
builder
    .concurrency(Integer.parseInt(messageBusConcurrency))
    .opsConcurrency(messageBusOpsConcurrency)
    .assemblyConcurrency(messageBusAssemblyConcurrency);

Local Identifier Factories

Local identifier factories create identifiers that uniquely identify a data record locally. For example, if the data source is a database, this would normally be the primary key of the database.

Several factories are provided by the SDK to handle petty much any situation:

Type Description Java Equivalent Factory

StringIdentifier

Identifies a record by a string

String

StringIdentifier.getFactory()

LongIdentifier

Identifies a record by a long integer

Long

LongIdentifier.getFactory()

UuidIdentifier

Identifies a record by a UUID

UUID

LongIdentifier.getFactory()

SimpleIdentifier

Identifies a record by a value of a specific type.

Must be a Serializable type.

Serializable

SimpleIdentifierFactory.create(type);

SimpleMultiValuedIdentifier

Identifies a record by a collection of key/value pairs.

Values must be Serializable.

Numeric types are converted to the highest precision: Long or BigDecimal respectively.

Map<String, Serializable>

SimpleMultiValuedIdentifier.getFactory()

SimpleListIdentifier

Identifies a record by an ordered list of values.

Values must be Serializable.

Numeric types are converted to the highest precision: Long or BigDecimal respectively.

List<Serializable>

SimpleListIdentifier.getFactory()

JsonLocalIdentifier

Identifies a record by arbitrary key/values pairs. Stored in JSON format.

ObjectNode

JsonLocalIdentifier.getFactory()

Datasource Implementation

A Datasource implementation handles interactions with a data source for a domain version.

A Datasource has three main jobs:

  • Add, update and remove data records in a data source

  • Retrieve a data record from a data source

  • Identify local changes in a data source

A custom Datasource extends AbstractDatasource.

By default, local changes are handled by a DefaultChangeListener which has a method receieve(Change<T> change) that can be called to send local changes to the YOUnite Server. See Creating a Custom ChangeListener for a guide to creating a custom ChangeListener for an event-driven or similar system.

See Polling for Changes for a custom Datasource implementation that has a custom ChangeListener that polls for changes at a given interval.

Example:

//
// this example creates a data source for domain "person" version 1.
// A LongIdentifier is used to identify records locally.
//

String domainName = "person";
long versionNumber = 1L;
LocalIdentifierFactory<LongIdentifier> localIdentifierFactory = LongIdentifier.getFactory();

Datasource<LongIdentifier> datasource = new AbstractDatasource<LongIdentifier>(domainName, versionNumber,
        localIdentifierFactory) {
    @Nonnull
    @Override
    public Optional<DataResponse<LongIdentifier>> get(DataRequest<LongIdentifier> request, @Nullable IncomingDataEvent<LongIdentifier> dataEvent, @Nullable DataEventResponse<LongIdentifier> response) {
        // TODO - get a record from the data source as a JSON ObjectNode or null if not found
    }

    @Nonnull
    @Override
    public DataEventResponse<LongIdentifier> put(IncomingDataEvent<LongIdentifier> incoming) {
        // TODO - add or update a record in the data source and return its local identifier
    }

    @Override
    public DataEventResponse<LongIdentifier> remove(IncomingDataEvent<LongIdentifier> incoming) {
        // TODO - remove a record in the data source
    }
};

// create the AdaptorService
AdaptorService adaptorService = new AdaptorServiceBuilder()
        .uuid(adaptorUuid)
        .clientId(clientId)
        .clientSecret(clientSecret)
        .messageBusUrl(messageBusUrl)
        .concurrency(4)
        .datasource(datasource)
        .build();

// start the adaptor service
adaptorService.start(60);

// send a local change to YOUnite via the DefaultChangeListener
DefaultChangeListener changeListener = (DefaultChangeListener)datasource1.getChangeListener();
changeListener.receive(new Change<>(id, data, type));

Creating a Custom ChangeListener

A custom Datasource extends AbstractChangeListener. This is particular useful for an event driven system. Its responsibilities are:

  1. Starting and stopping any process(es) which will be listening for events

  2. Handling events and calling the receive(Change<T> change) method to send changes to the YOUnite Server

If the adaptor should poll for changes instead, see Polling for Changes.

Example:

//
// this example creates a data source for domain "person" version 1.
// A LongIdentifier is used to identify records locally.
// A ChangeListener is implemented to listen for changes locally.
//

String domainName = "person";
long versionNumber = 1L;
LocalIdentifierFactory<LongIdentifier> localIdentifierFactory = LongIdentifier.getFactory();

//
// this example assumes a service has a start method that takes a callback
// to process events as they occur.
//
ChangeListener<LongIdentifier> changeListener = new AbstractChangeListener<LongIdentifier>() {
    @Override
    public void start() {
        // TODO - add start up code here, for example:
        service.start(this::onEvent);
    }

    @Override
    public void startAsync() {
        // TODO - add asynchronous start up code here, for example:
        service.startAsync(this::onEvent);
    }

    @Override
    public boolean stop(long timeout) {
        // TODO - add stop code here, for example:
        return service.stop(timeout);
    }

    @Override
    public boolean isRunning() {
        // TODO - test whether the service is running, for example:
        return service.isRunning();
    }

    private void onEvent(Event event) {
        try {
            // TODO - convert event to a Change
            Change<LongIdentifier> change = new Change<>( ... );
            receive(change);
        } catch (ChangeListenerException ex) {
            // TODO handle exception
        }
    }
};

Datasource<LongIdentifier> datasource = new AbstractDatasource<LongIdentifier>(domainName, versionNumber,
        localIdentifierFactory) {
    @Nonnull
    @Override
    public Optional<DataResponse<LongIdentifier>> get(DataRequest<LongIdentifier> request, @Nullable IncomingDataEvent<LongIdentifier> dataEvent, @Nullable DataEventResponse<LongIdentifier> response) {
        // TODO - get a record from the data source as a JSON ObjectNode or null if not found
    }

    @Nonnull
    @Override
    public DataEventResponse<LongIdentifier> put(IncomingDataEvent<LongIdentifier> incoming) {
        // TODO - add or update a record in the data source and return its local identifier
    }

    @Override
    public DataEventResponse<LongIdentifier> remove(IncomingDataEvent<LongIdentifier> incoming) {
        // TODO - remove a record in the data source
    }
};

// create the AdaptorService
AdaptorService adaptorService = new AdaptorServiceBuilder()
        .uuid(adaptorUuid)
        .clientId(clientId)
        .clientSecret(clientSecret)
        .messageBusUrl(messageBusUrl)
        .concurrency(4)
        .datasource(datasource)
        .build();

// start the adaptor service
adaptorService.start(60);

// the change listener should automatically start when the adaptor starts
assert changeListener.isRunning();

Polling for Changes

AbstractPollingDatasource extends AbstractDatasource with a custom ChangeListener that polls for changes at a given interval as well as methods to check for changes and commit or rollback each change as it is processed.

Example:

//
// this example creates a data source for domain "person" version 1.
// A LongIdentifier is used to identify records locally.
// A ChangeListener is runs at an interval to poll for new changes.
//

String domainName = "person";
long versionNumber = 1L;
int pollingInterval = 10_000;
LocalIdentifierFactory<LongIdentifier> localIdentifierFactory = LongIdentifier.getFactory("id");

Datasource<LongIdentifier> datasource = new AbstractPollingDatasource<LongIdentifier>(domainName,
        versionNumber, localIdentifierFactory, pollingInterval) {
    @Nonnull
    @Override
    public Optional<DataResponse<LongIdentifier>> get(DataRequest<LongIdentifier> request, @Nullable IncomingDataEvent<LongIdentifier> dataEvent, @Nullable DataEventResponse<LongIdentifier> response) {
        // TODO - get a record from the data source as a JSON ObjectNode or null if not found
    }

    @Nonnull
    @Override
    public DataEventResponse<LongIdentifier> put(IncomingDataEvent<LongIdentifier> incoming) {
        // TODO - add or update a record in the data source and return its local identifier
    }

    @Override
    public DataEventResponse<LongIdentifier> remove(IncomingDataEvent<LongIdentifier> incoming) {
        // TODO - remove a record in the data source
    }

    //
    // methods used for polling are below
    //

    @Override
    public boolean hasChanges() {
        // TODO - return true if there are changes waiting
    }

    @Nullable
    @Override
    public Change<LongIdentifier> next() {
        // TODO - get the next available change or null if there are no changes
    }

    @Override
    public void commit(Change<LongIdentifier> change) {
        // TODO - mark a change previously retrieved with next() as successfully processed
    }

    @Override
    public void rollback(Change<LongIdentifier> change) {
        // TODO - mark a change previously retrieved with next() as unsuccessfully processed
    }
};

// create the AdaptorService
AdaptorService adaptorService = new AdaptorServiceBuilder()
        .uuid(adaptorUuid)
        .clientId(clientId)
        .clientSecret(clientSecret)
        .messageBusUrl(messageBusUrl)
        .concurrency(4)
        .datasource(datasource)
        .build();

// start the adaptor service
adaptorService.start(60);

// the polling change listener should automatically start when the adaptor starts
assert datasource.getChangeListener().isRunning();

LocalIdentifierStore Implementations (optional)

The YOUnite Server maintains a list of all data records at an adaptor, including the local identifier of each data record at the adaptor. Implementing a LocalIdentifierStore allows the adaptor to do the same thing on its side: store a list of all data records at the adaptor that have been "matched" or "linked" to a data record in YOUnite. This is an optional feature and is not required for typical adaptor operation.

Three default implementations are provided in the SDK:

  1. In-memory store - good for testing, not production!

  2. H2-backed store

  3. PostgreSQL backed store

This store is populated when:

  1. YOUnite sends a new data record to the adaptor.

  2. The adaptor sends a new data record to YOUnite. YOUnite responds with a MATCH event informing the adaptor of the unique UUID of the data record at YOUnite so it can be added to the store.

Entries in the store are removed when:

  1. YOUnite sends a DELETE event to the adaptor and the event succeeds.

  2. The adaptor sends a DELETE event to YOUnite.

Example:

// create an H2 data source with a connection pool size of 5
DataSource datasource = JDBCLocalIdentifierStore.createDataSource("jdbc:h2:/data/local_identifier_store", "sa", "sa", 5);

// create a local identifier store for this H2 datasource with a cache size of 1,000
LocalIdentifierStore<StringIdentifier> store = JDBCLocalIdentifierStore.h2(datasource, "tableName", 1000);

// create the Datasource implementation
Datasource<StringIdentifier> datasource = new AbstractDatasource<>("person", 1, StringIdentifier.getFactory()) {
    // AbstractDatasource implementation goes here
};

// set the LocalIdentifierStore
datasource.setLocalIdentifierStore(store);

Appendix A: Null values vs missing values

Incoming data events from YOUnite include a JSON string with the data of the data record. For several reasons, this may or may not include all fields in the domain. Therefore, it should not be assumed that the value of a field is null if it is not present; nulls will be explicitly specific in the JSON string if that is their value.

Reasons why a field may be excluded:

  1. The adaptor that originated the data event did not include data for a field.

  2. Data Governance was applied and a field was excluded.

Appendix B: The Infinite Update Issue

Adaptors are required to handle the changes they receive and apply from other adaptors. Failure to do so may result in detecting these changes as new updates and subsequently sending them back to YOUnite. This is called the Infinite update issue and more about it can be read here.

Appendix C: Full example application - QuickStartAdaptorApplication.java

Below is a full example application in one file that starts up two adaptors and has them send messages to YOUnite that will then be routed to each other. This application, other than being an example, can be used as a quick way to test that things are working as expected.

package us.younite.adaptor;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.younite.adaptor.sdk.builder.AdaptorServiceBuilder;
import us.younite.adaptor.sdk.exceptions.AdaptorConfigurationException;
import us.younite.adaptor.sdk.exceptions.AdaptorStartupException;
import us.younite.adaptor.sdk.listener.change.Change;
import us.younite.adaptor.sdk.listener.ds.InMemDatabase;
import us.younite.adaptor.sdk.model.id.simple.LongIdentifier;
import us.younite.adaptor.sdk.service.AdaptorService;

import javax.annotation.ParametersAreNonnullByDefault;
import java.io.Closeable;
import java.util.UUID;

/**
 * * This is a quick start, referenced in the Adaptor Guide for Java Developers.
 *
 * <p>This application creates two adaptors that use test databases and simple listeners.
 * Data inserted, updated or deleted in the database of one adaptor will be synchronized
 * with the other adaptor.
 *
 * @see InMemDatabase
 * @author zeke@younite.us
 */
@ParametersAreNonnullByDefault
public class QuickStartAdaptorApplication {

    private static final Logger logger = LoggerFactory.getLogger(QuickStartAdaptorApplication.class);
    private static final ObjectMapper objectMapper = new ObjectMapper();

    //
    // BEFORE RUNNING, CREATE THE TEST DOMAIN AND ADAPTORS!!!
    //
    // 1) Create a domain named person with three fields and the exact matching type:
    // a. personId (string) - set as the DR Key property
    // b. name (string)
    // c. phone (string)
    //
    // 2) Create two adaptors and give them all capabilities to the person domain.
    // Enter the adaptors credentials below.
    //
    private static final String messageBusUrl = "tcp://localhost:61616";
    private static final UUID adaptorUuid_1 = UUID.fromString("46193a67-5243-483f-a129-85b5620c0574");
    private static final String clientId_1 = "b84a3379-02e8-4b17-b05e-24ffb3caec18";
    private static final String clientSecret_1 = "ea6e038d-dc5e-437d-85c1-ec094003996a";
    private static final UUID adaptorUuid_2 = UUID.fromString("a76e67f9-5aeb-47e6-a2d8-cbd60ef4dd32");
    private static final String clientId_2 = "a8bb9a19-cb49-4a2f-a730-ee3c2e7d7ef6";
    private static final String clientSecret_2 = "35e5fb86-6d97-4aa9-a774-ebd9d1d49984";
    //
    // UPDATE THE VALUES ABOVE BEFORE RUNNING!
    //

    private static boolean shutdown = false;
    private static final Object lock = new Object();

    public static void main(String[] args) throws Exception {
        synchronized (lock) {
            Runtime.getRuntime().addShutdownHook(new Thread(QuickStartAdaptorApplication::shutdown));

            // adaptor #1 uses the polling method to look for changes
            TestAdaptor adaptor1 = new TestAdaptor(adaptorUuid_1, clientId_1, clientSecret_1, true);

            // adaptor #2 uses an event-driven model of sending changes to YOUnite
            TestAdaptor adaptor2 = new TestAdaptor(adaptorUuid_2, clientId_2, clientSecret_2, false);

            // create some test records
            ObjectNode rylan = testRecord("BCN487", "Rylan Jorgenson", "555-202-4571");
            ObjectNode juneau = testRecord("ABA055", "Juneau Momo", "555-202-8644");
            ObjectNode jobe = testRecord("VRQ721", "Jobe Fallby", "555-230-9732");
            ObjectNode nevel = testRecord("PRX002", "Nevel Andersen", "555-230-8877");

            // add the data to adaptor 1
            logger.info("Creating test data in adaptor 1...");
            adaptor1.database.change(Change.Type.NEW, rylan);
            adaptor1.database.change(Change.Type.NEW, juneau);
            adaptor1.database.change(Change.Type.NEW, jobe);
            adaptor1.database.change(Change.Type.NEW, nevel);
            logger.info("Contents of adaptor 1's database {}", adaptor1.database.toMap());

            // wait for the data to show up in adaptor 2
            while (!shutdown && adaptor2.database.size() < 4) {
                logger.info("Waiting for changes to show up in adaptor 2 ...");
                lock.wait(1000);
            }

            logger.info("Contents of adaptor2: {}", adaptor2.database.toMap());

            // make sure the values are identical
            if (adaptor1.database.size() == adaptor2.database.size() &&
                    CollectionUtils.containsAll(adaptor1.database.toMap().values(), adaptor2.database.toMap().values())) {
                logger.info("SUCCESS! Values are identical in both databases");
            } else {
                logger.error("ERROR! Values are not identical!");
            }

            // change data in adaptor 2
            logger.info("Changing Rylan phone number in adaptor 2 ...");
            rylan.put("phone", "555-233-1144");
            adaptor2.database.change(Change.Type.UPDATE, rylan);

            // wait for the change to show up in adaptor 1
            while (!shutdown) {
                LongIdentifier id = adaptor1.database.lookup(rylan);
                if (id == null) {
                    logger.error("ERROR! Unable to determine id for Rylan record in adaptor 1!");
                    break;
                }

                ObjectNode updated = adaptor1.database.get(id);

                if (updated != null && rylan.get("phone").equals(updated.get("phone"))) {
                    logger.info("Success, found updated record in adaptor 1");
                    break;
                }

                logger.info("Waiting for update to show up in adaptor 1 ...");
                lock.wait(1000);
            }

            // delete all the records in adaptor 1
            for (LongIdentifier i : adaptor1.database.toMap().keySet()) {
                adaptor1.database.change(Change.Type.DELETE, i, null);
            }

            // wait for the deletions to show up in adaptor 2
            while (!shutdown && adaptor2.database.size() > 0) {
                logger.info("Waiting for deletions to show up in adaptor 2 ...");
                lock.wait(1000);
            }

            if (adaptor1.database.size() == 0 && adaptor2.database.size() == 0) {
                logger.info("SUCCESS! All records have been deleted in both databases");
            } else {
                logger.error("ERROR! Not all records have been deleted");
            }

            logger.info("Shutting down adaptors ...");

            adaptor1.close();
            adaptor2.close();
        }
    }

    private static void shutdown() {
        synchronized (lock) {
            shutdown = true;
            lock.notifyAll();
        }
    }

    static class TestAdaptor implements Closeable {
        private final AdaptorService adaptorService;
        private final InMemDatabase database;

        TestAdaptor(UUID adaptorUuid, String clientId, String clientSecret, boolean polling)
                throws AdaptorConfigurationException, AdaptorStartupException, InterruptedException  {

            // Create a test database for this domain version which stores data in memory. Records in the
            // database are uniquely identified locally by an auto-incrementing long integer and indexed by
            // primary key (personId).
            database = InMemDatabase.newPrimaryKeyDatabase("person", 1, "personId", polling);

            adaptorService = new AdaptorServiceBuilder()
                    .uuid(adaptorUuid)
                    .clientId(clientId)
                    .clientSecret(clientSecret)
                    .messageBusUrl(messageBusUrl)
                    .concurrency(4)
                    .datasource(database)
                    .build();

            adaptorService.start(60);
        }

        @Override
        public void close() {
            adaptorService.close();
        }
    }

    private static ObjectNode testRecord(String personId, String name, String phone) {
        ObjectNode node = objectMapper.createObjectNode();
        node.put("personId", personId);
        node.put("name", name);
        node.put("phone", phone);
        return node;
    }
}

Appendix C: Adaptor Service Builder Configuration Values

Required Adaptor Service Builder Configuration

Builder Property Description Example Notes

uuid

Adaptor UUID

aa8b8a53-b49f-47f7-a9cd-bd5e0ff909bc

Obtained when registering the Adaptor with YOUnite

clientId

Client ID

6ccb861b-0400-423c-94d6-1df32096de34

Obtained when registering the Adaptor with the YOUnite Server. Used to authenticate with the message bus.

clientSecret

Client Secret

f744544d-d1cf-4561-9d50-27a9fef8d076

Obtained when registering the Adaptor with the YOUnite Server. Used to authenticate with the message bus.

messageBusUrl

URL of the Message Bus

tcp://YOUnite-mb.example.com:61616

Address of the message bus

Optional Adaptor Service Builder Configuration

Builder Property Description Example Default Value Notes

assemblyConcurrency

Message bus concurrency for data virtualization events (i.e. federated GET).

1

1

Concurrency for incoming assembly messages, ie, number of threads that will handle incoming messages. These are federated GET requests. Expressed as a range, ie "1-5" or a single value indicating the maximum (with an implied minimum of 1).

autoStartDataListener

Automatically start the data listener when the connection to YOUnite has been established

true

true

Can be set to false to allow the application to manually start the data listener

changeListener

Register a ChangeListener for a domain version

None

For each domain version register either a Datasource OR a DomainVersionHandler and ChangeListener

concurrency

Message bus concurrency for data events

1

1

Concurrency for incoming data messages, ie, number of threads that will handle incoming messages. Note that this could lead to messages being processed out of order, though they are guaranteed to be processed in order based on their message grouping (the default is unique ID of the record, ie the DR UUID).

datasource

Register a Datasource for a domain version

None

A Datasource must be registered for each domain version that the adaptor is capable of handling or the adaptor will fail to start.

enableMetrics

Enable metrics

true

true

Metrics log the number of incoming and outgoing messages over various intervals.

hooks

Hooks to execute code before/after start and stop or on an error

None*

*When useDefaultHooks=true, default hooks are used for sourceDataListeners (see below)

messagePostProcessor

Post processor for outgoing JMS messages

None

This could be used to add headers to messages, or modify the payload before the message is sent to YOUnite.

metricsLogInterval

Interval in milliseconds to display the metrics in the adaptor logs.

3,600,000

3,600,000 (one hour)

opsConcurrency

Message bus concurrency for operational messages

1

1

Concurrency for incoming ops messages, ie, number of threads that will handle incoming messages. These are messages with adaptor status information and metadata, request for stats, shutdown requests. Expressed as a range, ie "1-5" or a single value indicating the maximum (with an implied minimum of 1).

sessionCacheSize

JMS Session cache size

10

10

Specifies the size of the cache of sessions to be used for JMS communication.

useDefaultHooks

Use the default hooks for the sourceDataListeners (if any)

true

true

The default hooks start each sourceDataListener on AFTER_START and stop each sourceDataListener on BEFORE_STOP and ON_ERROR.

Appendix D: Finding Data Errors Logged by an Adaptor

Log into the YOUnite UI and select "LOGS".

To search the logs for data errors:

log-entry-type: "ADAPTOR_ERROR"
adaptorErrorLogEntry

To limit results to a single adaptor use the adaptor’s UUID with the source-adaptor-uuid property e.g.:

log-entry-type: "ADAPTOR_ERROR" AND source-adaptor-uuid: "8862fdc0-43c9-4ec6-b5b0-a14ec3bdd352"