A custom Datasource
extends AbstractDatasource
.
By default, local changes are handled by a DefaultChangeListener
which has a method receieve(Change<T> change)
that can be called to send local changes to the YOUnite Server. See Creating a Custom ChangeListener
for a guide to creating
a custom ChangeListener
for an event-driven or similar system.
See Polling for Changes for a custom Datasource
implementation that has a custom
ChangeListener
that polls for changes at a given interval.
Example:
//
// this example creates a data source for domain "person" version 1.
// A LongIdentifier is used to identify records locally.
//
String domainName = "person";
long versionNumber = 1L;
LocalIdentifierFactory<LongIdentifier> localIdentifierFactory = LongIdentifier.getFactory();
Datasource<LongIdentifier> datasource = new AbstractDatasource<LongIdentifier>(domainName, versionNumber,
localIdentifierFactory) {
@Nonnull
@Override
public Optional<DataResponse<LongIdentifier>> get(DataRequest<LongIdentifier> request, @Nullable IncomingDataEvent<LongIdentifier> dataEvent, @Nullable DataEventResponse<LongIdentifier> response) {
// TODO - get a record from the data source as a JSON ObjectNode or null if not found
}
@Nonnull
@Override
public DataEventResponse<LongIdentifier> put(IncomingDataEvent<LongIdentifier> incoming) {
// TODO - add or update a record in the data source and return its local identifier
}
@Override
public DataEventResponse<LongIdentifier> remove(IncomingDataEvent<LongIdentifier> incoming) {
// TODO - remove a record in the data source
}
};
// create the AdaptorService
AdaptorService adaptorService = new AdaptorServiceBuilder()
.uuid(adaptorUuid)
.clientId(clientId)
.clientSecret(clientSecret)
.messageBusUrl(messageBusUrl)
.concurrency(4)
.datasource(datasource)
.build();
// start the adaptor service
adaptorService.start(60);
// send a local change to YOUnite via the DefaultChangeListener
DefaultChangeListener changeListener = (DefaultChangeListener)datasource1.getChangeListener();
changeListener.receive(new Change<>(id, data, type));
Creating a Custom ChangeListener
A custom Datasource
extends AbstractChangeListener
. This is particular useful for an event driven system.
Its responsibilities are:
-
Starting and stopping any process(es) which will be listening for events
-
Handling events and calling the
receive(Change<T> change)
method to send changes to the YOUnite Server
If the adaptor should poll for changes instead, see Polling for Changes.
Example:
//
// this example creates a data source for domain "person" version 1.
// A LongIdentifier is used to identify records locally.
// A ChangeListener is implemented to listen for changes locally.
//
String domainName = "person";
long versionNumber = 1L;
LocalIdentifierFactory<LongIdentifier> localIdentifierFactory = LongIdentifier.getFactory();
//
// this example assumes a service has a start method that takes a callback
// to process events as they occur.
//
ChangeListener<LongIdentifier> changeListener = new AbstractChangeListener<LongIdentifier>() {
@Override
public void start() {
// TODO - add start up code here, for example:
service.start(this::onEvent);
}
@Override
public void startAsync() {
// TODO - add asynchronous start up code here, for example:
service.startAsync(this::onEvent);
}
@Override
public boolean stop(long timeout) {
// TODO - add stop code here, for example:
return service.stop(timeout);
}
@Override
public boolean isRunning() {
// TODO - test whether the service is running, for example:
return service.isRunning();
}
private void onEvent(Event event) {
try {
// TODO - convert event to a Change
Change<LongIdentifier> change = new Change<>( ... );
receive(change);
} catch (ChangeListenerException ex) {
// TODO handle exception
}
}
};
Datasource<LongIdentifier> datasource = new AbstractDatasource<LongIdentifier>(domainName, versionNumber,
localIdentifierFactory) {
@Nonnull
@Override
public Optional<DataResponse<LongIdentifier>> get(DataRequest<LongIdentifier> request, @Nullable IncomingDataEvent<LongIdentifier> dataEvent, @Nullable DataEventResponse<LongIdentifier> response) {
// TODO - get a record from the data source as a JSON ObjectNode or null if not found
}
@Nonnull
@Override
public DataEventResponse<LongIdentifier> put(IncomingDataEvent<LongIdentifier> incoming) {
// TODO - add or update a record in the data source and return its local identifier
}
@Override
public DataEventResponse<LongIdentifier> remove(IncomingDataEvent<LongIdentifier> incoming) {
// TODO - remove a record in the data source
}
};
// create the AdaptorService
AdaptorService adaptorService = new AdaptorServiceBuilder()
.uuid(adaptorUuid)
.clientId(clientId)
.clientSecret(clientSecret)
.messageBusUrl(messageBusUrl)
.concurrency(4)
.datasource(datasource)
.build();
// start the adaptor service
adaptorService.start(60);
// the change listener should automatically start when the adaptor starts
assert changeListener.isRunning();
Polling for Changes
AbstractPollingDatasource
extends AbstractDatasource
with a custom ChangeListener
that polls for
changes at a given interval as well as methods to check for changes and commit or rollback each change
as it is processed.
Example:
//
// this example creates a data source for domain "person" version 1.
// A LongIdentifier is used to identify records locally.
// A ChangeListener is runs at an interval to poll for new changes.
//
String domainName = "person";
long versionNumber = 1L;
int pollingInterval = 10_000;
LocalIdentifierFactory<LongIdentifier> localIdentifierFactory = LongIdentifier.getFactory("id");
Datasource<LongIdentifier> datasource = new AbstractPollingDatasource<LongIdentifier>(domainName,
versionNumber, localIdentifierFactory, pollingInterval) {
@Nonnull
@Override
public Optional<DataResponse<LongIdentifier>> get(DataRequest<LongIdentifier> request, @Nullable IncomingDataEvent<LongIdentifier> dataEvent, @Nullable DataEventResponse<LongIdentifier> response) {
// TODO - get a record from the data source as a JSON ObjectNode or null if not found
}
@Nonnull
@Override
public DataEventResponse<LongIdentifier> put(IncomingDataEvent<LongIdentifier> incoming) {
// TODO - add or update a record in the data source and return its local identifier
}
@Override
public DataEventResponse<LongIdentifier> remove(IncomingDataEvent<LongIdentifier> incoming) {
// TODO - remove a record in the data source
}
//
// methods used for polling are below
//
@Override
public boolean hasChanges() {
// TODO - return true if there are changes waiting
}
@Nullable
@Override
public Change<LongIdentifier> next() {
// TODO - get the next available change or null if there are no changes
}
@Override
public void commit(Change<LongIdentifier> change) {
// TODO - mark a change previously retrieved with next() as successfully processed
}
@Override
public void rollback(Change<LongIdentifier> change) {
// TODO - mark a change previously retrieved with next() as unsuccessfully processed
}
};
// create the AdaptorService
AdaptorService adaptorService = new AdaptorServiceBuilder()
.uuid(adaptorUuid)
.clientId(clientId)
.clientSecret(clientSecret)
.messageBusUrl(messageBusUrl)
.concurrency(4)
.datasource(datasource)
.build();
// start the adaptor service
adaptorService.start(60);
// the polling change listener should automatically start when the adaptor starts
assert datasource.getChangeListener().isRunning();