Introduction

This document shows how to quickly get a YOUnite DB Adaptor up and running against a sample PostgreSQL database. MongoDB will also be used to handle processing. This example uses Docker to get all the services up and running.

This document assumes you have access to the younite-db-adaptor software and the complete YOUnite stack and can run them locally in Docker.

Topology

Adaptor

A Adaptor is software that is located within a system that is participating in the sharing of data through the YOUnite Data Fabric Platform. The adaptor acts as a connection point between one or more data sources (e.g. a database, stream, etc) and the YOUnite Server which is the core service inside the YOUnite Data Fabric Platform. The adaptor communicates with the YOUnite Server using the YOUnite Data Fabric message bus.

The YOUnite-Off-the-Shelf Database Adaptor includes a processing service that keeps track of:

  • Scanning information (what range of timestamps or keys has been scanned)

  • Incoming records from YOUnite (to prevent infinite updates) and batches of changes identified by a scan.

In addition to providing a place to store this information, the processing service ensures that in the event of a crash, the Adaptor can pick up where it left off and no transactions will be lost.

The processing service is implemented using Mongo DB.

image

More can be found in the YOUnite Off-the-Shelf Database Adaptor guide.

Pre-requisites

  1. Before beginning, make sure the YOUnite Server API and Message Bus are running locally or on a machine that you can access.

  2. Setting up this demo requires use of the YOUnite UI or running API requests against the YOUnite Server API. A tool like postman is recommended when making direct calls against the YOUnite Server API.

  3. Docker and Docker Compose plugin

  4. psql to execute commands against PostgreSQL (could also be done in a database tool)

  5. Access to the Docker image for the younite-db-adaptor or a copy in the local repository

  6. When running the full YOUnite stack locally, Docker must have at least 8GB of RAM, but 16 GB is recommended.

Setup

More specifics on adaptor configuration is covered in the YOUnite DB Adaptor, Configuration Overview

PostgreSQL and MongoDB Configuration

Important
By default, YOUnite uses MongoDB 4.4.6 since MongoDB 5.0+ requires a CPU with AVX support. If you want to use MongoDB 5.0+, then perform an online search to determine whether your CPU supports AVX, as various operating systems and CPUs have distinct ways of indicating AVX compatibility. If your CPU supports, AVX then you can use the latest version of Mongo DB by making the following change to the Docker Compose file before deploying the processing service:

Change:

image: mongo:4.4.6

To

image: mongo

Copy the following docker-compose.yml to your local machine and run docker compose up -d to start the services.

docker-compose.yml
version: "3.7"
services:

  mongo:
    image: mongo:4.4.6
    restart: always
    ports:
      - 27017:27017
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: admin
    networks:
      - younite

  mongo-express:
    image: mongo-express
    restart: always
    ports:
      - 8081:8081
    environment:
      ME_CONFIG_MONGODB_ADMINUSERNAME: admin
      ME_CONFIG_MONGODB_ADMINPASSWORD: admin
    networks:
      - younite

  postgres-db:
    image: postgres:12-alpine
    restart: always
    ports:
      # note the different port to avoid conflict with YOUnite Server's database in Docker
      - 15432:5432
    environment:
      POSTGRES_PASSWORD: mysecretpassword
    networks:
      - younite

Once the database has started, copy the following sql to a file named create-db.sql and run the following command to execute it against the PostgreSQL database: PGPASSWORD=mysecretpassword psql -U postgres -d postgres -h localhost -p 15432 < create_db.sql

create-db.sql
CREATE DATABASE customers;

\c customers;

CREATE TABLE public.customer (
  id bigserial primary key not null,
  customer_id varchar(10) unique not null,
  first_name varchar(100),
  last_name varchar(100) not null,
  email varchar(255),
  gender varchar(1),
  birth_date date,
  phone varchar(25),
  address varchar(255),
  city varchar(255),
  state varchar(2),
  zip varchar(10),
  service_rep varchar(50),
  last_visit date,
  account_balance numeric(12, 2) not null default 0,
  created timestamptz not null default now(),
  updated timestamptz not null default now()
);

CREATE INDEX customer_updated_idx ON public.customer (updated);

CREATE OR REPLACE FUNCTION public.update_updated_column()
 RETURNS trigger
 LANGUAGE plpgsql
AS $function$
BEGIN
NEW.updated = NOW();
RETURN NEW;
END;
$function$;

CREATE TRIGGER customer_updated BEFORE UPDATE OR INSERT ON public.customer
FOR EACH ROW EXECUTE PROCEDURE update_updated_column();

YOUnite Server Configuration

Either use the YOUnite UI or the YOUnite Server API to create a zone named customer, a domain named customer and an adaptor in the customer zone that is capable of read/write operations on the customer domain.

After the adaptor has been created, make name of the Adaptor’s UUID, Client ID and Client Secret as those will be needed in the next step.

If using the UI, use the following for the domain version and set the Dr Key to customerId:

{
	"customerId": {
		"type": "string",
		"required": true
	},
	"firstName": {
		"type": "string"
	},
	"lastName": {
		"type": "string"
	},
	"email": {
		"type": "string"
	},
	"gender": {
		"type": "enum",
		"enumType": "string",
		"data": [
			"M",
			"F",
			"X"
		]
	},
	"birthDate": {
		"type": "string",
		"min": 8,
		"max": 10
	},
	"phone": {
		"type": "string",
		"min": 12,
		"max": 12
	},
	"address": {
		"type": "string"
	},
	"city": {
		"type": "string"
	},
	"state": {
		"type": "string",
		"min": 2,
		"max": 2
	},
	"zip": {
		"type": "string",
		"min": 5,
		"max": 10
	},
	"serviceRep": {
		"type": "string"
	},
	"lastVisit": {
		"type": "string",
		"min": 8,
		"max": 10
	},
	"accountBalance": {
		"type": "number"
	},
	"created": {
		"type": "string"
	}
}

If using the UI, post this in the metadata for the adaptor:

{
	"databases": {
		"mongo": {
			"type": "MONGO",
			"properties": {
				"connectionString": "mongodb://admin:admin@mongo:27017/admin"
			}
		},
		"postgres": {
			"type": "JDBC",
			"properties": {
				"jdbcUrl": "jdbc:postgresql://postgres-db:5432/customers",
				"dataSource.password": "mysecretpassword",
				"dataSource.user": "postgres",
				"maximumPoolSize": "10"
			}
		}
	},
	"tables": [
		{
			"domain": "customer",
			"version": 1,
			"primaryKeySpecs": [
				{
					"type": "DOMAIN",
					"customer_id": {
						"columnType": "STRING",
						"required": true
					}
				},
				{
					"type": "LOCAL_AUTO",
					"id": {
						"columnType": "LONG",
						"required": true
					}
				}
			],
			"timestampSpec": {
				"columnName": "updated",
				"type": "TIMESTAMP",
				"precision": "MICROSECONDS",
				"scanStartTime": "2020-01-09T20:42:47.936Z",
				"lagTimeMilliseconds": 0,
				"overlapMilliseconds": 60000
			},
			"dataColumns": {
				"customer_id": {
					"source": "customerId",
					"columnType": "STRING",
					"updatable": false
				},
				"first_name": {
					"source": "firstName",
					"columnType": "STRING"
				},
				"last_name": {
					"source": "lastName",
					"columnType": "STRING"
				},
				"email": {
					"source": "email",
					"columnType": "STRING"
				},
				"gender": {
					"source": "gender",
					"columnType": "STRING"
				},
				"birth_date": {
					"source": "birthDate",
					"columnType": "LOCAL_DATE"
				},
				"phone": {
					"source": "phone",
					"columnType": "STRING"
				},
				"address": {
					"source": "address",
					"columnType": "STRING"
				},
				"city": {
					"source": "city",
					"columnType": "STRING"
				},
				"state": {
					"source": "state",
					"columnType": "STRING"
				},
				"zip": {
					"source": "zip",
					"columnType": "STRING"
				},
				"service_rep": {
					"source": "serviceRep",
					"columnType": "STRING"
				},
				"last_visit": {
					"source": "lastVisit",
					"columnType": "LOCAL_DATE"
				},
				"account_balance": {
					"source": "accountBalance",
					"columnType": "DOUBLE"
				},
				"created": {
					"columnType": "TIMESTAMP",
					"insertable": false,
					"updatable": false
				}
			},
			"dataSource": {
				"name": "postgres",
				"schema": "public",
				"tableName": "customer"
			},
			"processingDataSource": {
				"name": "mongo",
				"databaseName": "processing"
			},
			"localIdentifierStoreDataSource": {
				"name": "mongo",
				"databaseName": "local-id-store"
			},
			"scanMethod": "TIMESTAMP",
			"scanOptions": {
				"processorConcurrency": 4,
				"maxBatchSize": 1000,
				"enableIncoming": true,
				"enableModificationScan": true,
				"enableDeletionScan": true,
				"enableProcessor": true
			}
		}
	]
}

If using the API, run the following commands.

// the following command should be run as the admin

POST /zones
{
    "name": "Customer Zone",
    "parentZoneUuid": "6c5a754b-6ce0-4871-8dec-d39e255eccc3"
}

// the rest should be run as the data steward

POST /domains

{
  "name": "customer",
  "domainType": "FEDERATED"
}

POST /domains/<UUID of customer domain>/versions

{
	"modelSchema": {
		"properties": {
			"customerId": {
				"type": "string",
				"required": true
			},
			"firstName": {
				"type": "string"
			},
			"lastName": {
				"type": "string"
			},
			"email": {
				"type": "string"
			},
			"gender": {
				"type": "enum",
				"enumType": "string",
				"data": [
					"M",
					"F",
					"X"
				]
			},
			"birthDate": {
				"type": "string",
				"min": 8,
				"max": 10
			},
			"phone": {
				"type": "string",
				"min": 12,
				"max": 12
			},
			"address": {
				"type": "string"
			},
			"city": {
				"type": "string"
			},
			"state": {
				"type": "string",
				"min": 2,
				"max": 2
			},
			"zip": {
				"type": "string",
				"min": 5,
				"max": 10
			},
			"serviceRep": {
				"type": "string"
			},
			"lastVisit": {
				"type": "string",
				"min": 8,
				"max": 10
			},
			"accountBalance": {
				"type": "number"
			},
			"created": {
				"type": "string"
			}
		}
	},
    "drKeyProperties": [
        "customerId"
    ]
}

POST /zones/<UUID of customer domain>/adaptors

{
	"name": "Customer Adaptor",
	"description": "Test Customer Adaptor",
	"zoneUuid": "<UUID of customer zone>",
	"capabilities": "[{ \"domainName\": \"customer\", \"versionNumber\": 1, \"action\": \"PUT\", \"direction\": \"IN\"},{ \"domainName\": \"customer\", \"versionNumber\": 1, \"action\": \"POST\", \"direction\": \"IN\"},{ \"domainName\": \"customer\", \"versionNumber\": 1, \"action\": \"DELETE\", \"direction\": \"IN\"},{ \"domainName\": \"customer\", \"versionNumber\": 1, \"action\": \"GET\", \"direction\": \"OUT\"}]",
	"metadata": "{ \"databases\": { \"mongo\": { \"type\": \"MONGO\", \"properties\": { \"connectionString\": \"mongodb://admin:admin@mongo:27017/admin\" } }, \"postgres\": { \"type\": \"JDBC\", \"properties\": { \"jdbcUrl\": \"jdbc:postgresql://postgres-db:5432/customers\", \"dataSource.password\": \"mysecretpassword\", \"dataSource.user\": \"postgres\", \"maximumPoolSize\": \"10\" } } }, \"tables\": [ { \"domain\": \"customer\", \"version\": 1, \"primaryKeySpecs\": [ { \"type\": \"DOMAIN \"customer_id\": { \"columnType\": \"STRING\", \"required\": true } }, { \"type\": \"LOCAL_AUTO\", \"id\": { \"columnType\": \"LONG\", \"required\": true } } ], \"timestampSpec\": { \"columnName\": \"updated\", \"type\": \"TIMESTAMP\", \"precision\": \"MICROSECONDS\", \"scanStartTime\": \"2020-01-09T20:42:47.936Z\", \"lagTimeMilliseconds\": 0, \"overlapMilliseconds\": 60000 }, \"dataColumns\": { \"customer_id\": { \"source\": \"customerId\", \"columnType\": \"STRING\", \"updatable\": false }, \"first_name\": { \"source\": \"firstName\", \"columnType\": \"STRING\" }, \"last_name\": { \"source\": \"lastName\", \"columnType\": \"STRING\" }, \"email\": { \"source\": \"email\", \"columnType\": \"STRING\" }, \"gender\": { \"source\": \"gender\", \"columnType\": \"STRING\" }, \"birth_date\": { \"source\": \"birthDate\", \"columnType\": \"LOCAL_DATE\" }, \"phone\": { \"source\": \"phone\", \"columnType\": \"STRING\" }, \"address\": { \"source\": \"address\", \"columnType\": \"STRING\" }, \"city\": { \"source\": \"city\", \"columnType\": \"STRING\" }, \"state\": { \"source\": \"state\", \"columnType\": \"STRING\" }, \"zip\": { \"source\": \"zip\", \"columnType\": \"STRING\" }, \"service_rep\": { \"source\": \"serviceRep\", \"columnType\": \"STRING\" }, \"last_visit\": { \"source\": \"lastVisit\", \"columnType\": \"LOCAL_DATE\" }, \"account_balance\": { \"source\": \"accountBalance\", \"columnType\": \"DOUBLE\" }, \"created\": { \"columnType\": \"TIMESTAMP\", \"insertable\": false, \"updatable\": false } }, \"dataSource\": { \"name\": \"postgres\", \"schema\": \"public\", \"tableName\": \"customer\" }, \"processingDataSource\": { \"name\": \"mongo\", \"databaseName\": \"processing\" }, \"localIdentifierStoreDataSource\": { \"name\": \"mongo\", \"databaseName\": \"local-id-store\" }, \"scanMethod\": \"TIMESTAMP\", \"scanOptions\": { \"processorConcurrency\": 4, \"maxBatchSize\": 1000, \"enableIncoming\": true, \"enableModificationScan\": true, \"enableDeletionScan\": true, \"enableProcessor\": true } } ] }"
}

Running the Adaptor

Once all of the above has been done and the adaptor’s UUID, client id and client secret have been gathered, the Adaptor can be started with the following command:

Note: the --network younite clause should match to the name of the network that the YOUnite stack is running on so that the services can talk to each other.

#!/bin/bash

ADAPTOR_UUID=(adpator UUID)
CLIENT_ID=(client id)
CLIENT_SECRET=(client secret)
MESSAGE_BUS_URL=tcp://(name of message bus's container in Docker):61616

# concurrency of message bus for incoming messages
CONCURRENCY=4
OPS_CONCURRENCY=1
ASSEMBLY_CONCURRENCY=2

docker run --name customer_adaptor --network younite \
 -e ADAPTOR_UUID="${ADAPTOR_UUID}" -e CLIENT_ID="${CLIENT_ID}" -e CLIENT_SECRET="${CLIENT_SECRET}" \
 -e MESSAGE_BUS_URL="${MESSAGE_BUS_URL}" -e CONCURRENCY="${CONCURRENCY}" -e OPS_CONCURRENCY="${OPS_CONCURRENCY}" \
 -e ASSEMBLY_CONCURRENCY="${ASSEMBLY_CONCURRENCY} -e LOG_LEVEL=DEBUG \
 -d 160909222919.dkr.ecr.us-west-2.amazonaws.com/younite/younite-db-adaptor:latest