A custom Datasource extends AbstractDatasource.

By default, local changes are handled by a DefaultChangeListener which has a method receieve(Change<T> change) that can be called to send local changes to the YOUnite Server. See Creating a Custom ChangeListener for a guide to creating a custom ChangeListener for an event-driven or similar system.

See Polling for Changes for a custom Datasource implementation that has a custom ChangeListener that polls for changes at a given interval.

Example:

//
// this example creates a data source for domain "person" version 1.
// A LongIdentifier is used to identify records locally.
//

String domainName = "person";
long versionNumber = 1L;
LocalIdentifierFactory<LongIdentifier> localIdentifierFactory = LongIdentifier.getFactory();

Datasource<LongIdentifier> datasource = new AbstractDatasource<LongIdentifier>(domainName, versionNumber,
        localIdentifierFactory) {
    @Nonnull
    @Override
    public Optional<DataResponse<LongIdentifier>> get(DataRequest<LongIdentifier> request, @Nullable IncomingDataEvent<LongIdentifier> dataEvent, @Nullable DataEventResponse<LongIdentifier> response) {
        // TODO - get a record from the data source as a JSON ObjectNode or null if not found
    }

    @Nonnull
    @Override
    public DataEventResponse<LongIdentifier> put(IncomingDataEvent<LongIdentifier> incoming) {
        // TODO - add or update a record in the data source and return its local identifier
    }

    @Override
    public DataEventResponse<LongIdentifier> remove(IncomingDataEvent<LongIdentifier> incoming) {
        // TODO - remove a record in the data source
    }
};

// create the AdaptorService
AdaptorService adaptorService = new AdaptorServiceBuilder()
        .uuid(adaptorUuid)
        .clientId(clientId)
        .clientSecret(clientSecret)
        .messageBusUrl(messageBusUrl)
        .concurrency(4)
        .datasource(datasource)
        .build();

// start the adaptor service
adaptorService.start(60);

// send a local change to YOUnite via the DefaultChangeListener
DefaultChangeListener changeListener = (DefaultChangeListener)datasource1.getChangeListener();
changeListener.receive(new Change<>(id, data, type));

Creating a Custom ChangeListener

A custom Datasource extends AbstractChangeListener. This is particular useful for an event driven system. Its responsibilities are:

  1. Starting and stopping any process(es) which will be listening for events

  2. Handling events and calling the receive(Change<T> change) method to send changes to the YOUnite Server

If the adaptor should poll for changes instead, see Polling for Changes.

Example:

//
// this example creates a data source for domain "person" version 1.
// A LongIdentifier is used to identify records locally.
// A ChangeListener is implemented to listen for changes locally.
//

String domainName = "person";
long versionNumber = 1L;
LocalIdentifierFactory<LongIdentifier> localIdentifierFactory = LongIdentifier.getFactory();

//
// this example assumes a service has a start method that takes a callback
// to process events as they occur.
//
ChangeListener<LongIdentifier> changeListener = new AbstractChangeListener<LongIdentifier>() {
    @Override
    public void start() {
        // TODO - add start up code here, for example:
        service.start(this::onEvent);
    }

    @Override
    public void startAsync() {
        // TODO - add asynchronous start up code here, for example:
        service.startAsync(this::onEvent);
    }

    @Override
    public boolean stop(long timeout) {
        // TODO - add stop code here, for example:
        return service.stop(timeout);
    }

    @Override
    public boolean isRunning() {
        // TODO - test whether the service is running, for example:
        return service.isRunning();
    }

    private void onEvent(Event event) {
        try {
            // TODO - convert event to a Change
            Change<LongIdentifier> change = new Change<>( ... );
            receive(change);
        } catch (ChangeListenerException ex) {
            // TODO handle exception
        }
    }
};

Datasource<LongIdentifier> datasource = new AbstractDatasource<LongIdentifier>(domainName, versionNumber,
        localIdentifierFactory) {
    @Nonnull
    @Override
    public Optional<DataResponse<LongIdentifier>> get(DataRequest<LongIdentifier> request, @Nullable IncomingDataEvent<LongIdentifier> dataEvent, @Nullable DataEventResponse<LongIdentifier> response) {
        // TODO - get a record from the data source as a JSON ObjectNode or null if not found
    }

    @Nonnull
    @Override
    public DataEventResponse<LongIdentifier> put(IncomingDataEvent<LongIdentifier> incoming) {
        // TODO - add or update a record in the data source and return its local identifier
    }

    @Override
    public DataEventResponse<LongIdentifier> remove(IncomingDataEvent<LongIdentifier> incoming) {
        // TODO - remove a record in the data source
    }
};

// create the AdaptorService
AdaptorService adaptorService = new AdaptorServiceBuilder()
        .uuid(adaptorUuid)
        .clientId(clientId)
        .clientSecret(clientSecret)
        .messageBusUrl(messageBusUrl)
        .concurrency(4)
        .datasource(datasource)
        .build();

// start the adaptor service
adaptorService.start(60);

// the change listener should automatically start when the adaptor starts
assert changeListener.isRunning();

Polling for Changes

AbstractPollingDatasource extends AbstractDatasource with a custom ChangeListener that polls for changes at a given interval as well as methods to check for changes and commit or rollback each change as it is processed.

Example:

//
// this example creates a data source for domain "person" version 1.
// A LongIdentifier is used to identify records locally.
// A ChangeListener is runs at an interval to poll for new changes.
//

String domainName = "person";
long versionNumber = 1L;
int pollingInterval = 10_000;
LocalIdentifierFactory<LongIdentifier> localIdentifierFactory = LongIdentifier.getFactory("id");

Datasource<LongIdentifier> datasource = new AbstractPollingDatasource<LongIdentifier>(domainName,
        versionNumber, localIdentifierFactory, pollingInterval) {
    @Nonnull
    @Override
    public Optional<DataResponse<LongIdentifier>> get(DataRequest<LongIdentifier> request, @Nullable IncomingDataEvent<LongIdentifier> dataEvent, @Nullable DataEventResponse<LongIdentifier> response) {
        // TODO - get a record from the data source as a JSON ObjectNode or null if not found
    }

    @Nonnull
    @Override
    public DataEventResponse<LongIdentifier> put(IncomingDataEvent<LongIdentifier> incoming) {
        // TODO - add or update a record in the data source and return its local identifier
    }

    @Override
    public DataEventResponse<LongIdentifier> remove(IncomingDataEvent<LongIdentifier> incoming) {
        // TODO - remove a record in the data source
    }

    //
    // methods used for polling are below
    //

    @Override
    public boolean hasChanges() {
        // TODO - return true if there are changes waiting
    }

    @Nullable
    @Override
    public Change<LongIdentifier> next() {
        // TODO - get the next available change or null if there are no changes
    }

    @Override
    public void commit(Change<LongIdentifier> change) {
        // TODO - mark a change previously retrieved with next() as successfully processed
    }

    @Override
    public void rollback(Change<LongIdentifier> change) {
        // TODO - mark a change previously retrieved with next() as unsuccessfully processed
    }
};

// create the AdaptorService
AdaptorService adaptorService = new AdaptorServiceBuilder()
        .uuid(adaptorUuid)
        .clientId(clientId)
        .clientSecret(clientSecret)
        .messageBusUrl(messageBusUrl)
        .concurrency(4)
        .datasource(datasource)
        .build();

// start the adaptor service
adaptorService.start(60);

// the polling change listener should automatically start when the adaptor starts
assert datasource.getChangeListener().isRunning();