Introduction
This document is an introduction to the features and key concepts of the Adaptor SDK and includes a quick start template for getting an adaptor up and running quickly.
For more detailed information about building adaptors, see: * Adaptors * Creating and Managing Adaptors * YOUnite Adaptor Guide for Java Developers
What is the Adaptor SDK?
The Adaptor SDK is a Java library for creating Adaptors. It includes the functionality necessary to communicate with the YOUnite Server via the data fabric’s message bus and handle incoming / outgoing data events and assembly requests as well as error logging.
The main features and capabilities of the Adaptor SDK are:
-
Establishes two-way communication with the Message Bus.
-
Detects communication issues with the Message Bus and attempts to re-connect / recover gracefully.
-
Communicates its status with the YOUnite Server and receives configuration information in return (capabilities and metadata).
-
Listens for incoming data events from the YOUnite Server.
-
Listens for incoming data requests from the YOUnite Data Virtualization Service.
-
Listens for incoming requests for health and metrics from the YOUnite Server.
-
Sends data events to the YOUnite Server.
-
Sends responses to data requests to the YOUnite Data Virtualization Service.
-
Sends error messages to the YOUnite Server (which will be routed to the central logs).
-
Sends informational messages to the YOUnite Server (which will be routed to the central logs).
The core piece of the Adaptor SDK is the AdaptorService
. The AdaptorService
interface provides the methods necessary
for a single adaptor to perform bidirectional communication with the YOUnite Server.
Note
|
An AdaptorService takes care of all of the above for a single adaptor, however, multiple
AdaptorService instances may run in a single application.
|
The Infinite Update Issue
Adaptors are required to handle the changes they receive and apply from other adaptors. Failure to do so may result in detecting these changes as new updates and subsequently sending them back to YOUnite. This is called the Infinite update issue and more about it can be read here.
Using the Adaptor SDK in Java Projects
Using the Adaptor SDK simply includes adding a maven dependency to a project or including the Adaptor SDK jar files.
Creating an AdaptorService
Adaptors are created and configured using the AdaptorServiceBuilder
along with one or more DataSource
implementations.
As an example, the SDK comes with a sample DataSource
implementation InMemDatabase
. This is really only useful for
testing, but serves as a simple example for creating an adaptor:
// Create an in-memory database for domain person, version 1 with a "primary key" of personId.
Datasource datasource = InMemDatabase.newPrimaryKeyDatabase("person", 1, "personId", true);
// Use the builder to create an AdaptorService.
// adaptorUuid, clientId, clientSecret and messageBusUrl can all be retrieved from
// the YOUnite UI on the Adaptors page under Adaptor Credentials
AdaptorService adaptorService = new AdaptorServiceBuilder()
.uuid(adaptorUuid)
.clientId(clientId)
.clientSecret(clientSecret)
.messageBusUrl(messageBusUrl)
.concurrency(4)
.datasource(datasource)
.build();
// Start the adaptor service
adaptorService.start(60);
Datasource
Overview
For each domain version supported by the adaptor, a custom Datasource
must be implemented.
A Datasource
is responsible for the following, though not all options need be supported:
-
Adding, updating and deleting records
-
Retrieving records
-
Identifying local changes that will be sent to the YOUnite Server
Getting the LocalIdentifierFactory
Before creating a Datasource
, the type of LocalIdentifier
that the Datasource
will use must be identified.
A LocalIdentifier
contains the data that identifies a data record locally. For a database table, this might be the
primary key value(s). For a resource it might be a URL. For a transactional event it might be a transaction UUID.
The SDK comes with built-in LocalIdentifier
types and associated factories to fit pretty much any scenario:
Type | Description | Java Equivalent | Factory |
---|---|---|---|
|
Identifies a record by a string |
|
|
|
Identifies a record by a long integer |
|
|
|
Identifies a record by a UUID |
|
|
|
Identifies a record by a value of a specific type. Must be a |
|
|
|
Identifies a record by a collection of key/value pairs. Values must be Numeric types are converted to the highest precision: |
|
|
|
Identifies a record by an ordered list of values. Values must be Numeric types are converted to the highest precision: |
|
|
|
Identifies a record by arbitrary key/values pairs. Stored in JSON format. |
|
|
Creating a Custom Datasource
A custom Datasource
extends AbstractDatasource
.
By default, local changes are handled by a DefaultChangeListener
which has a method receieve(Change<T> change)
that can be called to send local changes to the YOUnite Server. See Creating a Custom ChangeListener
for a guide to creating
a custom ChangeListener
for an event-driven or similar system.
See Polling for Changes for a custom Datasource
implementation that has a custom
ChangeListener
that polls for changes at a given interval.
Example:
//
// this example creates a data source for domain "person" version 1.
// A LongIdentifier is used to identify records locally.
//
String domainName = "person";
long versionNumber = 1L;
LocalIdentifierFactory<LongIdentifier> localIdentifierFactory = LongIdentifier.getFactory();
Datasource<LongIdentifier> datasource = new AbstractDatasource<LongIdentifier>(domainName, versionNumber,
localIdentifierFactory) {
@Nonnull
@Override
public Optional<DataResponse<LongIdentifier>> get(DataRequest<LongIdentifier> request, @Nullable IncomingDataEvent<LongIdentifier> dataEvent, @Nullable DataEventResponse<LongIdentifier> response) {
// TODO - get a record from the data source as a JSON ObjectNode or null if not found
}
@Nonnull
@Override
public DataEventResponse<LongIdentifier> put(IncomingDataEvent<LongIdentifier> incoming) {
// TODO - add or update a record in the data source and return its local identifier
}
@Override
public DataEventResponse<LongIdentifier> remove(IncomingDataEvent<LongIdentifier> incoming) {
// TODO - remove a record in the data source
}
};
// create the AdaptorService
AdaptorService adaptorService = new AdaptorServiceBuilder()
.uuid(adaptorUuid)
.clientId(clientId)
.clientSecret(clientSecret)
.messageBusUrl(messageBusUrl)
.concurrency(4)
.datasource(datasource)
.build();
// start the adaptor service
adaptorService.start(60);
// send a local change to YOUnite via the DefaultChangeListener
DefaultChangeListener changeListener = (DefaultChangeListener)datasource1.getChangeListener();
changeListener.receive(new Change<>(id, data, type));
Creating a Custom ChangeListener
A custom Datasource
extends AbstractChangeListener
. This is particular useful for an event driven system.
Its responsibilities are:
-
Starting and stopping any process(es) which will be listening for events
-
Handling events and calling the
receive(Change<T> change)
method to send changes to the YOUnite Server
If the adaptor should poll for changes instead, see Polling for Changes.
Example:
//
// this example creates a data source for domain "person" version 1.
// A LongIdentifier is used to identify records locally.
// A ChangeListener is implemented to listen for changes locally.
//
String domainName = "person";
long versionNumber = 1L;
LocalIdentifierFactory<LongIdentifier> localIdentifierFactory = LongIdentifier.getFactory();
//
// this example assumes a service has a start method that takes a callback
// to process events as they occur.
//
ChangeListener<LongIdentifier> changeListener = new AbstractChangeListener<LongIdentifier>() {
@Override
public void start() {
// TODO - add start up code here, for example:
service.start(this::onEvent);
}
@Override
public void startAsync() {
// TODO - add asynchronous start up code here, for example:
service.startAsync(this::onEvent);
}
@Override
public boolean stop(long timeout) {
// TODO - add stop code here, for example:
return service.stop(timeout);
}
@Override
public boolean isRunning() {
// TODO - test whether the service is running, for example:
return service.isRunning();
}
private void onEvent(Event event) {
try {
// TODO - convert event to a Change
Change<LongIdentifier> change = new Change<>( ... );
receive(change);
} catch (ChangeListenerException ex) {
// TODO handle exception
}
}
};
Datasource<LongIdentifier> datasource = new AbstractDatasource<LongIdentifier>(domainName, versionNumber,
localIdentifierFactory) {
@Nonnull
@Override
public Optional<DataResponse<LongIdentifier>> get(DataRequest<LongIdentifier> request, @Nullable IncomingDataEvent<LongIdentifier> dataEvent, @Nullable DataEventResponse<LongIdentifier> response) {
// TODO - get a record from the data source as a JSON ObjectNode or null if not found
}
@Nonnull
@Override
public DataEventResponse<LongIdentifier> put(IncomingDataEvent<LongIdentifier> incoming) {
// TODO - add or update a record in the data source and return its local identifier
}
@Override
public DataEventResponse<LongIdentifier> remove(IncomingDataEvent<LongIdentifier> incoming) {
// TODO - remove a record in the data source
}
};
// create the AdaptorService
AdaptorService adaptorService = new AdaptorServiceBuilder()
.uuid(adaptorUuid)
.clientId(clientId)
.clientSecret(clientSecret)
.messageBusUrl(messageBusUrl)
.concurrency(4)
.datasource(datasource)
.build();
// start the adaptor service
adaptorService.start(60);
// the change listener should automatically start when the adaptor starts
assert changeListener.isRunning();
Polling for Changes
AbstractPollingDatasource
extends AbstractDatasource
with a custom ChangeListener
that polls for
changes at a given interval as well as methods to check for changes and commit or rollback each change
as it is processed.
Example:
//
// this example creates a data source for domain "person" version 1.
// A LongIdentifier is used to identify records locally.
// A ChangeListener is runs at an interval to poll for new changes.
//
String domainName = "person";
long versionNumber = 1L;
int pollingInterval = 10_000;
LocalIdentifierFactory<LongIdentifier> localIdentifierFactory = LongIdentifier.getFactory("id");
Datasource<LongIdentifier> datasource = new AbstractPollingDatasource<LongIdentifier>(domainName,
versionNumber, localIdentifierFactory, pollingInterval) {
@Nonnull
@Override
public Optional<DataResponse<LongIdentifier>> get(DataRequest<LongIdentifier> request, @Nullable IncomingDataEvent<LongIdentifier> dataEvent, @Nullable DataEventResponse<LongIdentifier> response) {
// TODO - get a record from the data source as a JSON ObjectNode or null if not found
}
@Nonnull
@Override
public DataEventResponse<LongIdentifier> put(IncomingDataEvent<LongIdentifier> incoming) {
// TODO - add or update a record in the data source and return its local identifier
}
@Override
public DataEventResponse<LongIdentifier> remove(IncomingDataEvent<LongIdentifier> incoming) {
// TODO - remove a record in the data source
}
//
// methods used for polling are below
//
@Override
public boolean hasChanges() {
// TODO - return true if there are changes waiting
}
@Nullable
@Override
public Change<LongIdentifier> next() {
// TODO - get the next available change or null if there are no changes
}
@Override
public void commit(Change<LongIdentifier> change) {
// TODO - mark a change previously retrieved with next() as successfully processed
}
@Override
public void rollback(Change<LongIdentifier> change) {
// TODO - mark a change previously retrieved with next() as unsuccessfully processed
}
};
// create the AdaptorService
AdaptorService adaptorService = new AdaptorServiceBuilder()
.uuid(adaptorUuid)
.clientId(clientId)
.clientSecret(clientSecret)
.messageBusUrl(messageBusUrl)
.concurrency(4)
.datasource(datasource)
.build();
// start the adaptor service
adaptorService.start(60);
// the polling change listener should automatically start when the adaptor starts
assert datasource.getChangeListener().isRunning();
Next Steps
See YOUnite Adaptor Guide for Java Developers for more detailed instructions for building adaptors using the Adaptor SDK.
Appendix B: Adaptor Service Builder Configuration Values
Required Adaptor Service Builder Configuration
Builder Property | Description | Example | Notes |
---|---|---|---|
uuid |
Adaptor UUID |
aa8b8a53-b49f-47f7-a9cd-bd5e0ff909bc |
Obtained when registering the Adaptor with YOUnite |
clientId |
Client ID |
6ccb861b-0400-423c-94d6-1df32096de34 |
Obtained when registering the Adaptor with the YOUnite Server. Used to authenticate with the message bus. |
clientSecret |
Client Secret |
f744544d-d1cf-4561-9d50-27a9fef8d076 |
Obtained when registering the Adaptor with the YOUnite Server. Used to authenticate with the message bus. |
messageBusUrl |
URL of the Message Bus |
tcp://YOUnite-mb.example.com:61616 |
Address of the message bus |
Optional Adaptor Service Builder Configuration
Builder Property | Description | Example | Default Value | Notes |
---|---|---|---|---|
assemblyConcurrency |
Message bus concurrency for data virtualization events (i.e. federated GET). |
1 |
1 |
Concurrency for incoming assembly messages, ie, number of threads that will handle incoming messages. These are federated GET requests. Expressed as a range, ie "1-5" or a single value indicating the maximum (with an implied minimum of 1). |
autoStartDataListener |
Automatically start the data listener when the connection to YOUnite has been established |
true |
true |
Can be set to false to allow the application to manually start the data listener |
changeListener |
Register a |
None |
For each domain version register either a |
|
concurrency |
Message bus concurrency for data events |
1 |
1 |
Concurrency for incoming data messages, ie, number of threads that will handle incoming messages. Note that this could lead to messages being processed out of order, though they are guaranteed to be processed in order based on their message grouping (the default is unique ID of the record, ie the DR UUID). |
datasource |
Register a |
None |
A Datasource must be registered for each domain version that the adaptor is capable of handling or the adaptor will fail to start. |
|
enableMetrics |
Enable metrics |
true |
true |
Metrics log the number of incoming and outgoing messages over various intervals. |
hooks |
Hooks to execute code before/after start and stop or on an error |
None* |
*When useDefaultHooks=true, default hooks are used for sourceDataListeners (see below) |
|
messagePostProcessor |
Post processor for outgoing JMS messages |
None |
This could be used to add headers to messages, or modify the payload before the message is sent to YOUnite. |
|
metricsLogInterval |
Interval in milliseconds to display the metrics in the adaptor logs. |
3,600,000 |
3,600,000 (one hour) |
|
opsConcurrency |
Message bus concurrency for operational messages |
1 |
1 |
Concurrency for incoming ops messages, ie, number of threads that will handle incoming messages. These are messages with adaptor status information and metadata, request for stats, shutdown requests. Expressed as a range, ie "1-5" or a single value indicating the maximum (with an implied minimum of 1). |
sessionCacheSize |
JMS Session cache size |
10 |
10 |
Specifies the size of the cache of sessions to be used for JMS communication. |
useDefaultHooks |
Use the default hooks for the sourceDataListeners (if any) |
true |
true |
The default hooks start each sourceDataListener on AFTER_START and stop each sourceDataListener on BEFORE_STOP and ON_ERROR. |