Inbox Protocol
Domain: Communications (Shared Core Domain) Trigger: Inbound messages (CPaaS webhooks), agent actions (Hasura Actions) Output: Real-time omnichannel inbox via Hasura subscriptions Sources: communications.mdL3 DoD: ✅ Schema | ✅ API | ✅ Edge States — Ready to Code
§1 Overview
The omnichannel inbox displays all customer conversations across WhatsApp, Email, and SMS in a unified dispatcher UI. Real-time updates push via Hasura GraphQL subscriptions (WebSocket). Agents claim conversations manually (Phase 1).
Architecture: CPaaS webhook (inbound) → NestJS InboundMessageHandler → Hasura INSERT → subscription push → Vue inbox UI. Agent replies use sendAgentMessage Hasura Action → NestJS → BullMQ → CPaaS (rejoins the notification pipeline at the DispatchMessageJob level).
§2 Hasura Subscription Contracts
Conversation List (Inbox Sidebar)
subscription InboxConversations($tenant_id: uuid!, $status_filter: [String!], $assigned_filter: uuid) {
conversations(
where: {
tenant_id: { _eq: $tenant_id }
status: { _in: $status_filter }
_or: [
{ assigned_to: { _eq: $assigned_filter } }
{ assigned_to: { _is_null: true } }
]
}
order_by: { last_message_at: desc }
limit: 50
) {
id status assigned_to last_message_at snoozed_until
contact { id name identifiers passenger_profile_id }
messages(order_by: { sent_at: desc }, limit: 1) {
rendered_content direction status sent_at
}
unread_count
booking_id trip_id invoice_id
}
}Filter tabs: The frontend switches between distinct subscription instances per tab:
- "My Conversations":
assigned_to = $current_user_id, status =OPEN - "Unassigned":
assigned_to IS NULL, status =OPEN - "All Open": No
assigned_tofilter, status =OPEN - "Resolved": status =
RESOLVED(standard query with pagination, no subscription)
Message Thread (Conversation Detail)
subscription ConversationMessages($conversation_id: uuid!, $tenant_id: uuid!) {
messages(
where: { conversation_id: { _eq: $conversation_id }, tenant_id: { _eq: $tenant_id } }
order_by: { sent_at: asc }
limit: 100
) {
id direction content_type rendered_content status
sent_at delivered_at read_at failed_reason
channel_account { channel_type }
template_id
}
}Global Unread Badge
subscription UnreadConversationCount($tenant_id: uuid!, $user_id: uuid!) {
conversations_aggregate(
where: {
tenant_id: { _eq: $tenant_id }
status: { _eq: "OPEN" }
_or: [
{ assigned_to: { _eq: $user_id } }
{ assigned_to: { _is_null: true } }
]
has_unread_messages: { _eq: true }
}
) { aggregate { count } }
}The
has_unread_messagescomputed field must exist because Hasura'swhereclause cannot compare two columns. See §6 for implementation.
§3 Message Thread Load Pattern
1. User clicks conversation → query last 50 messages (descending by sent_at)
2. Render messages, scroll to bottom
3. Activate subscription (ascending by sent_at) for live updates
4. New message → append + auto-scroll if at bottom + update read cursor
5. Scroll up → paginated query with cursor (sent_at < :oldest_loaded)Preview data (sidebar): Contact name, last message snippet (80 chars), timestamp, channel icon, unread badge, context chip (Booking/Trip/Invoice).
§4 sendAgentMessage Hasura Action
interface SendAgentMessageInput {
conversation_id: string;
channel: 'WHATSAPP' | 'EMAIL' | 'SMS';
content: string;
content_type: 'TEXT' | 'MEDIA';
media_url?: string;
}
interface SendAgentMessageOutput {
message_id: string; // Created Message (status = QUEUED)
}
interface SendAgentMessageError {
error_code: 'CHANNEL_SUSPENDED' | 'WHATSAPP_SESSION_EXPIRED' | 'CONTACT_NOT_FOUND' | 'INVALID_CONVERSATION';
error_message: string;
}Handler:
- Validate that
conversation_idbelongs to the caller'stenant_id. - Resolve the ChannelAccount for the specified
channel. Guard: Ifstatusis'SUSPENDED'or'REVOKED', return errorCHANNEL_SUSPENDED. - WhatsApp 24-Hour Session Window Check (if channel = WHATSAPP):
- Query
messages WHERE conversation_id = :id AND direction = 'INBOUND' ORDER BY sent_at DESC LIMIT 1. - If the last inbound message's
sent_at < now() - 24 hours, the conversation is outside the 24h session window. - Return error
WHATSAPP_SESSION_EXPIRED. The client must either: (a) use a template message (not agent-authored text), or (b) display a warning to the dispatcher explaining the limitation. - Note: This check does NOT apply to Email or SMS channels.
- Query
- Resolve the Contact and extract the recipient phone/email from
identifiers. Guard: If the contact is missing, return errorCONTACT_NOT_FOUND. - INSERT
messageswithdirection = OUTBOUND,status = QUEUED,template_id = NULL. Thetrg_messages_update_last_message_attrigger updatesconversations.last_message_atautomatically. - Enqueue
DispatchMessageJobon themessage-dispatchBullMQ queue (see notification-pipeline-protocol.md §4 for job shape). - Record
change_event(scope:GENERAL).
Optimistic UI: The Vue frontend renders the sent message immediately with a "sending" indicator. The subscription delivers the confirmed row within milliseconds.
§5 Agent Routing
Phase 1: Manual Claim Only. New conversations enter the "Unassigned" queue (assigned_to = NULL). Any dispatcher can claim them.
claimConversation Hasura Action
interface ClaimConversationInput {
conversation_id: string;
}
interface ClaimConversationOutput {
conversation_id: string;
assigned_to: string; // UUID of the claiming (or already-claiming) user
status: 'CLAIMED' | 'ALREADY_CLAIMED';
}Handler:
- Execute
SELECT * FROM conversations WHERE id = :conversation_id FOR UPDATE(row-level lock, prevents concurrent claims). - Guard: If
assigned_tois not null, return{ conversation_id, assigned_to: current_assignee, status: 'ALREADY_CLAIMED' }. - Update
conversationssettingassigned_to = x-hasura-user-idwhereid = :conversation_id. - Record
change_event(scope:GENERAL, entity_type:conversation, action:UPDATE). - Return
{ conversation_id, assigned_to: x-hasura-user-id, status: 'CLAIMED' }.
Concurrent claims: The second dispatcher's transaction blocks on the
SELECT ... FOR UPDATElock until the first commits. After the lock releases, the second dispatcher reads the non-nullassigned_toand returnsALREADY_CLAIMED. This prevents the race condition where two dispatchers simultaneously claim the same conversation.
reassignConversation Hasura Action
interface ReassignConversationInput {
conversation_id: string;
new_assignee_id: string;
}
interface ReassignConversationOutput {
conversation_id: string;
assigned_to: string; // The new assignee's UUID
status: 'REASSIGNED';
}Handler:
- Guard: Enforce that the caller has the
MANAGERrole via Hasura permission rules. Return errorINSUFFICIENT_PERMISSIONSif not. - Validate that
new_assignee_idis a valid user withDISPATCHERorMANAGERrole in the same tenant. Queryauth.users WHERE id = :new_assignee_id AND tenant_id = :tenant_id AND role IN ('DISPATCHER', 'MANAGER'). Return errorINVALID_ASSIGNEEif no match. - Update
conversationssettingassigned_to = :new_assignee_idwhereid = :conversation_id. - Record
change_event(scope:GENERAL, entity_type:conversation, action:UPDATE). - Return
{ conversation_id, assigned_to: new_assignee_id, status: 'REASSIGNED' }.
§6 Unread Tracking
Read Cursor Mutation & Debouncing Strategy
mutation MarkConversationRead($conversation_id: uuid!, $now: timestamptz!) {
insert_conversation_read_cursors_one(
object: { conversation_id: $conversation_id, last_read_at: $now }
on_conflict: { constraint: conversation_read_cursors_pkey, update_columns: [last_read_at] }
) { conversation_id last_read_at }
}Hasura injects
user_idvia column preset (x-hasura-user-id) — the frontend never sends it.
Debouncing Strategy (Client-Side):
The frontend must debounce MarkConversationRead calls to prevent thrashing the database with high-frequency UPSERTs:
- Throttle on scroll: When the user scrolls to a new message in the thread, debounce the mutation call with a 1-second delay. Multiple scroll events within 1 second collapse into a single UPSERT.
- Fire on blur: When the conversation detail panel loses focus (user switches to another conversation tab), immediately fire the mutation with the latest
now()timestamp. - Server-side dedup (optional): If the same
(conversation_id, user_id)pair receives identicallast_read_atvalues within 5 seconds, the server may skip the UPSERT. This is a performance optimization — the semantics remain unchanged.
This prevents the mutation from firing more than once per second per conversation, acceptable for SMB scale (1–5 dispatchers).
conversation_unread_count Computed Field
CREATE OR REPLACE FUNCTION communications.conversation_unread_count(
conv communications.conversations,
hasura_session JSON
) RETURNS INTEGER AS $$
SELECT COUNT(*)::INTEGER
FROM communications.messages m
WHERE m.conversation_id = conv.id
AND m.direction = 'INBOUND'
AND m.sent_at > COALESCE(
(SELECT last_read_at FROM communications.conversation_read_cursors
WHERE conversation_id = conv.id
AND user_id = (hasura_session ->> 'x-hasura-user-id')::UUID),
'1970-01-01'::TIMESTAMPTZ
);
$$ LANGUAGE SQL STABLE;IMPORTANT
Only INBOUND messages count as "unread." Outbound messages do not increment the counter.
WARNING
Performance: 50 correlated subqueries per subscription poll. Acceptable at SMB scale (<500 active conversations, <5 dispatchers). If performance degrades: (a) introduce a materialized unread_count column on conversations maintained by triggers, or (b) create a partial index (conversation_id) WHERE direction = 'INBOUND' on messages to accelerate the subquery.
NOTE
Stale Read Cursor Handling: If a frontend's local clock drifts ahead of the server (e.g., browser clock +5 min), the MarkConversationRead mutation sends a future timestamp. The conversation_unread_count function will treat all messages with sent_at <= future timestamp as read, potentially hiding unread messages. To mitigate: (a) the frontend should use server_time from the last API response instead of client_time, or (b) the server should cap last_read_at at the current now() in the mutation handler. Phase 1 implementation should use approach (b) for simplicity.
has_unread_messages Computed Field
The Global Unread Badge subscription (§2) filters on a boolean computed field because Hasura's where clause cannot compare two columns directly.
CREATE OR REPLACE FUNCTION communications.conversation_has_unread_messages(
conv communications.conversations,
hasura_session JSON
) RETURNS BOOLEAN AS $$
SELECT EXISTS (
SELECT 1 FROM communications.messages m
WHERE m.conversation_id = conv.id
AND m.direction = 'INBOUND'
AND m.sent_at > COALESCE(
(SELECT last_read_at FROM communications.conversation_read_cursors
WHERE conversation_id = conv.id
AND user_id = (hasura_session ->> 'x-hasura-user-id')::UUID),
'1970-01-01'::TIMESTAMPTZ
)
);
$$ LANGUAGE SQL STABLE;§7 Inbound Message Ingestion Pipeline
CPaaS Webhook → NestJS InboundMessageHandler → Hasura INSERT → Subscription Push → Inbox UISteps:
- Receive webhook (Meta:
POST /api/webhooks/meta/{tenant_id}; SES inbound:POST /api/webhooks/ses; SNS SMS:POST /api/webhooks/sns-sms). - Validate provider signature (see §7a).
- Extract: sender identifier, content, content type, timestamp, provider message ID, channel type.
- Resolve ChannelAccount: Query
channel_accounts WHERE tenant_id = :tenant_id AND channel_type = :channel_type. Guard: Ifstatusis'SUSPENDED'or'REVOKED', log a warning and still process the inbound message (the contact and conversation are created, but no outbound reply can be sent until the channel is re-activated). This prevents losing customer messages due to channel suspension. - Resolve or create Contact (see §8).
- Resolve or create Conversation (see §8).
- INSERT
messageswithdirection = INBOUND,status = DELIVERED(already arrived),channel_account_idresolved from step 4,external_message_id= provider's message ID (for idempotency). - UPDATE
conversations SET last_message_at = now(), status = 'OPEN'(reopen if RESOLVED/SNOOZED). - Hasura subscription fires → inbox UI updates.
Inbound messages skip the BullMQ dispatch pipeline entirely — direct webhook-to-database path.
NOTE
Per-tenant webhook URLs: All Meta webhooks (inbound messages + delivery status) use per-tenant URLs (/api/webhooks/meta/{tenant_id}) registered via WABA-level override during channel provisioning (see channel-provisioning-protocol.md §7). SES/SNS webhooks are account-level — see notification-pipeline-protocol.md §8.
Provider Webhook Payload Differences:
| Provider | Message Type | Payload Fields | Content Format |
|---|---|---|---|
| Meta WhatsApp | Inbound text | messages[].from, messages[].text, messages[].timestamp, messages[].id | Plain text or template response |
| Meta WhatsApp | Inbound media | messages[].from, messages[].image.link, messages[].id, messages[].timestamp | HTTPS URL (temporary, expires ~24h) |
| SES Email | Inbound email | SNS Message JSON: source, destination, messageId, mail.timestamp | mail.commonHeaders.subject, mail.commonHeaders.from, email body in S3 reference |
| SNS SMS | Delivery receipt | SNS Message: MessageId, TopicArn | MessageStatus enum (SUCCESS/FAILURE/DELIVERED/UNDELIVERED) |
| SNS SMS | Subscription Confirm | SNS Message: Type = SubscriptionConfirmation, SubscribeURL | Special message — requires callback to confirm subscription before SMS delivery starts |
§7a Inbound Signature Validation
Meta: Same as delivery callbacks — HMAC-SHA256 with X-Hub-Signature-256 header.
SES/SNS: SNS SubscriptionConfirmation messages require special handling:
if (message.Type === 'SubscriptionConfirmation') {
// Verify certificate + signature
// Call message.SubscribeURL via HTTP GET to confirm subscription
// Do NOT process as a normal message
await confirmSubscription(message.SubscribeURL);
return { statusCode: 200 };
}§7b Idempotency & Concurrent Delivery
Dedup key: For inbound messages, use external_message_id (provider's message ID) as the idempotency key, identical to outbound messages.
// Check for duplicate inbound message
const existingMessage = await messages.findOne({
where: { external_message_id: webhook.messageId }
});
if (existingMessage) {
return { success: true }; // Already processed
}Race between concurrent inbound webhooks: If the same inbound message arrives twice (provider retry), the second webhook finds existingMessage and returns immediately. The system does not fire the Hasura subscription twice.
§8 Contact & Conversation Resolution
Contact Resolution Algorithm
Exact Query:
SELECT id FROM communications.contacts
WHERE tenant_id = :tenant_id
AND identifiers @> :'[{"type": "' || :identifier_type || '", "value": "' || :identifier_value || '"}]'::jsonb
LIMIT 1;Where:
:identifier_type='phone'(for WhatsApp/SMS) or'email'(for SES inbound email):identifier_value= E.164 phone (+1234567890) or email address
Steps:
- Extract sender identifier from webhook (phone number or email).
- Run query above. If found, return existing Contact ID.
- If not found, INSERT new Contact:sqlWhere
INSERT INTO communications.contacts (tenant_id, name, identifiers, created_at) VALUES (:tenant_id, :placeholder_name, :identifiers_array, now()) RETURNING id;placeholder_name= the sender's identifier (phone/email) or a generic "Unknown Contact" label. Whereidentifiers_array=[{"type": "phone", "value": "+1234567890"}]or[{"type": "email", "value": "customer@example.com"}]
Conversation Resolution Algorithm
Query Priority: Always prefer the most recently active OPEN or SNOOZED conversation for the Contact.
SELECT id FROM communications.conversations
WHERE tenant_id = :tenant_id
AND contact_id = :contact_id
AND status IN ('OPEN', 'SNOOZED')
ORDER BY last_message_at DESC
LIMIT 1;Steps:
- Resolve Contact (above).
- Run query above. If found, reuse conversation and transition to OPEN if SNOOZED.
- If not found, INSERT new Conversation:sql
INSERT INTO communications.conversations (tenant_id, contact_id, status, assigned_to, last_message_at, created_at) VALUES (:tenant_id, :contact_id, 'OPEN', NULL, now(), now()) RETURNING id;
Important: Resolution does NOT filter by
channel_type— a Contact with both phone and email shares one Conversation regardless of channel. The dispatcher sees a unified thread per contact. If a customer messages via WhatsApp, then later replies via email, both messages appear in the same conversation thread (ordered bysent_at).
Concurrent Resolution Safety
If two inbound webhooks arrive simultaneously for the same Contact (e.g., Email + WhatsApp from same customer at the same time):
- Both queries run in parallel.
- Contact INSERT may experience a unique constraint race (if both try to create simultaneously).
- Conversation INSERT may experience the same race.
- Guard: Use
INSERT ... ON CONFLICT (tenant_id, contact_id, status) DO UPDATEfor Conversation to handle concurrent inserts safely. PostgreSQL provides atomic conflict resolution.
§9 Conversation Lifecycle State Machine
| From | To | Actor | Side Effects |
|---|---|---|---|
| — | OPEN | System (inbound message or automated creation) | last_message_at = now() |
OPEN | RESOLVED | Dispatcher (resolveConversation) | Clears assigned_to = NULL. Records change_event. |
OPEN | SNOOZED | Dispatcher (snoozeConversation) | Sets snoozed_until = now() + duration. Records change_event. |
RESOLVED | OPEN | System (new inbound message) | Resets assigned_to = NULL (returns to unassigned). |
SNOOZED | OPEN | System (cron: snoozed_until <= now()) or inbound message | Clears snoozed_until = NULL. |
resolveConversation Hasura Action
interface ResolveConversationInput {
conversation_id: string;
}
interface ResolveConversationOutput {
conversation_id: string;
status: 'RESOLVED' | 'ALREADY_RESOLVED';
}Handler:
- Guard:
status = 'OPEN'. If alreadyRESOLVED→ return{ conversation_id, status: 'ALREADY_RESOLVED' }. UPDATE conversations SET status = 'RESOLVED', assigned_to = NULL WHERE id = :conversation_id.- Record
change_event(scope:GENERAL, entity_type:conversation, action:UPDATE). - Return
{ conversation_id, status: 'RESOLVED' }.
Rationale for clearing assigned_to: Marking a conversation RESOLVED signals it requires no further dispatcher action. Clearing the assignment allows the conversation to be reassigned or reopened without an explicit "unassign" step. When a new inbound message arrives on a RESOLVED conversation, the system reopens it (status = OPEN) and sets assigned_to = NULL, returning it to the unassigned queue for any dispatcher to claim. This ensures no conversation gets "stuck" on a dispatcher who is unavailable.
snoozeConversation Hasura Action
interface SnoozeConversationInput {
conversation_id: string;
/** Snooze duration preset: '1h', '3h', '24h', 'tomorrow_9am', 'next_monday_9am' */
snooze_preset: string;
}
interface SnoozeConversationOutput {
conversation_id: string;
snoozed_until: string; // ISO 8601 timestamp
status: 'SNOOZED';
}Handler:
- Guard:
status = 'OPEN'. - Resolve
snoozed_untilfromsnooze_preset:'1h'→now() + interval '1 hour''3h'→now() + interval '3 hours''24h'→now() + interval '24 hours''tomorrow_9am'→ Next 09:00 in the tenant's timezone (querybackoffice.operators.timezone_offsetfor the tenant, or default toEurope/Berlinfor DACH region)'next_monday_9am'→ Next Monday at 09:00 in the tenant's timezone
UPDATE conversations SET status = 'SNOOZED', snoozed_until = :resolved_time WHERE id = :conversation_id.- Record
change_event(scope:GENERAL, entity_type:conversation, action:UPDATE). - Return
{ conversation_id, snoozed_until, status: 'SNOOZED' }.
Timezone source: The system reads the tenant's timezone from backoffice.operators.timezone_offset (or equivalent). If not set, it defaults to Europe/Berlin (DACH region default).
Snooze Wakeup Cron Trigger
Name: conversation_snooze_wakeupSchedule: Every 60 seconds Handler:
UPDATE communications.conversations
SET status = 'OPEN', snoozed_until = NULL
WHERE status = 'SNOOZED' AND snoozed_until <= now()
AND tenant_id IN (SELECT id FROM backoffice.operators WHERE status = 'ACTIVE');Concurrent Race Condition: If the cron fires at the same instant an inbound message arrives, both may attempt to update the same conversation's status. Postgres guarantees transactional isolation — one will execute first, updating status to OPEN + clearing snoozed_until. The second transaction will read the updated status and take no action (guard: status = 'SNOOZED' will fail). The inbound message handler also executes UPDATE conversations SET status = 'OPEN' where status IN ('SNOOZED', 'RESOLVED'), so both paths converge on the same final state.
snoozeConversation presets: 1h, 3h, 24h, tomorrow_9am, next_monday_9am.
§10 Push Notifications (Phase 1 Decision)
Phase 1 Status: NOT IMPLEMENTED
In Phase 1, agents receive real-time updates only via Hasura GraphQL subscriptions — no push notifications outside the browser tab. This simplifies the MVP:
- No service worker registration required
- No
POST /api/notificationsendpoint - No browser Notification API (
Notification.requestPermission()) - Agents must keep the Busflow tab open in their browser to see live inbox updates
Phase 2 Candidate: Browser push notifications (via the Notification API) for the following events:
- New conversation assigned to the agent (via
claimConversationorreassignConversation) - New inbound message in an assigned conversation while the Busflow tab is not focused
- Stale unread count (message sent but agent out of focus for >30 seconds)
Not in scope Phase 1: Mobile push notifications, SMS/email alerts to agents, or integration with native notification systems (macOS Notification Center, Windows Toast, etc.).
§11 Cross-Context Sidebar
The conversation detail panel resolves cross-schema data via Hasura manual object relationships (no DB-level FK constraints — all schemas share one Postgres instance).
| Conversation Column | Target | Relationship | Exposed Fields |
|---|---|---|---|
booking_id | commerce.bookings | booking | status, total_amount, departure_date, tour_name, passenger count |
trip_id | operations.service_legs | service_leg | status, leg_type, actual_start, scheduled_departure, vehicle info |
invoice_id | commerce.invoices | invoice | status, total_amount, issued_at, due_date |
IMPORTANT
These are read-side coupling only (permitted per domain-driven-design.md §7.2). The inbox never mutates cross-context entities. Hasura permission rules enforce tenant_id filtering on target tables.