一、试验背景
我们的高频交易平台实盘运行两年了,遇到了大大小小各种各样的问题,一直在思考怎么再缩短哪怕一点点的策略响应延迟。当然,缩短到交易所的物理距离,以及升级硬件是最简单粗暴(但是有效)的方法,不过军备竞赛不是这篇文章讨论的焦点。今天我想聚焦在通告机制上,看看不同的通告机制,会有多大的差别,以及不同的机制适用于高频交易平台中的哪些地方。
我们的交易平台是事件驱动的,比如说,品种A的tick行情到达、账户B的挂单回报到达、、、这些事件都需要以某种方式在系统各线程(进程)间传递。进程间通告机制(IPC)的对比我希望在下一篇文章中写出,这篇文章先写出线程间的对比。实际上我们的平台开始也是多线程的,后来因为python的GIL问题,才重构成了多进程架构的,以后有机会也可以写个文章分享出来。看看是否有大神愿意共同探讨、指点。
理论上,一个交易平台(的实盘部分)甚至可以只有一个工作线程,比如说只运行单个策略实例,直接用CTP的回调线程去执行策略代码,实际上我也确实看到过有人这样做。但那种方式我认为还是值得商榷,因为行情回调和订单回调不是由同一个线程来执行的,依然需要在大量的地方思考竞争条件、数据锁定的问题。并且,这就不是“事件驱动”,而退化成了“行情驱动”。试想如果你订阅的某个品种持续几秒钟没有行情过来,而你又希望响应其它事件(比如离收盘只剩3秒了,需要紧急清仓)会怎么样?而且我们的平台不止有一个行情源(股市、期货),也不止运行一个策略实例,显然不可能让单个行情线程一竿子执行到底。
这就催生了我们第一个版本的架构:另开一个线程专门执行策略代码。当然后来还有第二个版本,也就是线程池的,如果本文点赞量过万了,就动笔给大家汇报线程池的版本吧。
总之呢,因此我们就必须要考虑:行情线程、账户线程、风控线程、、、如何将各种事件高效地传递给策略线程,再由策略线程统一去调用策略相应的事件响应代码。打个比喻就是:你把饭做好了,总得摇一下铃铛,你的皇后娘娘才知道来吃。至于是摇铃铛通知她快呢?还是发微信、发邮件更快呢?我们就来试验一下吧。
二、试验思路
今天我对比了在Linux上几种常见线程间通告机制(条件变量、Linux信号量、POSIX消息队列、原子变量盲目等待)的时延,思路就是:
1.事件的生产者(行情、账户等)发出通告之前记录一下时戳,事件的消费者(策略线程)收到通告之后再取一次时戳,二者相减得到时延。如此统计个十万、百万次,再看平均值。当然最好还能看看中位数、最大最小值(因为Linux毕竟不是实时操作系统,用户代码在执行到任何位置时都有可能被挂起,OS也不会保证最多挂你多久就恢复,我就见识过自己的进程被挂了十几秒<严重内存不足的情况下,被强制换出了>!如此就会导致一些零星的反常极值)。
2.在用c++具体试验的时候,注意不要用std::system_clock::now()来取时戳,因为它并不保证是均匀单调递增的!它是wall clock时间,在你程序运行期间,有可能被NTP后台服务调整,导致我们的测算不准。我这里用了std::steady_clock::now(),它记录的是从开机启动以来所经过的纳秒数,并且快慢完全取决于本机硬件(相对稳定),和其它系统完全无关。哪位大神知道更好的方案,万望指点!
3.生产者每次发完通告之后,一定要睡那么一小会儿(比如1ms)再发,不能搞成加特林机关枪连续地突突。因为那样的话生产者有可能快过消费者,消费者始终应接不暇,出现发送时戳和接收时戳错配,导致测量错误。
三、试验结果(平均时延):
1.STL条件变量: 2464.92ns
2.Linux信号量: 2373.58ns
3.POSIX消息队列: 2624.94ns
4.原子变量盲目等待: 107.003ns
四、结论及更多考量
这个结果毫不意外,正常情况下“Linux信号量”机制是采用的Linux本土方法,自然要比“STL条件变量”快那么一点点,也许我用的STL(gcc9)就是用信号量来实现条件变量的?“POSIX消息队列”更慢一点也可以理解,毕竟它还要完成队列(实质是共享内存)操作,而且它实质上是一个进程间通信机制,我们也有理由怀疑它底层是由信号量实现通告操作的。最惊艳的当然是“原子变量盲目等待”了,快了几十倍啊!当然这样的话每个策略线程必须要费掉一个CPU核,如果你是土豪,而且运行的策略实例很少的话,也许可以试试?哈哈!
五、核心代码
以下是几种机制的核心代码,完整代码我放在github的。
1.条件变量
steady_clock::time_point s_post_time {};
mutex s_mutex;
condition_variable s_cv;
bool s_ready { false };
int s_recv_count = 0;
steady_clock::duration s_total_delay {};
void ThreadProducer() {
cout << &#34;producer:started...&#34; << endl;
int post_count = 0;
while( post_count < TOTAL_NOTES ) {
{
unique_lock<mutex> ulk( s_mutex );
if( s_ready ) // 消费者尚未处理完前一个通知
continue;
s_post_time = steady_clock::now();
s_ready = true;
}
s_cv.notify_one();
++post_count;
this_thread::sleep_for( SEND_INTERVEL );
}
cout << &#34;producer:ended.&#34; << endl;
};
void ThreadConsumer() {
cout << &#34;consumer:started...&#34; << endl;
while( s_recv_count < TOTAL_NOTES ) {
unique_lock<mutex> ulk( s_mutex );
s_cv.wait( ulk, []() { return s_ready; } );
auto recv_time = steady_clock::now();
s_total_delay += recv_time - s_post_time;
++s_recv_count;
s_ready = false;
ulk.unlock();
}
cout << &#34;consumer:ended.&#34; << endl;
};
2.Linux信号量
void ThreadProducer() {
cout << &#34;producer:started...&#34; << endl;
for( int j = 0; j < TOTAL_NOTES; ++j ) {
// s_post_time.store( steady_clock::now(), std::memory_order_release );
s_post_time = steady_clock::now();
if( sem_post( &s_note_sem ) != 0 ) {
cerr << &#34;producer:&#34; << strerror( errno ) << endl;
break;
}
this_thread::sleep_for( SEND_INTERVEL );
}
cout << &#34;producer:ended.&#34; << endl;
};
void ThreadConsumer() {
cout << &#34;consumer:started...&#34; << endl;
while( s_recv_count < TOTAL_NOTES ) {
if( sem_wait( &s_note_sem ) != 0 ) {
cerr << &#34;consumer:&#34; << strerror( errno ) << endl;
return;
}
auto recv_time = steady_clock::now();
// s_total_delay += recv_time - s_post_time.load( std::memory_order_acquire );
s_total_delay += recv_time - s_post_time;
++s_recv_count;
}
cout << &#34;consumer:ended.&#34; << endl;
};
3.POSIX消息队列
void ThreadProducer() {
auto mq = mq_open( MQ_NAME, O_WRONLY | O_NONBLOCK );
mq_attr attr {};
if( mq_getattr( mq, &attr ) == 0 )
cout << &#34;\nproducer.mq_flags :&#34; << attr.mq_flags
<< &#34;\nproducer.mq_maxmsg :&#34; << attr.mq_maxmsg
<< &#34;\nproducer.mq_msgsize:&#34; << attr.mq_msgsize
<< &#34;\nproducer.mq_curmsgs:&#34; << attr.mq_curmsgs
<< endl;
else
cerr << &#34;producer:&#34; << strerror( errno ) << endl;
Msg_u s_buf;
for( int j = 0; j < TOTAL_NOTES; ++j ) {
s_buf.send_time = steady_clock::now();
if( mq_send( mq, s_buf.as_buffer, sizeof( Msg_u ), 0 ) != 0 ) {
// 错误信息要先留存
string send_err = strerror( errno );
cerr << &#34;producer:&#34; << send_err << endl;
if( mq_getattr( mq, &attr ) < 0 )
cerr << &#34;producer:获取属性也失败(&#34; << strerror( errno ) << &#34;)!&#34; << endl;
if( attr.mq_curmsgs >= attr.mq_maxmsg - 1 )
cerr << &#34;producer:可能队列已满!&#34; << endl;
break;
}
this_thread::sleep_for( SEND_INTERVEL );
}
mq_close( mq );
cout << &#34;producer:ended.&#34; << endl;
};
void threadConsumer() {
mq_attr cfg {};
// cfg.mq_curmsgs = 0;
// cfg.mq_flags = O_CREAT | O_RDONLY | O_WRONLY;
cfg.mq_maxmsg = 16;
cfg.mq_msgsize = sizeof( Msg_u );
auto mq = mq_open( MQ_NAME, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR, &cfg );
if( mq_getattr( mq, &cfg ) == 0 )
cout << &#34;\nconsumer.mq_flags :&#34; << cfg.mq_flags
<< &#34;\nconsumer.mq_maxmsg :&#34; << cfg.mq_maxmsg
<< &#34;\nconsumer.mq_msgsize:&#34; << cfg.mq_msgsize
<< &#34;\nconsumer.mq_curmsgs:&#34; << cfg.mq_curmsgs
<< endl;
else
cerr << &#34;consumer:&#34; << strerror( errno ) << endl;
Msg_u r_buf;
while( s_recv_count < TOTAL_NOTES ) {
auto bytes = mq_receive( mq, r_buf.as_buffer, sizeof( Msg_u ), 0 );
auto recv_time = steady_clock::now();
if( bytes != sizeof( Msg_u ) ) {
cerr << &#34;consumer:&#34; << strerror( errno ) << endl;
mq_close( mq );
return;
}
s_total_delay += recv_time - r_buf.send_time;
++s_recv_count;
}
mq_close( mq );
cout << &#34;consumer:ended.&#34; << endl;
};
其中MQ_NAME, TOTAL_NOTES, SEND_INTERVEL统一定义在另一个头文件,分别是:
const char MQ_NAME[] = &#34;/comp_notify&#34;;
constexpr int TOTAL_NOTES { 100000 };
constexpr steady_clock::duration SEND_INTERVEL { 1ms }; |
|