Notification Pipeline Protocol
Domain: Communications (Shared Core Domain) Trigger: Domain events from all bounded contexts (Hasura Event Triggers → NestJS handlers) Output: Rendered messages dispatched via Meta Cloud API (WhatsApp), Amazon SES (Email), Amazon SNS (SMS) Sources: communications.md, workflow-orchestration.mdL3 DoD: ✅ Schema | ✅ API | ✅ Edge States — Ready to Code
§1 Overview
Flow: Domain Event → Hasura Event Trigger → NestJS handler (idempotency check, routing, enrichment) → n8n webhook (NotificationRequest) → template resolution → Message row creation → BullMQ DispatchMessageJob → CPaaS adapter (Meta/SES/SNS) → delivery webhook → status update.
The pipeline follows the four-layer workflow orchestration stack:
- Hasura detects DB changes and fires webhooks to NestJS (at-least-once delivery semantics)
- NestJS checks idempotency via
correlation_iddedup, enriches the payload (resolves recipients, builds context variables, gates on ChannelAccount status), and forwards to n8n via circuit-breaker-wrapped HTTP call - n8n resolves templates against tenant+trigger_event+channel+locale, creates Message rows with status=QUEUED, and enqueues BullMQ jobs
- BullMQ handles durable dispatch with retries, rate limiting per ChannelAccount tier, and fallback chains
Events requiring specialized flows (e.g., Incident Broadcast with dispatcher approval) bypass the generic pipeline and use dedicated n8n workflows, but rejoin at the DispatchMessageJob level for actual sending.
§2 NotificationRequest Interface
The generic payload the NestJS event handler forwards to n8n. NestJS owns idempotency checking, routing, and payload enrichment — n8n owns template resolution, Message row creation, and BullMQ enqueue. See §2.1 for NestJS handler integration details and error handling.
interface NotificationRequest {
/** Hasura event_id — used for idempotency dedup */
event_id: string;
/** The originating tenant */
tenant_id: string;
/** Maps to notification_templates.trigger_event */
trigger_event: string;
/** Target recipient(s). At least one required. */
recipients: NotificationRecipient[];
/**
* Variables for template interpolation.
* Shape varies per trigger_event — see §7 for the registry.
*/
context: Record<string, unknown>;
/**
* Optional conversation context — creates or reuses a
* Conversation linked to this domain entity.
* If omitted, n8n does not create a Conversation row.
*/
conversation_context?: {
type: 'booking' | 'trip' | 'invoice';
id: string;
};
/**
* Override default channel selection.
* If omitted, the pipeline dispatches on all channels
* that have a matching NotificationTemplate AND status=ACTIVE.
*/
channels?: ('WHATSAPP' | 'EMAIL' | 'SMS')[];
}
interface NotificationRecipient {
/** Soft FK to communications.contacts — resolved or created by the pipeline */
passenger_profile_id?: string;
/** Direct contact info (used when no profile exists) */
name: string;
phone?: string;
email?: string;
}§2.1 NestJS Handler Integration
Entry Point:
@Injectable()
class NotificationPipelineService {
constructor(
private readonly hasuraClient: HasuraClient,
private readonly httpClient: HttpClient,
private readonly circuitBreaker: CircuitBreaker,
private readonly bullQueue: BullQueue,
private readonly logger: Logger
) {}
async processEvent(req: NotificationRequest): Promise<void> {
// 1. Dedup: SELECT messages WHERE correlation_id = :event_id LIMIT 1
// If exists, return 200 (duplicate Hasura event).
// 2. Validate: At least one channel ACTIVE for this tenant.
// 3. Call n8n webhook with circuit breaker.
// POST /webhook/notifications/{trigger_event}
// On failure → catch, log, enqueue to notifications_recovery BullMQ queue.
// Return 200 immediately (fire-and-forget).
}
}Idempotency: See §12.
Error Handling: See §11.
§3 Message.status Transition Ownership
| From | To | Actor | Mechanism |
|---|---|---|---|
| — | QUEUED | n8n workflow | Hasura mutation insert_communications_messages after template rendering |
QUEUED | SENT | BullMQ worker | After CPaaS API returns 2xx. Worker writes external_message_id. |
SENT | DELIVERED | CPaaS webhook handler | Meta: statuses[].status = 'delivered'. SES: SNS eventType = 'Delivery'. Handler updates delivered_at. |
SENT/DELIVERED | READ | CPaaS webhook handler | Meta: statuses[].status = 'read'. SES: no read tracking. Handler updates read_at. |
QUEUED | FAILED | BullMQ worker | Permanent dispatch failure (invalid recipient, template rejected). Sets failed_reason. |
SENT | FAILED | CPaaS webhook handler | Meta: statuses[].status = 'failed'. SES: SNS eventType = 'Bounce' (permanent). Sets failed_reason. |
If a Message sits in
SENTfor >24h without a delivery callback, flag it as stale. Observable via the 5-minute QUEUED scan (extend to cover stale SENT).
§4 DispatchMessageJob BullMQ Job
/** Queue name: 'message-dispatch' */
interface DispatchMessageJob {
message_id: string;
channel_account_id: string;
channel: 'WHATSAPP' | 'EMAIL' | 'SMS';
rendered_content: string;
meta_template?: {
name: string;
namespace: string;
components: Record<string, string>[];
};
recipient: {
phone?: string;
email?: string;
};
fallback_chain?: ('WHATSAPP' | 'SMS' | 'EMAIL')[];
provider_config: Record<string, unknown>;
}Default queue config:
{
defaultJobOptions: {
attempts: 5,
backoff: { type: 'exponential', delay: 5000 },
removeOnComplete: 1000,
removeOnFail: false,
},
limiter: {
max: 50,
duration: 1000, // 50 msgs/sec — conservative for Meta Tier 1
},
}Fallback chain handling: On permanent failure from the primary channel, the worker creates a new Message row for the next channel in fallback_chain[] (status = QUEUED) and links it via correlation_id. It then enqueues a new job. The original Message transitions to FAILED with failed_reason = 'fallback_triggered:<channel>'.
§4.1 DispatchWorker — BullMQ Processor
The DispatchWorker registers as a BullMQ processor on the message-dispatch queue. It consumes jobs, selects the appropriate adapter, invokes the provider API, parses the response, and transitions Message status.
@Injectable()
export class DispatchWorker implements OnModuleInit {
constructor(
@InjectQueue('message-dispatch')
private readonly queue: Queue<DispatchMessageJob>,
private readonly factory: CpaasAdapterFactory,
private readonly messageService: MessageService,
private readonly encryptionService: EncryptionService,
private readonly rateLimiter: RateLimiterService,
private readonly logger: Logger,
) {}
onModuleInit() {
// Register processor: queue.process(concurrency, handler)
// Concurrency: 10 workers (tunable based on CPU/memory)
this.queue.process(10, this.handleDispatch.bind(this));
// Register failed event handler for permanent failures
this.queue.on('failed', this.handlePermanentFailure.bind(this));
}
async handleDispatch(job: Job<DispatchMessageJob>): Promise<string> {
// Returns external_message_id on success; throws CpaasError on failure
const {
channel,
channel_account_id,
provider_config,
message_id,
tenant_id,
} = job.data;
try {
// Decrypt provider secrets (API keys, access tokens)
const decrypted = await this.encryptionService.decrypt(
provider_config,
{ tenant_id, channel_account_id }
);
// Select adapter by channel type
const adapter = this.factory.getAdapter(channel);
// Acquire rate limit token (per-tenant, tier-aware for WhatsApp)
await this.rateLimiter.acquire(tenant_id, channel_account_id, channel, decrypted);
// Dispatch via adapter (construct API request, execute, parse response)
const external_message_id = await adapter.dispatch(job.data, decrypted);
// Update Message.external_message_id, status → SENT
await this.messageService.updateMessageAfterDispatch(message_id, external_message_id);
this.logger.debug(`Message ${message_id} sent via ${channel}, external_id: ${external_message_id}`);
return external_message_id;
} catch (error) {
if (error instanceof CpaasError) {
if (!error.isTransient) {
// Permanent failure — BullMQ will trigger 'failed' event
throw error;
}
// Transient failure — BullMQ will retry (exponential backoff)
throw error;
}
// Unexpected error — treat as transient
throw new CpaasError('UNKNOWN', error.message, true, 'UNEXPECTED_ERROR');
}
}
async handlePermanentFailure(
job: Job<DispatchMessageJob>,
error: CpaasError
): Promise<void> {
const { message_id, fallback_chain, tenant_id, channel_account_id } = job.data;
this.logger.warn(
`Permanent failure for message ${message_id}: ${error.errorType} (${error.code})`
);
if (!fallback_chain || fallback_chain.length === 0) {
// No fallback; mark original message as FAILED
await this.messageService.updateMessageFailed(
message_id,
error.errorType,
error.message
);
return;
}
// Pop first channel from fallback_chain
const nextChannel = fallback_chain[0];
const remainingChain = fallback_chain.slice(1);
// Create new Message row for fallback channel
const fallbackMessage = await this.messageService.createFallbackMessage(
message_id,
nextChannel
);
// Enqueue new job for fallback channel
const newJobData: DispatchMessageJob = {
...job.data,
message_id: fallbackMessage.id,
channel: nextChannel,
fallback_chain: remainingChain,
};
await this.queue.add(newJobData, { delay: 0 });
// Mark original message as FAILED with fallback_triggered reason
await this.messageService.updateMessageFailed(
message_id,
`fallback_triggered:${nextChannel}`,
`Falling back to ${nextChannel} after ${error.errorType}`
);
this.logger.info(`Fallback triggered: message ${message_id} → ${nextChannel}`);
}
}§4.2 CpaasAdapterFactory
Factory pattern to select the correct adapter by channel type.
@Injectable()
export class CpaasAdapterFactory {
constructor(
private readonly whatsapp: WhatsAppAdapter,
private readonly email: EmailAdapter,
private readonly sms: SmsAdapter,
) {}
getAdapter(channel: 'WHATSAPP' | 'EMAIL' | 'SMS'): CpaasAdapter {
switch (channel) {
case 'WHATSAPP':
return this.whatsapp;
case 'EMAIL':
return this.email;
case 'SMS':
return this.sms;
default:
throw new Error(`Unsupported channel: ${channel}`);
}
}
}§4.3 CpaasAdapter Base Interface
All provider-specific adapters implement this contract.
export interface CpaasAdapter {
/**
* Dispatch a message via the provider's API.
* @param job The DispatchMessageJob payload
* @param decrypted The decrypted provider_config (WhatsAppProviderConfig | EmailProviderConfig | SmsProviderConfig)
* @returns The provider's external_message_id
* @throws CpaasError (with code, message, isTransient flag, errorType)
*/
dispatch(
job: DispatchMessageJob,
decrypted: Record<string, unknown>
): Promise<string>;
}
export class CpaasError extends Error {
constructor(
public readonly code: string | number,
message: string,
public readonly isTransient: boolean,
public readonly errorType: string
) {
super(message);
this.name = 'CpaasError';
}
}§4.4 WhatsAppAdapter — Meta Cloud API
Dispatches messages via Meta Cloud API (v18.0). Supports both template-based messages (for marketing/template-approved content) and session messages (for transactional/24h contact window).
@Injectable()
export class WhatsAppAdapter implements CpaasAdapter {
constructor(
private readonly httpClient: HttpService,
private readonly logger: Logger,
) {}
async dispatch(
job: DispatchMessageJob,
decrypted: WhatsAppProviderConfig
): Promise<string> {
// Decide: template message or session message?
// Template message: Use meta_template for marketing/pre-approved content
// Session message: Use rendered_content for transactional (must have prior 24h contact)
const isTemplateMessage = Boolean(job.meta_template);
let payload: Record<string, unknown>;
if (isTemplateMessage) {
// Template-based dispatch (marketing messages)
payload = {
messaging_product: 'whatsapp',
to: job.recipient.phone,
type: 'template',
template: {
name: job.meta_template.name,
namespace: job.meta_template.namespace,
language: { code: 'de' }, // Phase 1: German only
components: job.meta_template.components,
},
};
} else {
// Session message (transactional, 24h window)
payload = {
messaging_product: 'whatsapp',
to: job.recipient.phone,
type: 'text',
text: { body: job.rendered_content },
};
}
try {
const url = `https://graph.facebook.com/v18.0/${decrypted.phone_number_id}/messages`;
const response = await this.httpClient.post(url, payload, {
headers: {
Authorization: `Bearer ${decrypted.access_token}`,
'Content-Type': 'application/json',
},
}).toPromise();
// Extract external_message_id from response
const external_message_id = response.data?.messages?.[0]?.id;
if (!external_message_id) {
throw new Error('Meta Cloud API response missing message ID');
}
return external_message_id;
} catch (error) {
throw this.mapMetaError(error);
}
}
private mapMetaError(error: any): CpaasError {
const code = error.response?.data?.error?.code;
const message = error.response?.data?.error?.message || error.message;
// Classify by Meta error code (see error classification table below)
switch (code) {
case 429:
// Rate limit exceeded — transient
return new CpaasError(code, message, true, 'RATE_LIMITED');
case 131026:
// Recipient not on WhatsApp — permanent
return new CpaasError(code, message, false, 'RECIPIENT_NOT_ON_WHATSAPP');
case 131000:
// Unable to create message — transient (network/overload)
return new CpaasError(code, message, true, 'UNABLE_TO_CREATE_MESSAGE');
case 100:
// Invalid request parameters — permanent
return new CpaasError(code, message, false, 'INVALID_PARAMETERS');
default:
// Unknown error — assume transient for safety
return new CpaasError(code, message, true, 'UNKNOWN_ERROR');
}
}
}§4.5 EmailAdapter — Amazon SES
Dispatches emails via Amazon SES. Automatically detects HTML vs. plain-text content.
@Injectable()
export class EmailAdapter implements CpaasAdapter {
private readonly ses = new SESClient({ region: 'eu-central-1' });
constructor(
private readonly logger: Logger,
) {}
async dispatch(
job: DispatchMessageJob,
decrypted: EmailProviderConfig
): Promise<string> {
// Determine content type: HTML or plain text?
const isHtml = /<[a-z]/i.test(job.rendered_content);
const params = new SendEmailCommand({
Source: decrypted.sender_email,
Destination: { ToAddresses: [job.recipient.email] },
Message: {
Subject: {
Data: 'Notification from Busflow',
Charset: 'UTF-8',
},
Body: isHtml
? {
Html: { Data: job.rendered_content, Charset: 'UTF-8' },
}
: {
Text: { Data: job.rendered_content, Charset: 'UTF-8' },
},
},
ConfigurationSetName: decrypted.configuration_set_name,
...(decrypted.reply_to_email && { ReplyToAddresses: [decrypted.reply_to_email] }),
});
try {
const response = await this.ses.send(params);
return response.MessageId; // SES MessageId is external_message_id
} catch (error) {
throw this.mapSesError(error);
}
}
private mapSesError(error: any): CpaasError {
const code = error.name || error.Code;
const message = error.message;
// Classify by SES error type (see error classification table below)
switch (code) {
case 'MessageRejected':
// Permanent: recipient address rejected, suppression list
return new CpaasError(code, message, false, 'INVALID_RECIPIENT');
case 'SendingPausedException':
// Transient: account sending paused (manager action, usually temporary)
return new CpaasError(code, message, true, 'SENDING_PAUSED');
case 'ThrottlingException':
// Transient: sending rate exceeded
return new CpaasError(code, message, true, 'THROTTLED');
case 'ConfigurationSetDoesNotExistException':
// Permanent: configuration set misconfigured
return new CpaasError(code, message, false, 'CONFIG_ERROR');
default:
// Unknown — assume transient
return new CpaasError(code, message, true, 'UNKNOWN_ERROR');
}
}
}§4.6 SmsAdapter — Amazon SNS
Dispatches SMS via Amazon SNS. Injects Sender ID for DACH carriers.
@Injectable()
export class SmsAdapter implements CpaasAdapter {
private readonly sns = new SNSClient({ region: 'eu-central-1' });
constructor(
private readonly logger: Logger,
) {}
async dispatch(
job: DispatchMessageJob,
decrypted: SmsProviderConfig
): Promise<string> {
const senderId = decrypted.origination_type === 'SENDER_ID'
? decrypted.origination_identity
: undefined;
const params = new PublishCommand({
Message: job.rendered_content,
PhoneNumber: job.recipient.phone,
MessageAttributes: {
...(senderId && {
AWS.SNS.SMS.SenderID: {
DataType: 'String',
StringValue: senderId,
},
}),
AWS.SNS.SMS.SMSType: {
DataType: 'String',
StringValue: 'Transactional',
},
},
});
try {
const response = await this.sns.send(params);
return response.MessageId; // SNS MessageId is external_message_id
} catch (error) {
throw this.mapSnsError(error);
}
}
private mapSnsError(error: any): CpaasError {
const code = error.name || error.Code;
const message = error.message;
// Classify by SNS error type (see error classification table below)
switch (code) {
case 'InvalidParameterException':
// Permanent: invalid phone number
return new CpaasError(code, message, false, 'INVALID_RECIPIENT');
case 'KMSDisabledException':
// Transient: KMS key issue (usually resolves)
return new CpaasError(code, message, true, 'KMS_ERROR');
case 'UserError':
// May be transient or permanent; inspect message
if (message.includes('spending limit')) {
// Spending limit — transient (recovers when limit increases or after 24h)
return new CpaasError(code, message, true, 'SPENDING_LIMIT');
}
return new CpaasError(code, message, false, 'USER_ERROR');
default:
// Unknown — assume transient
return new CpaasError(code, message, true, 'UNKNOWN_ERROR');
}
}
}§4.7 Error Classification Reference
| Provider | Error Code / Exception | Description | Transient? | Action |
|---|---|---|---|---|
| Meta (WhatsApp) | 429 | Rate limit exceeded | ✅ Yes | Backoff: Exponential (5s base from BullMQ config). Per-tenant rate limiting via RateLimiterService (§4.8). Respect messaging_tier from provider_config. |
| Meta | 131026 | Recipient not on WhatsApp | ❌ No | Permanent. Trigger fallback chain to SMS/Email. Do not retry. |
| Meta | 131000 | Unable to create message | ✅ Yes | Transient; BullMQ retries. May indicate network hiccup or Meta overload. |
| Meta | 100 | Invalid request parameters | ❌ No | Permanent. Log error; surface to dispatcher. Do not retry. |
| SES (Email) | MessageRejected | Recipient rejected (bounce list, invalid address) | ❌ No | Permanent. Do not retry. Trigger fallback chain if configured. |
| SES | SendingPausedException | Account sending paused | ✅ Yes | Transient; BullMQ retries. Dispatcher should investigate account status. |
| SES | ThrottlingException | Sending rate exceeded (SendingRateExceeded) | ✅ Yes | Transient; BullMQ retries with exponential backoff. Check account sending rate limits. |
| SES | ConfigurationSetDoesNotExistException | Configuration set misconfigured | ❌ No | Permanent. Validate provider_config.configuration_set_name against SES account. |
| SNS (SMS) | InvalidParameterException | Invalid phone number | ❌ No | Permanent. Validate E.164 format. Trigger fallback if configured. |
| SNS | KMSDisabledException | KMS key disabled (encryption issue) | ✅ Yes | Transient; BullMQ retries. Contact AWS support if persists. |
| SNS | UserError (spending limit) | Monthly spending limit reached | ✅ Yes | Transient (recovers after 24h or when operator increases limit). Queue for retry. Surface warning to dispatcher. |
§4.8 Rate Limiter Configuration
Per-tenant rate limiting with tier-aware adjustment for WhatsApp messaging tiers. Each tenant holds an isolated token bucket per channel.
@Injectable()
export class RateLimiterService {
private buckets = new Map<string, TokenBucket>(); // Key: `${tenant_id}:${channel_account_id}`
async acquire(
tenant_id: string,
channel_account_id: string,
channel: string,
decrypted: WhatsAppProviderConfig | EmailProviderConfig | SmsProviderConfig
): Promise<void> {
const key = `${tenant_id}:${channel_account_id}`;
let bucket = this.buckets.get(key);
if (!bucket) {
const config = this.getConfig(channel, decrypted);
bucket = new TokenBucket(config.capacity, config.refillRate);
this.buckets.set(key, bucket);
}
await bucket.acquire(1);
}
private getConfig(
channel: string,
decrypted: any
): { capacity: number; refillRate: number } {
let capacity = 50;
let refillRate = 50;
if (channel === 'WHATSAPP' && decrypted.messaging_tier) {
const tier = decrypted.messaging_tier as string;
const multipliers: Record<string, number> = {
TIER_1: 1, // 60 msg/min = 1 msg/sec
TIER_2: 8.3, // 500 msg/min
TIER_3: 33.3, // 2000 msg/min
TIER_4: 50, // 3000 msg/min
UNLIMITED: 100,
};
const multiplier = multipliers[tier] || 1;
refillRate = 50 * multiplier;
}
return { capacity, refillRate };
}
}
class TokenBucket {
private tokens: number;
private lastRefill: number = Date.now();
constructor(
private readonly capacity: number,
private readonly refillRate: number // tokens per second
) {
this.tokens = capacity;
}
async acquire(tokensNeeded: number): Promise<void> {
const now = Date.now();
const elapsed = (now - this.lastRefill) / 1000;
this.tokens = Math.min(
this.capacity,
this.tokens + elapsed * this.refillRate
);
this.lastRefill = now;
if (this.tokens < tokensNeeded) {
const waitTime = ((tokensNeeded - this.tokens) / this.refillRate) * 1000;
await new Promise(resolve => setTimeout(resolve, Math.ceil(waitTime)));
this.tokens -= tokensNeeded;
} else {
this.tokens -= tokensNeeded;
}
}
}NOTE
Phase 2 upgrade path: This in-memory TokenBucket works for single-instance Phase 1 deployments. For multi-instance scaling, migrate to a Redis-backed token bucket using an atomic Lua script. Key structure: communications:limiter:{tenant_id}:{channel_type}. The RateLimiterService interface stays the same — only the backing store changes.
§5 Template Resolution Algorithm
/**
* Lookup chain (per channel):
* 1. (tenant_id, trigger_event, channel, passenger_locale) — exact match
* 2. (tenant_id, trigger_event, channel, operator_default_locale) — locale fallback
* 3. (tenant_id, trigger_event, channel, 'de-DE') — ultimate fallback (DACH)
*
* If no template exists for a channel, that channel is skipped.
* The resolved template list only includes channels with both:
* - A matching NotificationTemplate (above lookup chain)
* - status = ACTIVE on the corresponding ChannelAccount
*/
async function resolveTemplates(
tenant_id: string,
trigger_event: string,
locale: string,
channels?: string[]
): Promise<{ channel: string; template: NotificationTemplate }[]>;The n8n workflow receives $locales as [passenger_locale, operator_default_locale, 'de-DE'] and selects the first match per channel.
§5.1 ChannelAccount ACTIVE Gate
Before dispatching on a channel, verify the ChannelAccount exists and is ACTIVE:
SELECT ca.id, ca.provider_config
FROM communications.channel_accounts ca
WHERE ca.tenant_id = :tenant_id
AND ca.channel_type = :channel
AND ca.status = 'ACTIVE'
LIMIT 1;Location in pipeline: The n8n workflow performs this check after resolving the template. If no ACTIVE account exists for the channel:
- Log warning:
template found but no ACTIVE ChannelAccount for {channel} - Exclude that channel from the dispatch list
If ALL channels are either missing templates or inactive:
- n8n creates a Message row with
status = FAILED,failed_reason = 'no_active_channels' - No BullMQ job is enqueued
- Dispatcher sees FAILED message in Inbox; can retry once ChannelAccount is ACTIVE
§6 Multi-Channel vs. Fallback Chain
| Pattern | When to Use | Example |
|---|---|---|
| Simultaneous Multi-Channel | Transactional notifications where both channels carry different value | BookingFullyPaid, InvoiceIssued |
| Fallback Chain | Operational notifications where delivery speed matters | IncidentBroadcast (WhatsApp → SMS → Email) |
| Single Channel | Channel-specific notifications | CheckoutAbandoned (Email only) |
NotificationRequest.channels controls the pattern:
- Omitted: Dispatch on all channels with a matching template (simultaneous).
- Specified: Dispatch only on listed channels.
DispatchMessageJob.fallback_chaincontrols fallback.
§7 trigger_event Registry
trigger_event | Domain Event Source | Channels | Context Variables | Approval |
|---|---|---|---|---|
BOOKING_CONFIRMED | BookingConfirmed ⚠️ | EMAIL, WHATSAPP | passenger_name, tour_name, departure_date, booking_reference, deposit_amount | No |
BOOKING_FULLY_PAID | BookingFullyPaid | EMAIL, WHATSAPP | passenger_name, tour_name, departure_date, booking_reference, total_amount, invoice_url | No |
BOOKING_CANCELLED | BookingCancelled | EMAIL, WHATSAPP | passenger_name, tour_name, departure_date, booking_reference, cancellation_fee | No |
BOOKING_REFUNDED | BookingRefunded | passenger_name, booking_reference, refund_amount | No | |
BOOKING_COMPLETED | BookingCompleted | passenger_name, tour_name, operator_name, feedback_url | No | |
BOOKING_NOSHOW | BookingNoShow | passenger_name, booking_reference, tour_name | No | |
CHECKOUT_ABANDONED | CheckoutAbandoned | passenger_name, tour_name, checkout_url, expiry_reason | No | |
CHECKOUT_ABANDONED_WHATSAPP | CheckoutAbandoned (+24h) | passenger_name, tour_name, checkout_url | No | |
FINAL_PAYMENT_DUE | FinalPaymentDue | WHATSAPP, EMAIL | passenger_name, tour_name, departure_date, remaining_amount, payment_link | No |
FINAL_PAYMENT_OVERDUE | FinalPaymentOverdue | WHATSAPP, EMAIL | passenger_name, tour_name, departure_date, remaining_amount, payment_link, days_overdue | No |
PASSENGER_CANCELLED | PassengerCancelled | passenger_name, tour_name, cancellation_fee, refund_amount | No | |
ANCILLARY_CANCELLED | AncillaryCancelled | passenger_name, ancillary_label, refund_amount | No | |
PASSENGER_ADDED | PassengerAdded | EMAIL, WHATSAPP | passenger_name, tour_name, departure_date, boarding_point_name | No |
INVOICE_ISSUED | InvoiceIssued | passenger_name, booking_reference, invoice_number, total_gross, invoice_pdf_url | No | |
SERVICE_LEG_STARTED | ServiceLegStarted | passenger_name, boarding_point_name, tracking_url | No | |
SERVICE_LEG_CANCELLED | ServiceLegCancelled | WHATSAPP, EMAIL | passenger_name, tour_name, boarding_point_name, operator_phone | No |
INCIDENT_BROADCAST | IncidentCreated (CRITICAL) | See incident-broadcast-protocol.md §8. Specialized flow. | Yes | |
INCIDENT_ALLCLEAR | IncidentResolved (CRITICAL) | See incident-broadcast-protocol.md §8. Specialized flow. | No | |
VEHICLE_SWAP_NOTIFICATION | VehicleSwapped | passenger_name, old_vehicle, new_vehicle, seat_assignment | No | |
QUALIFICATION_EXPIRING | QualificationExpiring | crew_member_name, qualification_type, expiry_date, days_remaining | No | |
VEHICLE_INSPECTION_OVERDUE | VehicleInspectionOverdue | vehicle_license_plate, inspection_type, due_date, days_overdue | No | |
PRE_TRIP_REMINDER | Hasura Cron (T-24h) | WHATSAPP, EMAIL | passenger_name, tour_name, departure_date, boarding_point_name, boarding_time, passenger_instructions | No |
Events without external channels (e.g.,
FINANCIAL_LEDGER_CLOSED,SERVICE_LEG_COMPLETED,BOARDING_CONFLICT) target internal systems (Dispatch Board Hasura subscriptions) and do not generate Messages.
Events marked "Specialized flow" bypass the generic
NotificationPipelineServiceand use dedicated NestJS handlers + n8n workflows. They rejoin at theDispatchMessageJoblevel for CPaaS sending.
NOTE
BOOKING_CONFIRMED — event catalog update. event-catalog.md now lists Communications as a consumer of BookingConfirmed (deposit confirmation EMAIL + WHATSAPP).
IMPORTANT
Seeding requirement: ProvisionTenant (ADR-003) must seed all templates above. Phase 1 seeds de-DE locale only. Operators can edit body content, but system-provided defaults must exist.
§8 CPaaS Delivery Webhook Specification
NOTE
Webhook routing: Meta supports WABA-level override callbacks via POST /<WABA_ID>/subscribed_apps. During channel provisioning, Busflow registers a per-tenant callback URL (/api/webhooks/meta/{tenant_id}) for each WABA. Both inbound messages and delivery status callbacks arrive at this per-tenant URL. The lookup keys below apply when resolving individual messages within the handler. See channel-provisioning-protocol.md §7 for webhook registration and inbox-protocol.md §7 for inbound message handling.
| Provider | Webhook Endpoint | Lookup Key | Status Mapping |
|---|---|---|---|
| Meta Cloud API (WhatsApp) — delivery | POST /api/webhooks/meta/{tenant_id} (per-WABA override) | external_message_id = Meta messages[].id | sent → SENT, delivered → DELIVERED, read → READ, failed → FAILED |
| Amazon SES (Email) | POST /api/webhooks/ses (SNS subscription) | external_message_id = SES MessageId | Delivery → DELIVERED, Bounce (permanent) → FAILED, Complaint → FAILED |
| Amazon SNS (SMS) | POST /api/webhooks/sns-sms (SNS subscription) | external_message_id = SNS MessageId | SUCCESS → DELIVERED, FAILURE → FAILED |
Handler: NestJS validates webhook signature (Meta: HMAC-SHA256 via X-Hub-Signature-256; SES/SNS: SNS message signature). For Meta, resolves tenant_id from the URL path parameter. Extracts external_message_id → queries messages WHERE external_message_id = :id → updates status, delivered_at/read_at/failed_reason. For SES/SNS (account-level webhooks), resolves tenant_id from the Message row.
§9 BatchDispatchJob Interface
For high-volume notifications (pre-trip reminders, mass cancellation):
/** Queue name: 'message-batch-dispatch' */
interface BatchDispatchJob {
batch_id: string;
message_ids: string[];
channel: 'WHATSAPP' | 'EMAIL' | 'SMS';
channel_account_id: string;
provider_config: Record<string, unknown>;
fallback_chain?: ('WHATSAPP' | 'SMS' | 'EMAIL')[];
}The processor iterates message_ids[], fetches each Message's rendered_content and recipient from the database, and dispatches sequentially with rate limiting. Failed individual messages do not abort the batch.
§10 Contact & Conversation Handling
Contact Resolution
The n8n workflow resolves each recipient to a Contact record. It uses the same base logic as the inbound message ingestion handler (see inbox-protocol.md §8), but adds support for profiling:
- If
passenger_profile_idis provided: Querycontacts WHERE tenant_id = :tenant_id AND passenger_profile_id = :profile_id. Use existing Contact. - Fallback: If no profile ID exists, use the standard Contact Resolution Algorithm.
- If no Contact exists: Create new Contact with
identifiers = [{type: 'phone', value: :phone}, ...].
This lookup/upsert logic ensures a unified identity across both inbound and outbound channels.
Conversation Handling
If the NotificationRequest provides a conversation_context:
- Look up: Find an existing OPEN or SNOOZED conversation linked to the context.sql
SELECT id FROM communications.conversations WHERE tenant_id = :tenant_id AND contact_id = :contact_id AND (:type = 'booking' AND booking_id = :id) OR (:type = 'trip' AND trip_id = :id) OR (:type = 'invoice' AND invoice_id = :id) AND status != 'RESOLVED'; - Action: Reuse the found Conversation or create a new one with
status = OPENand the appropriate FK (booking_id,trip_id, orinvoice_id) set.
Postgres schema note: Phase 1 uses nullable FKs (booking_id, trip_id, invoice_id) on the conversations table. A CHECK constraint ensures the system populates at most one context FK.
If the request omits conversation_context, n8n does not create or link a Conversation. The Message exists as a standalone outbound record.
§11 Error Handling & Resilience
n8n Webhook Failure (Circuit Breaker)
The NestJS handler wraps the n8n webhook call in a circuit breaker (opossum library, per workflow-orchestration.md §Error Handling):
Failure modes:
- Timeout (>30s): Transient; circuit breaker opens after 5 consecutive timeouts within 60s.
- 5xx response: Transient; same open threshold.
- 4xx response (e.g., 422 payload validation error): Likely permanent; log, open circuit, manual review required.
On circuit open:
- NestJS catches the call, logs ERROR severity.
- Enqueues the original NotificationRequest to a
notifications_recoveryBullMQ queue (separate frommessage-dispatch). - Returns 200 to Hasura (idempotent fire-and-forget).
- A monitoring job (
RecoveryWorker) periodically retries items in the recovery queue.
§11.1 RecoveryWorker Contract
Queue Name: notifications_recovery
interface RecoveryJob {
request: NotificationRequest;
attempt: number;
lastChangedAt: string;
}Retry Strategy:
- Attempts 1-3: Retry with linear backoff (1 minute).
- Attempt 4: Move job to
FAILEDqueue. Trigger ERROR alert for the operator (Phase 1: Log only; Phase 2: Dispatcher alert). - Manual Intervention: A developer can replay failed recovery jobs via the BullMQ dashboard if n8n requires manual workflow fixing.
Circuit breaker config:
const circuitBreaker = new CircuitBreaker(async (req) => {
return await this.httpClient.post(n8nWebhookUrl, req).toPromise();
}, {
timeout: 30000,
errorThresholdPercentage: 50,
resetTimeout: 60000, // Half-open after 60s
rollingCountTimeout: 60000,
});Orphaned QUEUED Messages (Stuck Message Detection)
If n8n creates a Message (status = QUEUED) but fails to enqueue the BullMQ job, the Message is orphaned. Detection:
Hasura Cron Trigger: message_stuck_queue_scan (every 5 minutes)
SELECT id, tenant_id FROM communications.messages
WHERE status = 'QUEUED'
AND sent_at < now() - interval '5 minutes'
ORDER BY sent_at ASC
LIMIT 100;Action: Log WARN per tenant. Dispatcher alerts are future work (Phase 2).
ChannelAccount SUSPENDED During Dispatch
If a ChannelAccount transitions from ACTIVE to SUSPENDED (due to provider revocation or manager action) while messages are QUEUED, the BullMQ worker must detect and handle gracefully:
- Worker fetches ChannelAccount status before dispatch:
SELECT status FROM channel_accounts WHERE id = :channel_account_id. - If status != ACTIVE: Permanently fail the Message:
UPDATE messages SET status = 'FAILED', failed_reason = 'channel_suspended'. - No retry: This is a permanent failure; the dispatcher must reactivate the ChannelAccount or route to a fallback channel manually.
Fallback behavior: If fallback_chain specifies additional channels and the primary channel fails due to suspension (vs. permanent recipient error), the worker creates a new Message + job for the next channel in the chain. See §4 Fallback Chain Handling.
Template Not Found (All Channels)
If n8n resolves zero templates (either no matching template records or all matching channels are INACTIVE):
- n8n creates Message with
status = FAILED,failed_reason = 'no_templates_found'. - No BullMQ job is enqueued.
- Dispatcher sees FAILED message in Inbox.
§12 Pipeline Idempotency
The correlation_id column on messages stores the Hasura event_id. Before forwarding to n8n, the NestJS handler queries:
SELECT id FROM communications.messages WHERE correlation_id = :event_id LIMIT 1;If a row exists, the event is a duplicate (Hasura at-least-once delivery) and the handler returns 200 without processing.
Schema enforcement: The messages table has a UNIQUE(correlation_id) WHERE correlation_id IS NOT NULL constraint. This prevents race conditions if two NestJS handlers process the same event_id simultaneously. The first INSERT succeeds; the second fails with a UNIQUE violation, which the handler catches to return an idempotent 200 result. See schema-communications.md §messages for the constraint definition.
Scope: This dedup protects against Hasura Event Trigger at-least-once delivery. It does NOT protect against:
- n8n webhook being called twice for the same event_id — If the NestJS call to n8n hangs and times out, the handler may retry and call n8n again. Ensure n8n is idempotent (e.g., by checking its own duplication log or using
NotificationRequest.event_idas the cross-session idempotency key). - BullMQ worker processing the same job twice — BullMQ's built-in deduplication (via job ID) prevents this. See §4 for job configuration.