#7 feat: optimize real-time market data performance

Спојено
muhammad.uzair споји(ла) 1 комит(е) из MQL-Development/feature/cherry-picked-to-master у MQL-Development/master пре 3 месеци
2 измењених фајлова са 109 додато и 126 уклоњено
  1. 100 124
      src/controllers/livePriceController.js
  2. 9 2
      src/server.js

+ 100 - 124
src/controllers/livePriceController.js

@@ -58,162 +58,140 @@ class LivePriceController {
58 58
       });
59 59
 
60 60
       if (!livePrice) {
61
-        const error = new Error('Live price not found for this symbol');
62
-        error.statusCode = 404;
63
-        return next(error);
61
+        return res.status(404).json({ success: false, message: 'Live price not found' });
64 62
       }
65 63
 
66
-      res.json({
67
-        success: true,
68
-        data: livePrice
69
-      });
64
+      res.json({ success: true, data: livePrice });
70 65
     } catch (error) {
71 66
       next(error);
72 67
     }
73 68
   }
74 69
 
75
-  // Update or create live price
70
+  // ✅ Upsert single live price (used by EA if sending one at a time)
76 71
   async upsertLivePrice(req, res, next) {
77 72
     try {
78 73
       const { symbolId, price, bid, ask, bidSize, askSize } = req.body;
79 74
 
80
-      // Verify symbol exists and is active
81 75
       const symbol = await Symbol.findByPk(symbolId);
82
-      if (!symbol) {
83
-        const error = new Error('Symbol not found');
84
-        error.statusCode = 404;
85
-        return next(error);
76
+      if (!symbol || !symbol.isActive) {
77
+        return res.status(400).json({ success: false, message: 'Invalid or inactive symbol' });
86 78
       }
87 79
 
88
-      if (!symbol.isActive) {
89
-        const error = new Error('Cannot update live price for inactive symbol');
90
-        error.statusCode = 400;
91
-        return next(error);
92
-      }
80
+      const now = new Date();
93 81
 
94
-      const [livePrice, created] = await LivePrice.upsert({
95
-        symbolId: parseInt(symbolId),
96
-        price,
97
-        bid,
98
-        ask,
82
+      // 🔥 Emit immediately — no DB blocking
83
+      const io = req.app.get('io');
84
+      const eventData = {
85
+        symbol: symbol.symbol,
86
+        symbolId,
87
+        price: Number(price).toFixed(5),
88
+        bid: Number(bid).toFixed(5),
89
+        ask: Number(ask).toFixed(5),
99 90
         bidSize,
100 91
         askSize,
101
-        lastUpdated: new Date()
102
-      });
103
-
104
-      // Emit WebSocket event for real-time updates
105
-      const io = req.app.get('io');
106
-      if (io) {
107
-        const eventData = {
108
-          symbol: symbol.symbol,
109
-          symbolId: symbol.id,
110
-          price,
111
-          bid,
112
-          ask,
113
-          bidSize,
114
-          askSize,
115
-          lastUpdated: livePrice.lastUpdated,
116
-          exchange: symbol.exchange,
117
-          instrumentType: symbol.instrumentType
118
-        };
119
-
120
-        // Emit to all clients subscribed to this symbol
121
-        io.to(`symbol:${symbol.symbol}`).emit('livePriceUpdate', eventData);
122
-
123
-        // Also emit to general live price updates
124
-        io.emit('livePriceUpdate', eventData);
125
-      }
126
-
127
-      res.status(created ? 201 : 200).json({
128
-        success: true,
129
-        data: livePrice,
130
-        message: created ? 'Live price created successfully' : 'Live price updated successfully'
92
+        lastUpdated: now,
93
+        exchange: symbol.exchange,
94
+        instrumentType: symbol.instrumentType
95
+      };
96
+
97
+      io?.to(`symbol:${symbol.symbol}`).emit('livePriceUpdate', eventData);
98
+      io?.emit('livePriceUpdate', eventData);
99
+
100
+      // Respond immediately
101
+      res.json({ success: true, message: 'Emitted instantly', data: eventData });
102
+
103
+      // Background DB upsert (async)
104
+      setImmediate(async () => {
105
+        try {
106
+          await LivePrice.upsert({
107
+            symbolId,
108
+            price,
109
+            bid,
110
+            ask,
111
+            bidSize,
112
+            askSize,
113
+            lastUpdated: now
114
+          });
115
+        } catch (err) {
116
+          console.error('[DB ERROR] upsertLivePrice:', err);
117
+        }
131 118
       });
132 119
     } catch (error) {
133 120
       next(error);
134 121
     }
135 122
   }
136 123
 
137
-  // Bulk update live prices
124
+  // ✅ BULK UPDATE (Optimized for speed & instant WebSocket)
138 125
   async bulkUpdateLivePrices(req, res, next) {
139 126
     try {
140 127
       const { prices } = req.body;
141
-
142 128
       if (!Array.isArray(prices) || prices.length === 0) {
143
-        const error = new Error('Prices array is required');
144
-        error.statusCode = 400;
145
-        return next(error);
129
+        return res.status(400).json({ success: false, message: 'Prices array is required' });
146 130
       }
147 131
 
148
-      // Verify all symbols exist and are active
132
+      const io = req.app.get('io');
133
+      const now = new Date();
134
+
135
+      // Step 1: Fetch all active symbols once
149 136
       const symbolIds = prices.map(p => p.symbolId);
150
-      const existingSymbols = await Symbol.findAll({
151
-        where: {
152
-          id: symbolIds,
153
-          isActive: true
154
-        },
155
-        attributes: ['id']
137
+      const symbols = await Symbol.findAll({
138
+        where: { id: symbolIds, isActive: true },
139
+        attributes: ['id', 'symbol', 'exchange', 'instrumentType']
156 140
       });
157 141
 
158
-      const existingSymbolIds = existingSymbols.map(s => s.id);
159
-      const invalidSymbolIds = symbolIds.filter(id => !existingSymbolIds.includes(id));
142
+      const symbolMap = new Map(symbols.map(s => [s.id, s]));
143
+
144
+      // Step 2: Emit immediately (non-blocking)
145
+      for (const p of prices) {
146
+        const s = symbolMap.get(p.symbolId);
147
+        if (!s) continue;
148
+
149
+        const payload = {
150
+          symbol: s.symbol,
151
+          symbolId: s.id,
152
+          price: Number(p.price).toFixed(5),
153
+          bid: Number(p.bid).toFixed(5),
154
+          ask: Number(p.ask).toFixed(5),
155
+          bidSize: p.bidSize,
156
+          askSize: p.askSize,
157
+          lastUpdated: now,
158
+          exchange: s.exchange,
159
+          instrumentType: s.instrumentType
160
+        };
160 161
 
161
-      if (invalidSymbolIds.length > 0) {
162
-        const error = new Error(`Invalid or inactive symbol IDs: ${invalidSymbolIds.join(', ')}`);
163
-        error.statusCode = 400;
164
-        return next(error);
165
-      }
166
-
167
-      // Prepare data for bulk upsert
168
-      const upsertData = prices.map(price => ({
169
-        symbolId: parseInt(price.symbolId),
170
-        price: price.price,
171
-        bid: price.bid,
172
-        ask: price.ask,
173
-        bidSize: price.bidSize,
174
-        askSize: price.askSize,
175
-        lastUpdated: new Date()
176
-      }));
177
-
178
-      const updatedPrices = [];
179
-      const io = req.app.get('io');
180
-
181
-      for (const data of upsertData) {
182
-        const [livePrice] = await LivePrice.upsert(data);
183
-        updatedPrices.push(livePrice);
184
-
185
-        // Emit WebSocket event for each updated price
186
-        if (io) {
187
-          const symbol = await Symbol.findByPk(data.symbolId);
188
-          if (symbol) {
189
-            const eventData = {
190
-              symbol: symbol.symbol,
191
-              symbolId: symbol.id,
192
-              price: data.price,
193
-              bid: data.bid,
194
-              ask: data.ask,
195
-              bidSize: data.bidSize,
196
-              askSize: data.askSize,
197
-              lastUpdated: data.lastUpdated,
198
-              exchange: symbol.exchange,
199
-              instrumentType: symbol.instrumentType
200
-            };
201
-
202
-            // Emit to all clients subscribed to this symbol
203
-            io.to(`symbol:${symbol.symbol}`).emit('livePriceUpdate', eventData);
204
-
205
-            // Also emit to general live price updates
206
-            io.emit('livePriceUpdate', eventData);
207
-          }
208
-        }
162
+        io?.to(`symbol:${s.symbol}`).emit('livePriceUpdate', payload);
163
+        io?.emit('livePriceUpdate', payload);
209 164
       }
210 165
 
166
+      // Step 3: Respond immediately
211 167
       res.json({
212 168
         success: true,
213
-        data: updatedPrices,
214
-        message: `${updatedPrices.length} live prices updated successfully`
169
+        message: `${prices.length} live prices emitted instantly`,
170
+      });
171
+
172
+      // Step 4: Async background write (DB non-blocking)
173
+      setImmediate(async () => {
174
+        try {
175
+          await LivePrice.bulkCreate(
176
+            prices.map(p => ({
177
+              symbolId: p.symbolId,
178
+              price: p.price,
179
+              bid: p.bid,
180
+              ask: p.ask,
181
+              bidSize: p.bidSize,
182
+              askSize: p.askSize,
183
+              lastUpdated: now
184
+            })),
185
+            { updateOnDuplicate: ['price', 'bid', 'ask', 'bidSize', 'askSize', 'lastUpdated'] }
186
+          );
187
+
188
+          console.log(`[DB] ${prices.length} prices written at ${now.toISOString()}`);
189
+        } catch (err) {
190
+          console.error('[DB ERROR] bulkUpdateLivePrices:', err);
191
+        }
215 192
       });
216 193
     } catch (error) {
194
+      console.error('[ERROR] bulkUpdateLivePrices:', error);
217 195
       next(error);
218 196
     }
219 197
   }
@@ -293,16 +271,14 @@ class LivePriceController {
293 271
         where: { symbolId: parseInt(symbolId) }
294 272
       });
295 273
 
296
-      if (deletedRowsCount === 0) {
297
-        const error = new Error('Live price not found for this symbol');
298
-        error.statusCode = 404;
299
-        return next(error);
274
+      if (!deletedRowsCount) {
275
+        return res.status(404).json({ success: false, message: 'Live price not found' });
300 276
       }
301 277
 
302
-      res.json({
303
-        success: true,
278
+      res.json({ 
279
+        success: true, 
304 280
         message: 'Live price deleted successfully'
305
-      });
281
+       });
306 282
     } catch (error) {
307 283
       next(error);
308 284
     }

+ 9 - 2
src/server.js

@@ -15,13 +15,20 @@ const startServer = async () => {
15 15
     // Create HTTP server
16 16
     const server = createServer(app);
17 17
 
18
-    // Initialize Socket.io
18
+    // Initialize Socket.io - Force WebSocket transport to prevent polling fallback
19 19
     const io = new Server(server, {
20 20
       cors: {
21 21
         origin: process.env.CORS_ORIGIN || "*",
22 22
         methods: ["GET", "POST"],
23 23
         credentials: true
24
-      }
24
+      },
25
+      // Force WebSocket transport only - no HTTP polling fallback
26
+      transports: ['websocket'],
27
+      // Connection management
28
+      pingTimeout: 60000,  // 60 seconds
29
+      pingInterval: 25000, // 25 seconds
30
+      // Allow Engine.IO v3 for compatibility
31
+      allowEIO3: true
25 32
     });
26 33
 
27 34
     // Socket.io connection handling