Topic naming (pattern, not prescription)
Deployments typically use two primary topics plus one derived:
| Role | Placeholder | Producer | Consumer |
|---|---|---|---|
| Request | {app}.media.optimize.request | API service | Media worker |
| Response | {app}.media.optimize.response | Media worker | API service |
| Virus alert | {app}.media.optimize.response.virus | Media worker (scan path) | API service |
Configuration drift warning: Docker Compose samples often ship defaults like media.raw / media.optimized while production Spring/YAML uses event-media-optimization style names. The wiring must match in both binaries' env — mismatches produce silent "upload works, never optimizes" incidents.
Request event (API → worker)
JSON schema (conceptual)
{
"type": "object",
"required": ["s3Key", "s3Bucket", "mediaId", "mediaUrl"],
"properties": {
"s3Key": {
"type": "string",
"description": "Object key, often {userId}/{fileName}"
},
"s3Bucket": { "type": "string" },
"mediaId": { "type": "string", "format": "uuid" },
"mediaUrl": {
"type": "string",
"format": "uri",
"description": "API indirection URL stored in DB at enqueue time"
}
}
}
Field semantics
| Field | Purpose |
|---|---|
s3Key | Worker download location; must match completed multipart key |
s3Bucket | Allows multi-bucket staging (dev/prod) or migration |
mediaId | Correlation id for DB UPDATE on response |
mediaUrl | Echoed as originalUrl in response; audit trail if variants fail |
Producer settings (typical)
- Key: optional
mediaIdstring for partition stickiness (same user's jobs co-locate — tradeoff: hot users). - Value: JSON UTF-8.
- Ack:
allon worker producer; API producer should match durability needs.
Strict parser behavior (Go worker example class)
Missing field → log error, return handler error. Implication: poison messages can stall consumption if the client library aborts the loop on first error — prefer DLQ or skip metrics after N failures.
Response event (worker → API)
JSON schema (conceptual)
{
"type": "object",
"required": ["mediaId", "originalUrl", "success"],
"properties": {
"mediaId": { "type": "string", "format": "uuid" },
"originalUrl": { "type": "string", "format": "uri" },
"success": { "type": "boolean" },
"error": { "type": "string" },
"processed": {
"type": "array",
"items": {
"type": "object",
"required": ["quality", "format", "url", "size"],
"properties": {
"quality": {
"type": "string",
"enum": ["high", "medium", "low", "original", "thumbnail"]
},
"format": {
"type": "string",
"enum": ["webp", "mp4", "jpg", "m3u8"]
},
"url": { "type": "string", "format": "uri" },
"size": { "type": "integer", "minimum": 0 }
}
}
}
}
}
Example — image success
{
"mediaId": "550e8400-e29b-41d4-a716-446655440000",
"originalUrl": "https://api.example/v1/media/user%2Fphoto.jpg",
"success": true,
"processed": [
{
"quality": "high",
"format": "webp",
"url": "https://cdn.example/u1/images/high/photo_high.webp",
"size": 245000
},
{
"quality": "medium",
"format": "webp",
"url": "https://cdn.example/u1/images/medium/photo_medium.webp",
"size": 120000
},
{
"quality": "low",
"format": "webp",
"url": "https://cdn.example/u1/images/low/photo_low.webp",
"size": 45000
}
]
}
Example — video success
{
"mediaId": "550e8400-e29b-41d4-a716-446655440000",
"originalUrl": "https://api.example/v1/media/user%2Fclip.mov",
"success": true,
"processed": [
{
"quality": "original",
"format": "mp4",
"url": "https://cdn.example/u1/videos/{mediaId}/mp4/clip.mp4",
"size": 8200000
},
{
"quality": "thumbnail",
"format": "jpg",
"url": "https://cdn.example/u1/thumbnail/{mediaId}/poster.jpg",
"size": 85000
}
]
}
Example — failure
{
"mediaId": "550e8400-e29b-41d4-a716-446655440000",
"originalUrl": "https://api.example/v1/media/user%2Fcorrupt.mp4",
"success": false,
"error": "failed to detect media type"
}
API consumer mapping rules
Pseudocode for the transactional listener:
on message(response):
if not response.success:
log.warn(mediaId)
return
row = repo.findById(response.mediaId)
if row missing: return
if row.type == IMAGE:
row.mediaUrl = first(processed, quality == "medium").url
row.lowUrl = first(processed, quality == "low").url
row.mediumUrl = ...
row.highUrl = ...
if row.type == VIDEO:
row.mediaUrl = first(processed, quality == "original").url
row.thumbnailUrl = first(processed, quality == "thumbnail" && format == "jpg").url
repo.save(row)
Ordering: Responses for the same mediaId arriving out of order are rare but possible under retries; last-write-wins is usually acceptable for profile media.
Virus event (derived topic)
{
"mediaId": "uuid",
"s3Key": "string",
"s3Bucket": "string",
"mediaUrl": "string",
"virusName": "string",
"detectedAt": "ISO-8601",
"actionTaken": "deleted_from_object_store"
}
API listener responsibilities:
- Load row by
mediaId. - Extract keys from all URL columns (original + derivatives).
DeleteObjecteach distinct key.DELETErow (hard delete) or soft-delete per product policy.
Delivery and ordering guarantees
| Guarantee | Reality in typical stack |
|---|---|
| Exactly-once end-to-end | No — without transactional outbox + idempotent consumer |
| At-least-once | Yes — Kafka + consumer retries |
| Per-mediaId ordering | Only if keyed partition + single consumer per partition |
| Cross-media ordering | Not required |
Consumer offset commit
Some Go kafka-go readers commit on interval (e.g. 30s) while processing synchronously in the handler goroutine. Understand commit-after-process vs process-after-commit in your library to reason about duplicate delivery on crash.
JVM consumer
Spring @KafkaListener with ackMode=RECORD or BATCH — document your choice in the private runbook; this reference note only flags it as a review item.
Schema evolution strategy
| Change | Safe approach |
|---|---|
| Add optional field | Old consumers ignore; new producers send |
| Rename field | New topic version *.v2 or dual-write period |
Change quality enum | Coordinate API mapping + mobile rendering |
| Switch video HLS → MP4 | Mobile must ship before worker flips |