← source index

src/Commands.cpp

671 lines  ·  25.4 K  ·  cpp
#include "Commands.h"

Commands::Commands()
    : mLRUCache(false, 2)
    , mActiveWebSocId(0)
    , mIsAppRunning(false)
    , mMsgCountPerSec(0)
    , mMsgCountTotal(0)
    , mMaxWriterMs(0)
    , mAvgWriterMs(0)
    , mAvgCounter(0)
    , mDisableReaderThread(false)
{
    mOrderBookMap.reserve(mDisplayData.maxSubsCount);

    mComMap.emplace("h",            CommandInstructions::HELP);
    mComMap.emplace("help",         CommandInstructions::HELP);
    mComMap.emplace("c",            CommandInstructions::CONNECT);
    mComMap.emplace("connect",      CommandInstructions::CONNECT);
    mComMap.emplace("s",            CommandInstructions::SHOW);
    mComMap.emplace("show",         CommandInstructions::SHOW);
    mComMap.emplace("sd",           CommandInstructions::SET_DEPTH);
    mComMap.emplace("set_depth",    CommandInstructions::SET_DEPTH);
    mComMap.emplace("q",            CommandInstructions::QUIT);
    mComMap.emplace("quit",         CommandInstructions::QUIT);

    mComLookup[(uint8_t) CommandInstructions::HELP] = {{"h","help"}, "Showing all available commands"};
    mComLookup[(uint8_t) CommandInstructions::CONNECT] = {{"c","connect"}, "Creates a websocket connection to specified uri"};
    mComLookup[(uint8_t) CommandInstructions::SHOW] = {{"s","show"}, "Shows websocket connection details"};
    mComLookup[(uint8_t) CommandInstructions::SET_DEPTH] = {{"sd","set_depth"}, "Sets the shown order book depth"};
    mComLookup[(uint8_t) CommandInstructions::QUIT] = {{"q","quit"}, "Quitting the program"};
}

std::tuple<bool, std::string, std::string, std::vector<std::string>, std::string> Commands::parseCommand(const std::string& cmd, bool isFromCmd)
{
    std::vector<std::string> argv;
    boost::split(argv, cmd, boost::is_any_of(" "));
    int argc = argv.size();

    if(cmd.size() == 0) return {false, "Run with -h option for help.", "", {}, ""};

    mCommandHistory.emplace_back(cmd);
    if(mCommandHistory.size() > mDisplayData.maxCommandHistory) mCommandHistory.erase(mCommandHistory.begin());

    if(argc == 0)
    {
        std::string err = "Not enough arguments provided. [cmd: " + cmd + ']';
        return {false, err, "", {}, ""};
    }

    std::string rawCmd = argv[0];

    if(rawCmd[0] != '-') 
    {
        std::string err = "Command must start with '-' symbol. [cmd: " + cmd + ']';
        return {false, err, "", {}, ""};
    }

    if(rawCmd.size() == 1)
    {
        std::string err = "Command is missing option\n\tUsage: ./run.sh -[opt] [args]\n\tExample: ./run.sh -h\n\t [cmd: " + cmd + ']';
        return {false, err, "", {}, ""};
    }

    std::string normCmd = rawCmd.substr(1);
    std::vector<std::string> rawArgs;

    if(argc >= 2 + isFromCmd) rawArgs = std::vector<std::string>(argv.begin() + 1, argv.begin() + argc);
    
    return {true, "", normCmd, rawArgs, cmd};
}

std::tuple<LogStatus, std::string, CommandInstructions> Commands::runCommand(const std::string& command, const std::vector<std::string>& args, const std::string& fullComm)
{
    if(command.size() == 0) return{LogStatus::__NONE__, "", CommandInstructions::__NONE__};

    auto foundCommand = mComMap.find(command);

    if(foundCommand == mComMap.end())
    {
        std::string err = "Unknown command. [" + command + ']';
        return {LogStatus::ERROR, err, CommandInstructions::__NONE__};
    }

    CommandInstructions cmd = foundCommand->second;

    switch(cmd)
    {
        case CommandInstructions::HELP:
        {
            std::stringstream helpMsgBody("");
            helpMsgBody << "List of commands:\n";

            for(const auto& valuePair : mComLookup)
            {
                const auto& cmds = valuePair.first;
                const auto& cmdsDescr = valuePair.second;
                std::string commandsStr = "";
                helpMsgBody << '\t';

                for(const auto& cmd : cmds)
                {
                    commandsStr += '-' + cmd + ' ';
                }

                helpMsgBody << commandsStr + '\t' + cmdsDescr + '\n';
            }

            helpMsgBody << "Example: -c api.gemini.com 443 /v2/marketdata BTCUSD";
            return {LogStatus::INFO, helpMsgBody.str(), cmd};
        }

        case CommandInstructions::CONNECT:
        {
            if(args.size() != 4)
            {
                std::string msg = "The connect command requires 4 args. [e.g.: './run.sh -c [endpoint host] [endpoint port] [target] [instrument,instrument,...]'] but received ['" + fullComm + "'] instead";
                return {LogStatus::ERROR, msg, cmd};
            }

            const auto& endpoint = args[0];
            const auto& port = args[1];
            const auto& target = args[2];
            const auto& instruments = args[3];

            std::vector<std::string> instrBuffer;
            boost::split(instrBuffer, instruments, boost::is_any_of(","));

            if(instrBuffer.size() > mDisplayData.maxSubsCount)
            {
                std::string err = "The app only supports a maximum of [" + std::to_string(mDisplayData.maxSubsCount) + "] subscriptions";
                mLog.warn(err);
                instrBuffer.resize(mDisplayData.maxSubsCount);
            }

            auto [id, err] = mWebSock.connect(endpoint, port, target, instruments);

            if(mLRUCache.get(id) == -1)
            {
                mLRUCache.push(id, [this](int id)
                {
                    mWebSock.close(id);
                });
            }

            if (id <= 0)
            {
                std::string msg = "The WebSocket connection failed. ['" + fullComm + "'] Reason: " + err;
                return {LogStatus::ERROR, msg, cmd};
            }

            //If there already exists an active websocket, swap to the new one
            if(mActiveWebSocId)
            {
                mActiveWebSocId = id;
                return {LogStatus::INFO, err, cmd};
            }
            
            mActiveWebSocId = id;
            mIsAppRunning = true;
            std::string finMsg = "Successfully connected to [" + err + "] on connection id [" + std::to_string(id) + ']';
            mLog.info(finMsg);
            connectionHandler(instrBuffer);
            break;
        }

        case CommandInstructions::SHOW:
        {
            break;
        }

        case CommandInstructions::SET_DEPTH:
        {
            if(args.size() != 1)
            {
                std::string msg = "The set_depth command requires 1 arg. [e.g.: './run.sh -c [depth]'] but received ['" + std::to_string(args.size()) + "'] instead";
                return {LogStatus::ERROR, msg, cmd};
            }

            const auto& depth = args[0];

            try
            {
                mDisplayData.orderBookDepth = std::stoi(depth);
            }
            catch(const std::exception& e)
            {
                std::string msg = "The depth value must be a valid number/ Value given was [" + depth + "]";
                return {LogStatus::ERROR, msg, cmd};
            }

            break;
        }

        case CommandInstructions::QUIT:
        {
            mIsAppRunning = false;
            return {LogStatus::INFO, "Closing program...", CommandInstructions::QUIT};
        }

        default:
        {
            std::string msg = "Unhandled command [" + command + "]";
            return {LogStatus::INFO, msg, cmd};
        }
    }

    return {LogStatus::__NONE__, "", CommandInstructions::__NONE__};
}

void Commands::geminiMarketDataFeedHandlerSubscription(std::vector<std::string> instrBuffer, int id)
{
    //Sending sub message
    nlohmann::json subsMsg;
    subsMsg["type"] = "subscribe";
    subsMsg["subscriptions"] = {{
        {"name", "l2"},
        {"symbols", instrBuffer},
        {"top_of_book", false} //Docs specify it is false by default anyways
    }};

    auto [sendSubId, sendSubErr] = mWebSock.send(id, subsMsg.dump());
    if(sendSubId <= 0)
    {
        std::string err = "Failed to send subscription msg: [" + sendSubErr + ']';
        mLog.error(err);
        //mIsAppRunning = false;
        return;
    }
}

void Commands::geminiMarketDataFeedHandler(std::vector<std::string> instrBuffer, int id)
{
    std::string sym = "";
    auto [getSymKeyid, getSymKeyErr] = mWebSock.getSymKey(id);
    if(getSymKeyid <= 0) 
    {
        std::string err = "Failed to get sym key: [" + getSymKeyErr + ']';
        mLog.error(err);
        mIsAppRunning = false;
        return;
    }
    else sym = getSymKeyErr;

    geminiMarketDataFeedHandlerSubscription(instrBuffer, id);

    long long ctr = 0;
    long long secPassed = 0;
    auto handlerClockReference = std::chrono::steady_clock::now();
    while(mIsThreadRunning)
    {
        auto startClockReference = std::chrono::steady_clock::now();
        auto [recvStatus, recvBuffer] = mWebSock.recv(id);
#ifdef DISABLE_NCURSES
        std::cout << "RECV: " << recvBuffer << "ERR: " << recvErr << '\n';
#endif

        //Critical path
        if(recvStatus > 0) 
        {
            auto startClockCritPath = std::chrono::steady_clock::now();
            nlohmann::json recvJson = nlohmann::json::parse(recvBuffer);

            //The first message received from a successfull subscription seems to be a snapshort, ideally I would treat this as an initialization step
            //TODO I should resize/reserve  the sizes for my datastructures here (maps, unordered_maps, vectors, etc). Only once.
            OrderEntryItem item(recvJson);

            OrderEntryItem::L2Data l2data = item.getL2Data();

            if(l2data.type == "l2_updates")
            {
                //After reconnection recovery, re-enable the reader thread
                if(mDisableReaderThread == true) mDisableReaderThread = false;

#ifdef ENABLE_MUTEX
                std::lock_guard<std::mutex> lock(mOrderBookMapMtx);
                auto orderBook = mOrderBookMap.at(l2data.symbol);
#elif defined ENABLE_ATOMIC_LOADSTORE
                auto& sharedOrderBook = mOrderBookMap.at(l2data.symbol);
                auto atomicOrderBook = std::atomic_load(&sharedOrderBook);
                std::shared_ptr<OrderBook> orderBook = std::make_shared<OrderBook>(*atomicOrderBook); //Making a copy to updated on, then atomic store back
                //std::shared_ptr<OrderBook> orderBook = atomicOrderBook; //very bad direct access...  
#endif   
#ifndef ENABLE_ATOMIC_WAITFREE
           
                for(const auto& order : l2data.changes)
                {
                    if(order.quantity)
                        orderBook->addOrder(order.side, order.price, order.quantity);
                    else 
                        orderBook->removeOrder(order.side, order.price);
                }
#else
                mSPSCQueue.enqueue(std::move(item));
#endif
              
#ifdef ENABLE_ATOMIC_LOADSTORE
                //Once done writing to the orderbook, store into the cache (pointer moving)
                std::atomic_store(&sharedOrderBook, orderBook);     
#endif
                auto endClockCritPath = std::chrono::steady_clock::now();
                mWriterTime = std::chrono::duration_cast<std::chrono::microseconds>(endClockCritPath - startClockCritPath).count();
                auto handlerClockNow = std::chrono::duration_cast<std::chrono::microseconds>(endClockCritPath - handlerClockReference).count();
                long long writerTm = mWriterTime;
                auto wms = mWriterTime * 0.001;
                if(wms > mMaxWriterMs) mMaxWriterMs = wms;
                mAvgCounter++;
                ctr++;
                mAvgWriterMs += std::round((wms - mAvgWriterMs) / mAvgCounter);

                if(handlerClockNow >= 1000000)
                {
                    secPassed++;
                    mMsgCountTotal += ctr;
                    mMsgCountPerSec = mMsgCountTotal / secPassed;
                    ctr = 0;
                    
                    handlerClockReference = endClockCritPath;
                }
            }
            else if(!(l2data.type == "trade" || l2data.type == "heartbeat"))
            {
                mLog.info(recvBuffer);
            }

            //if(recvJson["result"] == "error")
            //{
            //    std::string err = "Error while receiving inbound marked data: [" + recvJson["reason"].dump() + "] for subscription msg [" + subsMsg.dump() + ']';
            //    mLog.error(err);
            //    //TODO handle failover here??
            //}
        }
        else if(recvStatus == 0 || recvStatus == -3) //Connection does not exist or disconnected or outage
        {
            //TODO I should 'wait' for a reconnect recovery up to a configurable timeout, maybe
            mLog.error(recvBuffer);
            mLog.warn("Disconnected. Attempting reconnect...");
            auto [reconStatus, reconErr] = mWebSock.reconnect(id);

            if(reconStatus <= 0)
            {
                mLog.error(reconErr);
                std::this_thread::sleep_for(std::chrono::seconds(5));
            }
            else 
            {
                std::string reconMsg = "Reconnection successful on id [" + std::to_string(id) + ']';
                mLog.info(reconMsg);
                mDisableReaderThread = true;
                std::this_thread::sleep_for(std::chrono::seconds(1));
                for(const auto& instr : instrBuffer)
                    mOrderBookMap.at(instr)->clear();
                geminiMarketDataFeedHandlerSubscription(instrBuffer, id);
            }

            continue;
        }
        else if(recvStatus == -2) //Timeout
        {
            //TODO I should 'wait' for a reconnect recovery up to a configurable timeout, maybe
            mLog.error(recvBuffer);
            mDisableReaderThread = true;
            std::this_thread::sleep_for(std::chrono::seconds(1));
            for(const auto& instr : instrBuffer)
                mOrderBookMap.at(instr)->clear();
            geminiMarketDataFeedHandlerSubscription(instrBuffer, id);
            std::this_thread::sleep_for(std::chrono::seconds(5));
            continue;
        }
        else
        {
            mLog.error(recvBuffer);
        }

        //auto endClockReference = std::chrono::steady_clock::now();
        //mWriterTime = std::chrono::duration_cast<std::chrono::microseconds>(endClockReference - startClockReference).count();
        //long long writerTm = mWriterTime;
        //auto wms = mWriterTime / 1000;
        //if(wms > mMaxWriterMs) mMaxWriterMs = wms;
        //mAvgCounter++;
        //mAvgWriterMs += (wms - mAvgWriterMs) / mAvgCounter;
    }

    mWebSock.close(id);

    /*
    //Sending unsub message
    nlohmann::json unsubsMsg;
    unsubsMsg["type"] = "unsubscribe";
    unsubsMsg["subscriptions"] = {{
        {"name", "l2"},
        {"symbols", instrBuffer}
    }};

    auto [sendUnsubId, sendUnsubErr] = mWebSock.send(id, unsubsMsg.dump());
    if(sendUnsubId <= 0)
    {
        std::string err = "Failed to send subscription msg: [" + sendUnsubErr + ']';
        mLog.error(err);
        mIsAppRunning = false;
        return;
    }*/
}

void Commands::connectionHandler(const std::vector<std::string>& instrBuffer)
{
#ifndef DISABLE_NCURSES
    initscr();
    cbreak();
    noecho();
    curs_set(0);

    //getmaxyx(stdscr, yWinMax, xWinMax);

    mOrderBookDisplay = newwin(mDisplayData.hOrdBook, mDisplayData.wOrdBook, mDisplayData.yOrdBook, mDisplayData.xOrdBook);
    mAppCmdDisplay = newwin(mDisplayData.hAppCmd, mDisplayData.wAppCmd, mDisplayData.yAppCmd, mDisplayData.xAppCmd);
    mOutputDisplay = newwin(mDisplayData.hOutput, mDisplayData.wOutput, mDisplayData.yOutput, mDisplayData.xOutput);

    nodelay(mOrderBookDisplay, TRUE); 
    nodelay(mAppCmdDisplay, TRUE);//without this, the rendering seems stuck when using getch()
    nodelay(mOutputDisplay, TRUE);
#endif
    bool displayClockTrigger = true;
    
    mIsThreadRunning = true;

    //I should init my instr subs cache before I start the writer and reader...
    for(const auto& instr : instrBuffer)
        mOrderBookMap[instr] = std::make_shared<OrderBook>(instr);

    std::thread marketDataHandler(&Commands::geminiMarketDataFeedHandler, this, instrBuffer, mActiveWebSocId);

    std::string command = "";
    std::string output = "";
    std::string escChar = "";
    std::string stats = "";
    std::string debug = "";
    bool escCharSet = false;
    auto currentCommandIt = mCommandHistory.end();
    auto displayClockReference = std::chrono::steady_clock::now();
    while(mIsAppRunning)
    {
        //This is primarily used to disable the reader thread temporarily so that the writer thread can catch up with critical data sync-ing (used after reconnection)
        if(mDisableReaderThread)
        {
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
            continue;
        }

        auto startClockReference = std::chrono::steady_clock::now();
#ifndef DISABLE_NCURSES
        debug = "DEBUG-> ";

        auto displayClockNow = std::chrono::steady_clock::now();
        auto startMs = std::chrono::duration_cast<std::chrono::milliseconds>(displayClockNow - displayClockReference).count();

        if(startMs >= mDisplayData.displayClockTriggerDelay)
        {
            displayClockTrigger = !displayClockTrigger;
            displayClockReference = displayClockNow;
        }

        int ch = wgetch(mAppCmdDisplay);

        if(ch != ERR) 
        {
            //I tried the key macros here.. failed miserably, defaulted to the good ol' ASCII table
            if(ch <= 127)
            {
                switch(ch)
                {
                    case 27: //Manually handling escape characters for now...
                    {
                        escCharSet = true;
                        break;
                    }

                    case 10:
                    {
                        auto [status, err, normCmd, rawArgs, entireComm] = parseCommand(command);
                        currentCommandIt = mCommandHistory.end();
                        if(!status && err.size() != 0)
                        {
                            output = err;
                            mLog.error(err);
                        }
                        else
                        {
                            auto [res_type, res, cmd] = runCommand(normCmd, rawArgs, entireComm);
                            if(res_type != LogStatus::__NONE__)
                            {
                                output = res;
                                mLog.log(res_type, res);
                            }
                            else output = "";
                        }

                        command = "";
                        break;
                    }

                    case 127:
                    {
                        if (!command.empty()) command.pop_back();
                        break;
                    }

                    default:
                    {
                        if(!escCharSet)
                        {
                            if(command.size() < mDisplayData.maxDisplayCmdInput) command += ch;
                        }
                        else 
                        {
                            escChar.push_back(ch);
  
                            if(escChar == "[B")
                            {
                                auto prevCommandIt = std::next(currentCommandIt);

                                if(prevCommandIt != mCommandHistory.begin())
                                {
                                    command = *prevCommandIt;
                                    currentCommandIt = prevCommandIt;
                                }
                            }
                            if(escChar == "[A")
                            {
                                auto nextCommandIt = std::prev(currentCommandIt);
                                
                                if(nextCommandIt != mCommandHistory.end())
                                {
                                    command = *nextCommandIt;
                                    currentCommandIt = nextCommandIt;
                                }
                            }

                            //if(escChar == "[D");
                            //if(escChar == "[C");

                            if(escChar.size() >= 2)
                            {
                                escChar = "";
                                escCharSet = false;
                            }
                        }
                    }
                }
            }
            else
            {
                //Handling special cases separately
                switch(ch)
                {
                    case KEY_RESIZE:
                    {
                        //Forcing fixed size + pos, avoiding ncurses implicit resizing problems
                        wresize(mOrderBookDisplay, mDisplayData.hOrdBook, mDisplayData.wOrdBook);
                        mvwin(mOrderBookDisplay, mDisplayData.yOrdBook, mDisplayData.xOrdBook);
                        wresize(mAppCmdDisplay, mDisplayData.hAppCmd, mDisplayData.wAppCmd);
                        mvwin(mAppCmdDisplay, mDisplayData.yAppCmd, mDisplayData.xAppCmd);
                        wresize(mOutputDisplay, mDisplayData.hOutput, mDisplayData.wOutput);
                        mvwin(mOutputDisplay, mDisplayData.yOutput, mDisplayData.xOutput);
                    }
                }
            }
        }

        //Rendering top window
        werase(mOrderBookDisplay);
        box(mOrderBookDisplay, 0, 0);
        std::string topTitle = "Live data";
        mvwprintw(mOrderBookDisplay, 0, (mDisplayData.wOrdBook - topTitle.size()) * 0.5, "%s", topTitle.c_str());
        
        int bookAlign = 1;
        mDisplayData.currentSubsCount = 0;

#if defined ENABLE_ATOMIC_WAITFREE
        OrderEntryItem queueData;
        if(mSPSCQueue.dequeue(queueData))
        {
            auto l2data = queueData.getL2Data();
            auto orderBook = mOrderBookMap.at(l2data.symbol);
            for(const auto& order : l2data.changes)
            {
                if(order.quantity)
                    orderBook->addOrder(order.side, order.price, order.quantity);
                else 
                    orderBook->removeOrder(order.side, order.price);
            }
        }
#endif
        for (auto& [symbol, cachedBook] : mOrderBookMap)
        {
#ifdef ENABLE_MUTEX
            std::lock_guard<std::mutex> lock(mOrderBookMapMtx);
            std::weak_ptr<OrderBook> weakBook = cachedBook;
            auto bookSnap = weakBook.lock();
#elif defined ENABLE_ATOMIC_LOADSTORE
            auto bookSnap = std::atomic_load(&cachedBook);
            if(!bookSnap) continue;
#elif defined ENABLE_ATOMIC_WAITFREE
            const auto& bookSnap = cachedBook;
#endif
            std::string book = bookSnap->toStr(mDisplayData.orderBookDepth);

            if(book != "")
            {
                std::vector<std::string> bookLinesBuffer;
                boost::split(bookLinesBuffer, book, boost::is_any_of("\n"));
                int verticalOffset = std::floor(mDisplayData.currentSubsCount / mDisplayData.maxSubsCountPerRow);

                if(mDisplayData.currentSubsCount % mDisplayData.maxSubsCountPerRow == 0) bookAlign = 1;

                for(int i = 0; i < bookLinesBuffer.size(); i++)
                {
                    const auto& bookLine = bookLinesBuffer[i];
                    mvwprintw(mOrderBookDisplay, verticalOffset + verticalOffset * (mDisplayData.orderBookDepth + 1) + i + 1, bookAlign + 1, "%s", bookLine.c_str());
                }
                
                mDisplayData.currentSubsCount++;
                bookAlign += bookSnap->getBookWidth() + 5; //5 is just an offest, distance between books
            }

            debug += symbol + ":" + std::to_string(bookSnap.use_count()) + ':' + std::to_string(bookSnap->getBidBookSize()) + ':' + std::to_string(bookSnap->getAskBookSize()) + ' ';
        }
        
        wrefresh(mOrderBookDisplay);

        //Rendering bottom window
        werase(mAppCmdDisplay);
        box(mAppCmdDisplay, 0, 0);
        if(displayClockTrigger) mvwprintw(mAppCmdDisplay, std::floor(mDisplayData.hAppCmd * 0.5), 2, "%s", "> ");
        mvwprintw(mAppCmdDisplay, std::floor(mDisplayData.hAppCmd * 0.5), 4, "%s", command.c_str());
        wrefresh(mAppCmdDisplay);

        //Rendering output window
        werase(mOutputDisplay);
        mvwprintw(mOutputDisplay, 0, 1, "%s", stats.c_str());
        mvwprintw(mOutputDisplay, 1, 1, "%s", debug.c_str());
        if(output != "") mvwprintw(mOutputDisplay, 3, 1, "%s", output.c_str());
        wrefresh(mOutputDisplay);

        //std::this_thread::sleep_for(std::chrono::milliseconds(100));
        auto endClockReference = std::chrono::steady_clock::now();
        auto microsec = std::chrono::duration_cast<std::chrono::microseconds>(endClockReference - startClockReference).count();

        auto rms = microsec / 1000;
        auto rus = microsec % 1000;
        auto wms = mWriterTime / 1000;
        auto wus = mWriterTime % 1000;

        //RMS = reader ms
        //WMS = writer ms
        //WMMS = writer max ms - take with a pinch of salt, it's not accurate
        //WAMS = writer avg ms - take with a pinch of salt, it's not accurate
        stats = "STATS-> WMMS: " + std::to_string(mMaxWriterMs) + " WAMS: "  + std::to_string(mAvgWriterMs) + " RMS:" + 
            std::to_string(rms) + "." + std::to_string(rus) + " WMS: " + std::to_string(wms) + "." + std::to_string(wus) +
            " MPS: " + std::to_string(mMsgCountPerSec) + " MSGS: " + std::to_string(mMsgCountTotal);
#else
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
#endif
    }

#ifndef DISABLE_NCURSES
    endwin();
#endif
    mIsThreadRunning = false;
    marketDataHandler.join();
}