#include "Commands.h"
Commands::Commands()
: mLRUCache(false, 2)
, mActiveWebSocId(0)
, mIsAppRunning(false)
, mMsgCountPerSec(0)
, mMsgCountTotal(0)
, mMaxWriterMs(0)
, mAvgWriterMs(0)
, mAvgCounter(0)
, mDisableReaderThread(false)
{
mOrderBookMap.reserve(mDisplayData.maxSubsCount);
mComMap.emplace("h", CommandInstructions::HELP);
mComMap.emplace("help", CommandInstructions::HELP);
mComMap.emplace("c", CommandInstructions::CONNECT);
mComMap.emplace("connect", CommandInstructions::CONNECT);
mComMap.emplace("s", CommandInstructions::SHOW);
mComMap.emplace("show", CommandInstructions::SHOW);
mComMap.emplace("sd", CommandInstructions::SET_DEPTH);
mComMap.emplace("set_depth", CommandInstructions::SET_DEPTH);
mComMap.emplace("q", CommandInstructions::QUIT);
mComMap.emplace("quit", CommandInstructions::QUIT);
mComLookup[(uint8_t) CommandInstructions::HELP] = {{"h","help"}, "Showing all available commands"};
mComLookup[(uint8_t) CommandInstructions::CONNECT] = {{"c","connect"}, "Creates a websocket connection to specified uri"};
mComLookup[(uint8_t) CommandInstructions::SHOW] = {{"s","show"}, "Shows websocket connection details"};
mComLookup[(uint8_t) CommandInstructions::SET_DEPTH] = {{"sd","set_depth"}, "Sets the shown order book depth"};
mComLookup[(uint8_t) CommandInstructions::QUIT] = {{"q","quit"}, "Quitting the program"};
}
std::tuple<bool, std::string, std::string, std::vector<std::string>, std::string> Commands::parseCommand(const std::string& cmd, bool isFromCmd)
{
std::vector<std::string> argv;
boost::split(argv, cmd, boost::is_any_of(" "));
int argc = argv.size();
if(cmd.size() == 0) return {false, "Run with -h option for help.", "", {}, ""};
mCommandHistory.emplace_back(cmd);
if(mCommandHistory.size() > mDisplayData.maxCommandHistory) mCommandHistory.erase(mCommandHistory.begin());
if(argc == 0)
{
std::string err = "Not enough arguments provided. [cmd: " + cmd + ']';
return {false, err, "", {}, ""};
}
std::string rawCmd = argv[0];
if(rawCmd[0] != '-')
{
std::string err = "Command must start with '-' symbol. [cmd: " + cmd + ']';
return {false, err, "", {}, ""};
}
if(rawCmd.size() == 1)
{
std::string err = "Command is missing option\n\tUsage: ./run.sh -[opt] [args]\n\tExample: ./run.sh -h\n\t [cmd: " + cmd + ']';
return {false, err, "", {}, ""};
}
std::string normCmd = rawCmd.substr(1);
std::vector<std::string> rawArgs;
if(argc >= 2 + isFromCmd) rawArgs = std::vector<std::string>(argv.begin() + 1, argv.begin() + argc);
return {true, "", normCmd, rawArgs, cmd};
}
std::tuple<LogStatus, std::string, CommandInstructions> Commands::runCommand(const std::string& command, const std::vector<std::string>& args, const std::string& fullComm)
{
if(command.size() == 0) return{LogStatus::__NONE__, "", CommandInstructions::__NONE__};
auto foundCommand = mComMap.find(command);
if(foundCommand == mComMap.end())
{
std::string err = "Unknown command. [" + command + ']';
return {LogStatus::ERROR, err, CommandInstructions::__NONE__};
}
CommandInstructions cmd = foundCommand->second;
switch(cmd)
{
case CommandInstructions::HELP:
{
std::stringstream helpMsgBody("");
helpMsgBody << "List of commands:\n";
for(const auto& valuePair : mComLookup)
{
const auto& cmds = valuePair.first;
const auto& cmdsDescr = valuePair.second;
std::string commandsStr = "";
helpMsgBody << '\t';
for(const auto& cmd : cmds)
{
commandsStr += '-' + cmd + ' ';
}
helpMsgBody << commandsStr + '\t' + cmdsDescr + '\n';
}
helpMsgBody << "Example: -c api.gemini.com 443 /v2/marketdata BTCUSD";
return {LogStatus::INFO, helpMsgBody.str(), cmd};
}
case CommandInstructions::CONNECT:
{
if(args.size() != 4)
{
std::string msg = "The connect command requires 4 args. [e.g.: './run.sh -c [endpoint host] [endpoint port] [target] [instrument,instrument,...]'] but received ['" + fullComm + "'] instead";
return {LogStatus::ERROR, msg, cmd};
}
const auto& endpoint = args[0];
const auto& port = args[1];
const auto& target = args[2];
const auto& instruments = args[3];
std::vector<std::string> instrBuffer;
boost::split(instrBuffer, instruments, boost::is_any_of(","));
if(instrBuffer.size() > mDisplayData.maxSubsCount)
{
std::string err = "The app only supports a maximum of [" + std::to_string(mDisplayData.maxSubsCount) + "] subscriptions";
mLog.warn(err);
instrBuffer.resize(mDisplayData.maxSubsCount);
}
auto [id, err] = mWebSock.connect(endpoint, port, target, instruments);
if(mLRUCache.get(id) == -1)
{
mLRUCache.push(id, [this](int id)
{
mWebSock.close(id);
});
}
if (id <= 0)
{
std::string msg = "The WebSocket connection failed. ['" + fullComm + "'] Reason: " + err;
return {LogStatus::ERROR, msg, cmd};
}
//If there already exists an active websocket, swap to the new one
if(mActiveWebSocId)
{
mActiveWebSocId = id;
return {LogStatus::INFO, err, cmd};
}
mActiveWebSocId = id;
mIsAppRunning = true;
std::string finMsg = "Successfully connected to [" + err + "] on connection id [" + std::to_string(id) + ']';
mLog.info(finMsg);
connectionHandler(instrBuffer);
break;
}
case CommandInstructions::SHOW:
{
break;
}
case CommandInstructions::SET_DEPTH:
{
if(args.size() != 1)
{
std::string msg = "The set_depth command requires 1 arg. [e.g.: './run.sh -c [depth]'] but received ['" + std::to_string(args.size()) + "'] instead";
return {LogStatus::ERROR, msg, cmd};
}
const auto& depth = args[0];
try
{
mDisplayData.orderBookDepth = std::stoi(depth);
}
catch(const std::exception& e)
{
std::string msg = "The depth value must be a valid number/ Value given was [" + depth + "]";
return {LogStatus::ERROR, msg, cmd};
}
break;
}
case CommandInstructions::QUIT:
{
mIsAppRunning = false;
return {LogStatus::INFO, "Closing program...", CommandInstructions::QUIT};
}
default:
{
std::string msg = "Unhandled command [" + command + "]";
return {LogStatus::INFO, msg, cmd};
}
}
return {LogStatus::__NONE__, "", CommandInstructions::__NONE__};
}
void Commands::geminiMarketDataFeedHandlerSubscription(std::vector<std::string> instrBuffer, int id)
{
//Sending sub message
nlohmann::json subsMsg;
subsMsg["type"] = "subscribe";
subsMsg["subscriptions"] = {{
{"name", "l2"},
{"symbols", instrBuffer},
{"top_of_book", false} //Docs specify it is false by default anyways
}};
auto [sendSubId, sendSubErr] = mWebSock.send(id, subsMsg.dump());
if(sendSubId <= 0)
{
std::string err = "Failed to send subscription msg: [" + sendSubErr + ']';
mLog.error(err);
//mIsAppRunning = false;
return;
}
}
void Commands::geminiMarketDataFeedHandler(std::vector<std::string> instrBuffer, int id)
{
std::string sym = "";
auto [getSymKeyid, getSymKeyErr] = mWebSock.getSymKey(id);
if(getSymKeyid <= 0)
{
std::string err = "Failed to get sym key: [" + getSymKeyErr + ']';
mLog.error(err);
mIsAppRunning = false;
return;
}
else sym = getSymKeyErr;
geminiMarketDataFeedHandlerSubscription(instrBuffer, id);
long long ctr = 0;
long long secPassed = 0;
auto handlerClockReference = std::chrono::steady_clock::now();
while(mIsThreadRunning)
{
auto startClockReference = std::chrono::steady_clock::now();
auto [recvStatus, recvBuffer] = mWebSock.recv(id);
#ifdef DISABLE_NCURSES
std::cout << "RECV: " << recvBuffer << "ERR: " << recvErr << '\n';
#endif
//Critical path
if(recvStatus > 0)
{
auto startClockCritPath = std::chrono::steady_clock::now();
nlohmann::json recvJson = nlohmann::json::parse(recvBuffer);
//The first message received from a successfull subscription seems to be a snapshort, ideally I would treat this as an initialization step
//TODO I should resize/reserve the sizes for my datastructures here (maps, unordered_maps, vectors, etc). Only once.
OrderEntryItem item(recvJson);
OrderEntryItem::L2Data l2data = item.getL2Data();
if(l2data.type == "l2_updates")
{
//After reconnection recovery, re-enable the reader thread
if(mDisableReaderThread == true) mDisableReaderThread = false;
#ifdef ENABLE_MUTEX
std::lock_guard<std::mutex> lock(mOrderBookMapMtx);
auto orderBook = mOrderBookMap.at(l2data.symbol);
#elif defined ENABLE_ATOMIC_LOADSTORE
auto& sharedOrderBook = mOrderBookMap.at(l2data.symbol);
auto atomicOrderBook = std::atomic_load(&sharedOrderBook);
std::shared_ptr<OrderBook> orderBook = std::make_shared<OrderBook>(*atomicOrderBook); //Making a copy to updated on, then atomic store back
//std::shared_ptr<OrderBook> orderBook = atomicOrderBook; //very bad direct access...
#endif
#ifndef ENABLE_ATOMIC_WAITFREE
for(const auto& order : l2data.changes)
{
if(order.quantity)
orderBook->addOrder(order.side, order.price, order.quantity);
else
orderBook->removeOrder(order.side, order.price);
}
#else
mSPSCQueue.enqueue(std::move(item));
#endif
#ifdef ENABLE_ATOMIC_LOADSTORE
//Once done writing to the orderbook, store into the cache (pointer moving)
std::atomic_store(&sharedOrderBook, orderBook);
#endif
auto endClockCritPath = std::chrono::steady_clock::now();
mWriterTime = std::chrono::duration_cast<std::chrono::microseconds>(endClockCritPath - startClockCritPath).count();
auto handlerClockNow = std::chrono::duration_cast<std::chrono::microseconds>(endClockCritPath - handlerClockReference).count();
long long writerTm = mWriterTime;
auto wms = mWriterTime * 0.001;
if(wms > mMaxWriterMs) mMaxWriterMs = wms;
mAvgCounter++;
ctr++;
mAvgWriterMs += std::round((wms - mAvgWriterMs) / mAvgCounter);
if(handlerClockNow >= 1000000)
{
secPassed++;
mMsgCountTotal += ctr;
mMsgCountPerSec = mMsgCountTotal / secPassed;
ctr = 0;
handlerClockReference = endClockCritPath;
}
}
else if(!(l2data.type == "trade" || l2data.type == "heartbeat"))
{
mLog.info(recvBuffer);
}
//if(recvJson["result"] == "error")
//{
// std::string err = "Error while receiving inbound marked data: [" + recvJson["reason"].dump() + "] for subscription msg [" + subsMsg.dump() + ']';
// mLog.error(err);
// //TODO handle failover here??
//}
}
else if(recvStatus == 0 || recvStatus == -3) //Connection does not exist or disconnected or outage
{
//TODO I should 'wait' for a reconnect recovery up to a configurable timeout, maybe
mLog.error(recvBuffer);
mLog.warn("Disconnected. Attempting reconnect...");
auto [reconStatus, reconErr] = mWebSock.reconnect(id);
if(reconStatus <= 0)
{
mLog.error(reconErr);
std::this_thread::sleep_for(std::chrono::seconds(5));
}
else
{
std::string reconMsg = "Reconnection successful on id [" + std::to_string(id) + ']';
mLog.info(reconMsg);
mDisableReaderThread = true;
std::this_thread::sleep_for(std::chrono::seconds(1));
for(const auto& instr : instrBuffer)
mOrderBookMap.at(instr)->clear();
geminiMarketDataFeedHandlerSubscription(instrBuffer, id);
}
continue;
}
else if(recvStatus == -2) //Timeout
{
//TODO I should 'wait' for a reconnect recovery up to a configurable timeout, maybe
mLog.error(recvBuffer);
mDisableReaderThread = true;
std::this_thread::sleep_for(std::chrono::seconds(1));
for(const auto& instr : instrBuffer)
mOrderBookMap.at(instr)->clear();
geminiMarketDataFeedHandlerSubscription(instrBuffer, id);
std::this_thread::sleep_for(std::chrono::seconds(5));
continue;
}
else
{
mLog.error(recvBuffer);
}
//auto endClockReference = std::chrono::steady_clock::now();
//mWriterTime = std::chrono::duration_cast<std::chrono::microseconds>(endClockReference - startClockReference).count();
//long long writerTm = mWriterTime;
//auto wms = mWriterTime / 1000;
//if(wms > mMaxWriterMs) mMaxWriterMs = wms;
//mAvgCounter++;
//mAvgWriterMs += (wms - mAvgWriterMs) / mAvgCounter;
}
mWebSock.close(id);
/*
//Sending unsub message
nlohmann::json unsubsMsg;
unsubsMsg["type"] = "unsubscribe";
unsubsMsg["subscriptions"] = {{
{"name", "l2"},
{"symbols", instrBuffer}
}};
auto [sendUnsubId, sendUnsubErr] = mWebSock.send(id, unsubsMsg.dump());
if(sendUnsubId <= 0)
{
std::string err = "Failed to send subscription msg: [" + sendUnsubErr + ']';
mLog.error(err);
mIsAppRunning = false;
return;
}*/
}
void Commands::connectionHandler(const std::vector<std::string>& instrBuffer)
{
#ifndef DISABLE_NCURSES
initscr();
cbreak();
noecho();
curs_set(0);
//getmaxyx(stdscr, yWinMax, xWinMax);
mOrderBookDisplay = newwin(mDisplayData.hOrdBook, mDisplayData.wOrdBook, mDisplayData.yOrdBook, mDisplayData.xOrdBook);
mAppCmdDisplay = newwin(mDisplayData.hAppCmd, mDisplayData.wAppCmd, mDisplayData.yAppCmd, mDisplayData.xAppCmd);
mOutputDisplay = newwin(mDisplayData.hOutput, mDisplayData.wOutput, mDisplayData.yOutput, mDisplayData.xOutput);
nodelay(mOrderBookDisplay, TRUE);
nodelay(mAppCmdDisplay, TRUE);//without this, the rendering seems stuck when using getch()
nodelay(mOutputDisplay, TRUE);
#endif
bool displayClockTrigger = true;
mIsThreadRunning = true;
//I should init my instr subs cache before I start the writer and reader...
for(const auto& instr : instrBuffer)
mOrderBookMap[instr] = std::make_shared<OrderBook>(instr);
std::thread marketDataHandler(&Commands::geminiMarketDataFeedHandler, this, instrBuffer, mActiveWebSocId);
std::string command = "";
std::string output = "";
std::string escChar = "";
std::string stats = "";
std::string debug = "";
bool escCharSet = false;
auto currentCommandIt = mCommandHistory.end();
auto displayClockReference = std::chrono::steady_clock::now();
while(mIsAppRunning)
{
//This is primarily used to disable the reader thread temporarily so that the writer thread can catch up with critical data sync-ing (used after reconnection)
if(mDisableReaderThread)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
auto startClockReference = std::chrono::steady_clock::now();
#ifndef DISABLE_NCURSES
debug = "DEBUG-> ";
auto displayClockNow = std::chrono::steady_clock::now();
auto startMs = std::chrono::duration_cast<std::chrono::milliseconds>(displayClockNow - displayClockReference).count();
if(startMs >= mDisplayData.displayClockTriggerDelay)
{
displayClockTrigger = !displayClockTrigger;
displayClockReference = displayClockNow;
}
int ch = wgetch(mAppCmdDisplay);
if(ch != ERR)
{
//I tried the key macros here.. failed miserably, defaulted to the good ol' ASCII table
if(ch <= 127)
{
switch(ch)
{
case 27: //Manually handling escape characters for now...
{
escCharSet = true;
break;
}
case 10:
{
auto [status, err, normCmd, rawArgs, entireComm] = parseCommand(command);
currentCommandIt = mCommandHistory.end();
if(!status && err.size() != 0)
{
output = err;
mLog.error(err);
}
else
{
auto [res_type, res, cmd] = runCommand(normCmd, rawArgs, entireComm);
if(res_type != LogStatus::__NONE__)
{
output = res;
mLog.log(res_type, res);
}
else output = "";
}
command = "";
break;
}
case 127:
{
if (!command.empty()) command.pop_back();
break;
}
default:
{
if(!escCharSet)
{
if(command.size() < mDisplayData.maxDisplayCmdInput) command += ch;
}
else
{
escChar.push_back(ch);
if(escChar == "[B")
{
auto prevCommandIt = std::next(currentCommandIt);
if(prevCommandIt != mCommandHistory.begin())
{
command = *prevCommandIt;
currentCommandIt = prevCommandIt;
}
}
if(escChar == "[A")
{
auto nextCommandIt = std::prev(currentCommandIt);
if(nextCommandIt != mCommandHistory.end())
{
command = *nextCommandIt;
currentCommandIt = nextCommandIt;
}
}
//if(escChar == "[D");
//if(escChar == "[C");
if(escChar.size() >= 2)
{
escChar = "";
escCharSet = false;
}
}
}
}
}
else
{
//Handling special cases separately
switch(ch)
{
case KEY_RESIZE:
{
//Forcing fixed size + pos, avoiding ncurses implicit resizing problems
wresize(mOrderBookDisplay, mDisplayData.hOrdBook, mDisplayData.wOrdBook);
mvwin(mOrderBookDisplay, mDisplayData.yOrdBook, mDisplayData.xOrdBook);
wresize(mAppCmdDisplay, mDisplayData.hAppCmd, mDisplayData.wAppCmd);
mvwin(mAppCmdDisplay, mDisplayData.yAppCmd, mDisplayData.xAppCmd);
wresize(mOutputDisplay, mDisplayData.hOutput, mDisplayData.wOutput);
mvwin(mOutputDisplay, mDisplayData.yOutput, mDisplayData.xOutput);
}
}
}
}
//Rendering top window
werase(mOrderBookDisplay);
box(mOrderBookDisplay, 0, 0);
std::string topTitle = "Live data";
mvwprintw(mOrderBookDisplay, 0, (mDisplayData.wOrdBook - topTitle.size()) * 0.5, "%s", topTitle.c_str());
int bookAlign = 1;
mDisplayData.currentSubsCount = 0;
#if defined ENABLE_ATOMIC_WAITFREE
OrderEntryItem queueData;
if(mSPSCQueue.dequeue(queueData))
{
auto l2data = queueData.getL2Data();
auto orderBook = mOrderBookMap.at(l2data.symbol);
for(const auto& order : l2data.changes)
{
if(order.quantity)
orderBook->addOrder(order.side, order.price, order.quantity);
else
orderBook->removeOrder(order.side, order.price);
}
}
#endif
for (auto& [symbol, cachedBook] : mOrderBookMap)
{
#ifdef ENABLE_MUTEX
std::lock_guard<std::mutex> lock(mOrderBookMapMtx);
std::weak_ptr<OrderBook> weakBook = cachedBook;
auto bookSnap = weakBook.lock();
#elif defined ENABLE_ATOMIC_LOADSTORE
auto bookSnap = std::atomic_load(&cachedBook);
if(!bookSnap) continue;
#elif defined ENABLE_ATOMIC_WAITFREE
const auto& bookSnap = cachedBook;
#endif
std::string book = bookSnap->toStr(mDisplayData.orderBookDepth);
if(book != "")
{
std::vector<std::string> bookLinesBuffer;
boost::split(bookLinesBuffer, book, boost::is_any_of("\n"));
int verticalOffset = std::floor(mDisplayData.currentSubsCount / mDisplayData.maxSubsCountPerRow);
if(mDisplayData.currentSubsCount % mDisplayData.maxSubsCountPerRow == 0) bookAlign = 1;
for(int i = 0; i < bookLinesBuffer.size(); i++)
{
const auto& bookLine = bookLinesBuffer[i];
mvwprintw(mOrderBookDisplay, verticalOffset + verticalOffset * (mDisplayData.orderBookDepth + 1) + i + 1, bookAlign + 1, "%s", bookLine.c_str());
}
mDisplayData.currentSubsCount++;
bookAlign += bookSnap->getBookWidth() + 5; //5 is just an offest, distance between books
}
debug += symbol + ":" + std::to_string(bookSnap.use_count()) + ':' + std::to_string(bookSnap->getBidBookSize()) + ':' + std::to_string(bookSnap->getAskBookSize()) + ' ';
}
wrefresh(mOrderBookDisplay);
//Rendering bottom window
werase(mAppCmdDisplay);
box(mAppCmdDisplay, 0, 0);
if(displayClockTrigger) mvwprintw(mAppCmdDisplay, std::floor(mDisplayData.hAppCmd * 0.5), 2, "%s", "> ");
mvwprintw(mAppCmdDisplay, std::floor(mDisplayData.hAppCmd * 0.5), 4, "%s", command.c_str());
wrefresh(mAppCmdDisplay);
//Rendering output window
werase(mOutputDisplay);
mvwprintw(mOutputDisplay, 0, 1, "%s", stats.c_str());
mvwprintw(mOutputDisplay, 1, 1, "%s", debug.c_str());
if(output != "") mvwprintw(mOutputDisplay, 3, 1, "%s", output.c_str());
wrefresh(mOutputDisplay);
//std::this_thread::sleep_for(std::chrono::milliseconds(100));
auto endClockReference = std::chrono::steady_clock::now();
auto microsec = std::chrono::duration_cast<std::chrono::microseconds>(endClockReference - startClockReference).count();
auto rms = microsec / 1000;
auto rus = microsec % 1000;
auto wms = mWriterTime / 1000;
auto wus = mWriterTime % 1000;
//RMS = reader ms
//WMS = writer ms
//WMMS = writer max ms - take with a pinch of salt, it's not accurate
//WAMS = writer avg ms - take with a pinch of salt, it's not accurate
stats = "STATS-> WMMS: " + std::to_string(mMaxWriterMs) + " WAMS: " + std::to_string(mAvgWriterMs) + " RMS:" +
std::to_string(rms) + "." + std::to_string(rus) + " WMS: " + std::to_string(wms) + "." + std::to_string(wus) +
" MPS: " + std::to_string(mMsgCountPerSec) + " MSGS: " + std::to_string(mMsgCountTotal);
#else
std::this_thread::sleep_for(std::chrono::milliseconds(10));
#endif
}
#ifndef DISABLE_NCURSES
endwin();
#endif
mIsThreadRunning = false;
marketDataHandler.join();
}