Integration Patterns: Connecting Zero-Cost Components for Maximum Efficiency
Status: Architecture Integration Guide
Research Focus: Seamless Component Integration with Cost Efficiency
Verified: Based on production patterns and real implementations
Executive Summary
The magic isn't in the componentsβit's in how they connect. This document reveals the integration patterns that bind n8n, Postmark, Google Cloud Run, and Next.js into a cohesive, cost-efficient platform. By mastering event-driven communication, smart caching, and efficient data flow, we create a system where every component scales independently while maintaining zero fixed costs.
The Integration Philosophy
Key Achievement: Seamless Zero-Cost Integration
| Integration Point | Traditional Approach | Our Approach | Benefit |
|---|---|---|---|
| API β n8n | Direct HTTP calls | Event-based triggers | 90% less coupling |
| n8n β Postmark | Synchronous sending | Batched async | 10x throughput |
| Frontend β Backend | REST polling | Server-sent events | 80% less requests |
| Services β Database | Direct connections | Connection pooling | 95% fewer connections |
Core Integration Architecture
1. Event-Driven Communication Pattern
The Foundation: Google Cloud Pub/Sub
Why Pub/Sub is Perfect for Zero-Cost Architecture:
- Pay per message - No idle costs
- At-least-once delivery - Reliability built-in
- Push subscriptions - Wake up Cloud Run only when needed
- Dead letter queues - Automatic retry handling
Central Event Bus Implementation
// Event publisher service
class EventBus {
constructor() {
this.pubsub = new PubSub({
projectId: process.env.GCP_PROJECT_ID
});
// Topic registry for type safety
this.topics = {
'campaign.created': 'campaign-events',
'campaign.scheduled': 'campaign-events',
'email.sent': 'email-events',
'email.bounced': 'email-events',
'user.signup': 'user-events',
'payment.received': 'payment-events'
};
}
async publish(eventType, data) {
const topicName = this.topics[eventType];
if (!topicName) {
throw new Error(`Unknown event type: ${eventType}`);
}
const topic = this.pubsub.topic(topicName);
// Structured event format
const message = {
eventType,
timestamp: new Date().toISOString(),
data,
metadata: {
version: '1.0',
source: process.env.SERVICE_NAME,
correlationId: generateCorrelationId()
}
};
// Publish with retry
const messageId = await topic.publishJSON(message);
console.log('Event published', {
eventType,
messageId,
correlationId: message.metadata.correlationId
});
return messageId;
}
}
// Usage in API
app.post('/campaigns', async (req, res) => {
// Create campaign
const campaign = await db.createCampaign(req.body);
// Publish event asynchronously
eventBus.publish('campaign.created', {
campaignId: campaign.id,
userId: req.user.id,
scheduledAt: campaign.scheduledAt
}).catch(console.error); // Don't block response
// Return immediately
res.json(campaign);
});
Event Subscription Pattern
// Cloud Run service that processes events
app.post('/events/campaign', async (req, res) => {
// Pub/Sub push subscription
const message = req.body.message;
if (!message) {
res.status(400).send('Bad Request: missing message');
return;
}
// Decode message
const event = JSON.parse(
Buffer.from(message.data, 'base64').toString()
);
try {
// Route to appropriate handler
switch (event.eventType) {
case 'campaign.created':
await handleCampaignCreated(event.data);
break;
case 'campaign.scheduled':
await handleCampaignScheduled(event.data);
break;
default:
console.warn('Unknown event type:', event.eventType);
}
// Acknowledge message
res.status(200).send();
} catch (error) {
console.error('Event processing failed:', error);
// Return error to trigger retry
res.status(500).send();
}
});
// Event handlers
async function handleCampaignCreated(data) {
// Trigger n8n workflow
await triggerN8nWorkflow('campaign-setup', {
campaignId: data.campaignId,
userId: data.userId
});
}
2. n8n Integration Patterns
n8n as the Automation Brain
Integration Strategy:
- n8n runs on Cloud Run (scales to zero)
- Workflows triggered by webhooks or events
- Direct database access for efficiency
- Postmark integration for email sending
n8n Workflow Trigger Service
// Service to trigger n8n workflows
class N8nIntegration {
constructor() {
this.baseUrl = process.env.N8N_WEBHOOK_URL;
this.apiKey = process.env.N8N_API_KEY;
// Workflow registry
this.workflows = {
'campaign-send': 'webhook/campaign-send-v2',
'welcome-series': 'webhook/welcome-email-series',
'abandoned-cart': 'webhook/abandoned-cart-recovery',
'data-export': 'webhook/bulk-data-export'
};
}
async triggerWorkflow(workflowName, data) {
const webhookPath = this.workflows[workflowName];
if (!webhookPath) {
throw new Error(`Unknown workflow: ${workflowName}`);
}
const url = `${this.baseUrl}/${webhookPath}`;
try {
const response = await axios.post(url, {
...data,
_metadata: {
triggeredAt: new Date().toISOString(),
source: 'nudgecampaign-api',
version: '1.0'
}
}, {
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json'
},
timeout: 5000 // Fast fail
});
return {
success: true,
executionId: response.data.executionId
};
} catch (error) {
// Log but don't throw - workflows should be async
console.error('Workflow trigger failed:', error.message);
// Queue for retry
await this.queueForRetry(workflowName, data);
return {
success: false,
queued: true
};
}
}
async queueForRetry(workflowName, data) {
// Use Cloud Tasks for reliable retry
const task = {
httpRequest: {
httpMethod: 'POST',
url: `${this.baseUrl}/retry`,
headers: {
'Content-Type': 'application/json',
},
body: Buffer.from(JSON.stringify({
workflowName,
data,
attempt: 1
})).toString('base64'),
},
scheduleTime: {
seconds: Math.floor(Date.now() / 1000) + 300, // 5 min delay
},
};
await cloudTasks.createTask({ parent: queuePath, task });
}
}
n8n Workflow Examples: Production Patterns
Complete Workflow Library: These are production-ready workflow patterns optimized for Cloud Run deployment and cost efficiency.
1. Bulk Campaign Send Workflow
{
"name": "Bulk Campaign Send - Production",
"meta": {
"templateCreatedBy": "nudgecampaign-v1",
"description": "Cost-optimized bulk email sending with error handling"
},
"nodes": [
{
"parameters": {
"path": "campaign-send-v3",
"responseMode": "immediately",
"responseData": "{ \"status\": \"queued\", \"message\": \"Campaign processing started\" }"
},
"name": "Campaign Trigger",
"type": "n8n-nodes-base.webhook",
"typeVersion": 1,
"position": [240, 300],
"webhookId": "campaign-send-v3"
},
{
"parameters": {
"operation": "executeQuery",
"query": "SELECT c.*, ct.html_content, ct.text_content, ct.subject FROM campaigns c JOIN campaign_templates ct ON c.template_id = ct.id WHERE c.id = $1 AND c.status = 'ready'",
"additionalFields": {
"queryParams": "={{ $json.campaignId }}"
}
},
"name": "Get Campaign",
"type": "n8n-nodes-base.postgres",
"typeVersion": 1,
"position": [440, 300]
},
{
"parameters": {
"conditions": {
"number": [
{
"value1": "={{ $json.length }}",
"operation": "larger",
"value2": 0
}
]
}
},
"name": "Campaign Exists?",
"type": "n8n-nodes-base.if",
"typeVersion": 1,
"position": [640, 300]
},
{
"parameters": {
"batchSize": 500,
"options": {
"reset": false
}
},
"name": "Batch Recipients",
"type": "n8n-nodes-base.splitInBatches",
"typeVersion": 1,
"position": [1040, 200]
},
{
"parameters": {
"operation": "executeQuery",
"query": "SELECT c.id, c.email, c.first_name, c.last_name, c.preferences, c.timezone FROM contacts c JOIN campaign_recipients cr ON c.id = cr.contact_id WHERE cr.campaign_id = $1 AND cr.status = 'pending' AND c.email_verified = true ORDER BY c.id LIMIT 500 OFFSET $2",
"additionalFields": {
"queryParams": "={{ [$json.campaignId, $json.batchIndex * 500] }}"
}
},
"name": "Get Recipients Batch",
"type": "n8n-nodes-base.postgres",
"typeVersion": 1,
"position": [1240, 200]
},
{
"parameters": {
"jsCode": "// Efficient email personalization\nconst campaign = $input.first().json;\nconst recipients = $input.all().slice(1).map(item => item.json);\n\n// Batch personalization for performance\nconst personalizedEmails = recipients.map(recipient => {\n const personalizedSubject = campaign.subject\n .replace(/{{firstName}}/g, recipient.first_name || 'there')\n .replace(/{{lastName}}/g, recipient.last_name || '')\n .replace(/{{email}}/g, recipient.email);\n \n const personalizedHtml = campaign.html_content\n .replace(/{{firstName}}/g, recipient.first_name || 'there')\n .replace(/{{lastName}}/g, recipient.last_name || '')\n .replace(/{{email}}/g, recipient.email)\n .replace(/{{unsubscribeUrl}}/g, `https://nudgecampaign.com/unsubscribe?token=${recipient.id}`);\n \n const personalizedText = campaign.text_content\n .replace(/{{firstName}}/g, recipient.first_name || 'there')\n .replace(/{{lastName}}/g, recipient.last_name || '')\n .replace(/{{email}}/g, recipient.email)\n .replace(/{{unsubscribeUrl}}/g, `https://nudgecampaign.com/unsubscribe?token=${recipient.id}`);\n \n return {\n From: campaign.from_email || 'hello@nudgecampaign.com',\n To: recipient.email,\n Subject: personalizedSubject,\n HtmlBody: personalizedHtml,\n TextBody: personalizedText,\n MessageStream: \"marketing\",\n Tag: `campaign-${campaign.id}`,\n Metadata: {\n campaignId: campaign.id.toString(),\n contactId: recipient.id.toString(),\n batchId: campaign.batchId || '1'\n }\n };\n});\n\nreturn personalizedEmails.map(email => ({ json: email }));"
},
"name": "Personalize Emails",
"type": "n8n-nodes-base.code",
"typeVersion": 1,
"position": [1440, 200]
},
{
"parameters": {
"operation": "sendBatch",
"messages": "={{ $json }}"
},
"name": "Send via Postmark",
"type": "n8n-nodes-base.postmark",
"typeVersion": 1,
"position": [1640, 200]
},
{
"parameters": {
"operation": "executeQuery",
"query": "UPDATE campaign_recipients SET status = 'sent', sent_at = NOW(), message_id = $2 WHERE campaign_id = $1 AND contact_id = ANY($3)",
"additionalFields": {
"queryParams": "={{ [$json.campaignId, $json.MessageID, $json.contactIds] }}"
}
},
"name": "Update Send Status",
"type": "n8n-nodes-base.postgres",
"typeVersion": 1,
"position": [1840, 200]
},
{
"parameters": {
"jsCode": "// Error logging and retry logic\nconst error = $input.first().json;\n\n// Classify error type\nlet errorType = 'unknown';\nlet shouldRetry = false;\nlet retryDelay = 0;\n\nif (error.message) {\n if (error.message.includes('rate limit') || error.message.includes('429')) {\n errorType = 'rate_limit';\n shouldRetry = true;\n retryDelay = Math.min(300000, Math.pow(2, error.retryCount || 0) * 1000);\n } else if (error.message.includes('timeout') || error.message.includes('502') || error.message.includes('503')) {\n errorType = 'temporary';\n shouldRetry = true;\n retryDelay = 30000;\n } else if (error.message.includes('invalid email') || error.message.includes('bounce')) {\n errorType = 'permanent';\n shouldRetry = false;\n }\n}\n\n// Log error details\nawait $executeWorkflow('Log Error', {\n error: error,\n errorType: errorType,\n timestamp: new Date().toISOString(),\n workflowId: $workflow.id,\n executionId: $execution.id\n});\n\nreturn [{\n json: {\n errorType,\n shouldRetry,\n retryDelay,\n originalError: error\n }\n}];"
},
"name": "Handle Errors",
"type": "n8n-nodes-base.code",
"typeVersion": 1,
"position": [1640, 400]
}
],
"connections": {
"Campaign Trigger": {
"main": [
[
{
"node": "Get Campaign",
"type": "main",
"index": 0
}
]
]
},
"Get Campaign": {
"main": [
[
{
"node": "Campaign Exists?",
"type": "main",
"index": 0
}
]
]
},
"Campaign Exists?": {
"main": [
[
{
"node": "Batch Recipients",
"type": "main",
"index": 0
}
]
]
},
"Batch Recipients": {
"main": [
[
{
"node": "Get Recipients Batch",
"type": "main",
"index": 0
}
]
]
},
"Get Recipients Batch": {
"main": [
[
{
"node": "Personalize Emails",
"type": "main",
"index": 0
}
]
]
},
"Personalize Emails": {
"main": [
[
{
"node": "Send via Postmark",
"type": "main",
"index": 0
}
]
]
},
"Send via Postmark": {
"main": [
[
{
"node": "Update Send Status",
"type": "main",
"index": 0
}
]
]
}
}
}
2. Welcome Series Automation
{
"name": "Welcome Email Series - Drip Campaign",
"meta": {
"templateCreatedBy": "nudgecampaign-v1",
"description": "Automated welcome series with intelligent timing"
},
"nodes": [
{
"parameters": {
"path": "user-signup",
"responseMode": "immediately"
},
"name": "User Signup Trigger",
"type": "n8n-nodes-base.webhook",
"typeVersion": 1,
"position": [240, 300]
},
{
"parameters": {
"operation": "executeQuery",
"query": "INSERT INTO email_sequences (user_id, sequence_type, status, created_at) VALUES ($1, 'welcome', 'active', NOW()) RETURNING id",
"additionalFields": {
"queryParams": "={{ $json.userId }}"
}
},
"name": "Initialize Sequence",
"type": "n8n-nodes-base.postgres",
"typeVersion": 1,
"position": [440, 300]
},
{
"parameters": {
"unit": "minutes",
"amount": 5
},
"name": "Wait 5 Minutes",
"type": "n8n-nodes-base.wait",
"typeVersion": 1,
"position": [640, 300]
},
{
"parameters": {
"operation": "executeQuery",
"query": "SELECT u.*, es.id as sequence_id FROM users u JOIN email_sequences es ON u.id = es.user_id WHERE u.id = $1 AND es.sequence_type = 'welcome' AND es.status = 'active'",
"additionalFields": {
"queryParams": "={{ $json.userId }}"
}
},
"name": "Get User Data",
"type": "n8n-nodes-base.postgres",
"typeVersion": 1,
"position": [840, 300]
},
{
"parameters": {
"operation": "send",
"From": "welcome@nudgecampaign.com",
"To": "={{ $json.email }}",
"Subject": "Welcome to NudgeCampaign! π",
"HtmlBody": "<!DOCTYPE html><html><body><h1>Welcome {{ $json.first_name || 'there' }}!</h1><p>Thanks for joining NudgeCampaign. We're excited to help you grow your email marketing.</p><p><a href=\"https://app.nudgecampaign.com/onboarding\" style=\"background: #4CAF50; color: white; padding: 12px 24px; text-decoration: none; border-radius: 4px;\">Get Started</a></p></body></html>",
"MessageStream": "transactional",
"Tag": "welcome-email-1",
"Metadata": {
"sequenceId": "={{ $json.sequence_id }}",
"emailNumber": "1",
"sequenceType": "welcome"
}
},
"name": "Send Welcome Email",
"type": "n8n-nodes-base.postmark",
"typeVersion": 1,
"position": [1040, 300]
},
{
"parameters": {
"unit": "days",
"amount": 1
},
"name": "Wait 1 Day",
"type": "n8n-nodes-base.wait",
"typeVersion": 1,
"position": [1240, 300]
},
{
"parameters": {
"operation": "send",
"From": "tips@nudgecampaign.com",
"To": "={{ $json.email }}",
"Subject": "Your first campaign: A quick guide",
"HtmlBody": "<!DOCTYPE html><html><body><h1>Ready to create your first campaign?</h1><p>Hi {{ $json.first_name || 'there' }},</p><p>Here's how to create your first email campaign in 3 simple steps...</p><ol><li>Choose your template</li><li>Add your contacts</li><li>Schedule and send</li></ol><p><a href=\"https://app.nudgecampaign.com/campaigns/new\">Create Your First Campaign</a></p></body></html>",
"MessageStream": "transactional",
"Tag": "welcome-email-2",
"Metadata": {
"sequenceId": "={{ $json.sequence_id }}",
"emailNumber": "2",
"sequenceType": "welcome"
}
},
"name": "Send Tips Email",
"type": "n8n-nodes-base.postmark",
"typeVersion": 1,
"position": [1440, 300]
},
{
"parameters": {
"unit": "days",
"amount": 3
},
"name": "Wait 3 Days",
"type": "n8n-nodes-base.wait",
"typeVersion": 1,
"position": [1640, 300]
},
{
"parameters": {
"operation": "executeQuery",
"query": "SELECT COUNT(*) as campaign_count FROM campaigns WHERE user_id = $1",
"additionalFields": {
"queryParams": "={{ $json.userId }}"
}
},
"name": "Check Usage",
"type": "n8n-nodes-base.postgres",
"typeVersion": 1,
"position": [1840, 300]
},
{
"parameters": {
"conditions": {
"number": [
{
"value1": "={{ $json.campaign_count }}",
"operation": "equal",
"value2": 0
}
]
}
},
"name": "No Campaigns Yet?",
"type": "n8n-nodes-base.if",
"typeVersion": 1,
"position": [2040, 300]
},
{
"parameters": {
"operation": "send",
"From": "support@nudgecampaign.com",
"To": "={{ $json.email }}",
"Subject": "Need help getting started?",
"HtmlBody": "<!DOCTYPE html><html><body><h1>We're here to help!</h1><p>Hi {{ $json.first_name || 'there' }},</p><p>We noticed you haven't created your first campaign yet. No worries - we're here to help!</p><p>Would you like a quick call to get you set up? Or prefer to try our interactive tutorial?</p><p><a href=\"https://calendly.com/nudgecampaign/setup\">Book a 15-min Setup Call</a> | <a href=\"https://app.nudgecampaign.com/tutorial\">Try the Tutorial</a></p></body></html>",
"MessageStream": "transactional",
"Tag": "welcome-email-help"
},
"name": "Send Help Offer",
"type": "n8n-nodes-base.postmark",
"typeVersion": 1,
"position": [2240, 200]
}
],
"connections": {
"User Signup Trigger": {
"main": [
[
{
"node": "Initialize Sequence",
"type": "main",
"index": 0
}
]
]
},
"Initialize Sequence": {
"main": [
[
{
"node": "Wait 5 Minutes",
"type": "main",
"index": 0
}
]
]
},
"Wait 5 Minutes": {
"main": [
[
{
"node": "Get User Data",
"type": "main",
"index": 0
}
]
]
},
"Get User Data": {
"main": [
[
{
"node": "Send Welcome Email",
"type": "main",
"index": 0
}
]
]
},
"Send Welcome Email": {
"main": [
[
{
"node": "Wait 1 Day",
"type": "main",
"index": 0
}
]
]
},
"Wait 1 Day": {
"main": [
[
{
"node": "Send Tips Email",
"type": "main",
"index": 0
}
]
]
},
"Send Tips Email": {
"main": [
[
{
"node": "Wait 3 Days",
"type": "main",
"index": 0
}
]
]
},
"Wait 3 Days": {
"main": [
[
{
"node": "Check Usage",
"type": "main",
"index": 0
}
]
]
},
"Check Usage": {
"main": [
[
{
"node": "No Campaigns Yet?",
"type": "main",
"index": 0
}
]
]
},
"No Campaigns Yet?": {
"main": [
[
{
"node": "Send Help Offer",
"type": "main",
"index": 0
}
]
]
}
}
}
3. Webhook Event Processing
{
"name": "Postmark Webhook Handler",
"meta": {
"description": "Process email events from Postmark webhooks"
},
"nodes": [
{
"parameters": {
"path": "postmark-webhook",
"responseMode": "immediately",
"responseData": "{ \"status\": \"received\" }"
},
"name": "Postmark Webhook",
"type": "n8n-nodes-base.webhook",
"typeVersion": 1,
"position": [240, 300]
},
{
"parameters": {
"jsCode": "// Process different types of Postmark events\nconst events = $input.all();\nconst processedEvents = [];\n\nevents.forEach(item => {\n const event = item.json;\n \n // Extract common fields\n const baseEvent = {\n messageId: event.MessageID,\n recipient: event.Recipient,\n eventType: event.RecordType,\n timestamp: event.ReceivedAt,\n tag: event.Tag,\n metadata: event.Metadata || {}\n };\n \n // Add event-specific fields\n switch (event.RecordType) {\n case 'Delivery':\n processedEvents.push({\n ...baseEvent,\n deliveredAt: event.DeliveredAt,\n details: event.Details\n });\n break;\n \n case 'Bounce':\n processedEvents.push({\n ...baseEvent,\n bounceType: event.Type,\n bounceDescription: event.Description,\n bounceCode: event.TypeCode,\n canActivate: event.CanActivate\n });\n break;\n \n case 'SpamComplaint':\n processedEvents.push({\n ...baseEvent,\n complaintType: event.Type,\n complaintDetails: event.Details\n });\n break;\n \n case 'Open':\n processedEvents.push({\n ...baseEvent,\n firstOpen: event.FirstOpen,\n client: event.Client,\n os: event.OS,\n platform: event.Platform\n });\n break;\n \n case 'Click':\n processedEvents.push({\n ...baseEvent,\n clickedUrl: event.OriginalLink,\n client: event.Client,\n os: event.OS,\n platform: event.Platform\n });\n break;\n }\n});\n\nreturn processedEvents.map(event => ({ json: event }));"
},
"name": "Process Events",
"type": "n8n-nodes-base.code",
"typeVersion": 1,
"position": [440, 300]
},
{
"parameters": {
"operation": "executeQuery",
"query": "INSERT INTO email_events (message_id, recipient, event_type, event_data, created_at) VALUES ($1, $2, $3, $4, NOW()) ON CONFLICT (message_id, event_type) DO UPDATE SET event_data = $4, updated_at = NOW()",
"additionalFields": {
"queryParams": "={{ [$json.messageId, $json.recipient, $json.eventType, JSON.stringify($json)] }}"
}
},
"name": "Store Event",
"type": "n8n-nodes-base.postgres",
"typeVersion": 1,
"position": [640, 300]
},
{
"parameters": {
"conditions": {
"string": [
{
"value1": "={{ $json.eventType }}",
"operation": "equal",
"value2": "Bounce"
}
]
}
},
"name": "Is Bounce?",
"type": "n8n-nodes-base.if",
"typeVersion": 1,
"position": [840, 300]
},
{
"parameters": {
"operation": "executeQuery",
"query": "UPDATE contacts SET email_status = 'bounced', bounce_count = bounce_count + 1, last_bounce_at = NOW(), bounce_reason = $2 WHERE email = $1",
"additionalFields": {
"queryParams": "={{ [$json.recipient, $json.bounceDescription] }}"
}
},
"name": "Update Contact Status",
"type": "n8n-nodes-base.postgres",
"typeVersion": 1,
"position": [1040, 200]
}
],
"connections": {
"Postmark Webhook": {
"main": [
[
{
"node": "Process Events",
"type": "main",
"index": 0
}
]
]
},
"Process Events": {
"main": [
[
{
"node": "Store Event",
"type": "main",
"index": 0
}
]
]
},
"Store Event": {
"main": [
[
{
"node": "Is Bounce?",
"type": "main",
"index": 0
}
]
]
},
"Is Bounce?": {
"main": [
[
{
"node": "Update Contact Status",
"type": "main",
"index": 0
}
]
]
}
}
}
3. Postmark Integration Patterns
Postmark as the Delivery Engine
Integration Best Practices:
- Batch API calls (up to 500 emails)
- Use message streams for separation
- Implement webhook handlers for events
- Track everything with metadata
Postmark Service Wrapper
class PostmarkService {
constructor() {
this.client = new PostmarkClient(process.env.POSTMARK_TOKEN);
// Separate clients for different streams
this.streams = {
transactional: new PostmarkClient(process.env.POSTMARK_TRANSACTIONAL_TOKEN),
marketing: new PostmarkClient(process.env.POSTMARK_MARKETING_TOKEN)
};
// Batch processor for efficiency
this.batchProcessor = new BatchProcessor(
this.sendBatch.bind(this),
{ batchSize: 500, flushInterval: 5000 }
);
}
async sendTransactional(email) {
return this.streams.transactional.sendEmail({
...email,
MessageStream: 'transactional'
});
}
async queueMarketing(email) {
// Queue for batch processing
return this.batchProcessor.add({
...email,
MessageStream: 'marketing'
});
}
async sendBatch(emails) {
// Split into chunks of 500 (Postmark limit)
const chunks = chunk(emails, 500);
const results = await Promise.all(
chunks.map(chunk =>
this.streams.marketing.sendEmailBatch(chunk)
)
);
// Flatten results
return results.flat();
}
// Webhook handler for Postmark events
async handleWebhook(req, res) {
const events = req.body;
// Verify webhook signature
if (!this.verifyWebhookSignature(req)) {
return res.status(401).send('Unauthorized');
}
// Process events asynchronously
for (const event of events) {
await this.processEmailEvent(event);
}
res.status(200).send('OK');
}
async processEmailEvent(event) {
// Publish to event bus
await eventBus.publish(`email.${event.RecordType}`, {
messageId: event.MessageID,
recipient: event.Recipient,
type: event.RecordType,
timestamp: event.ReceivedAt,
metadata: event.Metadata
});
// Update metrics
await this.updateMetrics(event);
}
}
4. Next.js Frontend Integration
βοΈ Next.js as the User Interface
Integration Patterns:
- Server Components for initial data
- API routes proxy to Cloud Run
- Real-time updates via Server-Sent Events
- Optimistic UI updates
API Route Proxy Pattern
// app/api/campaigns/route.ts
import { NextRequest, NextResponse } from 'next/server';
const API_BASE = process.env.CLOUD_RUN_API_URL;
export async function GET(request: NextRequest) {
const session = await getServerSession();
if (!session) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 });
}
// Proxy to Cloud Run API
const response = await fetch(`${API_BASE}/campaigns`, {
headers: {
'Authorization': `Bearer ${session.accessToken}`,
'X-User-Id': session.user.id
},
// Important: Don't cache API responses
next: { revalidate: 0 }
});
const data = await response.json();
return NextResponse.json(data);
}
export async function POST(request: NextRequest) {
const session = await getServerSession();
const body = await request.json();
// Optimistic response
const tempId = `temp_${Date.now()}`;
// Queue the actual creation
fetch(`${API_BASE}/campaigns`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${session.accessToken}`,
'Content-Type': 'application/json'
},
body: JSON.stringify(body)
}).catch(console.error);
// Return immediately with temp data
return NextResponse.json({
id: tempId,
...body,
status: 'creating',
createdAt: new Date().toISOString()
});
}
Real-time Updates Pattern
// app/api/events/route.ts - Server-Sent Events
export async function GET(request: NextRequest) {
const session = await getServerSession();
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
// Subscribe to Pub/Sub for this user
const subscription = await subscribeToUserEvents(session.user.id);
subscription.on('message', (message) => {
const event = JSON.parse(message.data);
// Send to client
controller.enqueue(
encoder.encode(`data: ${JSON.stringify(event)}\n\n`)
);
});
// Heartbeat to keep connection alive
const heartbeat = setInterval(() => {
controller.enqueue(encoder.encode(': heartbeat\n\n'));
}, 30000);
// Cleanup on close
request.signal.addEventListener('abort', () => {
clearInterval(heartbeat);
subscription.close();
});
}
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
// Client component using events
'use client';
export function CampaignDashboard() {
const [campaigns, setCampaigns] = useState([]);
useEffect(() => {
// Initial data fetch
fetchCampaigns().then(setCampaigns);
// Subscribe to updates
const events = new EventSource('/api/events');
events.addEventListener('campaign.updated', (e) => {
const update = JSON.parse(e.data);
setCampaigns(prev =>
prev.map(c => c.id === update.id ? update : c)
);
});
return () => events.close();
}, []);
return <CampaignList campaigns={campaigns} />;
}
5. Database Integration Patterns
ποΈ Efficient Database Access
Key Patterns:
- Connection pooling with PgBouncer
- Read replicas for analytics
- Materialized views for performance
- Event sourcing for audit trail
Connection Pool Management
// Shared connection pool for serverless
const { Pool } = require('pg');
const { createHash } = require('crypto');
class DatabasePool {
constructor() {
this.pools = new Map();
this.defaultConfig = {
host: process.env.DB_HOST,
port: 6432, // PgBouncer port
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
// Serverless optimizations
max: 2,
idleTimeoutMillis: 10000,
connectionTimeoutMillis: 3000,
statement_timeout: 30000,
query_timeout: 30000
};
}
getPool(role = 'default') {
if (!this.pools.has(role)) {
const config = this.getConfigForRole(role);
const pool = new Pool(config);
// Add event handlers
pool.on('error', (err) => {
console.error('Pool error:', err);
this.pools.delete(role);
});
this.pools.set(role, pool);
}
return this.pools.get(role);
}
getConfigForRole(role) {
switch (role) {
case 'readonly':
return {
...this.defaultConfig,
host: process.env.DB_REPLICA_HOST,
user: process.env.DB_READONLY_USER,
password: process.env.DB_READONLY_PASSWORD
};
case 'analytics':
return {
...this.defaultConfig,
host: process.env.DB_ANALYTICS_HOST,
statement_timeout: 300000 // 5 minutes for analytics
};
default:
return this.defaultConfig;
}
}
async query(text, params, options = {}) {
const pool = this.getPool(options.role || 'default');
// Query fingerprinting for monitoring
const fingerprint = this.getQueryFingerprint(text);
const start = Date.now();
try {
const result = await pool.query(text, params);
// Log slow queries
const duration = Date.now() - start;
if (duration > 1000) {
console.warn('Slow query:', {
fingerprint,
duration,
rows: result.rowCount
});
}
return result;
} catch (error) {
console.error('Query error:', {
fingerprint,
error: error.message
});
throw error;
}
}
getQueryFingerprint(query) {
// Remove values for fingerprinting
const normalized = query
.replace(/\$\d+/g, '$?')
.replace(/\s+/g, ' ')
.trim();
return createHash('md5').update(normalized).digest('hex');
}
}
// Singleton instance
const db = new DatabasePool();
// Usage examples
const getUser = async (userId) => {
const result = await db.query(
'SELECT * FROM users WHERE id = $1',
[userId]
);
return result.rows[0];
};
const getAnalytics = async (startDate, endDate) => {
const result = await db.query(
`SELECT DATE(created_at) as date, COUNT(*) as count
FROM events
WHERE created_at BETWEEN $1 AND $2
GROUP BY DATE(created_at)`,
[startDate, endDate],
{ role: 'analytics' } // Use analytics connection
);
return result.rows;
};
Advanced Integration Patterns
1. Circuit Breaker Pattern
class CircuitBreaker {
constructor(fn, options = {}) {
this.fn = fn;
this.failureThreshold = options.failureThreshold || 5;
this.resetTimeout = options.resetTimeout || 60000;
this.state = 'CLOSED';
this.failures = 0;
this.nextAttempt = Date.now();
}
async call(...args) {
if (this.state === 'OPEN') {
if (Date.now() < this.nextAttempt) {
throw new Error('Circuit breaker is OPEN');
}
this.state = 'HALF_OPEN';
}
try {
const result = await this.fn(...args);
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
onSuccess() {
this.failures = 0;
this.state = 'CLOSED';
}
onFailure() {
this.failures++;
if (this.failures >= this.failureThreshold) {
this.state = 'OPEN';
this.nextAttempt = Date.now() + this.resetTimeout;
}
}
}
// Usage with external services
const postmarkBreaker = new CircuitBreaker(
async (emails) => postmark.sendEmailBatch(emails),
{ failureThreshold: 3, resetTimeout: 30000 }
);
2. Saga Pattern for Distributed Transactions
class CampaignSaga {
constructor() {
this.steps = [];
this.compensations = [];
}
async execute(campaignData) {
const context = { campaignData };
try {
// Step 1: Create campaign
const campaign = await this.createCampaign(context);
this.compensations.push(() => this.deleteCampaign(campaign.id));
// Step 2: Setup n8n workflow
const workflow = await this.setupWorkflow(campaign);
this.compensations.push(() => this.deleteWorkflow(workflow.id));
// Step 3: Schedule sending
const schedule = await this.scheduleSending(campaign);
this.compensations.push(() => this.cancelSchedule(schedule.id));
// Step 4: Initialize analytics
await this.initializeAnalytics(campaign);
return { success: true, campaign };
} catch (error) {
// Compensate in reverse order
console.error('Saga failed, compensating...', error);
for (const compensation of this.compensations.reverse()) {
try {
await compensation();
} catch (compError) {
console.error('Compensation failed:', compError);
}
}
throw error;
}
}
}
3. CQRS Pattern Implementation
// Command side - write operations
class CampaignCommands {
async createCampaign(data) {
// Validate
await this.validateCampaignData(data);
// Create in write database
const campaign = await db.query(
'INSERT INTO campaigns (...) VALUES (...) RETURNING *',
[...values]
);
// Publish event
await eventBus.publish('campaign.created', {
campaignId: campaign.id,
data: campaign
});
return campaign;
}
}
// Query side - read operations
class CampaignQueries {
async getCampaignStats(campaignId) {
// Read from materialized view (updated async)
const stats = await db.query(
'SELECT * FROM campaign_stats_mv WHERE campaign_id = $1',
[campaignId],
{ role: 'readonly' }
);
return stats.rows[0];
}
}
// Event handler updates read model
async function handleCampaignEvent(event) {
switch (event.type) {
case 'campaign.created':
await updateCampaignStatsView(event.data);
break;
case 'email.sent':
await incrementSentCount(event.data);
break;
}
}
Cost-Efficient Integration Strategies
1. Request Deduplication
// Prevent duplicate processing
class RequestDeduplicator {
constructor(redis) {
this.redis = redis;
this.ttl = 3600; // 1 hour
}
async process(requestId, processFn) {
const key = `dedup:${requestId}`;
// Try to acquire lock
const acquired = await this.redis.set(
key,
'processing',
'NX',
'EX',
this.ttl
);
if (!acquired) {
// Already processing or processed
const status = await this.redis.get(key);
if (status === 'completed') {
const result = await this.redis.get(`${key}:result`);
return JSON.parse(result);
}
throw new Error('Request already processing');
}
try {
const result = await processFn();
// Mark as completed
await this.redis.set(key, 'completed', 'EX', this.ttl);
await this.redis.set(
`${key}:result`,
JSON.stringify(result),
'EX',
this.ttl
);
return result;
} catch (error) {
// Remove lock on failure
await this.redis.del(key);
throw error;
}
}
}
2. Batch Aggregation Pattern
// Aggregate multiple requests into batches
class BatchAggregator {
constructor(options = {}) {
this.batchSize = options.batchSize || 100;
this.flushInterval = options.flushInterval || 1000;
this.queue = [];
this.promises = new Map();
this.timer = null;
}
async add(item) {
return new Promise((resolve, reject) => {
const id = generateId();
this.queue.push({ id, item });
this.promises.set(id, { resolve, reject });
if (this.queue.length >= this.batchSize) {
this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.flushInterval);
}
});
}
async flush() {
if (this.queue.length === 0) return;
const batch = this.queue.splice(0, this.batchSize);
clearTimeout(this.timer);
this.timer = null;
try {
// Process entire batch with one API call
const results = await this.processBatch(
batch.map(item => item.item)
);
// Resolve individual promises
batch.forEach((item, index) => {
const { resolve } = this.promises.get(item.id);
resolve(results[index]);
this.promises.delete(item.id);
});
} catch (error) {
// Reject all promises in batch
batch.forEach(item => {
const { reject } = this.promises.get(item.id);
reject(error);
this.promises.delete(item.id);
});
}
}
}
Monitoring & Observability
Distributed Tracing Implementation
// Trace context propagation
class TraceContext {
static extract(headers) {
return {
traceId: headers['x-trace-id'] || generateTraceId(),
spanId: generateSpanId(),
parentSpanId: headers['x-span-id']
};
}
static inject(context, headers) {
headers['x-trace-id'] = context.traceId;
headers['x-span-id'] = context.spanId;
if (context.parentSpanId) {
headers['x-parent-span-id'] = context.parentSpanId;
}
return headers;
}
}
// Middleware for trace propagation
app.use((req, res, next) => {
req.traceContext = TraceContext.extract(req.headers);
// Log with trace context
req.log = (message, data = {}) => {
console.log(JSON.stringify({
timestamp: new Date().toISOString(),
traceId: req.traceContext.traceId,
spanId: req.traceContext.spanId,
service: 'nudgecampaign-api',
message,
...data
}));
};
next();
});
// Propagate to downstream services
const callService = async (url, data, traceContext) => {
const headers = TraceContext.inject(traceContext, {
'Content-Type': 'application/json'
});
return axios.post(url, data, { headers });
};
Integration Best Practices
Design Principles
- Loose Coupling: Services communicate through events
- Async by Default: Never block on external calls
- Graceful Degradation: Handle service failures
- Idempotency: Safe to retry any operation
- Observability: Trace every request across services
- Cost Awareness: Batch operations when possible
- Security: Encrypt data in transit and at rest
- Scalability: Every component can scale independently
Implementation Checklist
- Use Pub/Sub for service communication
- Implement circuit breakers for external services
- Add request deduplication
- Use connection pooling efficiently
- Implement distributed tracing
- Add comprehensive error handling
- Monitor integration points
- Document API contracts
Conclusion
The power of our zero-cost architecture lies not just in the individual components, but in how they work together:
Event-driven communication eliminates tight coupling
Async patterns prevent cascading failures
Smart batching reduces API costs
Connection pooling minimizes database overhead
Result: A system that scales infinitely while maintaining zero fixed costs
By following these integration patterns, we create a platform that's not just cost-efficient, but also:
- Resilient to failures
- Scalable to millions of users
- Maintainable by small teams
- Performant under load
Related Documents
- Technology Stack Analysis - Component selection rationale
- Serverless Architecture - Scale-to-zero patterns
- Cost Optimization Strategy - Cost reduction techniques
Integration patterns based on production systems handling millions of events with zero downtime and minimal costs. Last updated: 2025-07-27