Ingestion batch jobs with no structured output metrics are black boxes. You cannot tell whether a 30-minute job processed 10,000 records successfully or crashed after 100. NIST CSF 2.0 DE.AE-3 requires that event data be aggregated to support detection of anomalies. Without per-run throughput metrics — records processed, records quarantined, duration — you have no baseline against which to detect degradation, and no data to drive SLA decisions about ingestion pipeline capacity.
Low because the gap is a monitoring weakness rather than a data correctness issue, but the absence of throughput metrics makes it impossible to detect performance degradation or failure rate trends across ingestion runs.
Emit a structured summary log at the end of every ingestion batch run. Include at minimum: source_id, processed, failed, quarantined, and duration_ms.
// src/lib/ingestion/batch.ts
export async function runIngestionBatch(sourceId: string, records: unknown[]) {
const start = Date.now()
let processed = 0, failed = 0, quarantined = 0
for (const record of records) {
try {
await ingestContact(record)
processed++
} catch (err) {
await quarantine(record, String(err), sourceId)
quarantined++
failed++
}
}
logger.info('ingestion_batch_complete', {
source_id: sourceId,
processed,
failed,
quarantined,
duration_ms: Date.now() - start,
})
}
If you use a job queue (BullMQ, pg-boss), write these metrics to the job result payload as well so they are queryable from the queue dashboard.
ID: data-sourcing-provenance.ingestion-pipeline.ingestion-throughput-monitoring
Severity: low
What to look for: Count all structured metric fields emitted at the end of ingestion runs. Look for metrics or logging that tracks how many records are processed per ingestion run, and how long each run takes. This might be: structured log lines emitted at the end of each batch with records_processed, records_failed, and duration_ms, a metrics counter incremented per record, or a job execution record in a table.
Pass criteria: Ingestion jobs emit at least 3 structured metric fields at completion: records processed, records quarantined/failed, and job duration. This data is queryable or observable, enabling trend analysis.
Fail criteria: Ingestion jobs produce no structured output beyond ad-hoc log lines. It is not possible to determine throughput or failure rate trends without manually parsing logs.
Cross-reference: Check data-sourcing-provenance.source-management.source-level-metrics — throughput monitoring should include source_id as a dimension for per-source analysis.
Skip (N/A) when: The system has only form-based ingestion with real-time inserts (no batch jobs to monitor).
Detail on fail: "Ingestion job logs individual record errors but emits no summary metrics — throughput and failure rate cannot be trended".
Remediation: Add a summary log at the end of each ingestion run:
async function runIngestionBatch(sourceId: string) {
const start = Date.now()
let processed = 0, failed = 0, quarantined = 0
for (const record of records) {
try {
await ingestContact(record)
processed++
} catch {
await quarantine(record, 'insert_error', sourceId)
quarantined++
failed++
}
}
logger.info('ingestion_batch_complete', {
source_id: sourceId,
processed,
failed,
quarantined,
duration_ms: Date.now() - start,
})
}