-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathserver.js
More file actions
127 lines (114 loc) · 3.6 KB
/
server.js
File metadata and controls
127 lines (114 loc) · 3.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
const SMTPServer = require('smtp-server').SMTPServer;
const config = require('./config');
const { parseEmail } = require('./services/emailParser');
const { sendToWebhook } = require('./services/webhookService');
require('aws-sdk/lib/maintenance_mode_message').suppress = true;
const Queue = require('better-queue');
const winston = require('winston');
require('winston-daily-rotate-file');
const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
),
transports: [
new winston.transports.Console(),
new winston.transports.DailyRotateFile({
filename: 'logs/application-%DATE%.log',
datePattern: 'YYYY-MM-DD',
zippedArchive: true,
maxSize: '20m',
maxFiles: '90d'
})
]
});
function validateConfig() {
const requiredKeys = ['PORT', 'SMTP_SECURE', 'WEBHOOK_URL', 'WEBHOOK_CONCURRENCY'];
for (const key of requiredKeys) {
if (!(key in config)) {
throw new Error(`Missing required configuration: ${key}`);
}
}
}
const webhookQueue = new Queue(async function (parsed, cb) {
const maxRetries = 3;
let retries = 0;
const attemptWebhook = async () => {
try {
await sendToWebhook(parsed);
logger.info('Successfully sent to webhook');
cb(null);
} catch (error) {
logger.error('Webhook error:', { message: error.message, stack: error.stack });
if (error.response) {
logger.error('Webhook response error:', {
status: error.response.status,
data: error.response.data
});
}
if (retries < maxRetries) {
retries++;
logger.info(`Retrying webhook (attempt ${retries}/${maxRetries})`);
setTimeout(attemptWebhook, 1000 * retries);
} else {
cb(error);
}
}
};
attemptWebhook();
}, { concurrent: config.WEBHOOK_CONCURRENCY || 5 });
const server = new SMTPServer({
onData(stream, session, callback) {
parseEmail(stream)
.then(parsed => {
webhookQueue.push(parsed);
logger.info('Email added to queue', { queueSize: webhookQueue.getStats().total });
callback();
})
.catch(error => {
logger.error('Parsing error:', { message: error.message, stack: error.stack });
callback(new Error('Failed to parse email'));
});
},
onError(error) {
logger.error('SMTP server error:', { message: error.message, stack: error.stack });
},
disabledCommands: ['AUTH'],
secure: config.SMTP_SECURE
});
server.listen(config.PORT, '0.0.0.0', err => {
if (err) {
logger.error('Failed to start SMTP server:', { message: err.message, stack: err.stack });
process.exit(1);
}
logger.info(`SMTP server listening on port ${config.PORT} on all interfaces`);
});
function gracefulShutdown(reason) {
logger.info(`Shutting down: ${reason}`);
server.close(() => {
logger.info('Server closed. Exiting process.');
process.exit(0);
});
}
process.on('uncaughtException', (err) => {
logger.error('Uncaught exception:', { message: err.message, stack: err.stack });
gracefulShutdown('Uncaught exception');
});
process.on('unhandledRejection', (reason, promise) => {
logger.error('Unhandled Rejection:', { reason: reason, promise: promise });
gracefulShutdown('Unhandled rejection');
});
process.on('SIGTERM', () => {
gracefulShutdown('SIGTERM signal received');
});
process.on('SIGINT', () => {
gracefulShutdown('SIGINT signal received');
});
// Add configuration validation at startup
try {
validateConfig();
} catch (error) {
logger.error('Configuration error:', { message: error.message });
process.exit(1);
}