||
- //+------------------------------------------------------------------+
- //| MarketDataSender.mq5 (Final Fixed Version) |
- //+------------------------------------------------------------------+
- #property strict
- #property description "Fetches all symbols' candles and live prices, sends to API."
- #include <Trade\SymbolInfo.mqh>
- input string ApiBaseUrl = "http://market-price.insightbull.io";
- input int HistoricalCandleCount = 1000;
- input ENUM_TIMEFRAMES HistoricalTimeframe = PERIOD_H1;
- input int LivePriceIntervalSeconds = 5;
- // Globals
- string symbols[];
- int symbolIds[];
- datetime lastSend = 0;
- datetime lastCandleSync = 0;
- // --- Supported timeframes ---
- ENUM_TIMEFRAMES Timeframes[] = { PERIOD_M15, PERIOD_M30, PERIOD_H1, PERIOD_D1, PERIOD_W1, PERIOD_MN1 };
- string TimeframeStrings[] = { "15m", "30m", "1h", "1D", "1W", "1M" };
- //+------------------------------------------------------------------+
- int OnInit()
- {
- Print("Initializing MarketDataSender EA...");
- if(!InitializeSymbols())
- {
- Print("❌ Failed to initialize symbols.");
- return(INIT_FAILED);
- }
- Print("✅ Symbols initialized: ", ArraySize(symbols));
- SendAllHistoricalCandles();
- EventSetTimer(60); // ⏱️ Trigger OnTimer() every 30 minutes
- Print("✅ Timer set: SendAllHistoricalCandles() will run every 30 minutes.");
- return(INIT_SUCCEEDED);
- }
- //+------------------------------------------------------------------+
- void OnTick()
- {
- if(TimeCurrent() - lastSend >= LivePriceIntervalSeconds)
- {
- SendLivePrices();
- lastSend = TimeCurrent();
- }
- }
- //+------------------------------------------------------------------+
- bool InitializeSymbols()
- {
- int total = SymbolsTotal(true);
- if(total <= 0)
- {
- Print("❌ No symbols found!");
- return false;
- }
- ArrayResize(symbols, total);
- ArrayResize(symbolIds, total);
- for(int i = 0; i < total; i++)
- {
- symbols[i] = SymbolName(i, true);
- symbolIds[i] = -1;
- }
- if(!SyncSymbolsWithDatabase())
- {
- Print("❌ Failed to sync symbols with database");
- return false;
- }
- return true;
- }
- //+------------------------------------------------------------------+
- bool SyncSymbolsWithDatabase()
- {
- Print("Syncing symbols with database...");
- string url = ApiBaseUrl + "/api/symbols";
- string headers = "Content-Type: application/json\r\n";
- string resultHeaders = "";
- char result[];
- char emptyData[]; // ✅ required placeholder for GET request
- ResetLastError();
- // ✅ Correct GET request signature: includes empty data[]
- int res = WebRequest("GET", url, headers, 5000, emptyData, result, resultHeaders);
- if(res == -1)
- {
- int err = GetLastError();
- Print("❌ WebRequest connection error: ", err, " URL=", url);
- return false;
- }
- if(res != 200)
- {
- Print("❌ Failed to fetch symbols from API: HTTP ", res, " Response: ", CharArrayToString(result));
- return false;
- }
- string symbolsResponse = CharArrayToString(result);
- if(StringFind(symbolsResponse, "\"data\"") < 0)
- {
- Print("⚠️ Unexpected response format from symbols API: ", symbolsResponse);
- }
- for(int i = 0; i < ArraySize(symbols); i++)
- {
- string symbolName = symbols[i];
- int symbolId = FindSymbolId(symbolsResponse, symbolName);
- if(symbolId > 0)
- {
- symbolIds[i] = symbolId;
- Print("✅ Found existing symbol: ", symbolName, " (ID: ", symbolId, ")");
- }
- else
- {
- Sleep(300); // prevent overload (0.3 second delay)
- symbolId = CreateSymbolInDatabase(symbolName);
- if(symbolId > 0)
- {
- symbolIds[i] = symbolId;
- Print("✅ Created new symbol: ", symbolName, " (ID: ", symbolId, ")");
- }
- else
- {
- Print("❌ Failed to create symbol: ", symbolName," (ID: ", symbolId, ")");
- symbolIds[i] = -1;
- }
- }
- }
- return true;
- }
- //+------------------------------------------------------------------+
- //+------------------------------------------------------------------+
- //| Find exact symbolId from JSON response |
- //+------------------------------------------------------------------+
- //+------------------------------------------------------------------+
- //| Robust JSON search: matches exact symbol only |
- //+------------------------------------------------------------------+
- int FindSymbolId(string response, string symbolName)
- {
- int pos = 0;
- string patternSymbol = "\"symbol\":\"";
- string patternId = "\"id\":";
- while(true)
- {
- // find each symbol occurrence
- int symPos = StringFind(response, patternSymbol, pos);
- if(symPos < 0)
- break;
- // extract actual symbol value
- int symStart = symPos + StringLen(patternSymbol);
- int symEnd = StringFind(response, "\"", symStart);
- if(symEnd < 0) break;
- string foundSymbol = StringSubstr(response, symStart, symEnd - symStart);
- // 🟩 DEBUG LOG: show all symbols found
- // ✅ exact match check (case-sensitive)
- if(foundSymbol == symbolName)
- {
- // find id that comes *before* this symbol entry
- int blockStart = StringFind(response, patternId, symPos - 100);
- if(blockStart < 0)
- blockStart = StringFind(response, patternId, symPos);
- if(blockStart >= 0)
- {
- int idStart = blockStart + StringLen(patternId);
- int idEnd = StringFind(response, ",", idStart);
- if(idEnd < 0) idEnd = StringFind(response, "}", idStart);
- string idStr = StringSubstr(response, idStart, idEnd - idStart);
- int id = (int)StringToInteger(idStr);
- // 🟩 Success log
- Print("✅ Exact match found → symbol='", symbolName, "' | ID=", id);
- return id;
- }
- }
- // move to next
- pos = symEnd + 1;
- }
- Print("⚠️ No exact match for symbol ", symbolName);
- return -1;
- }
- //+------------------------------------------------------------------+
- int CreateSymbolInDatabase(string symbolName)
- {
- string baseAsset = "";
- string quoteAsset = "";
- string exchange = "MT5";
- string instrumentType = "forex";
- // --- Clean suffixes like ".pro", ".m", ".r", "_i" ---
- int dotPos = StringFind(symbolName, ".");
- if(dotPos > 0)
- symbolName = StringSubstr(symbolName, 0, dotPos);
- // --- Try basic split for 6-char pairs like EURUSD, GBPJPY, BTCUSD ---
- if(StringLen(symbolName) >= 6)
- {
- baseAsset = StringSubstr(symbolName, 0, 3);
- quoteAsset = StringSubstr(symbolName, 3, 3);
- }
- else
- {
- // --- Try alternate detection ---
- if(StringFind(symbolName, "USD") >= 0)
- {
- int pos = StringFind(symbolName, "USD");
- baseAsset = StringSubstr(symbolName, 0, pos);
- quoteAsset = "USD";
- }
- else if(StringFind(symbolName, "EUR") >= 0)
- {
- int pos = StringFind(symbolName, "EUR");
- baseAsset = StringSubstr(symbolName, 0, pos);
- quoteAsset = "EUR";
- }
- else
- {
- // Fallback safe defaults
- baseAsset = symbolName;
- quoteAsset = "USD";
- }
- }
- // --- Final safety: ensure no empty fields ---
- if(StringLen(baseAsset) == 0) baseAsset = "UNKNOWN";
- if(StringLen(quoteAsset) == 0) quoteAsset = "USD";
- // --- Decide instrument type ---
- if(StringFind(symbolName, "BTC") == 0 || StringFind(symbolName, "ETH") == 0)
- instrumentType = "crypto";
- else if(StringFind(symbolName, "XAU") == 0 || StringFind(symbolName, "XAG") == 0)
- instrumentType = "metal";
- else if(StringFind(symbolName, "US30") == 0 || StringFind(symbolName, "NAS") == 0)
- instrumentType = "index";
- else
- instrumentType = "forex";
- string json = StringFormat(
- "{\"symbol\":\"%s\",\"baseAsset\":\"%s\",\"quoteAsset\":\"%s\",\"exchange\":\"%s\",\"instrumentType\":\"%s\",\"isActive\":true}",
- symbolName, baseAsset, quoteAsset, exchange, instrumentType
- );
- string url = ApiBaseUrl + "/api/symbols";
- string headers = "Content-Type: application/json\r\n";
- string resultHeaders = "";
- char postData[];
- StringToCharArray(json, postData, 0, CP_UTF8);
- if(ArraySize(postData) > 0 && postData[ArraySize(postData) - 1] == 0)
- ArrayResize(postData, ArraySize(postData) - 1);
- char result[];
- int res = WebRequest("POST", url, headers, 5000, postData, result, resultHeaders);
- if(res != 201 && res != 200)
- {
- PrintFormat("❌ Failed to create symbol %s | HTTP %d | Response: %s", symbolName, res, CharArrayToString(result));
- return -1;
- }
- string createResponse = CharArrayToString(result);
- int idPos = StringFind(createResponse, "\"id\":");
- if(idPos < 0) return -1;
- int startPos = idPos + 5;
- int endPos = StringFind(createResponse, ",", startPos);
- if(endPos < 0) endPos = StringFind(createResponse, "}", startPos);
- string idStr = StringSubstr(createResponse, startPos, endPos - startPos);
- return (int)StringToInteger(idStr);
- }
- //+------------------------------------------------------------------+
- //| Fetch latest stored candle openTime from API |
- //+------------------------------------------------------------------+
- datetime GetLatestCandleTime(int symbolId, string timeframe)
- {
- string url = ApiBaseUrl + "/api/candles/" + IntegerToString(symbolId) + "/latest?timeframe=" + timeframe;
- string headers = "Content-Type: application/json\r\n";
- string resultHeaders = "";
- char result[];
- char emptyData[];
- ResetLastError();
- int res = WebRequest("GET", url, headers, 10000, emptyData, result, resultHeaders);
- if(res != 200)
- {
- Print("⚠️ Could not fetch latest candle for symbolId=", symbolId, " timeframe=", timeframe, " (HTTP ", res, ")");
- return 0;
- }
- string response = CharArrayToString(result);
- int pos = StringFind(response, "\"openTime\":\"");
- if(pos < 0)
- {
- Print("⚠️ No openTime found in response for symbolId=", symbolId, " timeframe=", timeframe);
- return 0;
- }
- pos += StringLen("\"openTime\":\"");
- int end = StringFind(response, "\"", pos);
- string openTimeStr = StringSubstr(response, pos, end - pos);
- int year = (int)StringToInteger(StringSubstr(openTimeStr, 0, 4));
- int month = (int)StringToInteger(StringSubstr(openTimeStr, 5, 2));
- int day = (int)StringToInteger(StringSubstr(openTimeStr, 8, 2));
- int hour = (int)StringToInteger(StringSubstr(openTimeStr, 11, 2));
- int min = (int)StringToInteger(StringSubstr(openTimeStr, 14, 2));
- int sec = (int)StringToInteger(StringSubstr(openTimeStr, 17, 2));
- MqlDateTime t;
- t.year = year; t.mon = month; t.day = day;
- t.hour = hour; t.min = min; t.sec = sec;
- datetime dt = StructToTime(t);
- PrintFormat("🕓 Latest stored candle for %s (symbolId=%d) = %s", timeframe, symbolId, TimeToString(dt, TIME_DATE|TIME_SECONDS));
- return dt;
- }
- //+------------------------------------------------------------------+
- //+------------------------------------------------------------------+
- //| Send all historical candles to the API (Fixed Version) |
- //+------------------------------------------------------------------+
- //+------------------------------------------------------------------+
- //| Send all historical candles to the API (Fixed + Timeout Safe) |
- //+------------------------------------------------------------------+
- void SendAllHistoricalCandles()
- {
- Print("Starting multi-timeframe historical upload for ", ArraySize(symbols), " symbols...");
- for(int i = 0; i < ArraySize(symbols); i++)
- {
- string sym = symbols[i];
- int symbolId = symbolIds[i];
- if(symbolId <= 0) continue;
- // --- Loop through all timeframes ---
- for(int tfIndex = 0; tfIndex < ArraySize(Timeframes); tfIndex++)
- {
- ENUM_TIMEFRAMES tf = Timeframes[tfIndex];
- string tfStr = TimeframeStrings[tfIndex];
- PrintFormat("📊 Processing %s timeframe for %s", tfStr, sym);
- datetime latestApiTime = GetLatestCandleTime(symbolId, tfStr);
- Sleep(300);
- int tries = 0;
- bool historyReady = false;
- while(tries < 10)
- {
- if(SeriesInfoInteger(sym, tf, SERIES_SYNCHRONIZED))
- {
- historyReady = true;
- break;
- }
- PrintFormat("⏳ Waiting for %s (%s) history to load... (try %d/10)", sym, tfStr, tries + 1);
- Sleep(500);
- tries++;
- }
- if(!historyReady)
- {
- PrintFormat("⚠️ Skipping %s (%s) — history not loaded.", sym, tfStr);
- continue;
- }
- MqlRates rates[];
- ResetLastError();
- int copied = CopyRates(sym, tf, 0, HistoricalCandleCount, rates);
- if(copied <= 0)
- {
- int err = GetLastError();
- PrintFormat("⚠️ Failed to copy %s candles (%s) (error %d)", sym, tfStr, err);
- continue;
- }
- int startIndex = 0;
- for(int j = 0; j < copied; j++)
- {
- if(rates[j].time > latestApiTime)
- {
- startIndex = j;
- break;
- }
- }
- int newCount = copied - startIndex;
- if(newCount <= 0)
- {
- PrintFormat("ℹ️ No new %s candles for %s", tfStr, sym);
- continue;
- }
- PrintFormat("🆕 Sending %d new %s candles for %s", newCount, tfStr, sym);
- int batchSize = 200;
- int sentTotal = 0;
- for(int start = startIndex; start < copied; start += batchSize)
- {
- int size = MathMin(batchSize, copied - start);
- string json = BuildCandleJSONFromRates(symbolId, rates, start, size, tfStr, tf);
- string url = ApiBaseUrl + "/api/candles/bulk";
- string response;
- bool ok = SendJSON(url, json, response);
- if(!ok)
- {
- PrintFormat("❌ Failed to send %s batch for %s (start=%d)", tfStr, sym, start);
- break;
- }
- sentTotal += size;
- PrintFormat("📤 Sent %d/%d %s candles for %s", sentTotal, newCount, tfStr, sym);
- }
- }
- }
- Print("✅ Multi-timeframe candle upload finished.");
- }
- //+------------------------------------------------------------------+
- //| Send live prices of all active symbols |
- //+------------------------------------------------------------------+
- void SendLivePrices()
- {
- bool firstItem = true;
- string json = "{\"prices\":[";
- int sentCount = 0;
- for(int i = 0; i < ArraySize(symbols); i++)
- {
- string sym = symbols[i];
- int symId = symbolIds[i];
- if(symId <= 0) continue;
- // Ensure symbol is visible in Market Watch
- if(!SymbolSelect(sym, true))
- {
- Print("⚠️ Failed to select symbol: ", sym);
- continue;
- }
- // Read primary prices
- double bid = SymbolInfoDouble(sym, SYMBOL_BID);
- double ask = SymbolInfoDouble(sym, SYMBOL_ASK);
- double last = SymbolInfoDouble(sym, SYMBOL_LAST);
- // If last = 0 (some providers), use midprice as fallback
- if(last <= 0 && bid > 0 && ask > 0)
- last = (bid + ask) / 2.0;
- // Skip if prices are still invalid
- if(bid <= 0 || ask <= 0 || last <= 0)
- {
- Print("⚠️ Skipping symbol ", sym, " — invalid bid/ask/last (", DoubleToString(bid,8), "/", DoubleToString(ask,8), "/", DoubleToString(last,8), ")");
- continue;
- }
- // Initialize sizes
- double bidSize = 0.0;
- double askSize = 0.0;
- // Try to fetch market depth (book) and classify volumes by price vs bid/ask
- MqlBookInfo book[];
- if(MarketBookGet(sym, book) && ArraySize(book) > 0)
- {
- for(int j = 0; j < ArraySize(book); j++)
- {
- double p = book[j].price;
- double v = book[j].volume;
- // If price is >= ask => ask side
- if(p >= ask) askSize += v;
- // If price is <= bid => bid side
- else if(p <= bid) bidSize += v;
- else
- {
- // price in-between -> assign to nearer side
- double distToBid = MathAbs(p - bid);
- double distToAsk = MathAbs(ask - p);
- if(distToBid <= distToAsk) bidSize += v; else askSize += v;
- }
- }
- PrintFormat("ℹ️ MarketBook for %s → bid=%.8f ask=%.8f bidSize=%.2f askSize=%.2f (book entries=%d)", sym, bid, ask, bidSize, askSize, ArraySize(book));
- }
- else
- {
- // MarketBook not available or empty
- // Try SymbolInfoTick as fallback
- MqlTick tick;
- if(SymbolInfoTick(sym, tick))
- {
- // tick.volume is aggregated tick volume — not exact bid/ask sizes but better than zero
- double tickVol = (double)tick.volume;
- if(tickVol > 0.0)
- {
- // assign tick volume to both sides conservatively
- if(bidSize <= 0) bidSize = tickVol;
- if(askSize <= 0) askSize = tickVol;
- PrintFormat("ℹ️ tick fallback for %s → tick.volume=%.2f", sym, tickVol);
- }
- else
- {
- Print("ℹ️ tick available but volume zero for ", sym);
- }
- }
- else
- {
- Print("ℹ️ MarketBook and tick not available for ", sym);
- }
- }
- // Final safety: ensure API-required positive numbers
- // If a side is zero or negative, set to minimal positive 1.0
- if(bidSize <= 0.0) bidSize = 1.0;
- if(askSize <= 0.0) askSize = 1.0;
- // Build JSON item for this symbol
- string item = StringFormat(
- "{\"symbolId\":%d,\"price\":%.8f,\"bid\":%.8f,\"ask\":%.8f,\"bidSize\":%.8f,\"askSize\":%.8f}",
- symId, last, bid, ask, bidSize, askSize
- );
- // Add to aggregate payload
- if(!firstItem) json += ",";
- json += item;
- firstItem = false;
- sentCount++;
- }
- json += "]}";
- if(sentCount == 0)
- {
- Print("⚠️ No valid live prices to send right now. Check if market is open and symbols have tick data.");
- return;
- }
- // Log URL and truncated payload for debugging
- string url = ApiBaseUrl + "/api/live-prices/bulk";
- int maxShow = 1000;
- string payloadLog = (StringLen(json) > maxShow) ? StringSubstr(json, 0, maxShow) + "...(truncated)" : json;
- Print("📤 Calling API: ", url);
- Print("📦 Payload (truncated): ", payloadLog);
- // Send and report
- string response;
- bool ok = SendJSON(url, json, response);
- if(ok)
- Print("✅ Successfully sent ", sentCount, " live prices to API.");
- else
- Print("❌ Failed to send live prices (", sentCount, " items). API response: ", response);
- }
- //+------------------------------------------------------------------+
- string ToISO8601(datetime t)
- {
- MqlDateTime st;
- TimeToStruct(t, st);
- return StringFormat("%04d-%02d-%02dT%02d:%02d:%02d.000Z", st.year, st.mon, st.day, st.hour, st.min, st.sec);
- }
- string BuildCandleJSONFromRates(int symbolId, MqlRates &rates[], int startIndex, int count, string timeframe, ENUM_TIMEFRAMES tf)
- {
- string json = "{\"candles\":[";
- bool first = true;
- int ratesSize = ArraySize(rates);
- for(int i = startIndex; i < startIndex + count && i < ratesSize; i++)
- {
- MqlRates r = rates[i];
- if(r.time <= 0) continue;
- datetime open_dt = (datetime)r.time;
- datetime close_dt = (datetime)(r.time + (datetime)PeriodSeconds(tf));
- string openTime = ToISO8601(open_dt);
- string closeTime = ToISO8601(close_dt);
- double volume = (r.tick_volume > 0 ? r.tick_volume : 1);
- double quoteVolume = (r.real_volume > 0 ? r.real_volume : volume);
- string one = StringFormat(
- "{\"symbolId\":%d,\"timeframe\":\"%s\",\"openTime\":\"%s\",\"closeTime\":\"%s\",\"open\":%.5f,\"high\":%.5f,\"low\":%.5f,\"close\":%.5f,\"volume\":%.5f,\"tradesCount\":%d,\"quoteVolume\":%.5f}",
- symbolId, timeframe, openTime, closeTime,
- r.open, r.high, r.low, r.close,
- volume, (int)volume, quoteVolume
- );
- if(!first) json += ",";
- json += one;
- first = false;
- }
- json += "]}";
- return json;
- }
- //+------------------------------------------------------------------+
- bool SendJSON(string url, string json, string &response)
- {
- ResetLastError();
- char postData[];
- StringToCharArray(json, postData, 0, CP_UTF8);
- // ✅ Remove trailing null terminator
- if(ArraySize(postData) > 0 && postData[ArraySize(postData) - 1] == 0)
- ArrayResize(postData, ArraySize(postData) - 1);
- if(ArraySize(postData) <= 0)
- {
- Print("❌ Empty postData for URL: ", url);
- return false;
- }
- char result[];
- string headers = "Content-Type: application/json\r\n";
- string resultHeaders = "";
- int timeout = 15000;
- int res = WebRequest("POST", url, headers, timeout, postData, result, resultHeaders);
- if(res == -1)
- {
- int err = GetLastError();
- Print("WebRequest error: ", err, " url=", url);
- return false;
- }
- response = CharArrayToString(result);
- if(res == 200 || res == 201)
- return true;
- Print("HTTP status ", res, " response: ", response);
- return false;
- }
- //+------------------------------------------------------------------+
- //+------------------------------------------------------------------+
- //| Cleanup old candles (keep only last 1000) |
- //+------------------------------------------------------------------+
- void CleanupOldCandles(int symbolId)
- {
- string url = ApiBaseUrl + "/api/candles/cleanup/" + IntegerToString(symbolId) + "?keep=1000";
- string headers = "Content-Type: application/json\r\n";
- string resultHeaders = "";
- char result[];
- char emptyData[];
- ResetLastError();
- int res = WebRequest("DELETE", url, headers, 10000, emptyData, result, resultHeaders);
- string response = CharArrayToString(result);
- if(res == 200 || res == 204)
- Print("🧹 Cleanup successful for symbolId=", symbolId, " → kept last 1000 candles.");
- else
- Print("⚠️ Cleanup failed for symbolId=", symbolId, " HTTP=", res, " Response=", response);
- }
- //+------------------------------------------------------------------+
- //| Timer event: runs every 60 seconds |
- //+------------------------------------------------------------------+
- void OnTimer()
- {
- datetime now = TimeCurrent();
- // ✅ Run full candle sync only once every minute
- if(now - lastCandleSync >= 600)
- {
- Print("⏰ Running scheduled candle sync and cleanup...");
- SendAllHistoricalCandles();
- // ✅ After uploading candles, clean up old ones
- for(int i = 0; i < ArraySize(symbols); i++)
- {
- int symId = symbolIds[i];
- if(symId <= 0) continue;
- CleanupOldCandles(symId);
- Sleep(500); // small delay to avoid API overload
- }
- lastCandleSync = now;
- Print("✅ Candle sync + cleanup cycle completed.");
- }
- }
- void OnDeinit(const int reason)
- {
- EventKillTimer();
- }
|