Browse Source

Merge branch 'feature/cherry-picked-to-master' of MQL-Development/market-data-service into master

muhammad.uzair 3 months ago
parent
commit
332211fddc

+ 356 - 50
MT5/Experts/MarketDataSender.mq5

@@ -15,7 +15,7 @@ input int LivePriceIntervalSeconds = 5;
15 15
 string symbols[];
16 16
 int symbolIds[];
17 17
 datetime lastSend = 0;
18
-
18
+datetime lastCandleSync = 0;
19 19
 //+------------------------------------------------------------------+
20 20
 int OnInit()
21 21
 {
@@ -28,6 +28,8 @@ int OnInit()
28 28
 
29 29
    Print("✅ Symbols initialized: ", ArraySize(symbols));
30 30
    SendAllHistoricalCandles();
31
+   EventSetTimer(60);  // ⏱️ Trigger OnTimer() every 30 minutes
32
+   Print("✅ Timer set: SendAllHistoricalCandles() will run every 30 minutes.");
31 33
    return(INIT_SUCCEEDED);
32 34
 }
33 35
 
@@ -135,22 +137,63 @@ bool SyncSymbolsWithDatabase()
135 137
    return true;
136 138
 }
137 139
 
140
+//+------------------------------------------------------------------+
141
+//+------------------------------------------------------------------+
142
+//| Find exact symbolId from JSON response                           |
143
+//+------------------------------------------------------------------+
144
+//+------------------------------------------------------------------+
145
+//| Robust JSON search: matches exact symbol only                    |
138 146
 //+------------------------------------------------------------------+
139 147
 int FindSymbolId(string response, string symbolName)
140 148
 {
141
-   string searchPattern = StringFormat("\"symbol\":\"%s\"", symbolName);
142
-   int symbolPos = StringFind(response, searchPattern);
143
-   if(symbolPos < 0) return -1;
149
+   int pos = 0;
150
+   string patternSymbol = "\"symbol\":\"";
151
+   string patternId = "\"id\":";
144 152
 
145
-   int idPos = StringFind(response, "\"id\":", symbolPos);
146
-   if(idPos < 0) return -1;
153
+   while(true)
154
+   {
155
+      // find each symbol occurrence
156
+      int symPos = StringFind(response, patternSymbol, pos);
157
+      if(symPos < 0)
158
+         break;
147 159
 
148
-   int startPos = idPos + 5;
149
-   int endPos = StringFind(response, ",", startPos);
150
-   if(endPos < 0) endPos = StringFind(response, "}", startPos);
160
+      // extract actual symbol value
161
+      int symStart = symPos + StringLen(patternSymbol);
162
+      int symEnd = StringFind(response, "\"", symStart);
163
+      if(symEnd < 0) break;
151 164
 
152
-   string idStr = StringSubstr(response, startPos, endPos - startPos);
153
-   return (int)StringToInteger(idStr);
165
+      string foundSymbol = StringSubstr(response, symStart, symEnd - symStart);
166
+
167
+      // 🟩 DEBUG LOG: show all symbols found
168
+      // ✅ exact match check (case-sensitive)
169
+      if(foundSymbol == symbolName)
170
+      {
171
+         // find id that comes *before* this symbol entry
172
+         int blockStart = StringFind(response, patternId, symPos - 100);
173
+         if(blockStart < 0)
174
+            blockStart = StringFind(response, patternId, symPos);
175
+
176
+         if(blockStart >= 0)
177
+         {
178
+            int idStart = blockStart + StringLen(patternId);
179
+            int idEnd = StringFind(response, ",", idStart);
180
+            if(idEnd < 0) idEnd = StringFind(response, "}", idStart);
181
+
182
+            string idStr = StringSubstr(response, idStart, idEnd - idStart);
183
+            int id = (int)StringToInteger(idStr);
184
+
185
+            // 🟩 Success log
186
+            Print("✅ Exact match found → symbol='", symbolName, "' | ID=", id);
187
+            return id;
188
+         }
189
+      }
190
+
191
+      // move to next
192
+      pos = symEnd + 1;
193
+   }
194
+
195
+   Print("⚠️ No exact match for symbol ", symbolName);
196
+   return -1;
154 197
 }
155 198
 
156 199
 //+------------------------------------------------------------------+
@@ -161,11 +204,53 @@ int CreateSymbolInDatabase(string symbolName)
161 204
    string exchange = "MT5";
162 205
    string instrumentType = "forex";
163 206
 
207
+   // --- Clean suffixes like ".pro", ".m", ".r", "_i" ---
208
+   int dotPos = StringFind(symbolName, ".");
209
+   if(dotPos > 0)
210
+      symbolName = StringSubstr(symbolName, 0, dotPos);
211
+
212
+   // --- Try basic split for 6-char pairs like EURUSD, GBPJPY, BTCUSD ---
164 213
    if(StringLen(symbolName) >= 6)
165 214
    {
166 215
       baseAsset = StringSubstr(symbolName, 0, 3);
167 216
       quoteAsset = StringSubstr(symbolName, 3, 3);
168 217
    }
218
+   else
219
+   {
220
+      // --- Try alternate detection ---
221
+      if(StringFind(symbolName, "USD") >= 0)
222
+      {
223
+         int pos = StringFind(symbolName, "USD");
224
+         baseAsset = StringSubstr(symbolName, 0, pos);
225
+         quoteAsset = "USD";
226
+      }
227
+      else if(StringFind(symbolName, "EUR") >= 0)
228
+      {
229
+         int pos = StringFind(symbolName, "EUR");
230
+         baseAsset = StringSubstr(symbolName, 0, pos);
231
+         quoteAsset = "EUR";
232
+      }
233
+      else
234
+      {
235
+         // Fallback safe defaults
236
+         baseAsset = symbolName;
237
+         quoteAsset = "USD";
238
+      }
239
+   }
240
+
241
+   // --- Final safety: ensure no empty fields ---
242
+   if(StringLen(baseAsset) == 0) baseAsset = "UNKNOWN";
243
+   if(StringLen(quoteAsset) == 0) quoteAsset = "USD";
244
+
245
+   // --- Decide instrument type ---
246
+   if(StringFind(symbolName, "BTC") == 0 || StringFind(symbolName, "ETH") == 0)
247
+      instrumentType = "crypto";
248
+   else if(StringFind(symbolName, "XAU") == 0 || StringFind(symbolName, "XAG") == 0)
249
+      instrumentType = "metal";
250
+   else if(StringFind(symbolName, "US30") == 0 || StringFind(symbolName, "NAS") == 0)
251
+      instrumentType = "index";
252
+   else
253
+      instrumentType = "forex";
169 254
 
170 255
    string json = StringFormat(
171 256
       "{\"symbol\":\"%s\",\"baseAsset\":\"%s\",\"quoteAsset\":\"%s\",\"exchange\":\"%s\",\"instrumentType\":\"%s\",\"isActive\":true}",
@@ -178,18 +263,15 @@ int CreateSymbolInDatabase(string symbolName)
178 263
 
179 264
    char postData[];
180 265
    StringToCharArray(json, postData, 0, CP_UTF8);
181
-
182
-   // ✅ FIX: Remove trailing null terminator from JSON
183 266
    if(ArraySize(postData) > 0 && postData[ArraySize(postData) - 1] == 0)
184 267
       ArrayResize(postData, ArraySize(postData) - 1);
185 268
 
186 269
    char result[];
187
-
188 270
    int res = WebRequest("POST", url, headers, 5000, postData, result, resultHeaders);
189 271
 
190 272
    if(res != 201 && res != 200)
191 273
    {
192
-      Print("❌ Failed to create symbol: ", res, " Response: ", CharArrayToString(result));
274
+      PrintFormat("❌ Failed to create symbol %s | HTTP %d | Response: %s", symbolName, res, CharArrayToString(result));
193 275
       return -1;
194 276
    }
195 277
 
@@ -205,10 +287,62 @@ int CreateSymbolInDatabase(string symbolName)
205 287
    return (int)StringToInteger(idStr);
206 288
 }
207 289
 
290
+//+------------------------------------------------------------------+
291
+//| Fetch latest stored candle openTime from API                     |
292
+//+------------------------------------------------------------------+
293
+datetime GetLatestCandleTime(int symbolId)
294
+{
295
+   string url = ApiBaseUrl + "/api/candles/" + IntegerToString(symbolId) + "/latest";
296
+   string headers = "Content-Type: application/json\r\n";
297
+   string resultHeaders = "";
298
+   char result[];
299
+   char emptyData[];
300
+
301
+   ResetLastError();
302
+   int res = WebRequest("GET", url, headers, 10000, emptyData, result, resultHeaders);
303
+
304
+   if(res != 200)
305
+   {
306
+      Print("⚠️ Could not fetch latest candle for symbolId=", symbolId, " (HTTP ", res, ")");
307
+      return 0;
308
+   }
309
+
310
+   string response = CharArrayToString(result);
311
+   int pos = StringFind(response, "\"openTime\":\"");
312
+   if(pos < 0)
313
+   {
314
+      Print("⚠️ No openTime found in response for symbolId=", symbolId);
315
+      return 0;
316
+   }
317
+
318
+   pos += StringLen("\"openTime\":\"");
319
+   int end = StringFind(response, "\"", pos);
320
+   string openTimeStr = StringSubstr(response, pos, end - pos);
321
+
322
+   // --- Parse ISO8601 to datetime ---
323
+   int year  = (int)StringToInteger(StringSubstr(openTimeStr, 0, 4));
324
+   int month = (int)StringToInteger(StringSubstr(openTimeStr, 5, 2));
325
+   int day   = (int)StringToInteger(StringSubstr(openTimeStr, 8, 2));
326
+   int hour  = (int)StringToInteger(StringSubstr(openTimeStr, 11, 2));
327
+   int min   = (int)StringToInteger(StringSubstr(openTimeStr, 14, 2));
328
+   int sec   = (int)StringToInteger(StringSubstr(openTimeStr, 17, 2));
329
+
330
+   MqlDateTime t;
331
+   t.year = year; t.mon = month; t.day = day;
332
+   t.hour = hour; t.min = min; t.sec = sec;
333
+
334
+   datetime dt = StructToTime(t);
335
+   Print("🕓 Latest stored candle openTime for symbolId=", symbolId, " → ", TimeToString(dt, TIME_DATE|TIME_SECONDS));
336
+   return dt;
337
+}
338
+
208 339
 //+------------------------------------------------------------------+
209 340
 //+------------------------------------------------------------------+
210 341
 //| Send all historical candles to the API (Fixed Version)           |
211 342
 //+------------------------------------------------------------------+
343
+//+------------------------------------------------------------------+
344
+//| Send all historical candles to the API (Fixed + Timeout Safe)    |
345
+//+------------------------------------------------------------------+
212 346
 void SendAllHistoricalCandles()
213 347
 {
214 348
    Print("Starting historical upload for ", ArraySize(symbols), " symbols...");
@@ -216,90 +350,200 @@ void SendAllHistoricalCandles()
216 350
    for(int i = 0; i < ArraySize(symbols); i++)
217 351
    {
218 352
       string sym = symbols[i];
353
+      int symbolId = symbolIds[i];
354
+      if(symbolId <= 0) continue;
355
+
356
+      // --- Get last stored candle time ---
357
+      datetime latestApiTime = GetLatestCandleTime(symbolId);
219 358
 
220
-      // --- Ensure data is ready ---
359
+      // --- Ensure history data is available ---
221 360
       Sleep(300);
222 361
       int tries = 0;
223
-      while(!SeriesInfoInteger(sym, HistoricalTimeframe, SERIES_SYNCHRONIZED) && tries < 10)
362
+      bool historyReady = false;
363
+
364
+      while(tries < 10)
224 365
       {
225
-         Print("⏳ Waiting for ", sym, " history to load...");
366
+         if(SeriesInfoInteger(sym, HistoricalTimeframe, SERIES_SYNCHRONIZED))
367
+         {
368
+            historyReady = true;
369
+            break;
370
+         }
371
+         PrintFormat("⏳ Waiting for %s history to load... (try %d/10)", sym, tries + 1);
226 372
          Sleep(500);
227 373
          tries++;
228 374
       }
229 375
 
230
-      // --- Now copy candles ---
376
+      if(!historyReady)
377
+      {
378
+         PrintFormat("⚠️ Skipping %s — history not loaded after 10 tries (~5s timeout).", sym);
379
+         continue;
380
+      }
381
+
382
+      // --- Copy rates ---
231 383
       MqlRates rates[];
232 384
       ResetLastError();
233 385
       int copied = CopyRates(sym, HistoricalTimeframe, 0, HistoricalCandleCount, rates);
234
-      int err = GetLastError();
235 386
 
236 387
       if(copied <= 0)
237 388
       {
238
-         Print("⚠️ Failed to copy candles for ", sym, " (copied=", copied, ", err=", err, ")");
389
+         int err = GetLastError();
390
+         PrintFormat("⚠️ Failed to copy candles for %s (error %d)", sym, err);
239 391
          continue;
240 392
       }
241 393
 
242
-      Print("✅ Copied ", copied, " candles for ", sym);
394
+      PrintFormat("✅ Copied %d candles for %s", copied, sym);
243 395
 
244
-      // --- Print a few sample candles ---
245
-      int sampleCount = MathMin(5, copied); // show up to 5 examples
246
-      for(int j = 0; j < sampleCount; j++)
396
+      // --- Filter new candles ---
397
+      int startIndex = 0;
398
+      for(int j = 0; j < copied; j++)
247 399
       {
248
-         MqlRates r = rates[j];
249
-         string openTime = TimeToString(r.time, TIME_DATE|TIME_SECONDS);
250
-         string closeTime = TimeToString(r.time + PeriodSeconds(HistoricalTimeframe), TIME_DATE|TIME_SECONDS);
251
-         PrintFormat(
252
-            "🕒 [%s] %s → %s | O=%.5f H=%.5f L=%.5f C=%.5f | Vol=%.2f",
253
-            sym, openTime, closeTime, r.open, r.high, r.low, r.close, r.tick_volume
254
-         );
400
+         if(rates[j].time > latestApiTime)
401
+         {
402
+            startIndex = j;
403
+            break;
404
+         }
255 405
       }
256 406
 
257
-      // --- Send candles in batches to API ---
258
-      int sentTotal = 0;
407
+      int newCount = copied - startIndex;
408
+      if(newCount <= 0)
409
+      {
410
+         PrintFormat("ℹ️ No new candles to send for %s", sym);
411
+         continue;
412
+      }
413
+
414
+      PrintFormat("🆕 Sending %d new candles for %s after %s", newCount, sym, TimeToString(latestApiTime, TIME_DATE|TIME_SECONDS));
415
+
416
+      // --- Send new candles in batches ---
259 417
       int batchSize = 200;
260
-      for(int start = 0; start < copied; start += batchSize)
418
+      int sentTotal = 0;
419
+
420
+      for(int start = startIndex; start < copied; start += batchSize)
261 421
       {
262 422
          int size = MathMin(batchSize, copied - start);
263
-         int symbolId = symbolIds[i];
264
-         if(symbolId <= 0) continue;
265
-
266 423
          string json = BuildCandleJSONFromRates(symbolId, rates, start, size);
267 424
          string url = ApiBaseUrl + "/api/candles/bulk";
268 425
          string response;
426
+
269 427
          bool ok = SendJSON(url, json, response);
270 428
          if(!ok)
271 429
          {
272
-            Print("❌ Failed to send candle batch for ", sym, " start=", start);
430
+            PrintFormat("❌ Failed to send candle batch for %s (start=%d)", sym, start);
273 431
             break;
274 432
          }
433
+
275 434
          sentTotal += size;
276
-         Print("📤 Sent candles for ", sym, ": ", sentTotal, "/", copied);
435
+         PrintFormat("📤 Sent %d/%d new candles for %s", sentTotal, newCount, sym);
277 436
       }
278 437
    }
279
-   Print("✅ Historical upload finished.");
438
+
439
+   Print("✅ Incremental candle upload finished.");
280 440
 }
281 441
 
282 442
 
443
+//+------------------------------------------------------------------+
444
+//| Send live prices of all active symbols                           |
283 445
 //+------------------------------------------------------------------+
284 446
 void SendLivePrices()
285 447
 {
286 448
    bool firstItem = true;
287 449
    string json = "{\"prices\":[";
288
-
289 450
    int sentCount = 0;
451
+
290 452
    for(int i = 0; i < ArraySize(symbols); i++)
291 453
    {
292 454
       string sym = symbols[i];
455
+      int symId = symbolIds[i];
456
+      if(symId <= 0) continue;
457
+
458
+      // Ensure symbol is visible in Market Watch
459
+      if(!SymbolSelect(sym, true))
460
+      {
461
+         Print("⚠️ Failed to select symbol: ", sym);
462
+         continue;
463
+      }
464
+
465
+      // Read primary prices
293 466
       double bid = SymbolInfoDouble(sym, SYMBOL_BID);
294 467
       double ask = SymbolInfoDouble(sym, SYMBOL_ASK);
295 468
       double last = SymbolInfoDouble(sym, SYMBOL_LAST);
296
-      if(bid <= 0 || ask <= 0 || last <= 0) continue;
297 469
 
298
-      int symId = symbolIds[i];
299
-      if(symId <= 0) continue;
470
+      // If last = 0 (some providers), use midprice as fallback
471
+      if(last <= 0 && bid > 0 && ask > 0)
472
+         last = (bid + ask) / 2.0;
300 473
 
301
-      string item = StringFormat("{\"symbolId\":%d,\"price\":%.8f,\"bid\":%.8f,\"ask\":%.8f,\"bidSize\":%.8f,\"askSize\":%.8f}",
302
-                                 symId, last, bid, ask, 0.0, 0.0);
474
+      // Skip if prices are still invalid
475
+      if(bid <= 0 || ask <= 0 || last <= 0)
476
+      {
477
+         Print("⚠️ Skipping symbol ", sym, " — invalid bid/ask/last (", DoubleToString(bid,8), "/", DoubleToString(ask,8), "/", DoubleToString(last,8), ")");
478
+         continue;
479
+      }
480
+
481
+      // Initialize sizes
482
+      double bidSize = 0.0;
483
+      double askSize = 0.0;
484
+
485
+      // Try to fetch market depth (book) and classify volumes by price vs bid/ask
486
+      MqlBookInfo book[];
487
+      if(MarketBookGet(sym, book) && ArraySize(book) > 0)
488
+      {
489
+         for(int j = 0; j < ArraySize(book); j++)
490
+         {
491
+            double p = book[j].price;
492
+            double v = book[j].volume;
493
+
494
+            // If price is >= ask => ask side
495
+            if(p >= ask) askSize += v;
496
+            // If price is <= bid => bid side
497
+            else if(p <= bid) bidSize += v;
498
+            else
499
+            {
500
+               // price in-between -> assign to nearer side
501
+               double distToBid = MathAbs(p - bid);
502
+               double distToAsk = MathAbs(ask - p);
503
+               if(distToBid <= distToAsk) bidSize += v; else askSize += v;
504
+            }
505
+         }
506
+         PrintFormat("ℹ️ MarketBook for %s → bid=%.8f ask=%.8f bidSize=%.2f askSize=%.2f (book entries=%d)", sym, bid, ask, bidSize, askSize, ArraySize(book));
507
+      }
508
+      else
509
+      {
510
+         // MarketBook not available or empty
511
+         // Try SymbolInfoTick as fallback
512
+         MqlTick tick;
513
+         if(SymbolInfoTick(sym, tick))
514
+         {
515
+            // tick.volume is aggregated tick volume — not exact bid/ask sizes but better than zero
516
+            double tickVol = (double)tick.volume;
517
+            if(tickVol > 0.0)
518
+            {
519
+               // assign tick volume to both sides conservatively
520
+               if(bidSize <= 0) bidSize = tickVol;
521
+               if(askSize <= 0) askSize = tickVol;
522
+               PrintFormat("ℹ️ tick fallback for %s → tick.volume=%.2f", sym, tickVol);
523
+            }
524
+            else
525
+            {
526
+               Print("ℹ️ tick available but volume zero for ", sym);
527
+            }
528
+         }
529
+         else
530
+         {
531
+            Print("ℹ️ MarketBook and tick not available for ", sym);
532
+         }
533
+      }
534
+
535
+      // Final safety: ensure API-required positive numbers
536
+      // If a side is zero or negative, set to minimal positive 1.0
537
+      if(bidSize <= 0.0) bidSize = 1.0;
538
+      if(askSize <= 0.0) askSize = 1.0;
539
+
540
+      // Build JSON item for this symbol
541
+      string item = StringFormat(
542
+         "{\"symbolId\":%d,\"price\":%.8f,\"bid\":%.8f,\"ask\":%.8f,\"bidSize\":%.8f,\"askSize\":%.8f}",
543
+         symId, last, bid, ask, bidSize, askSize
544
+      );
545
+
546
+      // Add to aggregate payload
303 547
       if(!firstItem) json += ",";
304 548
       json += item;
305 549
       firstItem = false;
@@ -310,17 +554,25 @@ void SendLivePrices()
310 554
 
311 555
    if(sentCount == 0)
312 556
    {
313
-      Print("No valid live prices to send right now.");
557
+      Print("⚠️ No valid live prices to send right now. Check if market is open and symbols have tick data.");
314 558
       return;
315 559
    }
316 560
 
561
+   // Log URL and truncated payload for debugging
317 562
    string url = ApiBaseUrl + "/api/live-prices/bulk";
563
+   int maxShow = 1000;
564
+   string payloadLog = (StringLen(json) > maxShow) ? StringSubstr(json, 0, maxShow) + "...(truncated)" : json;
565
+   Print("📤 Calling API: ", url);
566
+   Print("📦 Payload (truncated): ", payloadLog);
567
+
568
+   // Send and report
318 569
    string response;
319 570
    bool ok = SendJSON(url, json, response);
571
+
320 572
    if(ok)
321
-      Print("✅ Sent ", sentCount, " live prices.");
573
+      Print("✅ Successfully sent ", sentCount, " live prices to API.");
322 574
    else
323
-      Print("❌ Failed to send live prices (sent ", sentCount, " items). Response: ", response);
575
+      Print("❌ Failed to send live prices (", sentCount, " items). API response: ", response);
324 576
 }
325 577
 
326 578
 //+------------------------------------------------------------------+
@@ -408,3 +660,57 @@ bool SendJSON(string url, string json, string &response)
408 660
 }
409 661
 
410 662
 //+------------------------------------------------------------------+
663
+//+------------------------------------------------------------------+
664
+//| Cleanup old candles (keep only last 1000)                        |
665
+//+------------------------------------------------------------------+
666
+void CleanupOldCandles(int symbolId)
667
+{
668
+   string url = ApiBaseUrl + "/api/candles/cleanup/" + IntegerToString(symbolId) + "?keep=1000";
669
+   string headers = "Content-Type: application/json\r\n";
670
+   string resultHeaders = "";
671
+   char result[];
672
+   char emptyData[];
673
+
674
+   ResetLastError();
675
+   int res = WebRequest("DELETE", url, headers, 10000, emptyData, result, resultHeaders);
676
+
677
+   string response = CharArrayToString(result);
678
+   if(res == 200 || res == 204)
679
+      Print("🧹 Cleanup successful for symbolId=", symbolId, " → kept last 1000 candles.");
680
+   else
681
+      Print("⚠️ Cleanup failed for symbolId=", symbolId, " HTTP=", res, " Response=", response);
682
+}
683
+
684
+//+------------------------------------------------------------------+
685
+//| Timer event: runs every 60 seconds                               |
686
+//+------------------------------------------------------------------+
687
+void OnTimer()
688
+{
689
+   datetime now = TimeCurrent();
690
+
691
+   // ✅ Run full candle sync only once every minute
692
+   if(now - lastCandleSync >= 600)
693
+   {
694
+      Print("⏰ Running scheduled candle sync and cleanup...");
695
+      SendAllHistoricalCandles();
696
+
697
+      // ✅ After uploading candles, clean up old ones
698
+      for(int i = 0; i < ArraySize(symbols); i++)
699
+      {
700
+         int symId = symbolIds[i];
701
+         if(symId <= 0) continue;
702
+         CleanupOldCandles(symId);
703
+         Sleep(500); // small delay to avoid API overload
704
+      }
705
+
706
+      lastCandleSync = now;
707
+      Print("✅ Candle sync + cleanup cycle completed.");
708
+   }
709
+}
710
+
711
+
712
+
713
+void OnDeinit(const int reason)
714
+{
715
+   EventKillTimer();
716
+}

+ 1 - 1
README.md

@@ -658,7 +658,7 @@ DB_USER=your_username
658 658
 DB_PASSWORD=your_password
659 659
 
660 660
 # Server Configuration
661
-PORT=3000
661
+PORT=3001
662 662
 NODE_ENV=development
663 663
 
664 664
 # JWT Configuration (if needed for authentication)

+ 341 - 0
package-lock.json

@@ -17,6 +17,8 @@
17 17
         "morgan": "^1.10.1",
18 18
         "pg": "^8.16.3",
19 19
         "sequelize": "^6.37.7",
20
+        "socket.io": "^4.8.1",
21
+        "socket.io-client": "^4.8.1",
20 22
         "winston": "^3.18.3"
21 23
       },
22 24
       "devDependencies": {
@@ -1187,6 +1189,12 @@
1187 1189
         "text-hex": "1.0.x"
1188 1190
       }
1189 1191
     },
1192
+    "node_modules/@socket.io/component-emitter": {
1193
+      "version": "3.1.2",
1194
+      "resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.2.tgz",
1195
+      "integrity": "sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA==",
1196
+      "license": "MIT"
1197
+    },
1190 1198
     "node_modules/@standard-schema/spec": {
1191 1199
       "version": "1.0.0",
1192 1200
       "resolved": "https://registry.npmjs.org/@standard-schema/spec/-/spec-1.0.0.tgz",
@@ -1249,6 +1257,15 @@
1249 1257
         "@babel/types": "^7.28.2"
1250 1258
       }
1251 1259
     },
1260
+    "node_modules/@types/cors": {
1261
+      "version": "2.8.19",
1262
+      "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.19.tgz",
1263
+      "integrity": "sha512-mFNylyeyqN93lfe/9CSxOGREz8cpzAhH+E93xJ4xWQf62V8sQ/24reV2nyzUWM6H6Xji+GGHpkbLe7pVoUEskg==",
1264
+      "license": "MIT",
1265
+      "dependencies": {
1266
+        "@types/node": "*"
1267
+      }
1268
+    },
1252 1269
     "node_modules/@types/debug": {
1253 1270
       "version": "4.1.12",
1254 1271
       "resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.12.tgz",
@@ -1887,6 +1904,15 @@
1887 1904
       "dev": true,
1888 1905
       "license": "MIT"
1889 1906
     },
1907
+    "node_modules/base64id": {
1908
+      "version": "2.0.0",
1909
+      "resolved": "https://registry.npmjs.org/base64id/-/base64id-2.0.0.tgz",
1910
+      "integrity": "sha512-lGe34o6EHj9y3Kts9R4ZYs/Gr+6N7MCaMlIFA3F1R2O5/m7K06AxfSeO5530PEERE6/WyEg3lsuyw4GHlPZHog==",
1911
+      "license": "MIT",
1912
+      "engines": {
1913
+        "node": "^4.5.0 || >= 5.9"
1914
+      }
1915
+    },
1890 1916
     "node_modules/baseline-browser-mapping": {
1891 1917
       "version": "2.8.17",
1892 1918
       "resolved": "https://registry.npmjs.org/baseline-browser-mapping/-/baseline-browser-mapping-2.8.17.tgz",
@@ -2661,6 +2687,125 @@
2661 2687
         "node": ">= 0.8"
2662 2688
       }
2663 2689
     },
2690
+    "node_modules/engine.io": {
2691
+      "version": "6.6.4",
2692
+      "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.6.4.tgz",
2693
+      "integrity": "sha512-ZCkIjSYNDyGn0R6ewHDtXgns/Zre/NT6Agvq1/WobF7JXgFff4SeDroKiCO3fNJreU9YG429Sc81o4w5ok/W5g==",
2694
+      "license": "MIT",
2695
+      "dependencies": {
2696
+        "@types/cors": "^2.8.12",
2697
+        "@types/node": ">=10.0.0",
2698
+        "accepts": "~1.3.4",
2699
+        "base64id": "2.0.0",
2700
+        "cookie": "~0.7.2",
2701
+        "cors": "~2.8.5",
2702
+        "debug": "~4.3.1",
2703
+        "engine.io-parser": "~5.2.1",
2704
+        "ws": "~8.17.1"
2705
+      },
2706
+      "engines": {
2707
+        "node": ">=10.2.0"
2708
+      }
2709
+    },
2710
+    "node_modules/engine.io-client": {
2711
+      "version": "6.6.3",
2712
+      "resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-6.6.3.tgz",
2713
+      "integrity": "sha512-T0iLjnyNWahNyv/lcjS2y4oE358tVS/SYQNxYXGAJ9/GLgH4VCvOQ/mhTjqU88mLZCQgiG8RIegFHYCdVC+j5w==",
2714
+      "license": "MIT",
2715
+      "dependencies": {
2716
+        "@socket.io/component-emitter": "~3.1.0",
2717
+        "debug": "~4.3.1",
2718
+        "engine.io-parser": "~5.2.1",
2719
+        "ws": "~8.17.1",
2720
+        "xmlhttprequest-ssl": "~2.1.1"
2721
+      }
2722
+    },
2723
+    "node_modules/engine.io-client/node_modules/debug": {
2724
+      "version": "4.3.7",
2725
+      "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.7.tgz",
2726
+      "integrity": "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==",
2727
+      "license": "MIT",
2728
+      "dependencies": {
2729
+        "ms": "^2.1.3"
2730
+      },
2731
+      "engines": {
2732
+        "node": ">=6.0"
2733
+      },
2734
+      "peerDependenciesMeta": {
2735
+        "supports-color": {
2736
+          "optional": true
2737
+        }
2738
+      }
2739
+    },
2740
+    "node_modules/engine.io-parser": {
2741
+      "version": "5.2.3",
2742
+      "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.3.tgz",
2743
+      "integrity": "sha512-HqD3yTBfnBxIrbnM1DoD6Pcq8NECnh8d4As1Qgh0z5Gg3jRRIqijury0CL3ghu/edArpUYiYqQiDUQBIs4np3Q==",
2744
+      "license": "MIT",
2745
+      "engines": {
2746
+        "node": ">=10.0.0"
2747
+      }
2748
+    },
2749
+    "node_modules/engine.io/node_modules/accepts": {
2750
+      "version": "1.3.8",
2751
+      "resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.8.tgz",
2752
+      "integrity": "sha512-PYAthTa2m2VKxuvSD3DPC/Gy+U+sOA1LAuT8mkmRuvw+NACSaeXEQ+NHcVF7rONl6qcaxV3Uuemwawk+7+SJLw==",
2753
+      "license": "MIT",
2754
+      "dependencies": {
2755
+        "mime-types": "~2.1.34",
2756
+        "negotiator": "0.6.3"
2757
+      },
2758
+      "engines": {
2759
+        "node": ">= 0.6"
2760
+      }
2761
+    },
2762
+    "node_modules/engine.io/node_modules/debug": {
2763
+      "version": "4.3.7",
2764
+      "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.7.tgz",
2765
+      "integrity": "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==",
2766
+      "license": "MIT",
2767
+      "dependencies": {
2768
+        "ms": "^2.1.3"
2769
+      },
2770
+      "engines": {
2771
+        "node": ">=6.0"
2772
+      },
2773
+      "peerDependenciesMeta": {
2774
+        "supports-color": {
2775
+          "optional": true
2776
+        }
2777
+      }
2778
+    },
2779
+    "node_modules/engine.io/node_modules/mime-db": {
2780
+      "version": "1.52.0",
2781
+      "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.52.0.tgz",
2782
+      "integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==",
2783
+      "license": "MIT",
2784
+      "engines": {
2785
+        "node": ">= 0.6"
2786
+      }
2787
+    },
2788
+    "node_modules/engine.io/node_modules/mime-types": {
2789
+      "version": "2.1.35",
2790
+      "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.35.tgz",
2791
+      "integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==",
2792
+      "license": "MIT",
2793
+      "dependencies": {
2794
+        "mime-db": "1.52.0"
2795
+      },
2796
+      "engines": {
2797
+        "node": ">= 0.6"
2798
+      }
2799
+    },
2800
+    "node_modules/engine.io/node_modules/negotiator": {
2801
+      "version": "0.6.3",
2802
+      "resolved": "https://registry.npmjs.org/negotiator/-/negotiator-0.6.3.tgz",
2803
+      "integrity": "sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg==",
2804
+      "license": "MIT",
2805
+      "engines": {
2806
+        "node": ">= 0.6"
2807
+      }
2808
+    },
2664 2809
     "node_modules/error-ex": {
2665 2810
       "version": "1.3.4",
2666 2811
       "resolved": "https://registry.npmjs.org/error-ex/-/error-ex-1.3.4.tgz",
@@ -5906,6 +6051,173 @@
5906 6051
         "node": ">=8"
5907 6052
       }
5908 6053
     },
6054
+    "node_modules/socket.io": {
6055
+      "version": "4.8.1",
6056
+      "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.8.1.tgz",
6057
+      "integrity": "sha512-oZ7iUCxph8WYRHHcjBEc9unw3adt5CmSNlppj/5Q4k2RIrhl8Z5yY2Xr4j9zj0+wzVZ0bxmYoGSzKJnRl6A4yg==",
6058
+      "license": "MIT",
6059
+      "dependencies": {
6060
+        "accepts": "~1.3.4",
6061
+        "base64id": "~2.0.0",
6062
+        "cors": "~2.8.5",
6063
+        "debug": "~4.3.2",
6064
+        "engine.io": "~6.6.0",
6065
+        "socket.io-adapter": "~2.5.2",
6066
+        "socket.io-parser": "~4.2.4"
6067
+      },
6068
+      "engines": {
6069
+        "node": ">=10.2.0"
6070
+      }
6071
+    },
6072
+    "node_modules/socket.io-adapter": {
6073
+      "version": "2.5.5",
6074
+      "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.5.tgz",
6075
+      "integrity": "sha512-eLDQas5dzPgOWCk9GuuJC2lBqItuhKI4uxGgo9aIV7MYbk2h9Q6uULEh8WBzThoI7l+qU9Ast9fVUmkqPP9wYg==",
6076
+      "license": "MIT",
6077
+      "dependencies": {
6078
+        "debug": "~4.3.4",
6079
+        "ws": "~8.17.1"
6080
+      }
6081
+    },
6082
+    "node_modules/socket.io-adapter/node_modules/debug": {
6083
+      "version": "4.3.7",
6084
+      "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.7.tgz",
6085
+      "integrity": "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==",
6086
+      "license": "MIT",
6087
+      "dependencies": {
6088
+        "ms": "^2.1.3"
6089
+      },
6090
+      "engines": {
6091
+        "node": ">=6.0"
6092
+      },
6093
+      "peerDependenciesMeta": {
6094
+        "supports-color": {
6095
+          "optional": true
6096
+        }
6097
+      }
6098
+    },
6099
+    "node_modules/socket.io-client": {
6100
+      "version": "4.8.1",
6101
+      "resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-4.8.1.tgz",
6102
+      "integrity": "sha512-hJVXfu3E28NmzGk8o1sHhN3om52tRvwYeidbj7xKy2eIIse5IoKX3USlS6Tqt3BHAtflLIkCQBkzVrEEfWUyYQ==",
6103
+      "license": "MIT",
6104
+      "dependencies": {
6105
+        "@socket.io/component-emitter": "~3.1.0",
6106
+        "debug": "~4.3.2",
6107
+        "engine.io-client": "~6.6.1",
6108
+        "socket.io-parser": "~4.2.4"
6109
+      },
6110
+      "engines": {
6111
+        "node": ">=10.0.0"
6112
+      }
6113
+    },
6114
+    "node_modules/socket.io-client/node_modules/debug": {
6115
+      "version": "4.3.7",
6116
+      "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.7.tgz",
6117
+      "integrity": "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==",
6118
+      "license": "MIT",
6119
+      "dependencies": {
6120
+        "ms": "^2.1.3"
6121
+      },
6122
+      "engines": {
6123
+        "node": ">=6.0"
6124
+      },
6125
+      "peerDependenciesMeta": {
6126
+        "supports-color": {
6127
+          "optional": true
6128
+        }
6129
+      }
6130
+    },
6131
+    "node_modules/socket.io-parser": {
6132
+      "version": "4.2.4",
6133
+      "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.4.tgz",
6134
+      "integrity": "sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==",
6135
+      "license": "MIT",
6136
+      "dependencies": {
6137
+        "@socket.io/component-emitter": "~3.1.0",
6138
+        "debug": "~4.3.1"
6139
+      },
6140
+      "engines": {
6141
+        "node": ">=10.0.0"
6142
+      }
6143
+    },
6144
+    "node_modules/socket.io-parser/node_modules/debug": {
6145
+      "version": "4.3.7",
6146
+      "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.7.tgz",
6147
+      "integrity": "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==",
6148
+      "license": "MIT",
6149
+      "dependencies": {
6150
+        "ms": "^2.1.3"
6151
+      },
6152
+      "engines": {
6153
+        "node": ">=6.0"
6154
+      },
6155
+      "peerDependenciesMeta": {
6156
+        "supports-color": {
6157
+          "optional": true
6158
+        }
6159
+      }
6160
+    },
6161
+    "node_modules/socket.io/node_modules/accepts": {
6162
+      "version": "1.3.8",
6163
+      "resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.8.tgz",
6164
+      "integrity": "sha512-PYAthTa2m2VKxuvSD3DPC/Gy+U+sOA1LAuT8mkmRuvw+NACSaeXEQ+NHcVF7rONl6qcaxV3Uuemwawk+7+SJLw==",
6165
+      "license": "MIT",
6166
+      "dependencies": {
6167
+        "mime-types": "~2.1.34",
6168
+        "negotiator": "0.6.3"
6169
+      },
6170
+      "engines": {
6171
+        "node": ">= 0.6"
6172
+      }
6173
+    },
6174
+    "node_modules/socket.io/node_modules/debug": {
6175
+      "version": "4.3.7",
6176
+      "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.7.tgz",
6177
+      "integrity": "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==",
6178
+      "license": "MIT",
6179
+      "dependencies": {
6180
+        "ms": "^2.1.3"
6181
+      },
6182
+      "engines": {
6183
+        "node": ">=6.0"
6184
+      },
6185
+      "peerDependenciesMeta": {
6186
+        "supports-color": {
6187
+          "optional": true
6188
+        }
6189
+      }
6190
+    },
6191
+    "node_modules/socket.io/node_modules/mime-db": {
6192
+      "version": "1.52.0",
6193
+      "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.52.0.tgz",
6194
+      "integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==",
6195
+      "license": "MIT",
6196
+      "engines": {
6197
+        "node": ">= 0.6"
6198
+      }
6199
+    },
6200
+    "node_modules/socket.io/node_modules/mime-types": {
6201
+      "version": "2.1.35",
6202
+      "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.35.tgz",
6203
+      "integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==",
6204
+      "license": "MIT",
6205
+      "dependencies": {
6206
+        "mime-db": "1.52.0"
6207
+      },
6208
+      "engines": {
6209
+        "node": ">= 0.6"
6210
+      }
6211
+    },
6212
+    "node_modules/socket.io/node_modules/negotiator": {
6213
+      "version": "0.6.3",
6214
+      "resolved": "https://registry.npmjs.org/negotiator/-/negotiator-0.6.3.tgz",
6215
+      "integrity": "sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg==",
6216
+      "license": "MIT",
6217
+      "engines": {
6218
+        "node": ">= 0.6"
6219
+      }
6220
+    },
5909 6221
     "node_modules/source-map": {
5910 6222
       "version": "0.6.1",
5911 6223
       "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz",
@@ -6745,6 +7057,35 @@
6745 7057
         "node": "^14.17.0 || ^16.13.0 || >=18.0.0"
6746 7058
       }
6747 7059
     },
7060
+    "node_modules/ws": {
7061
+      "version": "8.17.1",
7062
+      "resolved": "https://registry.npmjs.org/ws/-/ws-8.17.1.tgz",
7063
+      "integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==",
7064
+      "license": "MIT",
7065
+      "engines": {
7066
+        "node": ">=10.0.0"
7067
+      },
7068
+      "peerDependencies": {
7069
+        "bufferutil": "^4.0.1",
7070
+        "utf-8-validate": ">=5.0.2"
7071
+      },
7072
+      "peerDependenciesMeta": {
7073
+        "bufferutil": {
7074
+          "optional": true
7075
+        },
7076
+        "utf-8-validate": {
7077
+          "optional": true
7078
+        }
7079
+      }
7080
+    },
7081
+    "node_modules/xmlhttprequest-ssl": {
7082
+      "version": "2.1.2",
7083
+      "resolved": "https://registry.npmjs.org/xmlhttprequest-ssl/-/xmlhttprequest-ssl-2.1.2.tgz",
7084
+      "integrity": "sha512-TEU+nJVUUnA4CYJFLvK5X9AOeH4KvDvhIfm0vV1GaQRtchnG0hgK5p8hw/xjv8cunWYCsiPCSDzObPyhEwq3KQ==",
7085
+      "engines": {
7086
+        "node": ">=0.4.0"
7087
+      }
7088
+    },
6748 7089
     "node_modules/xtend": {
6749 7090
       "version": "4.0.2",
6750 7091
       "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz",

+ 2 - 0
package.json

@@ -30,6 +30,8 @@
30 30
     "morgan": "^1.10.1",
31 31
     "pg": "^8.16.3",
32 32
     "sequelize": "^6.37.7",
33
+    "socket.io": "^4.8.1",
34
+    "socket.io-client": "^4.8.1",
33 35
     "winston": "^3.18.3"
34 36
   },
35 37
   "devDependencies": {

+ 64 - 0
src/controllers/candleController.js

@@ -120,6 +120,29 @@ class CandleController {
120 120
 
121 121
       const candle = await Candle1h.create(candleData);
122 122
 
123
+      // Emit WebSocket event for real-time updates
124
+      const io = req.app.get('io');
125
+      if (io) {
126
+        const eventData = {
127
+          symbol: symbol.symbol,
128
+          symbolId: symbol.id,
129
+          openTime: candle.openTime,
130
+          open: candle.open,
131
+          high: candle.high,
132
+          low: candle.low,
133
+          close: candle.close,
134
+          volume: candle.volume,
135
+          exchange: symbol.exchange,
136
+          instrumentType: symbol.instrumentType
137
+        };
138
+
139
+        // Emit to all clients subscribed to this symbol
140
+        io.to(`symbol:${symbol.symbol}`).emit('candleUpdate', eventData);
141
+
142
+        // Also emit to general candle updates
143
+        io.emit('candleUpdate', eventData);
144
+      }
145
+
123 146
       res.status(201).json({
124 147
         success: true,
125 148
         data: candle,
@@ -159,6 +182,47 @@ class CandleController {
159 182
 
160 183
       const createdCandles = await Candle1h.bulkCreate(candles);
161 184
 
185
+      // Emit WebSocket events for real-time updates
186
+      const io = req.app.get('io');
187
+      if (io) {
188
+        // Group candles by symbol for efficient emission
189
+        const symbolGroups = {};
190
+        for (const candle of createdCandles) {
191
+          if (!symbolGroups[candle.symbolId]) {
192
+            symbolGroups[candle.symbolId] = [];
193
+          }
194
+          symbolGroups[candle.symbolId].push(candle);
195
+        }
196
+
197
+        // Emit events for each symbol
198
+        for (const [symbolId, symbolCandles] of Object.entries(symbolGroups)) {
199
+          const symbol = await Symbol.findByPk(symbolId);
200
+          if (symbol) {
201
+            // Emit the latest candle for this symbol (most recent)
202
+            const latestCandle = symbolCandles.sort((a, b) => new Date(b.openTime) - new Date(a.openTime))[0];
203
+
204
+            const eventData = {
205
+              symbol: symbol.symbol,
206
+              symbolId: symbol.id,
207
+              openTime: latestCandle.openTime,
208
+              open: latestCandle.open,
209
+              high: latestCandle.high,
210
+              low: latestCandle.low,
211
+              close: latestCandle.close,
212
+              volume: latestCandle.volume,
213
+              exchange: symbol.exchange,
214
+              instrumentType: symbol.instrumentType
215
+            };
216
+
217
+            // Emit to all clients subscribed to this symbol
218
+            io.to(`symbol:${symbol.symbol}`).emit('candleUpdate', eventData);
219
+
220
+            // Also emit to general candle updates
221
+            io.emit('candleUpdate', eventData);
222
+          }
223
+        }
224
+      }
225
+
162 226
       res.status(201).json({
163 227
         success: true,
164 228
         data: createdCandles,

+ 50 - 0
src/controllers/livePriceController.js

@@ -90,6 +90,29 @@ class LivePriceController {
90 90
         lastUpdated: new Date()
91 91
       });
92 92
 
93
+      // Emit WebSocket event for real-time updates
94
+      const io = req.app.get('io');
95
+      if (io) {
96
+        const eventData = {
97
+          symbol: symbol.symbol,
98
+          symbolId: symbol.id,
99
+          price,
100
+          bid,
101
+          ask,
102
+          bidSize,
103
+          askSize,
104
+          lastUpdated: livePrice.lastUpdated,
105
+          exchange: symbol.exchange,
106
+          instrumentType: symbol.instrumentType
107
+        };
108
+
109
+        // Emit to all clients subscribed to this symbol
110
+        io.to(`symbol:${symbol.symbol}`).emit('livePriceUpdate', eventData);
111
+
112
+        // Also emit to general live price updates
113
+        io.emit('livePriceUpdate', eventData);
114
+      }
115
+
93 116
       res.status(created ? 201 : 200).json({
94 117
         success: true,
95 118
         data: livePrice,
@@ -142,9 +165,36 @@ class LivePriceController {
142 165
       }));
143 166
 
144 167
       const updatedPrices = [];
168
+      const io = req.app.get('io');
169
+
145 170
       for (const data of upsertData) {
146 171
         const [livePrice] = await LivePrice.upsert(data);
147 172
         updatedPrices.push(livePrice);
173
+
174
+        // Emit WebSocket event for each updated price
175
+        if (io) {
176
+          const symbol = await Symbol.findByPk(data.symbolId);
177
+          if (symbol) {
178
+            const eventData = {
179
+              symbol: symbol.symbol,
180
+              symbolId: symbol.id,
181
+              price: data.price,
182
+              bid: data.bid,
183
+              ask: data.ask,
184
+              bidSize: data.bidSize,
185
+              askSize: data.askSize,
186
+              lastUpdated: data.lastUpdated,
187
+              exchange: symbol.exchange,
188
+              instrumentType: symbol.instrumentType
189
+            };
190
+
191
+            // Emit to all clients subscribed to this symbol
192
+            io.to(`symbol:${symbol.symbol}`).emit('livePriceUpdate', eventData);
193
+
194
+            // Also emit to general live price updates
195
+            io.emit('livePriceUpdate', eventData);
196
+          }
197
+        }
148 198
       }
149 199
 
150 200
       res.json({

+ 55 - 1
src/server.js

@@ -1,6 +1,8 @@
1 1
 const app = require('./app');
2 2
 const { testConnection } = require('./config/database');
3 3
 const logger = require('./utils/logger');
4
+const { createServer } = require('http');
5
+const { Server } = require('socket.io');
4 6
 
5 7
 const PORT = process.env.PORT || 3000;
6 8
 
@@ -10,11 +12,63 @@ const startServer = async () => {
10 12
     // Test database connection
11 13
     await testConnection();
12 14
 
15
+    // Create HTTP server
16
+    const server = createServer(app);
17
+
18
+    // Initialize Socket.io
19
+    const io = new Server(server, {
20
+      cors: {
21
+        origin: process.env.CORS_ORIGIN || "*",
22
+        methods: ["GET", "POST"],
23
+        credentials: true
24
+      }
25
+    });
26
+
27
+    // Socket.io connection handling
28
+    io.on('connection', (socket) => {
29
+      logger.info(`Client connected: ${socket.id}`);
30
+
31
+      // Handle subscription to symbols
32
+      socket.on('subscribe', (symbols) => {
33
+        if (Array.isArray(symbols)) {
34
+          symbols.forEach(symbol => {
35
+            socket.join(`symbol:${symbol}`);
36
+            logger.info(`Client ${socket.id} subscribed to ${symbol}`);
37
+          });
38
+        } else {
39
+          socket.join(`symbol:${symbols}`);
40
+          logger.info(`Client ${socket.id} subscribed to ${symbols}`);
41
+        }
42
+      });
43
+
44
+      // Handle unsubscription
45
+      socket.on('unsubscribe', (symbols) => {
46
+        if (Array.isArray(symbols)) {
47
+          symbols.forEach(symbol => {
48
+            socket.leave(`symbol:${symbol}`);
49
+            logger.info(`Client ${socket.id} unsubscribed from ${symbol}`);
50
+          });
51
+        } else {
52
+          socket.leave(`symbol:${symbols}`);
53
+          logger.info(`Client ${socket.id} unsubscribed from ${symbols}`);
54
+        }
55
+      });
56
+
57
+      // Handle disconnect
58
+      socket.on('disconnect', () => {
59
+        logger.info(`Client disconnected: ${socket.id}`);
60
+      });
61
+    });
62
+
63
+    // Make io accessible to routes/controllers
64
+    app.set('io', io);
65
+
13 66
     // Start the server
14
-    const server = app.listen(PORT, () => {
67
+    server.listen(PORT, () => {
15 68
       logger.info(`Market Data Service is running on port ${PORT}`);
16 69
       logger.info(`Environment: ${process.env.NODE_ENV || 'development'}`);
17 70
       logger.info(`Health check available at: http://localhost:${PORT}/health`);
71
+      logger.info(`WebSocket server ready for connections`);
18 72
     });
19 73
 
20 74
     // Graceful shutdown