Introduction

This document is an introduction to the features and key concepts of the Adaptor SDK and includes a quick start template for getting an adaptor up and running quickly.

For more detailed information about building adaptors, see: * Adaptors * Creating and Managing Adaptors * YOUnite Adaptor Guide for Java Developers

What is the Adaptor SDK?

The Adaptor SDK is a Java library for creating Adaptors. It includes the functionality necessary to communicate with the YOUnite Server via the data fabric’s message bus and handle incoming / outgoing data events and assembly requests as well as error logging.

The main features and capabilities of the Adaptor SDK are:

  • Establishes two-way communication with the Message Bus.

  • Detects communication issues with the Message Bus and attempts to re-connect / recover gracefully.

  • Communicates its status with the YOUnite Server and receives configuration information in return (capabilities and metadata).

  • Listens for incoming data events from the YOUnite Server.

  • Listens for incoming data requests from the YOUnite Data Virtualization Service.

  • Listens for incoming requests for health and metrics from the YOUnite Server.

  • Sends data events to the YOUnite Server.

  • Sends responses to data requests to the YOUnite Data Virtualization Service.

  • Sends error messages to the YOUnite Server (which will be routed to the central logs).

  • Sends informational messages to the YOUnite Server (which will be routed to the central logs).

The core piece of the Adaptor SDK is the AdaptorService. The AdaptorService interface provides the methods necessary for a single adaptor to perform bidirectional communication with the YOUnite Server.

Note
An AdaptorService takes care of all of the above for a single adaptor, however, multiple AdaptorService instances may run in a single application.

The Infinite Update Issue

Adaptors are required to handle the changes they receive and apply from other adaptors. Failure to do so may result in detecting these changes as new updates and subsequently sending them back to YOUnite. This is called the Infinite update issue and more about it can be read here.

Using the Adaptor SDK in Java Projects

Using the Adaptor SDK simply includes adding a maven dependency to a project or including the Adaptor SDK jar files.

Creating an AdaptorService

Adaptors are created and configured using the AdaptorServiceBuilder along with one or more DataSource implementations.

As an example, the SDK comes with a sample DataSource implementation InMemDatabase. This is really only useful for testing, but serves as a simple example for creating an adaptor:

// Create an in-memory database for domain person, version 1 with a "primary key" of personId.
Datasource datasource = InMemDatabase.newPrimaryKeyDatabase("person", 1, "personId", true);

// Use the builder to create an AdaptorService.
// adaptorUuid, clientId, clientSecret and messageBusUrl can all be retrieved from
// the YOUnite UI on the Adaptors page under Adaptor Credentials
AdaptorService adaptorService = new AdaptorServiceBuilder()
        .uuid(adaptorUuid)
        .clientId(clientId)
        .clientSecret(clientSecret)
        .messageBusUrl(messageBusUrl)
        .concurrency(4)
        .datasource(datasource)
        .build();

// Start the adaptor service
adaptorService.start(60);

Datasource Overview

For each domain version supported by the adaptor, a custom Datasource must be implemented.

A Datasource is responsible for the following, though not all options need be supported:

  1. Adding, updating and deleting records

  2. Retrieving records

  3. Identifying local changes that will be sent to the YOUnite Server

Getting the LocalIdentifierFactory

Before creating a Datasource, the type of LocalIdentifier that the Datasource will use must be identified. A LocalIdentifier contains the data that identifies a data record locally. For a database table, this might be the primary key value(s). For a resource it might be a URL. For a transactional event it might be a transaction UUID.

The SDK comes with built-in LocalIdentifier types and associated factories to fit pretty much any scenario:

Type Description Java Equivalent Factory

StringIdentifier

Identifies a record by a string

String

StringIdentifier.getFactory()

LongIdentifier

Identifies a record by a long integer

Long

LongIdentifier.getFactory()

UuidIdentifier

Identifies a record by a UUID

UUID

LongIdentifier.getFactory()

SimpleIdentifier

Identifies a record by a value of a specific type.

Must be a Serializable type.

Serializable

SimpleIdentifierFactory.create(type);

SimpleMultiValuedIdentifier

Identifies a record by a collection of key/value pairs.

Values must be Serializable.

Numeric types are converted to the highest precision: Long or BigDecimal respectively.

Map<String, Serializable>

SimpleMultiValuedIdentifier.getFactory()

SimpleListIdentifier

Identifies a record by an ordered list of values.

Values must be Serializable.

Numeric types are converted to the highest precision: Long or BigDecimal respectively.

List<Serializable>

SimpleListIdentifier.getFactory()

JsonLocalIdentifier

Identifies a record by arbitrary key/values pairs. Stored in JSON format.

ObjectNode

JsonLocalIdentifier.getFactory()

Creating a Custom Datasource

A custom Datasource extends AbstractDatasource.

By default, local changes are handled by a DefaultChangeListener which has a method receieve(Change<T> change) that can be called to send local changes to the YOUnite Server. See Creating a Custom ChangeListener for a guide to creating a custom ChangeListener for an event-driven or similar system.

See Polling for Changes for a custom Datasource implementation that has a custom ChangeListener that polls for changes at a given interval.

Example:

//
// this example creates a data source for domain "person" version 1.
// A LongIdentifier is used to identify records locally.
//

String domainName = "person";
long versionNumber = 1L;
LocalIdentifierFactory<LongIdentifier> localIdentifierFactory = LongIdentifier.getFactory();

Datasource<LongIdentifier> datasource = new AbstractDatasource<LongIdentifier>(domainName, versionNumber,
        localIdentifierFactory) {
    @Nonnull
    @Override
    public Optional<DataResponse<LongIdentifier>> get(DataRequest<LongIdentifier> request, @Nullable IncomingDataEvent<LongIdentifier> dataEvent, @Nullable DataEventResponse<LongIdentifier> response) {
        // TODO - get a record from the data source as a JSON ObjectNode or null if not found
    }

    @Nonnull
    @Override
    public DataEventResponse<LongIdentifier> put(IncomingDataEvent<LongIdentifier> incoming) {
        // TODO - add or update a record in the data source and return its local identifier
    }

    @Override
    public DataEventResponse<LongIdentifier> remove(IncomingDataEvent<LongIdentifier> incoming) {
        // TODO - remove a record in the data source
    }
};

// create the AdaptorService
AdaptorService adaptorService = new AdaptorServiceBuilder()
        .uuid(adaptorUuid)
        .clientId(clientId)
        .clientSecret(clientSecret)
        .messageBusUrl(messageBusUrl)
        .concurrency(4)
        .datasource(datasource)
        .build();

// start the adaptor service
adaptorService.start(60);

// send a local change to YOUnite via the DefaultChangeListener
DefaultChangeListener changeListener = (DefaultChangeListener)datasource1.getChangeListener();
changeListener.receive(new Change<>(id, data, type));

Creating a Custom ChangeListener

A custom Datasource extends AbstractChangeListener. This is particular useful for an event driven system. Its responsibilities are:

  1. Starting and stopping any process(es) which will be listening for events

  2. Handling events and calling the receive(Change<T> change) method to send changes to the YOUnite Server

If the adaptor should poll for changes instead, see Polling for Changes.

Example:

//
// this example creates a data source for domain "person" version 1.
// A LongIdentifier is used to identify records locally.
// A ChangeListener is implemented to listen for changes locally.
//

String domainName = "person";
long versionNumber = 1L;
LocalIdentifierFactory<LongIdentifier> localIdentifierFactory = LongIdentifier.getFactory();

//
// this example assumes a service has a start method that takes a callback
// to process events as they occur.
//
ChangeListener<LongIdentifier> changeListener = new AbstractChangeListener<LongIdentifier>() {
    @Override
    public void start() {
        // TODO - add start up code here, for example:
        service.start(this::onEvent);
    }

    @Override
    public void startAsync() {
        // TODO - add asynchronous start up code here, for example:
        service.startAsync(this::onEvent);
    }

    @Override
    public boolean stop(long timeout) {
        // TODO - add stop code here, for example:
        return service.stop(timeout);
    }

    @Override
    public boolean isRunning() {
        // TODO - test whether the service is running, for example:
        return service.isRunning();
    }

    private void onEvent(Event event) {
        try {
            // TODO - convert event to a Change
            Change<LongIdentifier> change = new Change<>( ... );
            receive(change);
        } catch (ChangeListenerException ex) {
            // TODO handle exception
        }
    }
};

Datasource<LongIdentifier> datasource = new AbstractDatasource<LongIdentifier>(domainName, versionNumber,
        localIdentifierFactory) {
    @Nonnull
    @Override
    public Optional<DataResponse<LongIdentifier>> get(DataRequest<LongIdentifier> request, @Nullable IncomingDataEvent<LongIdentifier> dataEvent, @Nullable DataEventResponse<LongIdentifier> response) {
        // TODO - get a record from the data source as a JSON ObjectNode or null if not found
    }

    @Nonnull
    @Override
    public DataEventResponse<LongIdentifier> put(IncomingDataEvent<LongIdentifier> incoming) {
        // TODO - add or update a record in the data source and return its local identifier
    }

    @Override
    public DataEventResponse<LongIdentifier> remove(IncomingDataEvent<LongIdentifier> incoming) {
        // TODO - remove a record in the data source
    }
};

// create the AdaptorService
AdaptorService adaptorService = new AdaptorServiceBuilder()
        .uuid(adaptorUuid)
        .clientId(clientId)
        .clientSecret(clientSecret)
        .messageBusUrl(messageBusUrl)
        .concurrency(4)
        .datasource(datasource)
        .build();

// start the adaptor service
adaptorService.start(60);

// the change listener should automatically start when the adaptor starts
assert changeListener.isRunning();

Polling for Changes

AbstractPollingDatasource extends AbstractDatasource with a custom ChangeListener that polls for changes at a given interval as well as methods to check for changes and commit or rollback each change as it is processed.

Example:

//
// this example creates a data source for domain "person" version 1.
// A LongIdentifier is used to identify records locally.
// A ChangeListener is runs at an interval to poll for new changes.
//

String domainName = "person";
long versionNumber = 1L;
int pollingInterval = 10_000;
LocalIdentifierFactory<LongIdentifier> localIdentifierFactory = LongIdentifier.getFactory("id");

Datasource<LongIdentifier> datasource = new AbstractPollingDatasource<LongIdentifier>(domainName,
        versionNumber, localIdentifierFactory, pollingInterval) {
    @Nonnull
    @Override
    public Optional<DataResponse<LongIdentifier>> get(DataRequest<LongIdentifier> request, @Nullable IncomingDataEvent<LongIdentifier> dataEvent, @Nullable DataEventResponse<LongIdentifier> response) {
        // TODO - get a record from the data source as a JSON ObjectNode or null if not found
    }

    @Nonnull
    @Override
    public DataEventResponse<LongIdentifier> put(IncomingDataEvent<LongIdentifier> incoming) {
        // TODO - add or update a record in the data source and return its local identifier
    }

    @Override
    public DataEventResponse<LongIdentifier> remove(IncomingDataEvent<LongIdentifier> incoming) {
        // TODO - remove a record in the data source
    }

    //
    // methods used for polling are below
    //

    @Override
    public boolean hasChanges() {
        // TODO - return true if there are changes waiting
    }

    @Nullable
    @Override
    public Change<LongIdentifier> next() {
        // TODO - get the next available change or null if there are no changes
    }

    @Override
    public void commit(Change<LongIdentifier> change) {
        // TODO - mark a change previously retrieved with next() as successfully processed
    }

    @Override
    public void rollback(Change<LongIdentifier> change) {
        // TODO - mark a change previously retrieved with next() as unsuccessfully processed
    }
};

// create the AdaptorService
AdaptorService adaptorService = new AdaptorServiceBuilder()
        .uuid(adaptorUuid)
        .clientId(clientId)
        .clientSecret(clientSecret)
        .messageBusUrl(messageBusUrl)
        .concurrency(4)
        .datasource(datasource)
        .build();

// start the adaptor service
adaptorService.start(60);

// the polling change listener should automatically start when the adaptor starts
assert datasource.getChangeListener().isRunning();

Next Steps

See YOUnite Adaptor Guide for Java Developers for more detailed instructions for building adaptors using the Adaptor SDK.

Appendix B: Adaptor Service Builder Configuration Values

Required Adaptor Service Builder Configuration

Builder Property Description Example Notes

uuid

Adaptor UUID

aa8b8a53-b49f-47f7-a9cd-bd5e0ff909bc

Obtained when registering the Adaptor with YOUnite

clientId

Client ID

6ccb861b-0400-423c-94d6-1df32096de34

Obtained when registering the Adaptor with the YOUnite Server. Used to authenticate with the message bus.

clientSecret

Client Secret

f744544d-d1cf-4561-9d50-27a9fef8d076

Obtained when registering the Adaptor with the YOUnite Server. Used to authenticate with the message bus.

messageBusUrl

URL of the Message Bus

tcp://YOUnite-mb.example.com:61616

Address of the message bus

Optional Adaptor Service Builder Configuration

Builder Property Description Example Default Value Notes

assemblyConcurrency

Message bus concurrency for data virtualization events (i.e. federated GET).

1

1

Concurrency for incoming assembly messages, ie, number of threads that will handle incoming messages. These are federated GET requests. Expressed as a range, ie "1-5" or a single value indicating the maximum (with an implied minimum of 1).

autoStartDataListener

Automatically start the data listener when the connection to YOUnite has been established

true

true

Can be set to false to allow the application to manually start the data listener

changeListener

Register a ChangeListener for a domain version

None

For each domain version register either a Datasource OR a DomainVersionHandler and ChangeListener

concurrency

Message bus concurrency for data events

1

1

Concurrency for incoming data messages, ie, number of threads that will handle incoming messages. Note that this could lead to messages being processed out of order, though they are guaranteed to be processed in order based on their message grouping (the default is unique ID of the record, ie the DR UUID).

datasource

Register a Datasource for a domain version

None

A Datasource must be registered for each domain version that the adaptor is capable of handling or the adaptor will fail to start.

enableMetrics

Enable metrics

true

true

Metrics log the number of incoming and outgoing messages over various intervals.

hooks

Hooks to execute code before/after start and stop or on an error

None*

*When useDefaultHooks=true, default hooks are used for sourceDataListeners (see below)

messagePostProcessor

Post processor for outgoing JMS messages

None

This could be used to add headers to messages, or modify the payload before the message is sent to YOUnite.

metricsLogInterval

Interval in milliseconds to display the metrics in the adaptor logs.

3,600,000

3,600,000 (one hour)

opsConcurrency

Message bus concurrency for operational messages

1

1

Concurrency for incoming ops messages, ie, number of threads that will handle incoming messages. These are messages with adaptor status information and metadata, request for stats, shutdown requests. Expressed as a range, ie "1-5" or a single value indicating the maximum (with an implied minimum of 1).

sessionCacheSize

JMS Session cache size

10

10

Specifies the size of the cache of sessions to be used for JMS communication.

useDefaultHooks

Use the default hooks for the sourceDataListeners (if any)

true

true

The default hooks start each sourceDataListener on AFTER_START and stop each sourceDataListener on BEFORE_STOP and ON_ERROR.