Busflow Docs

Internal documentation portal

Skip to content

Notification Pipeline Protocol

Domain: Communications (Shared Core Domain) Trigger: Domain events from all bounded contexts (Hasura Event Triggers → NestJS handlers) Output: Rendered messages dispatched via Meta Cloud API (WhatsApp), Amazon SES (Email), Amazon SNS (SMS) Sources: communications.md, workflow-orchestration.mdL3 DoD: ✅ Schema | ✅ API | ✅ Edge States — Ready to Code


§1 Overview

Flow: Domain Event → Hasura Event Trigger → NestJS handler (idempotency check, routing, enrichment) → n8n webhook (NotificationRequest) → template resolution → Message row creation → BullMQ DispatchMessageJob → CPaaS adapter (Meta/SES/SNS) → delivery webhook → status update.

The pipeline follows the four-layer workflow orchestration stack:

  • Hasura detects DB changes and fires webhooks to NestJS (at-least-once delivery semantics)
  • NestJS checks idempotency via correlation_id dedup, enriches the payload (resolves recipients, builds context variables, gates on ChannelAccount status), and forwards to n8n via circuit-breaker-wrapped HTTP call
  • n8n resolves templates against tenant+trigger_event+channel+locale, creates Message rows with status=QUEUED, and enqueues BullMQ jobs
  • BullMQ handles durable dispatch with retries, rate limiting per ChannelAccount tier, and fallback chains

Events requiring specialized flows (e.g., Incident Broadcast with dispatcher approval) bypass the generic pipeline and use dedicated n8n workflows, but rejoin at the DispatchMessageJob level for actual sending.


§2 NotificationRequest Interface

The generic payload the NestJS event handler forwards to n8n. NestJS owns idempotency checking, routing, and payload enrichment — n8n owns template resolution, Message row creation, and BullMQ enqueue. See §2.1 for NestJS handler integration details and error handling.

typescript
interface NotificationRequest {
  /** Hasura event_id — used for idempotency dedup */
  event_id: string;

  /** The originating tenant */
  tenant_id: string;

  /** Maps to notification_templates.trigger_event */
  trigger_event: string;

  /** Target recipient(s). At least one required. */
  recipients: NotificationRecipient[];

  /**
   * Variables for template interpolation.
   * Shape varies per trigger_event — see §7 for the registry.
   */
  context: Record<string, unknown>;

  /**
   * Optional conversation context — creates or reuses a
   * Conversation linked to this domain entity.
   * If omitted, n8n does not create a Conversation row.
   */
  conversation_context?: {
    type: 'booking' | 'trip' | 'invoice';
    id: string;
  };

  /**
   * Override default channel selection.
   * If omitted, the pipeline dispatches on all channels
   * that have a matching NotificationTemplate AND status=ACTIVE.
   */
  channels?: ('WHATSAPP' | 'EMAIL' | 'SMS')[];
}

interface NotificationRecipient {
  /** Soft FK to communications.contacts — resolved or created by the pipeline */
  passenger_profile_id?: string;
  /** Direct contact info (used when no profile exists) */
  name: string;
  phone?: string;
  email?: string;
}

§2.1 NestJS Handler Integration

Entry Point:

typescript
@Injectable()
class NotificationPipelineService {
  constructor(
    private readonly hasuraClient: HasuraClient,
    private readonly httpClient: HttpClient,
    private readonly circuitBreaker: CircuitBreaker,
    private readonly bullQueue: BullQueue,
    private readonly logger: Logger
  ) {}

  async processEvent(req: NotificationRequest): Promise<void> {
    // 1. Dedup: SELECT messages WHERE correlation_id = :event_id LIMIT 1
    //    If exists, return 200 (duplicate Hasura event).

    // 2. Validate: At least one channel ACTIVE for this tenant.

    // 3. Call n8n webhook with circuit breaker.
    //    POST /webhook/notifications/{trigger_event}
    //    On failure → catch, log, enqueue to notifications_recovery BullMQ queue.
    //    Return 200 immediately (fire-and-forget).
  }
}

Idempotency: See §12.

Error Handling: See §11.


§3 Message.status Transition Ownership

FromToActorMechanism
QUEUEDn8n workflowHasura mutation insert_communications_messages after template rendering
QUEUEDSENTBullMQ workerAfter CPaaS API returns 2xx. Worker writes external_message_id.
SENTDELIVEREDCPaaS webhook handlerMeta: statuses[].status = 'delivered'. SES: SNS eventType = 'Delivery'. Handler updates delivered_at.
SENT/DELIVEREDREADCPaaS webhook handlerMeta: statuses[].status = 'read'. SES: no read tracking. Handler updates read_at.
QUEUEDFAILEDBullMQ workerPermanent dispatch failure (invalid recipient, template rejected). Sets failed_reason.
SENTFAILEDCPaaS webhook handlerMeta: statuses[].status = 'failed'. SES: SNS eventType = 'Bounce' (permanent). Sets failed_reason.

If a Message sits in SENT for >24h without a delivery callback, flag it as stale. Observable via the 5-minute QUEUED scan (extend to cover stale SENT).


§4 DispatchMessageJob BullMQ Job

typescript
/** Queue name: 'message-dispatch' */
interface DispatchMessageJob {
  message_id: string;
  channel_account_id: string;
  channel: 'WHATSAPP' | 'EMAIL' | 'SMS';
  rendered_content: string;
  meta_template?: {
    name: string;
    namespace: string;
    components: Record<string, string>[];
  };
  recipient: {
    phone?: string;
    email?: string;
  };
  fallback_chain?: ('WHATSAPP' | 'SMS' | 'EMAIL')[];
  provider_config: Record<string, unknown>;
}

Default queue config:

typescript
{
  defaultJobOptions: {
    attempts: 5,
    backoff: { type: 'exponential', delay: 5000 },
    removeOnComplete: 1000,
    removeOnFail: false,
  },
  limiter: {
    max: 50,
    duration: 1000,  // 50 msgs/sec — conservative for Meta Tier 1
  },
}

Fallback chain handling: On permanent failure from the primary channel, the worker creates a new Message row for the next channel in fallback_chain[] (status = QUEUED) and links it via correlation_id. It then enqueues a new job. The original Message transitions to FAILED with failed_reason = 'fallback_triggered:<channel>'.


§4.1 DispatchWorker — BullMQ Processor

The DispatchWorker registers as a BullMQ processor on the message-dispatch queue. It consumes jobs, selects the appropriate adapter, invokes the provider API, parses the response, and transitions Message status.

typescript
@Injectable()
export class DispatchWorker implements OnModuleInit {
  constructor(
    @InjectQueue('message-dispatch')
    private readonly queue: Queue<DispatchMessageJob>,
    private readonly factory: CpaasAdapterFactory,
    private readonly messageService: MessageService,
    private readonly encryptionService: EncryptionService,
    private readonly rateLimiter: RateLimiterService,
    private readonly logger: Logger,
  ) {}

  onModuleInit() {
    // Register processor: queue.process(concurrency, handler)
    // Concurrency: 10 workers (tunable based on CPU/memory)
    this.queue.process(10, this.handleDispatch.bind(this));

    // Register failed event handler for permanent failures
    this.queue.on('failed', this.handlePermanentFailure.bind(this));
  }

  async handleDispatch(job: Job<DispatchMessageJob>): Promise<string> {
    // Returns external_message_id on success; throws CpaasError on failure
    const {
      channel,
      channel_account_id,
      provider_config,
      message_id,
      tenant_id,
    } = job.data;

    try {
      // Decrypt provider secrets (API keys, access tokens)
      const decrypted = await this.encryptionService.decrypt(
        provider_config,
        { tenant_id, channel_account_id }
      );

      // Select adapter by channel type
      const adapter = this.factory.getAdapter(channel);

      // Acquire rate limit token (per-tenant, tier-aware for WhatsApp)
      await this.rateLimiter.acquire(tenant_id, channel_account_id, channel, decrypted);

      // Dispatch via adapter (construct API request, execute, parse response)
      const external_message_id = await adapter.dispatch(job.data, decrypted);

      // Update Message.external_message_id, status → SENT
      await this.messageService.updateMessageAfterDispatch(message_id, external_message_id);

      this.logger.debug(`Message ${message_id} sent via ${channel}, external_id: ${external_message_id}`);
      return external_message_id;
    } catch (error) {
      if (error instanceof CpaasError) {
        if (!error.isTransient) {
          // Permanent failure — BullMQ will trigger 'failed' event
          throw error;
        }
        // Transient failure — BullMQ will retry (exponential backoff)
        throw error;
      }
      // Unexpected error — treat as transient
      throw new CpaasError('UNKNOWN', error.message, true, 'UNEXPECTED_ERROR');
    }
  }

  async handlePermanentFailure(
    job: Job<DispatchMessageJob>,
    error: CpaasError
  ): Promise<void> {
    const { message_id, fallback_chain, tenant_id, channel_account_id } = job.data;

    this.logger.warn(
      `Permanent failure for message ${message_id}: ${error.errorType} (${error.code})`
    );

    if (!fallback_chain || fallback_chain.length === 0) {
      // No fallback; mark original message as FAILED
      await this.messageService.updateMessageFailed(
        message_id,
        error.errorType,
        error.message
      );
      return;
    }

    // Pop first channel from fallback_chain
    const nextChannel = fallback_chain[0];
    const remainingChain = fallback_chain.slice(1);

    // Create new Message row for fallback channel
    const fallbackMessage = await this.messageService.createFallbackMessage(
      message_id,
      nextChannel
    );

    // Enqueue new job for fallback channel
    const newJobData: DispatchMessageJob = {
      ...job.data,
      message_id: fallbackMessage.id,
      channel: nextChannel,
      fallback_chain: remainingChain,
    };

    await this.queue.add(newJobData, { delay: 0 });

    // Mark original message as FAILED with fallback_triggered reason
    await this.messageService.updateMessageFailed(
      message_id,
      `fallback_triggered:${nextChannel}`,
      `Falling back to ${nextChannel} after ${error.errorType}`
    );

    this.logger.info(`Fallback triggered: message ${message_id} → ${nextChannel}`);
  }
}

§4.2 CpaasAdapterFactory

Factory pattern to select the correct adapter by channel type.

typescript
@Injectable()
export class CpaasAdapterFactory {
  constructor(
    private readonly whatsapp: WhatsAppAdapter,
    private readonly email: EmailAdapter,
    private readonly sms: SmsAdapter,
  ) {}

  getAdapter(channel: 'WHATSAPP' | 'EMAIL' | 'SMS'): CpaasAdapter {
    switch (channel) {
      case 'WHATSAPP':
        return this.whatsapp;
      case 'EMAIL':
        return this.email;
      case 'SMS':
        return this.sms;
      default:
        throw new Error(`Unsupported channel: ${channel}`);
    }
  }
}

§4.3 CpaasAdapter Base Interface

All provider-specific adapters implement this contract.

typescript
export interface CpaasAdapter {
  /**
   * Dispatch a message via the provider's API.
   * @param job The DispatchMessageJob payload
   * @param decrypted The decrypted provider_config (WhatsAppProviderConfig | EmailProviderConfig | SmsProviderConfig)
   * @returns The provider's external_message_id
   * @throws CpaasError (with code, message, isTransient flag, errorType)
   */
  dispatch(
    job: DispatchMessageJob,
    decrypted: Record<string, unknown>
  ): Promise<string>;
}

export class CpaasError extends Error {
  constructor(
    public readonly code: string | number,
    message: string,
    public readonly isTransient: boolean,
    public readonly errorType: string
  ) {
    super(message);
    this.name = 'CpaasError';
  }
}

§4.4 WhatsAppAdapter — Meta Cloud API

Dispatches messages via Meta Cloud API (v18.0). Supports both template-based messages (for marketing/template-approved content) and session messages (for transactional/24h contact window).

typescript
@Injectable()
export class WhatsAppAdapter implements CpaasAdapter {
  constructor(
    private readonly httpClient: HttpService,
    private readonly logger: Logger,
  ) {}

  async dispatch(
    job: DispatchMessageJob,
    decrypted: WhatsAppProviderConfig
  ): Promise<string> {
    // Decide: template message or session message?
    // Template message: Use meta_template for marketing/pre-approved content
    // Session message: Use rendered_content for transactional (must have prior 24h contact)
    const isTemplateMessage = Boolean(job.meta_template);

    let payload: Record<string, unknown>;
    if (isTemplateMessage) {
      // Template-based dispatch (marketing messages)
      payload = {
        messaging_product: 'whatsapp',
        to: job.recipient.phone,
        type: 'template',
        template: {
          name: job.meta_template.name,
          namespace: job.meta_template.namespace,
          language: { code: 'de' }, // Phase 1: German only
          components: job.meta_template.components,
        },
      };
    } else {
      // Session message (transactional, 24h window)
      payload = {
        messaging_product: 'whatsapp',
        to: job.recipient.phone,
        type: 'text',
        text: { body: job.rendered_content },
      };
    }

    try {
      const url = `https://graph.facebook.com/v18.0/${decrypted.phone_number_id}/messages`;
      const response = await this.httpClient.post(url, payload, {
        headers: {
          Authorization: `Bearer ${decrypted.access_token}`,
          'Content-Type': 'application/json',
        },
      }).toPromise();

      // Extract external_message_id from response
      const external_message_id = response.data?.messages?.[0]?.id;
      if (!external_message_id) {
        throw new Error('Meta Cloud API response missing message ID');
      }

      return external_message_id;
    } catch (error) {
      throw this.mapMetaError(error);
    }
  }

  private mapMetaError(error: any): CpaasError {
    const code = error.response?.data?.error?.code;
    const message = error.response?.data?.error?.message || error.message;

    // Classify by Meta error code (see error classification table below)
    switch (code) {
      case 429:
        // Rate limit exceeded — transient
        return new CpaasError(code, message, true, 'RATE_LIMITED');
      case 131026:
        // Recipient not on WhatsApp — permanent
        return new CpaasError(code, message, false, 'RECIPIENT_NOT_ON_WHATSAPP');
      case 131000:
        // Unable to create message — transient (network/overload)
        return new CpaasError(code, message, true, 'UNABLE_TO_CREATE_MESSAGE');
      case 100:
        // Invalid request parameters — permanent
        return new CpaasError(code, message, false, 'INVALID_PARAMETERS');
      default:
        // Unknown error — assume transient for safety
        return new CpaasError(code, message, true, 'UNKNOWN_ERROR');
    }
  }
}

§4.5 EmailAdapter — Amazon SES

Dispatches emails via Amazon SES. Automatically detects HTML vs. plain-text content.

typescript
@Injectable()
export class EmailAdapter implements CpaasAdapter {
  private readonly ses = new SESClient({ region: 'eu-central-1' });

  constructor(
    private readonly logger: Logger,
  ) {}

  async dispatch(
    job: DispatchMessageJob,
    decrypted: EmailProviderConfig
  ): Promise<string> {
    // Determine content type: HTML or plain text?
    const isHtml = /<[a-z]/i.test(job.rendered_content);

    const params = new SendEmailCommand({
      Source: decrypted.sender_email,
      Destination: { ToAddresses: [job.recipient.email] },
      Message: {
        Subject: {
          Data: 'Notification from Busflow',
          Charset: 'UTF-8',
        },
        Body: isHtml
          ? {
              Html: { Data: job.rendered_content, Charset: 'UTF-8' },
            }
          : {
              Text: { Data: job.rendered_content, Charset: 'UTF-8' },
            },
      },
      ConfigurationSetName: decrypted.configuration_set_name,
      ...(decrypted.reply_to_email && { ReplyToAddresses: [decrypted.reply_to_email] }),
    });

    try {
      const response = await this.ses.send(params);
      return response.MessageId; // SES MessageId is external_message_id
    } catch (error) {
      throw this.mapSesError(error);
    }
  }

  private mapSesError(error: any): CpaasError {
    const code = error.name || error.Code;
    const message = error.message;

    // Classify by SES error type (see error classification table below)
    switch (code) {
      case 'MessageRejected':
        // Permanent: recipient address rejected, suppression list
        return new CpaasError(code, message, false, 'INVALID_RECIPIENT');
      case 'SendingPausedException':
        // Transient: account sending paused (manager action, usually temporary)
        return new CpaasError(code, message, true, 'SENDING_PAUSED');
      case 'ThrottlingException':
        // Transient: sending rate exceeded
        return new CpaasError(code, message, true, 'THROTTLED');
      case 'ConfigurationSetDoesNotExistException':
        // Permanent: configuration set misconfigured
        return new CpaasError(code, message, false, 'CONFIG_ERROR');
      default:
        // Unknown — assume transient
        return new CpaasError(code, message, true, 'UNKNOWN_ERROR');
    }
  }
}

§4.6 SmsAdapter — Amazon SNS

Dispatches SMS via Amazon SNS. Injects Sender ID for DACH carriers.

typescript
@Injectable()
export class SmsAdapter implements CpaasAdapter {
  private readonly sns = new SNSClient({ region: 'eu-central-1' });

  constructor(
    private readonly logger: Logger,
  ) {}

  async dispatch(
    job: DispatchMessageJob,
    decrypted: SmsProviderConfig
  ): Promise<string> {
    const senderId = decrypted.origination_type === 'SENDER_ID'
      ? decrypted.origination_identity
      : undefined;

    const params = new PublishCommand({
      Message: job.rendered_content,
      PhoneNumber: job.recipient.phone,
      MessageAttributes: {
        ...(senderId && {
          AWS.SNS.SMS.SenderID: {
            DataType: 'String',
            StringValue: senderId,
          },
        }),
        AWS.SNS.SMS.SMSType: {
          DataType: 'String',
          StringValue: 'Transactional',
        },
      },
    });

    try {
      const response = await this.sns.send(params);
      return response.MessageId; // SNS MessageId is external_message_id
    } catch (error) {
      throw this.mapSnsError(error);
    }
  }

  private mapSnsError(error: any): CpaasError {
    const code = error.name || error.Code;
    const message = error.message;

    // Classify by SNS error type (see error classification table below)
    switch (code) {
      case 'InvalidParameterException':
        // Permanent: invalid phone number
        return new CpaasError(code, message, false, 'INVALID_RECIPIENT');
      case 'KMSDisabledException':
        // Transient: KMS key issue (usually resolves)
        return new CpaasError(code, message, true, 'KMS_ERROR');
      case 'UserError':
        // May be transient or permanent; inspect message
        if (message.includes('spending limit')) {
          // Spending limit — transient (recovers when limit increases or after 24h)
          return new CpaasError(code, message, true, 'SPENDING_LIMIT');
        }
        return new CpaasError(code, message, false, 'USER_ERROR');
      default:
        // Unknown — assume transient
        return new CpaasError(code, message, true, 'UNKNOWN_ERROR');
    }
  }
}

§4.7 Error Classification Reference

ProviderError Code / ExceptionDescriptionTransient?Action
Meta (WhatsApp)429Rate limit exceeded✅ YesBackoff: Exponential (5s base from BullMQ config). Per-tenant rate limiting via RateLimiterService (§4.8). Respect messaging_tier from provider_config.
Meta131026Recipient not on WhatsApp❌ NoPermanent. Trigger fallback chain to SMS/Email. Do not retry.
Meta131000Unable to create message✅ YesTransient; BullMQ retries. May indicate network hiccup or Meta overload.
Meta100Invalid request parameters❌ NoPermanent. Log error; surface to dispatcher. Do not retry.
SES (Email)MessageRejectedRecipient rejected (bounce list, invalid address)❌ NoPermanent. Do not retry. Trigger fallback chain if configured.
SESSendingPausedExceptionAccount sending paused✅ YesTransient; BullMQ retries. Dispatcher should investigate account status.
SESThrottlingExceptionSending rate exceeded (SendingRateExceeded)✅ YesTransient; BullMQ retries with exponential backoff. Check account sending rate limits.
SESConfigurationSetDoesNotExistExceptionConfiguration set misconfigured❌ NoPermanent. Validate provider_config.configuration_set_name against SES account.
SNS (SMS)InvalidParameterExceptionInvalid phone number❌ NoPermanent. Validate E.164 format. Trigger fallback if configured.
SNSKMSDisabledExceptionKMS key disabled (encryption issue)✅ YesTransient; BullMQ retries. Contact AWS support if persists.
SNSUserError (spending limit)Monthly spending limit reached✅ YesTransient (recovers after 24h or when operator increases limit). Queue for retry. Surface warning to dispatcher.

§4.8 Rate Limiter Configuration

Per-tenant rate limiting with tier-aware adjustment for WhatsApp messaging tiers. Each tenant holds an isolated token bucket per channel.

typescript
@Injectable()
export class RateLimiterService {
  private buckets = new Map<string, TokenBucket>(); // Key: `${tenant_id}:${channel_account_id}`

  async acquire(
    tenant_id: string,
    channel_account_id: string,
    channel: string,
    decrypted: WhatsAppProviderConfig | EmailProviderConfig | SmsProviderConfig
  ): Promise<void> {
    const key = `${tenant_id}:${channel_account_id}`;

    let bucket = this.buckets.get(key);
    if (!bucket) {
      const config = this.getConfig(channel, decrypted);
      bucket = new TokenBucket(config.capacity, config.refillRate);
      this.buckets.set(key, bucket);
    }

    await bucket.acquire(1);
  }

  private getConfig(
    channel: string,
    decrypted: any
  ): { capacity: number; refillRate: number } {
    let capacity = 50;
    let refillRate = 50;

    if (channel === 'WHATSAPP' && decrypted.messaging_tier) {
      const tier = decrypted.messaging_tier as string;
      const multipliers: Record<string, number> = {
        TIER_1: 1,     // 60 msg/min = 1 msg/sec
        TIER_2: 8.3,   // 500 msg/min
        TIER_3: 33.3,  // 2000 msg/min
        TIER_4: 50,    // 3000 msg/min
        UNLIMITED: 100,
      };
      const multiplier = multipliers[tier] || 1;
      refillRate = 50 * multiplier;
    }

    return { capacity, refillRate };
  }
}

class TokenBucket {
  private tokens: number;
  private lastRefill: number = Date.now();

  constructor(
    private readonly capacity: number,
    private readonly refillRate: number // tokens per second
  ) {
    this.tokens = capacity;
  }

  async acquire(tokensNeeded: number): Promise<void> {
    const now = Date.now();
    const elapsed = (now - this.lastRefill) / 1000;
    this.tokens = Math.min(
      this.capacity,
      this.tokens + elapsed * this.refillRate
    );
    this.lastRefill = now;

    if (this.tokens < tokensNeeded) {
      const waitTime = ((tokensNeeded - this.tokens) / this.refillRate) * 1000;
      await new Promise(resolve => setTimeout(resolve, Math.ceil(waitTime)));
      this.tokens -= tokensNeeded;
    } else {
      this.tokens -= tokensNeeded;
    }
  }
}

NOTE

Phase 2 upgrade path: This in-memory TokenBucket works for single-instance Phase 1 deployments. For multi-instance scaling, migrate to a Redis-backed token bucket using an atomic Lua script. Key structure: communications:limiter:{tenant_id}:{channel_type}. The RateLimiterService interface stays the same — only the backing store changes.


§5 Template Resolution Algorithm

typescript
/**
 * Lookup chain (per channel):
 * 1. (tenant_id, trigger_event, channel, passenger_locale) — exact match
 * 2. (tenant_id, trigger_event, channel, operator_default_locale) — locale fallback
 * 3. (tenant_id, trigger_event, channel, 'de-DE') — ultimate fallback (DACH)
 *
 * If no template exists for a channel, that channel is skipped.
 * The resolved template list only includes channels with both:
 *   - A matching NotificationTemplate (above lookup chain)
 *   - status = ACTIVE on the corresponding ChannelAccount
 */
async function resolveTemplates(
  tenant_id: string,
  trigger_event: string,
  locale: string,
  channels?: string[]
): Promise<{ channel: string; template: NotificationTemplate }[]>;

The n8n workflow receives $locales as [passenger_locale, operator_default_locale, 'de-DE'] and selects the first match per channel.

§5.1 ChannelAccount ACTIVE Gate

Before dispatching on a channel, verify the ChannelAccount exists and is ACTIVE:

sql
SELECT ca.id, ca.provider_config
FROM communications.channel_accounts ca
WHERE ca.tenant_id = :tenant_id
  AND ca.channel_type = :channel
  AND ca.status = 'ACTIVE'
LIMIT 1;

Location in pipeline: The n8n workflow performs this check after resolving the template. If no ACTIVE account exists for the channel:

  • Log warning: template found but no ACTIVE ChannelAccount for {channel}
  • Exclude that channel from the dispatch list

If ALL channels are either missing templates or inactive:

  • n8n creates a Message row with status = FAILED, failed_reason = 'no_active_channels'
  • No BullMQ job is enqueued
  • Dispatcher sees FAILED message in Inbox; can retry once ChannelAccount is ACTIVE

§6 Multi-Channel vs. Fallback Chain

PatternWhen to UseExample
Simultaneous Multi-ChannelTransactional notifications where both channels carry different valueBookingFullyPaid, InvoiceIssued
Fallback ChainOperational notifications where delivery speed mattersIncidentBroadcast (WhatsApp → SMS → Email)
Single ChannelChannel-specific notificationsCheckoutAbandoned (Email only)

NotificationRequest.channels controls the pattern:

  • Omitted: Dispatch on all channels with a matching template (simultaneous).
  • Specified: Dispatch only on listed channels.
  • DispatchMessageJob.fallback_chain controls fallback.

§7 trigger_event Registry

trigger_eventDomain Event SourceChannelsContext VariablesApproval
BOOKING_CONFIRMEDBookingConfirmed ⚠️EMAIL, WHATSAPPpassenger_name, tour_name, departure_date, booking_reference, deposit_amountNo
BOOKING_FULLY_PAIDBookingFullyPaidEMAIL, WHATSAPPpassenger_name, tour_name, departure_date, booking_reference, total_amount, invoice_urlNo
BOOKING_CANCELLEDBookingCancelledEMAIL, WHATSAPPpassenger_name, tour_name, departure_date, booking_reference, cancellation_feeNo
BOOKING_REFUNDEDBookingRefundedEMAILpassenger_name, booking_reference, refund_amountNo
BOOKING_COMPLETEDBookingCompletedEMAILpassenger_name, tour_name, operator_name, feedback_urlNo
BOOKING_NOSHOWBookingNoShowEMAILpassenger_name, booking_reference, tour_nameNo
CHECKOUT_ABANDONEDCheckoutAbandonedEMAILpassenger_name, tour_name, checkout_url, expiry_reasonNo
CHECKOUT_ABANDONED_WHATSAPPCheckoutAbandoned (+24h)WHATSAPPpassenger_name, tour_name, checkout_urlNo
FINAL_PAYMENT_DUEFinalPaymentDueWHATSAPP, EMAILpassenger_name, tour_name, departure_date, remaining_amount, payment_linkNo
FINAL_PAYMENT_OVERDUEFinalPaymentOverdueWHATSAPP, EMAILpassenger_name, tour_name, departure_date, remaining_amount, payment_link, days_overdueNo
PASSENGER_CANCELLEDPassengerCancelledEMAILpassenger_name, tour_name, cancellation_fee, refund_amountNo
ANCILLARY_CANCELLEDAncillaryCancelledEMAILpassenger_name, ancillary_label, refund_amountNo
PASSENGER_ADDEDPassengerAddedEMAIL, WHATSAPPpassenger_name, tour_name, departure_date, boarding_point_nameNo
INVOICE_ISSUEDInvoiceIssuedEMAILpassenger_name, booking_reference, invoice_number, total_gross, invoice_pdf_urlNo
SERVICE_LEG_STARTEDServiceLegStartedWHATSAPPpassenger_name, boarding_point_name, tracking_urlNo
SERVICE_LEG_CANCELLEDServiceLegCancelledWHATSAPP, EMAILpassenger_name, tour_name, boarding_point_name, operator_phoneNo
INCIDENT_BROADCASTIncidentCreated (CRITICAL)WHATSAPPSee incident-broadcast-protocol.md §8. Specialized flow.Yes
INCIDENT_ALLCLEARIncidentResolved (CRITICAL)WHATSAPPSee incident-broadcast-protocol.md §8. Specialized flow.No
VEHICLE_SWAP_NOTIFICATIONVehicleSwappedWHATSAPPpassenger_name, old_vehicle, new_vehicle, seat_assignmentNo
QUALIFICATION_EXPIRINGQualificationExpiringEMAILcrew_member_name, qualification_type, expiry_date, days_remainingNo
VEHICLE_INSPECTION_OVERDUEVehicleInspectionOverdueEMAILvehicle_license_plate, inspection_type, due_date, days_overdueNo
PRE_TRIP_REMINDERHasura Cron (T-24h)WHATSAPP, EMAILpassenger_name, tour_name, departure_date, boarding_point_name, boarding_time, passenger_instructionsNo

Events without external channels (e.g., FINANCIAL_LEDGER_CLOSED, SERVICE_LEG_COMPLETED, BOARDING_CONFLICT) target internal systems (Dispatch Board Hasura subscriptions) and do not generate Messages.

Events marked "Specialized flow" bypass the generic NotificationPipelineService and use dedicated NestJS handlers + n8n workflows. They rejoin at the DispatchMessageJob level for CPaaS sending.

NOTE

BOOKING_CONFIRMED — event catalog update. event-catalog.md now lists Communications as a consumer of BookingConfirmed (deposit confirmation EMAIL + WHATSAPP).

IMPORTANT

Seeding requirement: ProvisionTenant (ADR-003) must seed all templates above. Phase 1 seeds de-DE locale only. Operators can edit body content, but system-provided defaults must exist.


§8 CPaaS Delivery Webhook Specification

NOTE

Webhook routing: Meta supports WABA-level override callbacks via POST /<WABA_ID>/subscribed_apps. During channel provisioning, Busflow registers a per-tenant callback URL (/api/webhooks/meta/{tenant_id}) for each WABA. Both inbound messages and delivery status callbacks arrive at this per-tenant URL. The lookup keys below apply when resolving individual messages within the handler. See channel-provisioning-protocol.md §7 for webhook registration and inbox-protocol.md §7 for inbound message handling.

ProviderWebhook EndpointLookup KeyStatus Mapping
Meta Cloud API (WhatsApp) — deliveryPOST /api/webhooks/meta/{tenant_id} (per-WABA override)external_message_id = Meta messages[].idsent → SENT, delivered → DELIVERED, read → READ, failed → FAILED
Amazon SES (Email)POST /api/webhooks/ses (SNS subscription)external_message_id = SES MessageIdDelivery → DELIVERED, Bounce (permanent) → FAILED, Complaint → FAILED
Amazon SNS (SMS)POST /api/webhooks/sns-sms (SNS subscription)external_message_id = SNS MessageIdSUCCESS → DELIVERED, FAILURE → FAILED

Handler: NestJS validates webhook signature (Meta: HMAC-SHA256 via X-Hub-Signature-256; SES/SNS: SNS message signature). For Meta, resolves tenant_id from the URL path parameter. Extracts external_message_id → queries messages WHERE external_message_id = :id → updates status, delivered_at/read_at/failed_reason. For SES/SNS (account-level webhooks), resolves tenant_id from the Message row.


§9 BatchDispatchJob Interface

For high-volume notifications (pre-trip reminders, mass cancellation):

typescript
/** Queue name: 'message-batch-dispatch' */
interface BatchDispatchJob {
  batch_id: string;
  message_ids: string[];
  channel: 'WHATSAPP' | 'EMAIL' | 'SMS';
  channel_account_id: string;
  provider_config: Record<string, unknown>;
  fallback_chain?: ('WHATSAPP' | 'SMS' | 'EMAIL')[];
}

The processor iterates message_ids[], fetches each Message's rendered_content and recipient from the database, and dispatches sequentially with rate limiting. Failed individual messages do not abort the batch.


§10 Contact & Conversation Handling

Contact Resolution

The n8n workflow resolves each recipient to a Contact record. It uses the same base logic as the inbound message ingestion handler (see inbox-protocol.md §8), but adds support for profiling:

  1. If passenger_profile_id is provided: Query contacts WHERE tenant_id = :tenant_id AND passenger_profile_id = :profile_id. Use existing Contact.
  2. Fallback: If no profile ID exists, use the standard Contact Resolution Algorithm.
  3. If no Contact exists: Create new Contact with identifiers = [{type: 'phone', value: :phone}, ...].

This lookup/upsert logic ensures a unified identity across both inbound and outbound channels.

Conversation Handling

If the NotificationRequest provides a conversation_context:

  1. Look up: Find an existing OPEN or SNOOZED conversation linked to the context.
    sql
    SELECT id FROM communications.conversations
    WHERE tenant_id = :tenant_id
      AND contact_id = :contact_id
      AND (:type = 'booking' AND booking_id = :id)
       OR (:type = 'trip' AND trip_id = :id)
       OR (:type = 'invoice' AND invoice_id = :id)
      AND status != 'RESOLVED';
  2. Action: Reuse the found Conversation or create a new one with status = OPEN and the appropriate FK (booking_id, trip_id, or invoice_id) set.

Postgres schema note: Phase 1 uses nullable FKs (booking_id, trip_id, invoice_id) on the conversations table. A CHECK constraint ensures the system populates at most one context FK.

If the request omits conversation_context, n8n does not create or link a Conversation. The Message exists as a standalone outbound record.


§11 Error Handling & Resilience

n8n Webhook Failure (Circuit Breaker)

The NestJS handler wraps the n8n webhook call in a circuit breaker (opossum library, per workflow-orchestration.md §Error Handling):

Failure modes:

  • Timeout (>30s): Transient; circuit breaker opens after 5 consecutive timeouts within 60s.
  • 5xx response: Transient; same open threshold.
  • 4xx response (e.g., 422 payload validation error): Likely permanent; log, open circuit, manual review required.

On circuit open:

  • NestJS catches the call, logs ERROR severity.
  • Enqueues the original NotificationRequest to a notifications_recovery BullMQ queue (separate from message-dispatch).
  • Returns 200 to Hasura (idempotent fire-and-forget).
  • A monitoring job (RecoveryWorker) periodically retries items in the recovery queue.

§11.1 RecoveryWorker Contract

Queue Name: notifications_recovery

typescript
interface RecoveryJob {
  request: NotificationRequest;
  attempt: number;
  lastChangedAt: string;
}

Retry Strategy:

  • Attempts 1-3: Retry with linear backoff (1 minute).
  • Attempt 4: Move job to FAILED queue. Trigger ERROR alert for the operator (Phase 1: Log only; Phase 2: Dispatcher alert).
  • Manual Intervention: A developer can replay failed recovery jobs via the BullMQ dashboard if n8n requires manual workflow fixing.

Circuit breaker config:

typescript
const circuitBreaker = new CircuitBreaker(async (req) => {
  return await this.httpClient.post(n8nWebhookUrl, req).toPromise();
}, {
  timeout: 30000,
  errorThresholdPercentage: 50,
  resetTimeout: 60000,  // Half-open after 60s
  rollingCountTimeout: 60000,
});

Orphaned QUEUED Messages (Stuck Message Detection)

If n8n creates a Message (status = QUEUED) but fails to enqueue the BullMQ job, the Message is orphaned. Detection:

Hasura Cron Trigger: message_stuck_queue_scan (every 5 minutes)

sql
SELECT id, tenant_id FROM communications.messages
WHERE status = 'QUEUED'
  AND sent_at < now() - interval '5 minutes'
ORDER BY sent_at ASC
LIMIT 100;

Action: Log WARN per tenant. Dispatcher alerts are future work (Phase 2).

ChannelAccount SUSPENDED During Dispatch

If a ChannelAccount transitions from ACTIVE to SUSPENDED (due to provider revocation or manager action) while messages are QUEUED, the BullMQ worker must detect and handle gracefully:

  1. Worker fetches ChannelAccount status before dispatch: SELECT status FROM channel_accounts WHERE id = :channel_account_id.
  2. If status != ACTIVE: Permanently fail the Message: UPDATE messages SET status = 'FAILED', failed_reason = 'channel_suspended'.
  3. No retry: This is a permanent failure; the dispatcher must reactivate the ChannelAccount or route to a fallback channel manually.

Fallback behavior: If fallback_chain specifies additional channels and the primary channel fails due to suspension (vs. permanent recipient error), the worker creates a new Message + job for the next channel in the chain. See §4 Fallback Chain Handling.

Template Not Found (All Channels)

If n8n resolves zero templates (either no matching template records or all matching channels are INACTIVE):

  1. n8n creates Message with status = FAILED, failed_reason = 'no_templates_found'.
  2. No BullMQ job is enqueued.
  3. Dispatcher sees FAILED message in Inbox.

§12 Pipeline Idempotency

The correlation_id column on messages stores the Hasura event_id. Before forwarding to n8n, the NestJS handler queries:

sql
SELECT id FROM communications.messages WHERE correlation_id = :event_id LIMIT 1;

If a row exists, the event is a duplicate (Hasura at-least-once delivery) and the handler returns 200 without processing.

Schema enforcement: The messages table has a UNIQUE(correlation_id) WHERE correlation_id IS NOT NULL constraint. This prevents race conditions if two NestJS handlers process the same event_id simultaneously. The first INSERT succeeds; the second fails with a UNIQUE violation, which the handler catches to return an idempotent 200 result. See schema-communications.md §messages for the constraint definition.

Scope: This dedup protects against Hasura Event Trigger at-least-once delivery. It does NOT protect against:

  • n8n webhook being called twice for the same event_id — If the NestJS call to n8n hangs and times out, the handler may retry and call n8n again. Ensure n8n is idempotent (e.g., by checking its own duplication log or using NotificationRequest.event_id as the cross-session idempotency key).
  • BullMQ worker processing the same job twice — BullMQ's built-in deduplication (via job ID) prevents this. See §4 for job configuration.

Internal documentation — Busflow