An ingestion pipeline with no backpressure mechanism is a cost bomb waiting for a large import. A bulk upload of 500,000 contacts from a newly purchased list can exhaust database connection pools, max out queue worker memory, and cause cascading failures in unrelated services sharing the same infrastructure. CWE-400 (Uncontrolled Resource Consumption) applies when there is no ceiling on how fast upstream producers can push work into the pipeline. ISO 25010:2011 §4.2.6 (resource utilization) requires that systems manage resource consumption under load. The fix cost is low; the blast radius of omitting it is high.
Low because the risk materializes only during atypical high-volume imports, but when it does, an unbound queue can exhaust memory and crash the ingestion pipeline and adjacent services.
Configure a concurrency ceiling on queue workers and add a queue-depth check in the producer before enqueuing large batches.
// BullMQ worker — cap concurrent jobs
const worker = new Worker('contact-ingestion', processJob, {
connection,
concurrency: 5, // max 5 jobs in flight simultaneously
})
// Producer — check queue depth before bulk enqueue
const queue = new Queue('contact-ingestion', { connection })
export async function enqueueIfNotSaturated(job: ContactIngestionJob) {
const waiting = await queue.getWaitingCount()
if (waiting > 1000) {
throw new Error(
`Ingestion queue depth is ${waiting} — pipeline is saturated. Retry after current backlog clears.`
)
}
await queue.add('ingest', job)
}
For synchronous ingestion endpoints (no queue), add a rate limit middleware that returns HTTP 429 when request volume exceeds the database's sustainable write throughput.
ID: data-sourcing-provenance.ingestion-pipeline.backpressure-on-saturation
Severity: low
What to look for: Count all backpressure mechanisms in the ingestion pipeline. Look at how the pipeline handles load spikes. If the system uses a queue (Bull, BullMQ, pg-boss, Inngest), check whether: the queue has a max concurrency setting of no more than 20, workers have a maximum job rate configured, and upstream producers check queue depth before enqueuing more work. If there is no queue (direct synchronous ingestion), check for circuit breaker patterns or request rate limiting on the ingestion endpoint.
Pass criteria: At least 1 backpressure mechanism exists: a queue with bounded concurrency, a max queue depth, a backpressure signal from the queue to the producer, or an ingestion endpoint that returns 429/503 when the pipeline is overloaded.
Fail criteria: The ingestion pipeline has no backpressure mechanism. Upstream producers can enqueue unlimited work, leading to queue depth growth, memory exhaustion, or cascading failures during spikes.
Skip (N/A) when: The system processes only form submissions one at a time with no batch ingestion.
Detail on fail: "BullMQ queue has no max concurrency or queue depth limit — a large import could saturate the pipeline" or "No queue used — synchronous ingestion endpoint has no rate limiting or overload protection".
Remediation: Configure concurrency limits and optionally check queue depth before enqueuing:
// BullMQ worker with concurrency limit
const worker = new Worker('contact-ingestion', processJob, {
connection,
concurrency: 5, // max 5 concurrent jobs
})
// Producer checks queue depth before adding
const queue = new Queue('contact-ingestion', { connection })
async function enqueueIfNotSaturated(job: ContactIngestionJob) {
const waiting = await queue.getWaitingCount()
if (waiting > 1000) {
throw new Error('Ingestion queue is saturated — try again later')
}
await queue.add('ingest', job)
}