摘要:
此工程分三部分来解读,相互之间是独立的
1.发起请求部分:向行情服务器发送请求数据包后,发起请求部分就此结束,不用管了。
(1)接收StkNet发过来的单个请求
(2)启动了一个分笔数据请求线程,在交易时间内每隔3秒向服务器请求所有股票分笔数据行情。
2.数据接收部分:用事件机制接收行情服务器发送过来的数据
接收数据后存入类中的缓冲区,然后对此缓冲区的数据进行解码,解码后的数据包存入类中的另一个缓冲区后,不管了。
3.数据推送部分:数据分发线程
启动了一个数据分发线程,从缓冲区取出数据,用消息发送数据到StkNet的窗口句柄,不管了。
一、发起请求部分
(一)开启主动请求所有股票分笔数据的线程
//功能:1.每隔3秒向获取缓冲区的数据
// 2.接收数据空闲时,发送队列里缓存的行情数据请求命令。
// 缺陷:用线程发起请求,数据有延迟,应该改用websocket主动推送
条件:获取本地股票分笔成交文件,股票数量》0;
只在交易时间内发起请求,可以修改本地时间。
UINT TWAutoReportThreadMain(LPVOID pParam)
请求数据
CTWSocket的socket的事件机制把收到的数据先用CTSCache解码,CTSCache解码后存在了缓冲区,GetStockByCodeEx()利用CTSCache类直接从缓冲区中读取数据
int WINAPI GetStockByCodeEx(char * pszStockCode,int nMarket,RCV_REPORT_STRUCTEx * pBuf)
{
return ( CTSCache::GetInstance().GetStockByCodeEx( pszStockCode, nMarket, pBuf ) ? 1 : -1 );
}
数据请求、接收的实现过程
NetTS对外提供
int WINAPI RequestStockData( int nDataType, STOCK_STRUCTEx* pStocks, int nSize, int nKType, int nDataCount );
由StkNet调用此函数发起请求行情。包括请求的数据类型、股票代码的集合等。
用socket的事件机制接收到原始数据后,先进行解码,然后存入变量缓冲区。
(二)分笔数请求过程
1.发起分笔数据请求
CTWSocket::RequestReport(TW_STOCK* pStock, int nSize)
TW_ASK ask;
int lenSend = ConstructAskReportBuffer(ask, stocks, putsize);
if (lenSend > 0)
//发送分笔数据请求 by freeman
Send(&ask, lenSend);
二、数据接收部分
1.用CTWSocket::OnReceive(int nErrorCode)的事件机制接收数据
(1)接收到的数据先 存入NetTS\TWSocket.h的CTWSockt::m_rbuffer变量缓冲区(BYTE m_rbuffer[0x10000]; 64k 字节接收缓冲区
//数据接收事件
void CTWSocket::OnReceive(int nErrorCode)
{
char szText[256];
sprintf(szText, "→step 1.数据到达事件");
g_pWinTrace->Debug()->Send("CTWSocket::OnReceive", szText);
m_timeReceiveLast = time(NULL);
//读取数据事件,存入m_rbuffer缓冲区
int nReceive = Receive(m_rbuffer, sizeof(m_rbuffer));
sprintf(szText, "→step 2.读取数据,调用Receive(m_rbuffer, sizeof(m_rbuffer)),存入m_rbuffer缓冲区,字节数:%d", nReceive);
g_pWinTrace->Debug()->Send("CTWSocket::OnReceive", szText);
if (nReceive > 0)
{
//调用程序对收到的数据解码
sprintf(szText, "→step 3.读取数据长度>0,调用CTSCache::GetInstance().OnReceive(m_rbuffer, nReceive)处理");
g_pWinTrace->Debug()->Send("CTWSocket::OnReceive", szText);
//对收到的数据包进行解码
CTSCache::GetInstance().OnReceive(m_rbuffer, nReceive);
if (nReceive < 256 && TryGetLength(m_rbuffer, nReceive) < 256) // 收到小块包,说明大包接收完毕
m_bIsReceiving = FALSE;
if (nReceive == sizeof(m_rbuffer))
OnReceive(nErrorCode);
}
CSocket::OnReceive(nErrorCode);
}
(2) 用CTSCache::OnReceive( BYTE * buf, size_t len )函数对CTWSockt::m_rbuffer变量缓冲区数据进行解码
解码过程
首先将CTWSockt::m_rbuffer的数据拷贝到将数据拷贝到CTSCache::m_buffer 512k。然后开始解码
//对收到的数据包进行解码 by freeman
int CTSCache::OnReceive( BYTE * buf, size_t len )
{
char szText[256];
sprintf(szText, "→step 4.准备解码,准备调用DecodePacket() sizeof(m_buffer)=%d len=%d", sizeof(m_buffer), len);
g_pWinTrace->Debug()->Send("CTSCache::OnReceive", szText);
CSingleLock lock(&m_mutexBuffer,TRUE);
if( NULL == buf || len <= 0 )
return 0;
if( len > sizeof(m_buffer) )
return 0;
if( m_nBufLen + len > sizeof(m_buffer) )
m_nBufLen = 0; // discard old
//将数据拷贝到CTSCache::m_buffer 512k bytes buffer by freeman
memcpy( m_buffer+m_nBufLen, buf, len );
m_nBufLen += len;
int packets = DecodePacket();
while( packets > 0 )
packets = DecodePacket();
return len;
}
//解码数据包
int CTSCache::DecodePacket( )
{
if( m_nBufLen <= 0 )
return 0;
int nPacketLen = FindFirstPacketLength();
if( nPacketLen > 0 && nPacketLen <= (int)m_nBufLen )
{
TryGetPacket( nPacketLen );
DiscardPacket( nPacketLen );
return 1;
}
if( m_nBufLen > sizeof(m_buffer)/2 )
m_nBufLen = 0; // truncate if too big and no packets found.
return 0;
}
int CTSCache::TryGetPacket( int nPacketLen )
尝试对CTSCache::m_buffer的数据解码,如果是分笔数据,转换为RCV_DATA结构,存入prcvdata 变量 by freeman
nLen = TryGetReport( m_buffer, nPacketLen, prcvdata );
NetTS:解码的分笔数据存入了CTSCache::m_aReports变量,结构为RCV_REPORT_STRUCTEx
BOOL CTSCache::PushReport( char * pszStockCode, RCV_REPORT_STRUCTEx * pBuf )
{
if( NULL == pszStockCode || strlen(pszStockCode) <= 0 || NULL == pBuf )
return FALSE;
CSingleLock lock(&m_mutexReports,TRUE);
// look for it in map
void * rValue = NULL;
if( m_mapReports.Lookup( pszStockCode, rValue ) )
{
int nNo = (int)rValue;
ASSERT( nNo >= 0 && nNo < m_aReports.GetSize() && 0 == strncmp(pszStockCode, m_aReports.ElementAt(nNo).m_szLabel, sizeof(m_aReports.ElementAt(nNo).m_szLabel)) );
if( nNo >= 0 && nNo < m_aReports.GetSize()
&& ( 0 == (m_aReports.ElementAt(nNo).m_szLabel[0] )
|| 0 == strncmp(pszStockCode, m_aReports.ElementAt(nNo).m_szLabel, sizeof(m_aReports.ElementAt(nNo).m_szLabel)) ) )
return PushReport( nNo, pBuf );
else // something error
m_mapReports.RemoveKey( pszStockCode );
}
ASSERT( pBuf->m_cbSize == sizeof(RCV_REPORT_STRUCTEx) );
pBuf->m_cbSize = sizeof(RCV_REPORT_STRUCTEx);
//存入CTSCache::m_aReports变量,结构为RCV_REPORT_STRUCTEx
int nNo = m_aReports.Add( *pBuf );
//修改CTSCache::m_mapReports相关信息
m_mapReports.SetAt( pszStockCode, (void *)nNo );
return TRUE;
}
如果通过解码,判断此数据包时分笔数据包,则:
(1)存入CTSCache::m_aReports变量,结构为RCV_REPORT_STRUCTEx
(2)消息号和数据存入CTSCache::m_aPackets,结构如下:
m_aPackets是一个TS_PACKET类型的数组
CTSPacketArray m_aPackets;
typedef CArray< TS_PACKET, TS_PACKET &> CTSPacketArray;
TS_PACKET包含了消息号和数据体
typedef struct _ts_packet_t {
UINT m_nMsgType;
PRCV_DATA m_pRCV_DATA;
} TS_PACKET, PTS_PACKET;
数据体结构
typedef struct tagRCV_DATA
{
int m_wDataType; // 文件类型
int m_nPacketNum; // 记录数,参见注一
RCV_FILE_HEADEx m_File; // 文件接口
BOOL m_bDISK; // 文件是否已存盘的文件
union
{
RCV_REPORT_STRUCTEx * m_pReport;
RCV_HISTORY_STRUCTEx * m_pDay;
RCV_MINUTE_STRUCTEx * m_pMinute;
RCV_POWER_STRUCTEx * m_pPower;
RCV_MULTISORT_STRUCTEx * m_pMultisort;
void * m_pData; // 参见注二
};
} RCV_DATA,*PRCV_DATA;
if( nLen > 0 )
{
//存入CTSCache::m_aReports变量,结构为RCV_REPORT_STRUCTEx
PushReport( prcvdata->m_pReport, prcvdata->m_nPacketNum );
//消息号和数据存入CTSCache::m_aPackets by freeman
PushPacket( RCV_REPORT, prcvdata );
prcvdata = NULL;
return 1;
}
CTSCache::PushPacket()
//消息号和数据存入CTSCache::m_aPackets by freeman
BOOL CTSCache::PushPacket( UINT nMsgType, PRCV_DATA pRCV_DATA )
{
ASSERT( pRCV_DATA && pRCV_DATA->m_pData );
CSingleLock lock(&m_mutexPackets,TRUE);
TS_PACKET packet;
packet.m_nMsgType = nMsgType;
packet.m_pRCV_DATA = pRCV_DATA;
//消息号和数据存入CTSCache::m_aPackets by freeman
m_aPackets.Add( packet );
return TRUE;
}
三、数据发送部分
(一)初始化该类时,启动数据分发线程。
//初始化该类时,启动数据分发线程。
CTSWnd::CTSWnd()
{
CTSWnd::GetInstance().StartDispatchThread();
}
(二)数据分发线程,从缓冲区取出数据,用消息发送数据到StkNet的窗口句柄
// 分发数据线程,行情服务器主动推送线程 2019/06/08 by freeman
UINT TSDispatchThreadMain(LPVOID pParam)
{
char szText[256];
sprintf(szText, "→start CTSWnd::StartDispatchThread()");
g_pWinTrace->Debug()->Send("NetTS:TSWnd.cpp", szText);
while(TRUE)
{
UINT nMsgType = 0;
PRCV_DATA pRCV_DATA = NULL;
//从缓冲区中弹出数据,然后向已注册的窗口句柄发送消息。StkNet 向NetTS类注册了窗口句柄。 by freeman
if (CTSCache::GetInstance().PopPacket(nMsgType, pRCV_DATA))
{
ASSERT(pRCV_DATA && pRCV_DATA->m_pData);
if (pRCV_DATA && pRCV_DATA->m_pData)
CTSWnd::GetInstance().SendMessage(nMsgType, (LPARAM)pRCV_DATA);
char szText[256];
sprintf(szText, "↑DataType:%d PacketNum:%d", pRCV_DATA->m_wDataType, pRCV_DATA->m_nPacketNum);
g_pWinTrace->Debug()->Send("TSDispatchThreadMain", szText);
CTSCache::GetInstance().FreePacket(pRCV_DATA);
}
Sleep(1);
// User wants to quit program
if (WAIT_OBJECT_0 == WaitForSingleObject(CTSWnd::m_hEventKillDispatchThread,0))
{
SetEvent(CTSWnd::m_hEventDispatchThreadKilled);
AfxEndThread(0, TRUE);
return 0;
}
}
}
从CTSCache::m_aPackets缓冲区(数组)中弹出数据
//从CTSCache::m_aPackets缓冲区(数组)中弹出数据
BOOL CTSCache::PopPacket( UINT & nMsgType, PRCV_DATA & pRCV_DATA )
{
CSingleLock lock(&m_mutexPackets,TRUE);
if( m_aPackets.GetSize() > 0 )
{
nMsgType = m_aPackets.ElementAt(0).m_nMsgType;
pRCV_DATA = m_aPackets.ElementAt(0).m_pRCV_DATA;
m_aPackets.RemoveAt(0);
return TRUE;
}
return FALSE;
}