Architectural role
The media worker is a stateless, CPU-bound service:
- Pulls optimize-request events from Kafka.
- Uses local disk as ephemeral scratch space.
- Reads/writes object storage.
- Publishes optimize-response events (success or structured failure).
It should never hold authoritative user state — Postgres remains source of truth on the API side.
Concurrency model
Two-level parallelism
Kafka ReadMessage loop (single goroutine typical)
│
└──► ProcessMediaEvent() ── spawns ──► processJobWithRetry() goroutine per message
│
└── acquire workerPool slot (buffered channel, cap = N)
└── processJob() // FFmpeg / libvips
└── release slot
| Mechanism | Purpose |
|---|---|
| Goroutine per event | Kafka handler returns fast; offset commit not blocked by 60s transcode |
chan struct{} semaphore | Cap simultaneous FFmpeg jobs ≈ CPU cores |
activeJobs map + mutex | Introspection for health/metrics |
Config knobs (illustrative)
| Env | Meaning | Tuning guide |
|---|---|---|
WORKER_POOL_SIZE | Max parallel transcodes | Start at CPU cores - 1 |
RETRY_COUNT | Per-job attempts after first failure | 2–3 for transient S3 blips |
TMP_DIR | Scratch path | Fast local SSD; size ≥ largest expected upload |
Backpressure: When pool saturated, new goroutines block on workerPool <- inside the retry loop — Kafka consumer may still read messages unless flow control is added upstream. Monitor consumer lag as the real backpressure signal.
Per-job lifecycle
job := { id, event, status=PENDING, retryCount=0 }
register activeJobs[job.id]
loop retryCount <= RETRY_COUNT:
acquire pool
err = processJob(job) // download → transform → upload
release pool
if err == nil: status=COMPLETED; break
retryCount++
sleep(retryCount * 1 second) // linear backoff
if still failed: status=FAILED
sendProcessingResult(job) // always
unregister activeJobs[job.id]
processJob internals
MkdirAll(/tmp/{jobId})DownloadFile(bucket, key)→ input pathdetectMediaType— image branch vs video branch- Upload N outputs with deterministic keys
defer RemoveAll(/tmp/{jobId})
Disk exhaustion: Large concurrent jobs × file size can fill /tmp. Mitigate: pool size cap, separate volume, job input size limits enforced at API gate.
Result publishing semantics
Always publish to response topic — even on failure:
{ "success": false, "error": "failed to download file from S3: ..." }
API consumer must handle success: false without throwing (avoid poison retry loops on API side).
If SendProcessingResult fails (broker down):
- Job marked complete locally.
- Object variants may exist in bucket without DB update — orphan variants until reconciliation.
Metric: kafka_publish_failures_total.
Retry vs Kafka redelivery
| Layer | Triggers | Scope |
|---|---|---|
| In-process retry | S3 timeout, FFmpeg flake | Same message, same consumer instance |
| Kafka redelivery | Process crash after partial upload | Another consumer may re-run full job |
Idempotency: Deterministic output keys ({userId}/videos/{mediaId}/mp4/...) make retries safe (overwrite). Non-deterministic UUID suffixes in keys would create garbage on retry.
Video-specific partial failure
Thumbnail generation failure is often logged as warning while MP4 upload proceeds:
if thumbnailErr != nil:
log.Warn(...)
// continue
Document this for support teams — "video plays but no poster" is a known degraded state, not always a failed Kafka message.
HTTP colocation in same binary
Typical routes in the same process:
| Route | Resource profile |
|---|---|
GET /health/live | Cheap |
GET /health/ready | Checks S3 + Kafka connectivity |
GET /metrics | Prometheus scrape |
POST /transcode | Sync; competes with pool unless separate limit |
POST /scan | IO + ClamAV CPU |
Isolation option: Split sync transcode to separate deployment so Kafka workers stay saturated without HTTP starvation.
Graceful shutdown sequence
On SIGTERM:
http.Server.Shutdown(30s)— stop new HTTP.- Cancel Kafka consumer context — stop reading new messages.
- Poll
activeJobs == 0with sleep loop — drain in-flight transcodes. - Exit.
Kubernetes: Ensure terminationGracePeriodSeconds > worst-case transcode duration + 30s HTTP drain, or in-flight work is killed mid-FFmpeg.
Observability (OpenTelemetry pattern)
Span hierarchy example:
media.process_event
└── media.process_job_with_retry
└── media.process_job
├── image.compress | media.process_video
└── kafka.send_processing_result
Attributes to rely on in dashboards: event.s3_key, media.type, job.retry_count, processing.success.
Failure mode matrix
| Symptom | Likely cause | Investigation |
|---|---|---|
| Lag grows linearly | POOL_SIZE too low vs ingress | Increase replicas or pool |
| Single partition stuck | Handler returns error, consumer exited | Restart pod; fix poison payload |
High success: false | Bad uploads passing API gate | Tighten probe rules |
| Disk pressure | Pool × video size | Smaller pool or bigger volume |
| DB never updates | Response topic mismatch | Config diff API vs worker |