This repo is part of the Data Platform project. The Data Platform provides tools for managing the data captured in an experimental research facility, such as a particle accelerator. The data are used within control systems and analytics applications, and facilitate the creation of machine learning models for those applications. The data-platform repo provides a project overview and links to the various project componnents, as well as an installer for running the latest version.
This repo contains Java implementations of the Data Platform Ingestion, Query, and Annotation Service APIs defined in the dp-grpc repo. The Ingestion Service provides a variety of methods for use in capturing data to the archive with a focus on the performance required to handle the data rates in an accelerator facility. The Query Service provides methods for retrieving raw time-series data for use in machine learning applications, and higher-level APIs for retrieving tabular time-series data as well as for querying metadata and annotations in the archive. The Annotation Service provides APIs for annotating the data in the archive.
The main objective of this document is to give an overview of the dp-service repo focusing on code organization, navigation and conventions in order to help a developer find the relevant code for handling a particular API method or adding a new API method. This document contains the following sections:
- Organization of the dp-service repo
- Generic component structure of Data Platform services
- Concrete structure of each specific service
- Detailed code walkthrough illustrating handling for an incoming API method request
- MongoDB document collection schema and relationship to API methods
- Detailed developer notes with UML design diagrams
- Documentation for running and configuring service applications
The top-level package for the classes defined in the repo is com.ospreydcs.dp.service. The table below describes the packages contained in the top-level package.
| Package | Description |
|---|---|
| annotation | Contains the Annotation Service implementation. |
| common | Contains utilities of use across all the service implementations. |
| ingest | Contains the Ingestion Service implementation. |
| query | Contains the Query Service implementation. |
The Data Platform utilizes the gRPC framework for API communication. The executable application for each service, therefore, is a gRPC server. The generic structure of each service is shown in the table below.
| Component | Responsibility |
|---|---|
| gRPC Server | Each service extends common.server.GrpcServerBase to create a Service Implementation to handle incoming API method requests and start the gRPC server framework. This class includes the executable main() method. |
| Service Implementation | Each service provides an implementation of the gRPC service API methods defined in the respective proto files by extending the stub class generated by the gRPC protoc compiler. This implementation receives and dispatches incoming API method requests to the Handler. An implementation of each method defined by the gRPC service is found in this class. |
| Handler | The service handler framework manages a task queue and a pool of workers, and adds Jobs to the queue for incoming requests that are serviced by the workers. Each Handler extends the base class common.handler.QueueHandlerBase and implements a service-specific handler interface. |
| Jobs | A Job class is defined for each of the API methods. It contains the logic for fulfilling the API method request, probably by using the Database Interface Client to insert a document or execute a query, and sending responses to the API client via a Dispatcher. Concrete implementations extend common.handler.HandlerJob. Where possible, the name of a Job class begins with the name of the corresponding API method. |
| Database Interface Client | Each service provides a database interface client implementation that uses MongoDB collections to manage data as needed to fulfill the API method requests. Each handler extends the base class common.mongo.MongoSyncClient (or AsyncMongoClient) and implements a service-specific interface. |
| Dispatchers | A Dispatcher class is defined for each of the API methods to send messages to the client in the API method response stream. Concrete implementations extend the base class common.handler.Dispatcher. Where possible, the name of a Dispatcher class begins with the name of the corresponding API method. |
| Integration Tests | Integration test coverage is provided for the handling of each service API method via one or more test classes. Where possible, the name of an Integration Test class begins with the name of the corresponding API method. |
The table below shows the concrete classes and packages corresponding to the generic service components shown above for each Data Platform service.
| Component | Ingestion Service | Query Service | Annotation Service |
|---|---|---|---|
| gRPC Server | ingest.server.IngestionGrpcServer | query.server.QueryGrpcServer | annotation.server.AnnotationGrpcServer |
| Service Implementation | ingest.service.IngestionServiceImpl | query.service.QueryServiceImpl | annotation.service.AnnotationServiceImpl |
| Handler | ingest.handler.mongo.MongoIngestionHandler | query.handler.mongo.MongoQueryHandler | annotation.handler.mongo.MongoAnnotationHandler |
| Database Interface Client | ingest.handler.mongo.client.MongoSyncIngestionClient | query.handler.mongo.client.MongoSyncQueryClient | annotation.handler.mongo.client.MongoSyncAnnotationClient |
| Jobs package | ingest.handler.mongo.job | query.handler.mongo.job | annotation.handler.mongo.job |
| Dispatchers package | ingest.handler.mongo.dispatch | query.handler.mongo.dispatch | annotation.handler.mongo.dispatch |
| Integration Tests package | integration.ingest | integration.query | integration.annotation |
This section shows some of the code involved for handling an incoming queryProviderMetadata() API method request by the Query Service. This example was chosen for its simplicity. Handling for all API methods follows a similar pattern, but using the class and method names appropriate to the specific service and API method. See notes at each step for more details.
The method name in the Service Implementation matches the name of the method as defined in the gRPC proto file corresponding to the service.
@Override
public void queryProviderMetadata(
QueryProviderMetadataRequest request, StreamObserver<QueryProviderMetadataResponse> responseObserver
) {
// check that request contains non-empty providerId
if (request.getProviderId().isBlank()) {
final String errorMsg = "QueryProviderMetadataRequest.providerId must be specified";
sendQueryProviderMetadataResponseReject(errorMsg, responseObserver);
return;
}
handler.handleQueryProviderMetadata(request, responseObserver);
}
Handler method naming convention is "handleXXX()" where "XXX" is the name of the API method being handled, within the appropriate Handler concrete class for the service defining that method.
@Override
public void handleQueryProviderMetadata(
QueryProviderMetadataRequest request,
StreamObserver<QueryProviderMetadataResponse> responseObserver
) {
final QueryProviderMetadataJob job =
new QueryProviderMetadataJob(request, responseObserver, mongoQueryClient);
try {
requestQueue.put(job);
} catch (InterruptedException e) {
logger.error("InterruptedException waiting for requestQueue.put");
Thread.currentThread().interrupt();
}
}
QueueHandlerBase is a common base class for each of the service concrete Handler implementations that provides a Job queue and worker pool for handling incoming API method requests.
private class QueueWorker implements Runnable {
private final BlockingQueue queue;
public QueueWorker(BlockingQueue q) {
this.queue = q;
}
public void run() {
try {
while (!Thread.currentThread().isInterrupted() && !shutdownRequested.get()) {
// poll for next queue item with a timeout
HandlerJob job =
(HandlerJob) queue.poll(POLL_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (job != null) {
try {
job.execute();
} catch (Exception ex) {
logger.error("QueryWorker.run encountered exception: {}", ex.getMessage());
ex.printStackTrace(System.err);
}
}
}
} catch (InterruptedException ex) {
logger.error("InterruptedException in QueryWorker.run");
Thread.currentThread().interrupt();
}
}
}
Job (QueryProviderMetadataJob) executes query for request via Database Interface Client, sends response via Dispatcher.
For the most part, Job classes are named using the convention "XXXJob" where "XXX" is the API method name, and are contained in the handler.mongo.job package under the appropriate service package.
@Override
public void execute() {
final MongoCursor<ProviderMetadataQueryResultDocument> cursor =
this.mongoClient.executeQueryProviderMetadata(this.request);
dispatcher.handleResult(cursor);
}
Where possible, the method names in the concrete Database Interface classes reflect the name of the corresponding API method. The best place to find the appropriate Database Interface method name for an API method is to look at the execute() method for the concrete Job class that handles that API method.
@Override
public MongoCursor<ProviderMetadataQueryResultDocument> executeQueryProviderMetadata(
QueryProviderMetadataRequest request
) {
final Bson providerIdFilter = eq(BsonConstants.BSON_KEY_BUCKET_PROVIDER_ID, request.getProviderId());
Bson bucketFieldProjection = Projections.fields(Projections.include(
BsonConstants.BSON_KEY_BUCKET_PROVIDER_ID,
BsonConstants.BSON_KEY_PV_NAME,
BsonConstants.BSON_KEY_BUCKET_FIRST_TIME
));
Bson bucketSort = ascending(BsonConstants.BSON_KEY_BUCKET_FIRST_TIME);
Bson metadataSort = ascending(BsonConstants.BSON_KEY_BUCKET_PROVIDER_ID);
var aggregateIterable = mongoCollectionBuckets.withDocumentClass(ProviderMetadataQueryResultDocument.class)
.aggregate(
Arrays.asList(
Aggregates.match(providerIdFilter),
Aggregates.project(bucketFieldProjection),
Aggregates.sort(bucketSort),
// Bucket fields for grouping must appear in projection!!
Aggregates.group(
"$" + BsonConstants.BSON_KEY_BUCKET_PROVIDER_ID,
Accumulators.addToSet(
// collect a set of unique PV names for this provider
BsonConstants.BSON_KEY_PROVIDER_METADATA_PV_NAMES,
"$" + BsonConstants.BSON_KEY_PV_NAME),
Accumulators.first(
// save the first time of the first bucket document for this provider
BsonConstants.BSON_KEY_PROVIDER_METADATA_FIRST_BUCKET_TIMESTAMP,
"$" + BsonConstants.BSON_KEY_BUCKET_FIRST_TIME),
Accumulators.last(
// save the first time of the last bucket document for this provider
BsonConstants.BSON_KEY_PROVIDER_METADATA_LAST_BUCKET_TIMESTAMP,
"$" + BsonConstants.BSON_KEY_BUCKET_FIRST_TIME),
Accumulators.sum(
// count number of bucket documents in group for this provider
BsonConstants.BSON_KEY_PROVIDER_METADATA_NUM_BUCKETS,
1)
),
Aggregates.sort(metadataSort) // sort metadata documents so result is sorted
));
return aggregateIterable.cursor();
}
Dispatcher (QueryProviderMetadataDispatcher) sends results from database cursor to client in response stream.
Concrete Dispatcher class names use the convention "XXXDispatcher", where "XXX" is the API method name. They are contained in the handler.mongo.dispatch package of the package for the service that defines that API method.
public void handleResult(MongoCursor<ProviderMetadataQueryResultDocument> cursor) {
// validate cursor
if (cursor == null) {
// send error response and close response stream if cursor is null
final String msg = "providerMetadata query returned null cursor";
QueryServiceImpl.sendQueryProviderMetadataResponseError(msg, this.responseObserver);
return;
} else if (!cursor.hasNext()) {
// send empty QueryStatus and close response stream if query matched no data
QueryServiceImpl.sendQueryProviderMetadataResponseEmpty(this.responseObserver);
return;
}
QueryProviderMetadataResponse.MetadataResult.Builder providerMetadataResultBuilder =
QueryProviderMetadataResponse.MetadataResult.newBuilder();
while (cursor.hasNext()) {
// add protobuf object for each document in cursor
final ProviderMetadataQueryResultDocument providerMetadataDocument = cursor.next();
final QueryProviderMetadataResponse.MetadataResult.ProviderMetadata.Builder providerMetadataBuilder =
QueryProviderMetadataResponse.MetadataResult.ProviderMetadata.newBuilder();
providerMetadataBuilder.setId(providerMetadataDocument.getId());
providerMetadataBuilder.addAllPvNames(providerMetadataDocument.getPvNames());
final Instant firstTimeInstant = providerMetadataDocument.getFirstBucketTimestamp().toInstant();
providerMetadataBuilder.setFirstBucketTime(
TimestampUtility.timestampFromSeconds(
firstTimeInstant.getEpochSecond(), firstTimeInstant.getNano()));
final Instant lastTimeInstant = providerMetadataDocument.getLastBucketTimestamp().toInstant();
providerMetadataBuilder.setLastBucketTime(
TimestampUtility.timestampFromSeconds(
lastTimeInstant.getEpochSecond(), lastTimeInstant.getNano()));
providerMetadataBuilder.setNumBuckets(providerMetadataDocument.getNumBuckets());
providerMetadataResultBuilder.addProviderMetadatas(providerMetadataBuilder.build());
}
// send response and close response stream
final QueryProviderMetadataResponse.MetadataResult metadataResult = providerMetadataResultBuilder.build();
QueryServiceImpl.sendQueryProviderMetadataResponse(metadataResult, this.responseObserver);
}
Most test coverage for API methods is contained in the test package com.ospreydcs.dp.service.integration. That package contains packages "annotation", "ingest", and "query" corresponding to the respective service name. The test coverage for the API methods for a particular service are contained in the integration test package for that service. E.g., QueryProviderMetadataTest is in the test package integration.query. Where possible, test class names begin with the name of the corresponding API method name.
@Test
public void testQueryProviderMetadata() {
// ingest some data
IngestionScenarioResult ingestionScenarioResult;
{
ingestionScenarioResult = simpleIngestionScenario();
}
// queryProviderMetadata() positive test for GCC_INGESTION_PROVIDER using result of simpleIngestionScenario.
{
final IngestionProviderInfo gccProviderInfo =
ingestionScenarioResult.providerInfoMap.get(GCC_INGESTION_PROVIDER);
sendAndVerifyQueryProviderMetadata(
gccProviderInfo.providerId,
gccProviderInfo,
false,
null);
}
}
The Data Platform services utilize MongoDB for persistence. The default database is called "dp". Regression tests use a database called "dp-test" whose contents are removed at the start of each test.
The diagram below shows the entity-relationship model for the Data Platform MongoDB schema.
Each MongoDB collection is described below, with details about the Data Platform API methods that utilize that collection.
The "providers" collection contains a ProviderDocument for each data provider registered with the Data Platform via the "registerProvider()" API method.
The Query Service's queryProviders() method matches documents from the providers collection against the criteria specified in the query.
The "buckets" collection contains bucketed time-series data for PVs and is populated by the Ingestion Service's ingestData*() methods. A BucketDocument is created for each DataColumn in the IngestDataRequest. Each BucketDocument contains a vector of PV measurements for a specified time range.
The domain of the Query Service methods is the "buckets" collection. A time-series data query specifies a list of PV names and time range and the result contains those buckets matching the query parameters. queryPvMetadata() returns results for the buckets matching the PV name(s) in the query. queryProviderMetadata() returns statistics about BucketDocuments created with a particular providerId.
BucketDocuments contain an embedded DataTimestampsDocument with details about the begin / end times, sample period, and number of samples for the bucket (or an explicit list of Timestamps). The protobuf DataTimestamps object is serialized to the "bytes" field of the DataTimestampsDocument.
BucketDocuments also contain an embedded DataColumnDocument that contains the vector of values for the bucket. The protobuf DataColumn object is serialized to the bytes field of the DataColumnDocument.
An optional embedded EventMetadataDocument is used to associate BucketDocuments with an experiment or event.
The "requestStatus" collection contains RequestStatusDocuments. The Ingestion Service manages the "requestStatus" collection, creating a new document for each "IngestDataRequest" that it receives with the disposition of that request.
The service performs validation on each request, and responds with either a rejection or acknowledgment. The request is then handled asynchronously, with no further reporting back to the client making the request. The request status document indicates whether the request succeeded or failed, and provides further information for failures.
Given the asynchronous nature of the Ingestion Service, it is anticipated that a monitoring tool is needed to detect problems handling ingestion requests. Such a tool could use the "queryRequestStatus()" API method to query over the "requestStatus" collection.
The "dataSets" collection contains documents whose Java type is "DataSetDocument". As mentioned above, the Annotation data model uses datasets to specify the relevant data for Annotations.
The Annotation Service manages the "dataSets" collection. The API method "createDataSet()" creates a new document for each successful request. This collection is the domain for the method "queryDataSets()", which returns the documents in the collection matching the query criteria.
Each DataSetDocument contains a list of embedded DataBlockDocuments, each of which specifies a list of PVs and time range for a region of interest in the archive.
The time range and PV names specified in a DataBlockDocument correspond to the domain of the "buckets" collection, and can therefore be used directly in the parameters for any of the Query Service time-series and metadata query methods.
The "annotations" collection contains documents whose Java type is "AnnotationDocument". The Annotation Service manages the "annotations" collection. The API method "createAnnotation()" creates a new document for each successful request. The handler for that method determines the appropriate concrete Java document class to create from the request parameters. This collection is the domain for the "queryAnnotations()" API method, which matches annotation documents against the search criteria and returns the matching documents.
An AnnotationDocument contains a list of associated unique ids for associated DataSetDocuments and other Annotations. One or more Data Sets is the target of an Annotation. The lists of associated Data Sets and Annotations is also used for tracking data provenance.
An AnnotationDocument optionally contains the unique id of a CalculationsDocument if user-defined Calculations were included with the Annotation.
The calculations collection contains CalculationsDocuments. There is a one-to-one relationship with the annotations collection. That is, a single Annotation can only have 1 associated CalculationsDocument and the CalculationsDocument is only associated with a single Annotation.
The CalculationsDocument is created when the corresponding AnnotationDocument is saved by the handling for the Annotation Service's createAnnotation() method. The content of the CalculationsDocument corresponding to an AnnotationDocument is included in the queryAnnotations() result for that Annotation.
Each CalculationsDocument contains a list of embedded CalculationsDataFrameDocuments. Each of these contains an embedded DataTimestampsDocument specifying the timestamps for the list of embedded DataColumnDocuments (these are the same embedded documents used in BucketDocuments as described above).
It might be helpful to think of an analogy between the CalculationsDocument and an Excel workbook. The CalculationsDocument is the workbook, and each CalculationsDataFrameDocument is a worksheet with a column of timestamps and a list of data columns containing values for each timestamp.
TimestampDocuments are not saved directly to a collection, but are embedded in several places where we want to save a protobuf Timestamp API object to a MongoDB document, containing fields for seconds and nanoseconds, converted to a Date field as a convenience.
EventMetadataDocuments are also only embedded within other documents, where an association with an event or experiment is desired for that document (e.g., BucketDocument and AnnotationDocument).
The links below provide details for running and configuring the Services and other applications. The dp-support repo provides a full set of utilities for managing the Data Platform ecosystem, including running the dp-service applications.
Notes for running the Data Platform server and client applications are linked here.
Options for configuring Data Platform services are desribed in more detail in the configuration documentation.
- overview of key classes
- gRPC server
- service request handling framework
- handling for bidirectional streaming API methods
- MongoDB interface
- serialization of protobuf objects to MongoDB documents
- using SerializedDataColumns for improved API performance
- data subscription framework
- data event monitoring framework
- export framework
- configuration framework
- performance benchmarking
- generating sample data
- regression testing
- integration testing
