Context
HDIM's data source connectors (EHR connector, CDR processor, CMS connector) and evaluation engine (CQL engine, care gap service) were implemented as independent services. Data could be pulled from external systems but had no automated path to FHIR persistence or downstream quality measure evaluation. The result: manual orchestration was required to move data from source to care gap detection.
Specifically:
The gap: zero automated data flow from source systems to care gap detection.
Decision
Implement an event-driven pipeline using Kafka topics to chain data flow end-to-end:
Data Source → FHIR Bundle POST → fhir-service persistence → Kafka event → CQL auto-evaluation → care gap detection
Key Design Choices
@ConditionalOnProperty with matchIfMissing = false. Operators enable pipeline components per-deployment: - cql.auto-evaluation.enabled — CQL triggers on FHIR data events
- cql.lazy-fetch.enabled — Redis stale-check with on-demand EHR sync
- ehr.sync.consumer.enabled — EHR sync request consumer
POST /Bundle endpoint accepts FHIR transaction/batch bundles. Routes entries to 19 existing resource services. No new persistence logic — pure orchestration.POST /$import → 202 + Content-Location → GET /$import-poll-status/{jobId}.ehr.sync.requested Kafka event rather than blocking on synchronous EHR pull. Trade-off: slightly stale data vs. guaranteed low-latency CQL evaluation.Alternatives Considered
Alternative 1: Synchronous REST chaining
Each service calls the next via REST: EHR → FHIR service → CQL engine → care gap service.
Rejected because:
Alternative 2: Central orchestrator service
A new pipeline-orchestrator service coordinates all steps via a state machine.
Rejected because:
Alternative 3: Database polling
Each service polls a shared pipeline_events table for new work.
Rejected because:
Consequences
Positive
Negative
Implementation
Components Delivered
| Component | Service | Purpose |
|-----------|---------|---------|
| BundleController | fhir-service | POST /Bundle transaction/batch endpoint |
| BundleTransactionService | fhir-service | Routes entries to 19 resource services |
| BulkImportController | fhir-service | POST /$import async NDJSON endpoint |
| BulkImportService | fhir-service | Streaming NDJSON processing with batching |
| EhrFhirPersistenceService | ehr-connector-service | WebClient to fhir-service Bundle endpoint |
| EhrConnectionConfigEntity | ehr-connector-service | JPA entity for connection persistence |
| CdrFhirPersistenceService | cdr-processor-service | WebClient to fhir-service |
| CmsBulkIngestService | cms-connector-service | Batch NDJSON ingest via fhir-service |
| FhirDataAvailableConsumer | cql-engine-service | 5 Kafka listeners triggering auto-evaluation |
| CqlDataProviderService | cql-engine-service | Redis stale-check + lazy EHR fetch |
Kafka Topics (Pipeline)
| Topic | Producer | Consumer |
|-------|----------|----------|
| ehr.sync.fhir-persisted | ehr-connector-service | cql-engine-service |
| cdr.hl7v2.fhir-persisted | cdr-processor-service | cql-engine-service |
| cdr.cda.fhir-persisted | cdr-processor-service | cql-engine-service |
| fhir.bulk-import.completed | fhir-service | cql-engine-service |
| cms.bcda.ingest-completed | cms-connector-service | cql-engine-service |
| ehr.sync.requested | cql-engine-service | ehr-connector-service |
| cql.pipeline.auto-evaluation-completed | cql-engine-service | care-gap-service |
Related Decisions
References
*ADR-013 | Version 1.0 | Last Updated: 2026-03-02*