Busflow Docs

Internal documentation portal

Skip to content

Inbox Protocol

Domain: Communications (Shared Core Domain) Trigger: Inbound messages (CPaaS webhooks), agent actions (Hasura Actions) Output: Real-time omnichannel inbox via Hasura subscriptions Sources: communications.mdL3 DoD: ✅ Schema | ✅ API | ✅ Edge States — Ready to Code


§1 Overview

The omnichannel inbox displays all customer conversations across WhatsApp, Email, and SMS in a unified dispatcher UI. Real-time updates push via Hasura GraphQL subscriptions (WebSocket). Agents claim conversations manually (Phase 1).

Architecture: CPaaS webhook (inbound) → NestJS InboundMessageHandler → Hasura INSERT → subscription push → Vue inbox UI. Agent replies use sendAgentMessage Hasura Action → NestJS → BullMQ → CPaaS (rejoins the notification pipeline at the DispatchMessageJob level).


§2 Hasura Subscription Contracts

Conversation List (Inbox Sidebar)

graphql
subscription InboxConversations($tenant_id: uuid!, $status_filter: [String!], $assigned_filter: uuid) {
  conversations(
    where: {
      tenant_id: { _eq: $tenant_id }
      status: { _in: $status_filter }
      _or: [
        { assigned_to: { _eq: $assigned_filter } }
        { assigned_to: { _is_null: true } }
      ]
    }
    order_by: { last_message_at: desc }
    limit: 50
  ) {
    id status assigned_to last_message_at snoozed_until
    contact { id name identifiers passenger_profile_id }
    messages(order_by: { sent_at: desc }, limit: 1) {
      rendered_content direction status sent_at
    }
    unread_count
    booking_id trip_id invoice_id
  }
}

Filter tabs: The frontend switches between distinct subscription instances per tab:

  • "My Conversations": assigned_to = $current_user_id, status = OPEN
  • "Unassigned": assigned_to IS NULL, status = OPEN
  • "All Open": No assigned_to filter, status = OPEN
  • "Resolved": status = RESOLVED (standard query with pagination, no subscription)

Message Thread (Conversation Detail)

graphql
subscription ConversationMessages($conversation_id: uuid!, $tenant_id: uuid!) {
  messages(
    where: { conversation_id: { _eq: $conversation_id }, tenant_id: { _eq: $tenant_id } }
    order_by: { sent_at: asc }
    limit: 100
  ) {
    id direction content_type rendered_content status
    sent_at delivered_at read_at failed_reason
    channel_account { channel_type }
    template_id
  }
}

Global Unread Badge

graphql
subscription UnreadConversationCount($tenant_id: uuid!, $user_id: uuid!) {
  conversations_aggregate(
    where: {
      tenant_id: { _eq: $tenant_id }
      status: { _eq: "OPEN" }
      _or: [
        { assigned_to: { _eq: $user_id } }
        { assigned_to: { _is_null: true } }
      ]
      has_unread_messages: { _eq: true }
    }
  ) { aggregate { count } }
}

The has_unread_messages computed field must exist because Hasura's where clause cannot compare two columns. See §6 for implementation.


§3 Message Thread Load Pattern

1. User clicks conversation → query last 50 messages (descending by sent_at)
2. Render messages, scroll to bottom
3. Activate subscription (ascending by sent_at) for live updates
4. New message → append + auto-scroll if at bottom + update read cursor
5. Scroll up → paginated query with cursor (sent_at < :oldest_loaded)

Preview data (sidebar): Contact name, last message snippet (80 chars), timestamp, channel icon, unread badge, context chip (Booking/Trip/Invoice).


§4 sendAgentMessage Hasura Action

typescript
interface SendAgentMessageInput {
  conversation_id: string;
  channel: 'WHATSAPP' | 'EMAIL' | 'SMS';
  content: string;
  content_type: 'TEXT' | 'MEDIA';
  media_url?: string;
}

interface SendAgentMessageOutput {
  message_id: string;   // Created Message (status = QUEUED)
}

interface SendAgentMessageError {
  error_code: 'CHANNEL_SUSPENDED' | 'WHATSAPP_SESSION_EXPIRED' | 'CONTACT_NOT_FOUND' | 'INVALID_CONVERSATION';
  error_message: string;
}

Handler:

  1. Validate that conversation_id belongs to the caller's tenant_id.
  2. Resolve the ChannelAccount for the specified channel. Guard: If status is 'SUSPENDED' or 'REVOKED', return error CHANNEL_SUSPENDED.
  3. WhatsApp 24-Hour Session Window Check (if channel = WHATSAPP):
    • Query messages WHERE conversation_id = :id AND direction = 'INBOUND' ORDER BY sent_at DESC LIMIT 1.
    • If the last inbound message's sent_at < now() - 24 hours, the conversation is outside the 24h session window.
    • Return error WHATSAPP_SESSION_EXPIRED. The client must either: (a) use a template message (not agent-authored text), or (b) display a warning to the dispatcher explaining the limitation.
    • Note: This check does NOT apply to Email or SMS channels.
  4. Resolve the Contact and extract the recipient phone/email from identifiers. Guard: If the contact is missing, return error CONTACT_NOT_FOUND.
  5. INSERT messages with direction = OUTBOUND, status = QUEUED, template_id = NULL. The trg_messages_update_last_message_at trigger updates conversations.last_message_at automatically.
  6. Enqueue DispatchMessageJob on the message-dispatch BullMQ queue (see notification-pipeline-protocol.md §4 for job shape).
  7. Record change_event (scope: GENERAL).

Optimistic UI: The Vue frontend renders the sent message immediately with a "sending" indicator. The subscription delivers the confirmed row within milliseconds.


§5 Agent Routing

Phase 1: Manual Claim Only. New conversations enter the "Unassigned" queue (assigned_to = NULL). Any dispatcher can claim them.

claimConversation Hasura Action

typescript
interface ClaimConversationInput {
  conversation_id: string;
}

interface ClaimConversationOutput {
  conversation_id: string;
  assigned_to: string;   // UUID of the claiming (or already-claiming) user
  status: 'CLAIMED' | 'ALREADY_CLAIMED';
}

Handler:

  1. Execute SELECT * FROM conversations WHERE id = :conversation_id FOR UPDATE (row-level lock, prevents concurrent claims).
  2. Guard: If assigned_to is not null, return { conversation_id, assigned_to: current_assignee, status: 'ALREADY_CLAIMED' }.
  3. Update conversations setting assigned_to = x-hasura-user-id where id = :conversation_id.
  4. Record change_event (scope: GENERAL, entity_type: conversation, action: UPDATE).
  5. Return { conversation_id, assigned_to: x-hasura-user-id, status: 'CLAIMED' }.

Concurrent claims: The second dispatcher's transaction blocks on the SELECT ... FOR UPDATE lock until the first commits. After the lock releases, the second dispatcher reads the non-null assigned_to and returns ALREADY_CLAIMED. This prevents the race condition where two dispatchers simultaneously claim the same conversation.

reassignConversation Hasura Action

typescript
interface ReassignConversationInput {
  conversation_id: string;
  new_assignee_id: string;
}

interface ReassignConversationOutput {
  conversation_id: string;
  assigned_to: string;  // The new assignee's UUID
  status: 'REASSIGNED';
}

Handler:

  1. Guard: Enforce that the caller has the MANAGER role via Hasura permission rules. Return error INSUFFICIENT_PERMISSIONS if not.
  2. Validate that new_assignee_id is a valid user with DISPATCHER or MANAGER role in the same tenant. Query auth.users WHERE id = :new_assignee_id AND tenant_id = :tenant_id AND role IN ('DISPATCHER', 'MANAGER'). Return error INVALID_ASSIGNEE if no match.
  3. Update conversations setting assigned_to = :new_assignee_id where id = :conversation_id.
  4. Record change_event (scope: GENERAL, entity_type: conversation, action: UPDATE).
  5. Return { conversation_id, assigned_to: new_assignee_id, status: 'REASSIGNED' }.

§6 Unread Tracking

Read Cursor Mutation & Debouncing Strategy

graphql
mutation MarkConversationRead($conversation_id: uuid!, $now: timestamptz!) {
  insert_conversation_read_cursors_one(
    object: { conversation_id: $conversation_id, last_read_at: $now }
    on_conflict: { constraint: conversation_read_cursors_pkey, update_columns: [last_read_at] }
  ) { conversation_id last_read_at }
}

Hasura injects user_id via column preset (x-hasura-user-id) — the frontend never sends it.

Debouncing Strategy (Client-Side):

The frontend must debounce MarkConversationRead calls to prevent thrashing the database with high-frequency UPSERTs:

  1. Throttle on scroll: When the user scrolls to a new message in the thread, debounce the mutation call with a 1-second delay. Multiple scroll events within 1 second collapse into a single UPSERT.
  2. Fire on blur: When the conversation detail panel loses focus (user switches to another conversation tab), immediately fire the mutation with the latest now() timestamp.
  3. Server-side dedup (optional): If the same (conversation_id, user_id) pair receives identical last_read_at values within 5 seconds, the server may skip the UPSERT. This is a performance optimization — the semantics remain unchanged.

This prevents the mutation from firing more than once per second per conversation, acceptable for SMB scale (1–5 dispatchers).

conversation_unread_count Computed Field

sql
CREATE OR REPLACE FUNCTION communications.conversation_unread_count(
  conv communications.conversations,
  hasura_session JSON
) RETURNS INTEGER AS $$
  SELECT COUNT(*)::INTEGER
  FROM communications.messages m
  WHERE m.conversation_id = conv.id
    AND m.direction = 'INBOUND'
    AND m.sent_at > COALESCE(
      (SELECT last_read_at FROM communications.conversation_read_cursors
       WHERE conversation_id = conv.id
         AND user_id = (hasura_session ->> 'x-hasura-user-id')::UUID),
      '1970-01-01'::TIMESTAMPTZ
    );
$$ LANGUAGE SQL STABLE;

IMPORTANT

Only INBOUND messages count as "unread." Outbound messages do not increment the counter.

WARNING

Performance: 50 correlated subqueries per subscription poll. Acceptable at SMB scale (<500 active conversations, <5 dispatchers). If performance degrades: (a) introduce a materialized unread_count column on conversations maintained by triggers, or (b) create a partial index (conversation_id) WHERE direction = 'INBOUND' on messages to accelerate the subquery.

NOTE

Stale Read Cursor Handling: If a frontend's local clock drifts ahead of the server (e.g., browser clock +5 min), the MarkConversationRead mutation sends a future timestamp. The conversation_unread_count function will treat all messages with sent_at <= future timestamp as read, potentially hiding unread messages. To mitigate: (a) the frontend should use server_time from the last API response instead of client_time, or (b) the server should cap last_read_at at the current now() in the mutation handler. Phase 1 implementation should use approach (b) for simplicity.

has_unread_messages Computed Field

The Global Unread Badge subscription (§2) filters on a boolean computed field because Hasura's where clause cannot compare two columns directly.

sql
CREATE OR REPLACE FUNCTION communications.conversation_has_unread_messages(
  conv communications.conversations,
  hasura_session JSON
) RETURNS BOOLEAN AS $$
  SELECT EXISTS (
    SELECT 1 FROM communications.messages m
    WHERE m.conversation_id = conv.id
      AND m.direction = 'INBOUND'
      AND m.sent_at > COALESCE(
        (SELECT last_read_at FROM communications.conversation_read_cursors
         WHERE conversation_id = conv.id
           AND user_id = (hasura_session ->> 'x-hasura-user-id')::UUID),
        '1970-01-01'::TIMESTAMPTZ
      )
  );
$$ LANGUAGE SQL STABLE;

§7 Inbound Message Ingestion Pipeline

CPaaS Webhook → NestJS InboundMessageHandler → Hasura INSERT → Subscription Push → Inbox UI

Steps:

  1. Receive webhook (Meta: POST /api/webhooks/meta/{tenant_id}; SES inbound: POST /api/webhooks/ses; SNS SMS: POST /api/webhooks/sns-sms).
  2. Validate provider signature (see §7a).
  3. Extract: sender identifier, content, content type, timestamp, provider message ID, channel type.
  4. Resolve ChannelAccount: Query channel_accounts WHERE tenant_id = :tenant_id AND channel_type = :channel_type. Guard: If status is 'SUSPENDED' or 'REVOKED', log a warning and still process the inbound message (the contact and conversation are created, but no outbound reply can be sent until the channel is re-activated). This prevents losing customer messages due to channel suspension.
  5. Resolve or create Contact (see §8).
  6. Resolve or create Conversation (see §8).
  7. INSERT messages with direction = INBOUND, status = DELIVERED (already arrived), channel_account_id resolved from step 4, external_message_id = provider's message ID (for idempotency).
  8. UPDATE conversations SET last_message_at = now(), status = 'OPEN' (reopen if RESOLVED/SNOOZED).
  9. Hasura subscription fires → inbox UI updates.

Inbound messages skip the BullMQ dispatch pipeline entirely — direct webhook-to-database path.

NOTE

Per-tenant webhook URLs: All Meta webhooks (inbound messages + delivery status) use per-tenant URLs (/api/webhooks/meta/{tenant_id}) registered via WABA-level override during channel provisioning (see channel-provisioning-protocol.md §7). SES/SNS webhooks are account-level — see notification-pipeline-protocol.md §8.

Provider Webhook Payload Differences:

ProviderMessage TypePayload FieldsContent Format
Meta WhatsAppInbound textmessages[].from, messages[].text, messages[].timestamp, messages[].idPlain text or template response
Meta WhatsAppInbound mediamessages[].from, messages[].image.link, messages[].id, messages[].timestampHTTPS URL (temporary, expires ~24h)
SES EmailInbound emailSNS Message JSON: source, destination, messageId, mail.timestampmail.commonHeaders.subject, mail.commonHeaders.from, email body in S3 reference
SNS SMSDelivery receiptSNS Message: MessageId, TopicArnMessageStatus enum (SUCCESS/FAILURE/DELIVERED/UNDELIVERED)
SNS SMSSubscription ConfirmSNS Message: Type = SubscriptionConfirmation, SubscribeURLSpecial message — requires callback to confirm subscription before SMS delivery starts

§7a Inbound Signature Validation

Meta: Same as delivery callbacks — HMAC-SHA256 with X-Hub-Signature-256 header.

SES/SNS: SNS SubscriptionConfirmation messages require special handling:

typescript
if (message.Type === 'SubscriptionConfirmation') {
  // Verify certificate + signature
  // Call message.SubscribeURL via HTTP GET to confirm subscription
  // Do NOT process as a normal message
  await confirmSubscription(message.SubscribeURL);
  return { statusCode: 200 };
}

§7b Idempotency & Concurrent Delivery

Dedup key: For inbound messages, use external_message_id (provider's message ID) as the idempotency key, identical to outbound messages.

typescript
// Check for duplicate inbound message
const existingMessage = await messages.findOne({
  where: { external_message_id: webhook.messageId }
});
if (existingMessage) {
  return { success: true }; // Already processed
}

Race between concurrent inbound webhooks: If the same inbound message arrives twice (provider retry), the second webhook finds existingMessage and returns immediately. The system does not fire the Hasura subscription twice.


§8 Contact & Conversation Resolution

Contact Resolution Algorithm

Exact Query:

sql
SELECT id FROM communications.contacts
  WHERE tenant_id = :tenant_id
    AND identifiers @> :'[{"type": "' || :identifier_type || '", "value": "' || :identifier_value || '"}]'::jsonb
  LIMIT 1;

Where:

  • :identifier_type = 'phone' (for WhatsApp/SMS) or 'email' (for SES inbound email)
  • :identifier_value = E.164 phone (+1234567890) or email address

Steps:

  1. Extract sender identifier from webhook (phone number or email).
  2. Run query above. If found, return existing Contact ID.
  3. If not found, INSERT new Contact:
    sql
    INSERT INTO communications.contacts (tenant_id, name, identifiers, created_at)
      VALUES (:tenant_id, :placeholder_name, :identifiers_array, now())
      RETURNING id;
    Where placeholder_name = the sender's identifier (phone/email) or a generic "Unknown Contact" label. Where identifiers_array = [{"type": "phone", "value": "+1234567890"}] or [{"type": "email", "value": "customer@example.com"}]

Conversation Resolution Algorithm

Query Priority: Always prefer the most recently active OPEN or SNOOZED conversation for the Contact.

sql
SELECT id FROM communications.conversations
  WHERE tenant_id = :tenant_id
    AND contact_id = :contact_id
    AND status IN ('OPEN', 'SNOOZED')
  ORDER BY last_message_at DESC
  LIMIT 1;

Steps:

  1. Resolve Contact (above).
  2. Run query above. If found, reuse conversation and transition to OPEN if SNOOZED.
  3. If not found, INSERT new Conversation:
    sql
    INSERT INTO communications.conversations
      (tenant_id, contact_id, status, assigned_to, last_message_at, created_at)
      VALUES (:tenant_id, :contact_id, 'OPEN', NULL, now(), now())
      RETURNING id;

Important: Resolution does NOT filter by channel_type — a Contact with both phone and email shares one Conversation regardless of channel. The dispatcher sees a unified thread per contact. If a customer messages via WhatsApp, then later replies via email, both messages appear in the same conversation thread (ordered by sent_at).

Concurrent Resolution Safety

If two inbound webhooks arrive simultaneously for the same Contact (e.g., Email + WhatsApp from same customer at the same time):

  • Both queries run in parallel.
  • Contact INSERT may experience a unique constraint race (if both try to create simultaneously).
  • Conversation INSERT may experience the same race.
  • Guard: Use INSERT ... ON CONFLICT (tenant_id, contact_id, status) DO UPDATE for Conversation to handle concurrent inserts safely. PostgreSQL provides atomic conflict resolution.

§9 Conversation Lifecycle State Machine

FromToActorSide Effects
OPENSystem (inbound message or automated creation)last_message_at = now()
OPENRESOLVEDDispatcher (resolveConversation)Clears assigned_to = NULL. Records change_event.
OPENSNOOZEDDispatcher (snoozeConversation)Sets snoozed_until = now() + duration. Records change_event.
RESOLVEDOPENSystem (new inbound message)Resets assigned_to = NULL (returns to unassigned).
SNOOZEDOPENSystem (cron: snoozed_until <= now()) or inbound messageClears snoozed_until = NULL.

resolveConversation Hasura Action

typescript
interface ResolveConversationInput {
  conversation_id: string;
}

interface ResolveConversationOutput {
  conversation_id: string;
  status: 'RESOLVED' | 'ALREADY_RESOLVED';
}

Handler:

  1. Guard: status = 'OPEN'. If already RESOLVED → return { conversation_id, status: 'ALREADY_RESOLVED' }.
  2. UPDATE conversations SET status = 'RESOLVED', assigned_to = NULL WHERE id = :conversation_id.
  3. Record change_event (scope: GENERAL, entity_type: conversation, action: UPDATE).
  4. Return { conversation_id, status: 'RESOLVED' }.

Rationale for clearing assigned_to: Marking a conversation RESOLVED signals it requires no further dispatcher action. Clearing the assignment allows the conversation to be reassigned or reopened without an explicit "unassign" step. When a new inbound message arrives on a RESOLVED conversation, the system reopens it (status = OPEN) and sets assigned_to = NULL, returning it to the unassigned queue for any dispatcher to claim. This ensures no conversation gets "stuck" on a dispatcher who is unavailable.

snoozeConversation Hasura Action

typescript
interface SnoozeConversationInput {
  conversation_id: string;
  /** Snooze duration preset: '1h', '3h', '24h', 'tomorrow_9am', 'next_monday_9am' */
  snooze_preset: string;
}

interface SnoozeConversationOutput {
  conversation_id: string;
  snoozed_until: string;  // ISO 8601 timestamp
  status: 'SNOOZED';
}

Handler:

  1. Guard: status = 'OPEN'.
  2. Resolve snoozed_until from snooze_preset:
    • '1h'now() + interval '1 hour'
    • '3h'now() + interval '3 hours'
    • '24h'now() + interval '24 hours'
    • 'tomorrow_9am' → Next 09:00 in the tenant's timezone (query backoffice.operators.timezone_offset for the tenant, or default to Europe/Berlin for DACH region)
    • 'next_monday_9am' → Next Monday at 09:00 in the tenant's timezone
  3. UPDATE conversations SET status = 'SNOOZED', snoozed_until = :resolved_time WHERE id = :conversation_id.
  4. Record change_event (scope: GENERAL, entity_type: conversation, action: UPDATE).
  5. Return { conversation_id, snoozed_until, status: 'SNOOZED' }.

Timezone source: The system reads the tenant's timezone from backoffice.operators.timezone_offset (or equivalent). If not set, it defaults to Europe/Berlin (DACH region default).

Snooze Wakeup Cron Trigger

Name: conversation_snooze_wakeupSchedule: Every 60 seconds Handler:

sql
UPDATE communications.conversations
  SET status = 'OPEN', snoozed_until = NULL
  WHERE status = 'SNOOZED' AND snoozed_until <= now()
  AND tenant_id IN (SELECT id FROM backoffice.operators WHERE status = 'ACTIVE');

Concurrent Race Condition: If the cron fires at the same instant an inbound message arrives, both may attempt to update the same conversation's status. Postgres guarantees transactional isolation — one will execute first, updating status to OPEN + clearing snoozed_until. The second transaction will read the updated status and take no action (guard: status = 'SNOOZED' will fail). The inbound message handler also executes UPDATE conversations SET status = 'OPEN' where status IN ('SNOOZED', 'RESOLVED'), so both paths converge on the same final state.

snoozeConversation presets: 1h, 3h, 24h, tomorrow_9am, next_monday_9am.


§10 Push Notifications (Phase 1 Decision)

Phase 1 Status: NOT IMPLEMENTED

In Phase 1, agents receive real-time updates only via Hasura GraphQL subscriptions — no push notifications outside the browser tab. This simplifies the MVP:

  • No service worker registration required
  • No POST /api/notifications endpoint
  • No browser Notification API (Notification.requestPermission())
  • Agents must keep the Busflow tab open in their browser to see live inbox updates

Phase 2 Candidate: Browser push notifications (via the Notification API) for the following events:

  1. New conversation assigned to the agent (via claimConversation or reassignConversation)
  2. New inbound message in an assigned conversation while the Busflow tab is not focused
  3. Stale unread count (message sent but agent out of focus for >30 seconds)

Not in scope Phase 1: Mobile push notifications, SMS/email alerts to agents, or integration with native notification systems (macOS Notification Center, Windows Toast, etc.).


§11 Cross-Context Sidebar

The conversation detail panel resolves cross-schema data via Hasura manual object relationships (no DB-level FK constraints — all schemas share one Postgres instance).

Conversation ColumnTargetRelationshipExposed Fields
booking_idcommerce.bookingsbookingstatus, total_amount, departure_date, tour_name, passenger count
trip_idoperations.service_legsservice_legstatus, leg_type, actual_start, scheduled_departure, vehicle info
invoice_idcommerce.invoicesinvoicestatus, total_amount, issued_at, due_date

IMPORTANT

These are read-side coupling only (permitted per domain-driven-design.md §7.2). The inbox never mutates cross-context entities. Hasura permission rules enforce tenant_id filtering on target tables.

Internal documentation — Busflow