Busflow Docs

Internal documentation portal

Skip to content

Communications Database Schema โ€‹

This document details the concrete physical database schema for the Communications Bounded Context (schema: communications). The Communications schema is an independent Shared Core Domain providing omnichannel inbox capability and trigger-based automated messaging to Backoffice, Commerce, and Operations.

Physical Entity Relationship Diagram โ€‹

mermaid
erDiagram
    %% ==========================================
    %% COMMUNICATIONS SCHEMA
    %% ==========================================

    CHANNEL_ACCOUNT {
        uuid id PK
        uuid tenant_id "Soft reference to Backoffice"
        string channel_type "Enum: WHATSAPP, EMAIL, WEBCHAT, SMS"
        string sender_identity "Canonical sender: E.164 phone, email, domain"
        string display_name "Operator-editable label"
        jsonb provider_config "Encrypted CPaaS credentials"
        string status "Enum: PENDING_VERIFICATION, ACTIVE, SUSPENDED, REVOKED"
        timestamp created_at
        timestamp updated_at
    }

    CONTACT {
        uuid id PK
        uuid tenant_id "Soft reference to Backoffice"
        uuid passenger_profile_id "Soft reference to Backoffice, Nullable"
        string name
        jsonb identifiers "Array of emails/phone numbers"
        timestamp created_at
    }

    CONVERSATION {
        uuid id PK
        uuid tenant_id "Soft reference to Backoffice"
        uuid contact_id FK "References CONTACT(id)"
        uuid booking_id "Soft reference to Commerce, Nullable"
        uuid trip_id "Soft reference to Operations, Nullable"
        uuid invoice_id "Soft reference to Commerce, Nullable"
        string status "Enum: OPEN, RESOLVED, SNOOZED"
        uuid assigned_to "Soft reference to auth.users, Nullable"
        timestamptz last_message_at "Denormalized sort key"
        timestamptz snoozed_until "Snooze wakeup, Nullable"
        timestamp created_at
    }

    MESSAGE {
        uuid id PK
        uuid tenant_id "Soft reference to Backoffice"
        uuid conversation_id FK "References CONVERSATION(id)"
        uuid channel_account_id FK "References CHANNEL_ACCOUNT(id)"
        uuid template_id "Soft reference to Backoffice, Nullable"
        string direction "Enum: INBOUND, OUTBOUND"
        string content_type "Enum: TEXT, TEMPLATE, MEDIA"
        text rendered_content
        string status "Enum: QUEUED, SENT, DELIVERED, READ, FAILED"
        uuid correlation_id "Hasura event_id for idempotency dedup, Nullable"
        string external_message_id "Provider message ID (Meta/SES/SNS), Nullable"
        text failed_reason "Failure details when status=FAILED, Nullable"
        timestamp sent_at
        timestamptz delivered_at "Set by CPaaS webhook handler, Nullable"
        timestamptz read_at "Set by CPaaS webhook handler, Nullable"
    }

    CONVERSATION_READ_CURSOR {
        uuid conversation_id PK "References CONVERSATION(id)"
        uuid user_id PK "Soft reference to auth.users"
        timestamptz last_read_at
    }

    CHANGE_EVENT {
        uuid id PK
        uuid tenant_id "Soft reference to Backoffice"
        uuid user_id "Soft reference to Backoffice, Nullable"
        string entity_type "e.g. channel_account, contact, conversation"
        uuid entity_id "References target entity"
        string action "Enum: CREATE, UPDATE, DELETE"
        string scope "Enum: GOBD, COMPLIANCE, DSGVO, CONFIG, GENERAL"
        uuid correlation_id "Cross-context saga grouping, Nullable"
        jsonb old_values
        jsonb new_values
        timestamp created_at
    }

    %% Relationships
    CONTACT ||--o{ CONVERSATION : "participates in"
    CONVERSATION ||--o{ MESSAGE : "contains"
    CONVERSATION ||--o{ CONVERSATION_READ_CURSOR : "tracks reads"
    CHANNEL_ACCOUNT ||--o{ MESSAGE : "sent via"

Table Definitions โ€‹

Note on Cross-Schema References: The Communications schema makes heavy use of soft references (UUIDs) to other bounded contexts. Real foreign keys only exist internally (e.g., between conversations and contacts).

channel_accounts โ€‹

Configured communication endpoints provisioned per tenant via CPaaS providers. Each operator registers their own Meta WhatsApp Business Account, Amazon SES domain identity, or SMS sender โ€” see channel-provisioning-protocol.md for registration flows, lifecycle management, and Hasura Action contracts.

ColumnTypeConstraintsDescription
idUUIDPrimary KeyUnique account identifier
tenant_idUUIDNot NullSoft FK to backoffice.operators
channel_typeVARCHARNot NullWHATSAPP, EMAIL, WEBCHAT, SMS
sender_identityVARCHARNot NullCanonical sender identifier: phone number (E.164) for WhatsApp/SMS, verified sender email for Email, domain for Webchat. Populated during channel registration.
display_nameVARCHARNot NullOperator-editable label for the channel account (e.g., "Busflow Reisen WhatsApp"). Displayed in the inbox UI.
provider_configJSONBNot NullCPaaS-specific configuration. Shape varies by channel_type โ€” see typed interfaces below. Sensitive fields (API keys, access tokens) are encrypted at rest via pgsodium.
statusVARCHARNot Null, Default: PENDING_VERIFICATIONPENDING_VERIFICATION (provider verification in progress), ACTIVE (operational), SUSPENDED (manager action or provider issue), REVOKED (terminal โ€” provider permanently banned the account)
created_atTIMESTAMPTZDefault: now()Record creation
updated_atTIMESTAMPTZDefault: now()Last update

IMPORTANT

Unique constraint: UNIQUE(tenant_id, channel_type) โ€” one channel account per type per tenant in Phase 1.

WARNING

Subscription Tier Gating: Provisioning and modifying CPaaS channels (channel_accounts) is restricted to PRO and ENTERPRISE plans (X-Hasura-Plan). CORE tenants can only use the platform's default transactional pipelines. See L3-6.2.2 Schema Matrix Enforcement for rules.

Known provider_config Shapes โ€‹

WHATSAPP:

typescript
interface WhatsAppProviderConfig {
  waba_id: string;                    // Meta WABA (WhatsApp Business Account) ID
  phone_number_id: string;            // Phone Number ID โ€” used in Meta Cloud API calls
  app_id: string;                     // Meta App ID (platform constant, duplicated for convenience)
  app_secret: string;                 // ๐Ÿ” ENCRYPTED โ€” Meta App Secret for HMAC-SHA256 webhook verification
  access_token: string;               // ๐Ÿ” ENCRYPTED โ€” long-lived System User Access Token
  webhook_verify_token: string;       // Random string for webhook GET verification handshake
  business_verification_status: 'NOT_VERIFIED' | 'PENDING' | 'VERIFIED';
  messaging_tier: 'TIER_1' | 'TIER_2' | 'TIER_3' | 'TIER_4' | 'UNLIMITED';
  quality_rating?: 'GREEN' | 'YELLOW' | 'RED';
}

EMAIL:

typescript
interface EmailProviderConfig {
  ses_identity_arn: string;           // AWS SES Identity ARN (verified domain)
  aws_region: string;                 // AWS region for the SES identity
  sns_delivery_topic_arn: string;     // SNS Topic ARN for delivery status notifications
  sns_inbound_topic_arn: string;      // SNS Topic ARN for inbound email
  configuration_set_name: string;     // SES Configuration Set for event publishing
  sender_email: string;               // Verified sender email
  reply_to_email?: string;            // Optional reply-to override
  dkim_status: 'PENDING' | 'SUCCESS' | 'FAILED';
  domain_verification_status: 'PENDING' | 'SUCCESS' | 'FAILED' | 'TEMPORARY_FAILURE';
}

SMS:

typescript
interface SmsProviderConfig {
  origination_identity: string;       // AWS SNS Sender ID or phone number
  origination_type: 'SENDER_ID' | 'PHONE_NUMBER';
  aws_region: string;
  sandbox_mode: boolean;              // true for testing
  monthly_spend_limit_eur: number;    // Operator-configurable safety cap (default: 50)
}

IMPORTANT

Secret storage: Fields marked ๐Ÿ” ENCRYPTED use database-level encryption (pgsodium), not plaintext. Sensitive values are encrypted transparently at the database boundary before persistence and decrypted at runtime when the dispatch pipeline or webhook handler reads them. Note: While n8n encrypts secrets using application-level Swarm Secrets (see workflow-orchestration.md ยงCredential & Secret Management), Busflow tenant data relies on the more robust database-level pgsodium AEAD approach.

NOTE

WEBCHAT: The provider_config shape for WEBCHAT channel type defers to Phase 2. The webchat widget embeds directly in the operator's customer-facing website and does not require external CPaaS credentials.

contacts โ€‹

Unified identity for an external person (customer, driver, vendor) merging cross-channel communication threads.

ColumnTypeConstraintsDescription
idUUIDPrimary KeyUnique contact identifier
tenant_idUUIDNot NullSoft FK to backoffice.operators
passenger_profile_idUUIDSoft FK to backoffice.passenger_profiles
nameVARCHARNot NullDisplay name for the inbox
identifiersJSONBNot NullArray of {type, value} objects for de-duplication. See ยง6a.
created_atTIMESTAMPDefault: now()

ยง6a contacts.identifiers JSONB Structure โ€‹

The identifiers column stores an array of contact channels. Structure:

json
[
  {
    "type": "phone",
    "value": "+1234567890"
  },
  {
    "type": "email",
    "value": "customer@example.com"
  }
]

Type values: "phone" (E.164 format), "email" (RFC 5321 format).

Inbound message handler: When resolving a Contact by phone, query:

sql
identifiers @> '[{"type": "phone", "value": "+1234567890"}]'::jsonb

When adding a new channel to an existing Contact, append to the array:

sql
UPDATE contacts SET identifiers = identifiers || '[{"type": "email", "value": "new@example.com"}]'
  WHERE id = :id AND NOT (identifiers @> '[{"type": "email", "value": "new@example.com"}]'::jsonb)

Index: CREATE INDEX ON contacts USING GIN (identifiers) โ€” accelerates JSONB containment queries.

conversations โ€‹

The centralized container for a communication thread.

Constraint Strategy: Uses exclusive nullable soft references instead of polymorphic associations to maintain Hasura GraphQL compatibility. A Postgres check constraint enforces: CHECK (num_nonnulls(booking_id, trip_id, invoice_id) <= 1)

WARNING

Subscription Tier Gating: Access to the unified agent inbox (creating, updating, and interacting with conversations and messages) is an Omnichannel Premium feature restricted to PRO and ENTERPRISE plans. CORE dispatchers cannot open 2-way threads. See L3-6.2.2.

WARNING

Anti-pattern: This table uses the same wide-nullable-FK pattern that ADR-019 eliminated from change_events. Consider migrating to context_type VARCHAR + context_id UUID (polymorphic) with a Hasura computed field for entity resolution. The set of context types (booking, trip, invoice) is small and stable, but adding future context types (e.g., incident, claim) would require ALTER TABLE ADD COLUMN under the current pattern.

NOTE

Incident broadcast context: For incident-triggered broadcast conversations (see incident-broadcast-protocol.md), trip_id stores the incident's service_leg_id as the conversation context. Despite the column name trip_id, this is correct โ€” the incident targets a specific leg, and no higher-level Trip entity exists in the schema. If the team introduces a Trip aggregate in the future, re-evaluate this usage. The all-clear handler locates prior broadcast conversations via messages.template_id matching the INCIDENT_BROADCAST notification template, not via a dedicated incident_id column. If conversations migrates to polymorphic context (per the WARNING above), the team should add incident as a context_type.

ColumnTypeConstraintsDescription
idUUIDPrimary KeyUnique conversation identifier
tenant_idUUIDNot NullSoft FK to backoffice.operators
contact_idUUIDForeign Key (contacts.id), Not NullPrimary contact
booking_idUUIDContext constraint (Commerce)
trip_idUUIDContext constraint (Operations)
invoice_idUUIDContext constraint (Commerce)
statusVARCHARDefault: OPENOPEN, RESOLVED, SNOOZED
assigned_toUUIDNullableSoft FK to auth.users(id). The agent currently handling this conversation. Null = unassigned (visible to all agents). Set via claimConversation / reassignConversation Hasura Actions โ€” see inbox-protocol.md.
last_message_atTIMESTAMPTZNot Null, Default: now()Denormalized timestamp of the most recent message. Updated by a Postgres trigger on messages INSERT. Primary sort key for the conversation list query.
snoozed_untilTIMESTAMPTZNullableWhen non-null, the conversation is in SNOOZED status and will auto-transition to OPEN when snoozed_until <= now(). A Hasura Cron Trigger (conversation_snooze_wakeup) checks for expired snoozes.
created_atTIMESTAMPDefault: now()

NOTE

Indexes: (tenant_id, status, last_message_at DESC) โ€” covers the primary conversation list query (all OPEN conversations for a tenant, sorted by recency). (assigned_to) WHERE assigned_to IS NOT NULL โ€” covers "my conversations" filter.

last_message_at Trigger โ€‹

A Postgres trigger maintains the denormalized last_message_at column on every message INSERT. The WHERE guard handles out-of-order inserts (e.g., batch import of historical messages).

sql
CREATE OR REPLACE FUNCTION communications.update_conversation_last_message_at()
RETURNS TRIGGER AS $$
BEGIN
  UPDATE communications.conversations
    SET last_message_at = NEW.sent_at
    WHERE id = NEW.conversation_id
      AND last_message_at < NEW.sent_at;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_messages_update_last_message_at
  AFTER INSERT ON communications.messages
  FOR EACH ROW
  EXECUTE FUNCTION communications.update_conversation_last_message_at();

messages โ€‹

Individual communication payloads linked to a Conversation, a Channel Account, and a Contact.

ColumnTypeConstraintsDescription
idUUIDPrimary KeyUnique message identifier
tenant_idUUIDNot NullSoft FK to backoffice.operators
conversation_idUUIDForeign Key (conversations.id)Thread this message belongs to
channel_account_idUUIDForeign Key (channel_accounts.id)Physical connection the system uses for delivery
template_idUUIDSoft FK to backoffice.notification_templates if this was a template dispatch
directionVARCHARNot NullINBOUND, OUTBOUND
content_typeVARCHARNot NullTEXT, TEMPLATE, MEDIA
rendered_contentTEXTNot NullFinal dispatched payload
statusVARCHARNot NullQUEUED, SENT, DELIVERED, READ, FAILED
correlation_idUUIDNullableHasura event_id for at-least-once dedup. The NestJS handler queries messages WHERE correlation_id = :event_id before forwarding to n8n โ€” if a match exists, the event is a duplicate and the handler skips processing.
external_message_idVARCHAR(255)NullableProvider-assigned message identifier (Meta: messages[].id, SES: MessageId, SNS: MessageId). Set by the BullMQ worker after successful CPaaS API call. Used by delivery webhook handlers to look up the Message row for status updates. Max length 255 characters to accommodate all provider formats (Meta IDs can be large; SES/SNS ~21 chars). Indexed for fast webhook lookups.
failed_reasonTEXTNullableHuman-readable failure reason. Populated by BullMQ worker on permanent dispatch failure or by CPaaS webhook handler on delivery failure (e.g., recipient_not_on_whatsapp, permanent_bounce, rate_limited). Stores the provider error type (e.g., INVALID_RECIPIENT, THROTTLED, SPENDING_LIMIT) for categorization.
sent_atTIMESTAMPDefault: now()
delivered_atTIMESTAMPTZNullableTimestamp when the provider confirmed delivery. Set by CPaaS webhook handler on DELIVERED status callback.
read_atTIMESTAMPTZNullableTimestamp when the recipient read the message. Set by CPaaS webhook handler on READ status callback. Only applicable for WhatsApp (Meta sends read receipts). Email/SMS do not support read tracking.

IMPORTANT

Unique constraint: UNIQUE(correlation_id) WHERE correlation_id IS NOT NULL โ€” enforces idempotency. If two NestJS handlers receive the same Hasura event_id (at-least-once delivery race), only one INSERT succeeds. The second handler catches the UNIQUE violation, logs a warning, and returns 200 (idempotent). See notification-pipeline-protocol.md ยง10.

NOTE

Indexes: (external_message_id) WHERE external_message_id IS NOT NULL โ€” covers delivery webhook lookups. (correlation_id) WHERE correlation_id IS NOT NULL โ€” covers idempotency dedup queries and enforces uniqueness. (conversation_id, sent_at DESC) โ€” covers message thread loading.

conversation_read_cursors โ€‹

Per-agent read position within a conversation. Tracks the last time each agent viewed a conversation's messages. The conversation_unread_count Hasura computed field uses this to calculate unread counts โ€” see inbox-protocol.md.

ColumnTypeConstraintsDescription
conversation_idUUIDForeign Key (conversations.id), PK componentThe conversation this cursor tracks
user_idUUIDNot Null, PK componentSoft FK to auth.users(id). The agent this read position tracks.
last_read_atTIMESTAMPTZNot NullTimestamp of the last read. The system treats messages with sent_at > last_read_at as unread for this agent.

NOTE

Primary key: Composite (conversation_id, user_id). UPSERT via ON CONFLICT (conversation_id, user_id) DO UPDATE SET last_read_at = EXCLUDED.last_read_at โ€” the frontend fires this when the agent opens a conversation.

IMPORTANT

Debouncing Strategy: The frontend must debounce UPSERT calls to prevent high-frequency database thrashing. See inbox-protocol.md ยง6 for the debouncing pattern (1-second throttle on scroll, immediate fire on blur). This table can see 50+ UPSERTs per second during active inbox use without debouncing; debouncing reduces that to <5 per second per agent.

Hasura Computed Fields โ€‹

Two session-aware computed fields on conversations power the inbox's unread tracking. Both accept the Hasura session object to resolve the current agent's user_id.

sql
-- Returns the count of unread INBOUND messages for the session user
CREATE OR REPLACE FUNCTION communications.conversation_unread_count(
  conv communications.conversations,
  hasura_session JSON
) RETURNS INTEGER AS $$
  SELECT COUNT(*)::INTEGER
  FROM communications.messages m
  WHERE m.conversation_id = conv.id
    AND m.direction = 'INBOUND'
    AND m.sent_at > COALESCE(
      (SELECT last_read_at FROM communications.conversation_read_cursors
       WHERE conversation_id = conv.id
         AND user_id = (hasura_session ->> 'x-hasura-user-id')::UUID),
      '1970-01-01'::TIMESTAMPTZ
    );
$$ LANGUAGE SQL STABLE;

-- Returns true if any unread INBOUND messages exist (used for global badge filtering)
CREATE OR REPLACE FUNCTION communications.conversation_has_unread_messages(
  conv communications.conversations,
  hasura_session JSON
) RETURNS BOOLEAN AS $$
  SELECT EXISTS (
    SELECT 1 FROM communications.messages m
    WHERE m.conversation_id = conv.id
      AND m.direction = 'INBOUND'
      AND m.sent_at > COALESCE(
        (SELECT last_read_at FROM communications.conversation_read_cursors
         WHERE conversation_id = conv.id
           AND user_id = (hasura_session ->> 'x-hasura-user-id')::UUID),
        '1970-01-01'::TIMESTAMPTZ
      )
  );
$$ LANGUAGE SQL STABLE;

NOTE

Only INBOUND messages count as unread. Outbound agent/system messages do not increment the counter. See inbox-protocol.md ยง6 for the subscription queries and read cursor mutation that consume these fields.

Conversation Lifecycle State Machine โ€‹

The three-state lifecycle governs conversation visibility in the inbox. Transitions occur via Hasura Actions (agent-initiated) or system automation (inbound messages, cron triggers).

FromToActorSide Effects
โ€”OPENSystem (on inbound message or automated conversation creation)Sets last_message_at = now()
OPENRESOLVEDDispatcher (via resolveConversation Hasura Action)Clears assigned_to = NULL. Records change_event.
OPENSNOOZEDDispatcher (via snoozeConversation Hasura Action)Sets snoozed_until = now() + :duration. Records change_event.
RESOLVEDOPENSystem (on new inbound message from the same Contact)Sets assigned_to = NULL (returns to unassigned queue).
SNOOZEDOPENSystem (Hasura Cron Trigger: snoozed_until <= now()) or System (new inbound message)Clears snoozed_until = NULL.

NOTE

No CLOSED/ARCHIVED state in Phase 1. RESOLVED conversations remain queryable in the "Resolved" tab. Automatic archival/deletion is a Phase 2 concern (linked to GDPR retention policies).

NOTE

Snooze wakeup: Hasura Cron Trigger conversation_snooze_wakeup runs every 60 seconds: UPDATE communications.conversations SET status = 'OPEN', snoozed_until = NULL WHERE status = 'SNOOZED' AND snoozed_until <= now(). This is a batch update โ€” no per-conversation Action call needed.

change_events โ€‹

Local audit trail for entities within the Communications schema. Uses a polymorphic entity reference (entity_type + entity_id) โ€” see ADR-019. The shared AuditTrailService (NestJS) writes all entries.

ColumnTypeConstraintsDescription
idUUIDPrimary KeyUnique event
tenant_idUUIDNot NullSoft FK to backoffice.operators
user_idUUIDNullableSoft FK to auth.users(id). The actor โ€” null for system/automation.
entity_typeVARCHARNot NullTarget entity type: channel_account, contact, conversation
entity_idUUIDNot NullID of the mutated entity. Soft FK โ€” entity table varies by entity_type.
actionVARCHARNot NullINSERT, UPDATE, DELETE
scopeVARCHARNot Null, Default: GENERALGOBD, COMPLIANCE, DSGVO, CONFIG, GENERAL โ€” canonical enum per ADR-019.
correlation_idUUIDNullableGroups related change_events across bounded contexts.
old_valuesJSONBFull row snapshot before mutation. Captured via SELECT ... FOR UPDATE in the AuditTrailService. Null for CREATE.
new_valuesJSONBState after mutation. Null for DELETE.
created_atTIMESTAMPTZDefault: now()

NOTE

Index: (tenant_id, entity_type, entity_id) โ€” covers all audit trail queries. Additional partial index: (correlation_id) WHERE correlation_id IS NOT NULL.

NOTE

Scope mapping for this schema: DSGVO for contact (personal data). CONFIG for channel_account. GENERAL for conversation.

Internal documentation โ€” Busflow