//+------------------------------------------------------------------+ //| MarketDataSender.mq5 (Final Fixed Version) | //+------------------------------------------------------------------+ #property strict #property description "Fetches all symbols' candles and live prices, sends to API." #include input string ApiBaseUrl = "http://market-price.insightbull.io"; input int HistoricalCandleCount = 1000; input ENUM_TIMEFRAMES HistoricalTimeframe = PERIOD_H1; input int LivePriceIntervalSeconds = 5; // Globals string symbols[]; int symbolIds[]; datetime lastSend = 0; datetime lastCandleSync = 0; // --- Supported timeframes --- ENUM_TIMEFRAMES Timeframes[] = { PERIOD_M15, PERIOD_M30, PERIOD_H1, PERIOD_D1, PERIOD_W1, PERIOD_MN1 }; string TimeframeStrings[] = { "15m", "30m", "1h", "1D", "1W", "1M" }; //+------------------------------------------------------------------+ int OnInit() { Print("Initializing MarketDataSender EA..."); if(!InitializeSymbols()) { Print("❌ Failed to initialize symbols."); return(INIT_FAILED); } Print("✅ Symbols initialized: ", ArraySize(symbols)); SendAllHistoricalCandles(); EventSetTimer(60); // ⏱️ Trigger OnTimer() every 30 minutes Print("✅ Timer set: SendAllHistoricalCandles() will run every 30 minutes."); return(INIT_SUCCEEDED); } //+------------------------------------------------------------------+ void OnTick() { if(TimeCurrent() - lastSend >= LivePriceIntervalSeconds) { SendLivePrices(); lastSend = TimeCurrent(); } } //+------------------------------------------------------------------+ bool InitializeSymbols() { int total = SymbolsTotal(true); if(total <= 0) { Print("❌ No symbols found!"); return false; } ArrayResize(symbols, total); ArrayResize(symbolIds, total); for(int i = 0; i < total; i++) { symbols[i] = SymbolName(i, true); symbolIds[i] = -1; } if(!SyncSymbolsWithDatabase()) { Print("❌ Failed to sync symbols with database"); return false; } return true; } //+------------------------------------------------------------------+ bool SyncSymbolsWithDatabase() { Print("Syncing symbols with database..."); string url = ApiBaseUrl + "/api/symbols"; string headers = "Content-Type: application/json\r\n"; string resultHeaders = ""; char result[]; char emptyData[]; // ✅ required placeholder for GET request ResetLastError(); // ✅ Correct GET request signature: includes empty data[] int res = WebRequest("GET", url, headers, 5000, emptyData, result, resultHeaders); if(res == -1) { int err = GetLastError(); Print("❌ WebRequest connection error: ", err, " URL=", url); return false; } if(res != 200) { Print("❌ Failed to fetch symbols from API: HTTP ", res, " Response: ", CharArrayToString(result)); return false; } string symbolsResponse = CharArrayToString(result); if(StringFind(symbolsResponse, "\"data\"") < 0) { Print("⚠️ Unexpected response format from symbols API: ", symbolsResponse); } for(int i = 0; i < ArraySize(symbols); i++) { string symbolName = symbols[i]; int symbolId = FindSymbolId(symbolsResponse, symbolName); if(symbolId > 0) { symbolIds[i] = symbolId; Print("✅ Found existing symbol: ", symbolName, " (ID: ", symbolId, ")"); } else { Sleep(300); // prevent overload (0.3 second delay) symbolId = CreateSymbolInDatabase(symbolName); if(symbolId > 0) { symbolIds[i] = symbolId; Print("✅ Created new symbol: ", symbolName, " (ID: ", symbolId, ")"); } else { Print("❌ Failed to create symbol: ", symbolName," (ID: ", symbolId, ")"); symbolIds[i] = -1; } } } return true; } //+------------------------------------------------------------------+ //+------------------------------------------------------------------+ //| Find exact symbolId from JSON response | //+------------------------------------------------------------------+ //+------------------------------------------------------------------+ //| Robust JSON search: matches exact symbol only | //+------------------------------------------------------------------+ int FindSymbolId(string response, string symbolName) { int pos = 0; string patternSymbol = "\"symbol\":\""; string patternId = "\"id\":"; while(true) { // find each symbol occurrence int symPos = StringFind(response, patternSymbol, pos); if(symPos < 0) break; // extract actual symbol value int symStart = symPos + StringLen(patternSymbol); int symEnd = StringFind(response, "\"", symStart); if(symEnd < 0) break; string foundSymbol = StringSubstr(response, symStart, symEnd - symStart); // 🟩 DEBUG LOG: show all symbols found // ✅ exact match check (case-sensitive) if(foundSymbol == symbolName) { // find id that comes *before* this symbol entry int blockStart = StringFind(response, patternId, symPos - 100); if(blockStart < 0) blockStart = StringFind(response, patternId, symPos); if(blockStart >= 0) { int idStart = blockStart + StringLen(patternId); int idEnd = StringFind(response, ",", idStart); if(idEnd < 0) idEnd = StringFind(response, "}", idStart); string idStr = StringSubstr(response, idStart, idEnd - idStart); int id = (int)StringToInteger(idStr); // 🟩 Success log Print("✅ Exact match found → symbol='", symbolName, "' | ID=", id); return id; } } // move to next pos = symEnd + 1; } Print("⚠️ No exact match for symbol ", symbolName); return -1; } //+------------------------------------------------------------------+ int CreateSymbolInDatabase(string symbolName) { string baseAsset = ""; string quoteAsset = ""; string exchange = "MT5"; string instrumentType = "forex"; // --- Clean suffixes like ".pro", ".m", ".r", "_i" --- int dotPos = StringFind(symbolName, "."); if(dotPos > 0) symbolName = StringSubstr(symbolName, 0, dotPos); // --- Try basic split for 6-char pairs like EURUSD, GBPJPY, BTCUSD --- if(StringLen(symbolName) >= 6) { baseAsset = StringSubstr(symbolName, 0, 3); quoteAsset = StringSubstr(symbolName, 3, 3); } else { // --- Try alternate detection --- if(StringFind(symbolName, "USD") >= 0) { int pos = StringFind(symbolName, "USD"); baseAsset = StringSubstr(symbolName, 0, pos); quoteAsset = "USD"; } else if(StringFind(symbolName, "EUR") >= 0) { int pos = StringFind(symbolName, "EUR"); baseAsset = StringSubstr(symbolName, 0, pos); quoteAsset = "EUR"; } else { // Fallback safe defaults baseAsset = symbolName; quoteAsset = "USD"; } } // --- Final safety: ensure no empty fields --- if(StringLen(baseAsset) == 0) baseAsset = "UNKNOWN"; if(StringLen(quoteAsset) == 0) quoteAsset = "USD"; // --- Decide instrument type --- if(StringFind(symbolName, "BTC") == 0 || StringFind(symbolName, "ETH") == 0) instrumentType = "crypto"; else if(StringFind(symbolName, "XAU") == 0 || StringFind(symbolName, "XAG") == 0) instrumentType = "metal"; else if(StringFind(symbolName, "US30") == 0 || StringFind(symbolName, "NAS") == 0) instrumentType = "index"; else instrumentType = "forex"; string json = StringFormat( "{\"symbol\":\"%s\",\"baseAsset\":\"%s\",\"quoteAsset\":\"%s\",\"exchange\":\"%s\",\"instrumentType\":\"%s\",\"isActive\":true}", symbolName, baseAsset, quoteAsset, exchange, instrumentType ); string url = ApiBaseUrl + "/api/symbols"; string headers = "Content-Type: application/json\r\n"; string resultHeaders = ""; char postData[]; StringToCharArray(json, postData, 0, CP_UTF8); if(ArraySize(postData) > 0 && postData[ArraySize(postData) - 1] == 0) ArrayResize(postData, ArraySize(postData) - 1); char result[]; int res = WebRequest("POST", url, headers, 5000, postData, result, resultHeaders); if(res != 201 && res != 200) { PrintFormat("❌ Failed to create symbol %s | HTTP %d | Response: %s", symbolName, res, CharArrayToString(result)); return -1; } string createResponse = CharArrayToString(result); int idPos = StringFind(createResponse, "\"id\":"); if(idPos < 0) return -1; int startPos = idPos + 5; int endPos = StringFind(createResponse, ",", startPos); if(endPos < 0) endPos = StringFind(createResponse, "}", startPos); string idStr = StringSubstr(createResponse, startPos, endPos - startPos); return (int)StringToInteger(idStr); } //+------------------------------------------------------------------+ //| Fetch latest stored candle openTime from API | //+------------------------------------------------------------------+ datetime GetLatestCandleTime(int symbolId, string timeframe) { string url = ApiBaseUrl + "/api/candles/" + IntegerToString(symbolId) + "/latest?timeframe=" + timeframe; string headers = "Content-Type: application/json\r\n"; string resultHeaders = ""; char result[]; char emptyData[]; ResetLastError(); int res = WebRequest("GET", url, headers, 10000, emptyData, result, resultHeaders); if(res != 200) { Print("⚠️ Could not fetch latest candle for symbolId=", symbolId, " timeframe=", timeframe, " (HTTP ", res, ")"); return 0; } string response = CharArrayToString(result); int pos = StringFind(response, "\"openTime\":\""); if(pos < 0) { Print("⚠️ No openTime found in response for symbolId=", symbolId, " timeframe=", timeframe); return 0; } pos += StringLen("\"openTime\":\""); int end = StringFind(response, "\"", pos); string openTimeStr = StringSubstr(response, pos, end - pos); int year = (int)StringToInteger(StringSubstr(openTimeStr, 0, 4)); int month = (int)StringToInteger(StringSubstr(openTimeStr, 5, 2)); int day = (int)StringToInteger(StringSubstr(openTimeStr, 8, 2)); int hour = (int)StringToInteger(StringSubstr(openTimeStr, 11, 2)); int min = (int)StringToInteger(StringSubstr(openTimeStr, 14, 2)); int sec = (int)StringToInteger(StringSubstr(openTimeStr, 17, 2)); MqlDateTime t; t.year = year; t.mon = month; t.day = day; t.hour = hour; t.min = min; t.sec = sec; datetime dt = StructToTime(t); PrintFormat("🕓 Latest stored candle for %s (symbolId=%d) = %s", timeframe, symbolId, TimeToString(dt, TIME_DATE|TIME_SECONDS)); return dt; } //+------------------------------------------------------------------+ //+------------------------------------------------------------------+ //| Send all historical candles to the API (Fixed Version) | //+------------------------------------------------------------------+ //+------------------------------------------------------------------+ //| Send all historical candles to the API (Fixed + Timeout Safe) | //+------------------------------------------------------------------+ void SendAllHistoricalCandles() { Print("Starting multi-timeframe historical upload for ", ArraySize(symbols), " symbols..."); for(int i = 0; i < ArraySize(symbols); i++) { string sym = symbols[i]; int symbolId = symbolIds[i]; if(symbolId <= 0) continue; // --- Loop through all timeframes --- for(int tfIndex = 0; tfIndex < ArraySize(Timeframes); tfIndex++) { ENUM_TIMEFRAMES tf = Timeframes[tfIndex]; string tfStr = TimeframeStrings[tfIndex]; PrintFormat("📊 Processing %s timeframe for %s", tfStr, sym); datetime latestApiTime = GetLatestCandleTime(symbolId, tfStr); Sleep(300); int tries = 0; bool historyReady = false; while(tries < 10) { if(SeriesInfoInteger(sym, tf, SERIES_SYNCHRONIZED)) { historyReady = true; break; } PrintFormat("⏳ Waiting for %s (%s) history to load... (try %d/10)", sym, tfStr, tries + 1); Sleep(500); tries++; } if(!historyReady) { PrintFormat("⚠️ Skipping %s (%s) — history not loaded.", sym, tfStr); continue; } MqlRates rates[]; ResetLastError(); int copied = CopyRates(sym, tf, 0, HistoricalCandleCount, rates); if(copied <= 0) { int err = GetLastError(); PrintFormat("⚠️ Failed to copy %s candles (%s) (error %d)", sym, tfStr, err); continue; } int startIndex = 0; for(int j = 0; j < copied; j++) { if(rates[j].time > latestApiTime) { startIndex = j; break; } } int newCount = copied - startIndex; if(newCount <= 0) { PrintFormat("ℹ️ No new %s candles for %s", tfStr, sym); continue; } PrintFormat("🆕 Sending %d new %s candles for %s", newCount, tfStr, sym); int batchSize = 200; int sentTotal = 0; for(int start = startIndex; start < copied; start += batchSize) { int size = MathMin(batchSize, copied - start); string json = BuildCandleJSONFromRates(symbolId, rates, start, size, tfStr, tf); string url = ApiBaseUrl + "/api/candles/bulk"; string response; bool ok = SendJSON(url, json, response); if(!ok) { PrintFormat("❌ Failed to send %s batch for %s (start=%d)", tfStr, sym, start); break; } sentTotal += size; PrintFormat("📤 Sent %d/%d %s candles for %s", sentTotal, newCount, tfStr, sym); } } } Print("✅ Multi-timeframe candle upload finished."); } //+------------------------------------------------------------------+ //| Send live prices of all active symbols | //+------------------------------------------------------------------+ void SendLivePrices() { bool firstItem = true; string json = "{\"prices\":["; int sentCount = 0; for(int i = 0; i < ArraySize(symbols); i++) { string sym = symbols[i]; int symId = symbolIds[i]; if(symId <= 0) continue; // Ensure symbol is visible in Market Watch if(!SymbolSelect(sym, true)) { Print("⚠️ Failed to select symbol: ", sym); continue; } // Read primary prices double bid = SymbolInfoDouble(sym, SYMBOL_BID); double ask = SymbolInfoDouble(sym, SYMBOL_ASK); double last = SymbolInfoDouble(sym, SYMBOL_LAST); // If last = 0 (some providers), use midprice as fallback if(last <= 0 && bid > 0 && ask > 0) last = (bid + ask) / 2.0; // Skip if prices are still invalid if(bid <= 0 || ask <= 0 || last <= 0) { Print("⚠️ Skipping symbol ", sym, " — invalid bid/ask/last (", DoubleToString(bid,8), "/", DoubleToString(ask,8), "/", DoubleToString(last,8), ")"); continue; } // Initialize sizes double bidSize = 0.0; double askSize = 0.0; // Try to fetch market depth (book) and classify volumes by price vs bid/ask MqlBookInfo book[]; if(MarketBookGet(sym, book) && ArraySize(book) > 0) { for(int j = 0; j < ArraySize(book); j++) { double p = book[j].price; double v = book[j].volume; // If price is >= ask => ask side if(p >= ask) askSize += v; // If price is <= bid => bid side else if(p <= bid) bidSize += v; else { // price in-between -> assign to nearer side double distToBid = MathAbs(p - bid); double distToAsk = MathAbs(ask - p); if(distToBid <= distToAsk) bidSize += v; else askSize += v; } } PrintFormat("ℹ️ MarketBook for %s → bid=%.8f ask=%.8f bidSize=%.2f askSize=%.2f (book entries=%d)", sym, bid, ask, bidSize, askSize, ArraySize(book)); } else { // MarketBook not available or empty // Try SymbolInfoTick as fallback MqlTick tick; if(SymbolInfoTick(sym, tick)) { // tick.volume is aggregated tick volume — not exact bid/ask sizes but better than zero double tickVol = (double)tick.volume; if(tickVol > 0.0) { // assign tick volume to both sides conservatively if(bidSize <= 0) bidSize = tickVol; if(askSize <= 0) askSize = tickVol; PrintFormat("ℹ️ tick fallback for %s → tick.volume=%.2f", sym, tickVol); } else { Print("ℹ️ tick available but volume zero for ", sym); } } else { Print("ℹ️ MarketBook and tick not available for ", sym); } } // Final safety: ensure API-required positive numbers // If a side is zero or negative, set to minimal positive 1.0 if(bidSize <= 0.0) bidSize = 1.0; if(askSize <= 0.0) askSize = 1.0; // Build JSON item for this symbol string item = StringFormat( "{\"symbolId\":%d,\"price\":%.8f,\"bid\":%.8f,\"ask\":%.8f,\"bidSize\":%.8f,\"askSize\":%.8f}", symId, last, bid, ask, bidSize, askSize ); // Add to aggregate payload if(!firstItem) json += ","; json += item; firstItem = false; sentCount++; } json += "]}"; if(sentCount == 0) { Print("⚠️ No valid live prices to send right now. Check if market is open and symbols have tick data."); return; } // Log URL and truncated payload for debugging string url = ApiBaseUrl + "/api/live-prices/bulk"; int maxShow = 1000; string payloadLog = (StringLen(json) > maxShow) ? StringSubstr(json, 0, maxShow) + "...(truncated)" : json; Print("📤 Calling API: ", url); Print("📦 Payload (truncated): ", payloadLog); // Send and report string response; bool ok = SendJSON(url, json, response); if(ok) Print("✅ Successfully sent ", sentCount, " live prices to API."); else Print("❌ Failed to send live prices (", sentCount, " items). API response: ", response); } //+------------------------------------------------------------------+ string ToISO8601(datetime t) { MqlDateTime st; TimeToStruct(t, st); return StringFormat("%04d-%02d-%02dT%02d:%02d:%02d.000Z", st.year, st.mon, st.day, st.hour, st.min, st.sec); } string BuildCandleJSONFromRates(int symbolId, MqlRates &rates[], int startIndex, int count, string timeframe, ENUM_TIMEFRAMES tf) { string json = "{\"candles\":["; bool first = true; int ratesSize = ArraySize(rates); for(int i = startIndex; i < startIndex + count && i < ratesSize; i++) { MqlRates r = rates[i]; if(r.time <= 0) continue; datetime open_dt = (datetime)r.time; datetime close_dt = (datetime)(r.time + (datetime)PeriodSeconds(tf)); string openTime = ToISO8601(open_dt); string closeTime = ToISO8601(close_dt); double volume = (r.tick_volume > 0 ? r.tick_volume : 1); double quoteVolume = (r.real_volume > 0 ? r.real_volume : volume); string one = StringFormat( "{\"symbolId\":%d,\"timeframe\":\"%s\",\"openTime\":\"%s\",\"closeTime\":\"%s\",\"open\":%.5f,\"high\":%.5f,\"low\":%.5f,\"close\":%.5f,\"volume\":%.5f,\"tradesCount\":%d,\"quoteVolume\":%.5f}", symbolId, timeframe, openTime, closeTime, r.open, r.high, r.low, r.close, volume, (int)volume, quoteVolume ); if(!first) json += ","; json += one; first = false; } json += "]}"; return json; } //+------------------------------------------------------------------+ bool SendJSON(string url, string json, string &response) { ResetLastError(); char postData[]; StringToCharArray(json, postData, 0, CP_UTF8); // ✅ Remove trailing null terminator if(ArraySize(postData) > 0 && postData[ArraySize(postData) - 1] == 0) ArrayResize(postData, ArraySize(postData) - 1); if(ArraySize(postData) <= 0) { Print("❌ Empty postData for URL: ", url); return false; } char result[]; string headers = "Content-Type: application/json\r\n"; string resultHeaders = ""; int timeout = 15000; int res = WebRequest("POST", url, headers, timeout, postData, result, resultHeaders); if(res == -1) { int err = GetLastError(); Print("WebRequest error: ", err, " url=", url); return false; } response = CharArrayToString(result); if(res == 200 || res == 201) return true; Print("HTTP status ", res, " response: ", response); return false; } //+------------------------------------------------------------------+ //+------------------------------------------------------------------+ //| Cleanup old candles (keep only last 1000) | //+------------------------------------------------------------------+ void CleanupOldCandles(int symbolId) { string url = ApiBaseUrl + "/api/candles/cleanup/" + IntegerToString(symbolId) + "?keep=1000"; string headers = "Content-Type: application/json\r\n"; string resultHeaders = ""; char result[]; char emptyData[]; ResetLastError(); int res = WebRequest("DELETE", url, headers, 10000, emptyData, result, resultHeaders); string response = CharArrayToString(result); if(res == 200 || res == 204) Print("🧹 Cleanup successful for symbolId=", symbolId, " → kept last 1000 candles."); else Print("⚠️ Cleanup failed for symbolId=", symbolId, " HTTP=", res, " Response=", response); } //+------------------------------------------------------------------+ //| Timer event: runs every 60 seconds | //+------------------------------------------------------------------+ void OnTimer() { datetime now = TimeCurrent(); // ✅ Run full candle sync only once every minute if(now - lastCandleSync >= 600) { Print("⏰ Running scheduled candle sync and cleanup..."); SendAllHistoricalCandles(); // ✅ After uploading candles, clean up old ones for(int i = 0; i < ArraySize(symbols); i++) { int symId = symbolIds[i]; if(symId <= 0) continue; CleanupOldCandles(symId); Sleep(500); // small delay to avoid API overload } lastCandleSync = now; Print("✅ Candle sync + cleanup cycle completed."); } } void OnDeinit(const int reason) { EventKillTimer(); }