各种事件通告机制在高频交易平台中的对比试验(一)

论坛 期权论坛 股票     
期权匿名问答   2023-2-18 23:40   2834   1
一、试验背景

我们的高频交易平台实盘运行两年了,遇到了大大小小各种各样的问题,一直在思考怎么再缩短哪怕一点点的策略响应延迟。当然,缩短到交易所的物理距离,以及升级硬件是最简单粗暴(但是有效)的方法,不过军备竞赛不是这篇文章讨论的焦点。今天我想聚焦在通告机制上,看看不同的通告机制,会有多大的差别,以及不同的机制适用于高频交易平台中的哪些地方。
我们的交易平台是事件驱动的,比如说,品种A的tick行情到达、账户B的挂单回报到达、、、这些事件都需要以某种方式在系统各线程(进程)间传递。进程间通告机制(IPC)的对比我希望在下一篇文章中写出,这篇文章先写出线程间的对比。实际上我们的平台开始也是多线程的,后来因为python的GIL问题,才重构成了多进程架构的,以后有机会也可以写个文章分享出来。看看是否有大神愿意共同探讨、指点。
理论上,一个交易平台(的实盘部分)甚至可以只有一个工作线程,比如说只运行单个策略实例,直接用CTP的回调线程去执行策略代码,实际上我也确实看到过有人这样做。但那种方式我认为还是值得商榷,因为行情回调和订单回调不是由同一个线程来执行的,依然需要在大量的地方思考竞争条件、数据锁定的问题。并且,这就不是“事件驱动”,而退化成了“行情驱动”。试想如果你订阅的某个品种持续几秒钟没有行情过来,而你又希望响应其它事件(比如离收盘只剩3秒了,需要紧急清仓)会怎么样?而且我们的平台不止有一个行情源(股市、期货),也不止运行一个策略实例,显然不可能让单个行情线程一竿子执行到底。
这就催生了我们第一个版本的架构:另开一个线程专门执行策略代码。当然后来还有第二个版本,也就是线程池的,如果本文点赞量过万了,就动笔给大家汇报线程池的版本吧。
总之呢,因此我们就必须要考虑:行情线程、账户线程、风控线程、、、如何将各种事件高效地传递给策略线程,再由策略线程统一去调用策略相应的事件响应代码。打个比喻就是:你把饭做好了,总得摇一下铃铛,你的皇后娘娘才知道来吃。至于是摇铃铛通知她快呢?还是发微信、发邮件更快呢?我们就来试验一下吧。

二、试验思路

今天我对比了在Linux上几种常见线程间通告机制(条件变量、Linux信号量、POSIX消息队列、原子变量盲目等待)的时延,思路就是:
1.事件的生产者(行情、账户等)发出通告之前记录一下时戳,事件的消费者(策略线程)收到通告之后再取一次时戳,二者相减得到时延。如此统计个十万、百万次,再看平均值。当然最好还能看看中位数、最大最小值(因为Linux毕竟不是实时操作系统,用户代码在执行到任何位置时都有可能被挂起,OS也不会保证最多挂你多久就恢复,我就见识过自己的进程被挂了十几秒<严重内存不足的情况下,被强制换出了>!如此就会导致一些零星的反常极值)。
2.在用c++具体试验的时候,注意不要用std::system_clock::now()来取时戳,因为它并不保证是均匀单调递增的!它是wall clock时间,在你程序运行期间,有可能被NTP后台服务调整,导致我们的测算不准。我这里用了std::steady_clock::now(),它记录的是从开机启动以来所经过的纳秒数,并且快慢完全取决于本机硬件(相对稳定),和其它系统完全无关。哪位大神知道更好的方案,万望指点!
3.生产者每次发完通告之后,一定要睡那么一小会儿(比如1ms)再发,不能搞成加特林机关枪连续地突突。因为那样的话生产者有可能快过消费者,消费者始终应接不暇,出现发送时戳和接收时戳错配,导致测量错误。

三、试验结果(平均时延):

1.STL条件变量: 2464.92ns
2.Linux信号量: 2373.58ns
3.POSIX消息队列: 2624.94ns
4.原子变量盲目等待: 107.003ns
四、结论及更多考量

这个结果毫不意外,正常情况下“Linux信号量”机制是采用的Linux本土方法,自然要比“STL条件变量”快那么一点点,也许我用的STL(gcc9)就是用信号量来实现条件变量的?“POSIX消息队列”更慢一点也可以理解,毕竟它还要完成队列(实质是共享内存)操作,而且它实质上是一个进程间通信机制,我们也有理由怀疑它底层是由信号量实现通告操作的。最惊艳的当然是“原子变量盲目等待”了,快了几十倍啊!当然这样的话每个策略线程必须要费掉一个CPU核,如果你是土豪,而且运行的策略实例很少的话,也许可以试试?哈哈!
五、核心代码

以下是几种机制的核心代码,完整代码我放在github的。
1.条件变量

steady_clock::time_point s_post_time {};
mutex                                s_mutex;
condition_variable                s_cv;
bool                                s_ready { false };

int                                s_recv_count = 0;
steady_clock::duration                s_total_delay {};

void ThreadProducer() {
        cout << "producer:started..." << endl;
        int post_count = 0;
        while( post_count < TOTAL_NOTES ) {
                {
                        unique_lock<mutex> ulk( s_mutex );
                        if( s_ready )        // 消费者尚未处理完前一个通知
                                continue;

                        s_post_time = steady_clock::now();
                        s_ready = true;
                }
                s_cv.notify_one();
                ++post_count;
                this_thread::sleep_for( SEND_INTERVEL );
        }
        cout << "producer:ended." << endl;
};

void ThreadConsumer() {
        cout << "consumer:started..." << endl;
        while( s_recv_count < TOTAL_NOTES ) {
                unique_lock<mutex> ulk( s_mutex );
                s_cv.wait( ulk, []() { return s_ready; } );

                auto recv_time = steady_clock::now();
                s_total_delay += recv_time - s_post_time;
                ++s_recv_count;
                s_ready = false;
                ulk.unlock();
        }
        cout << "consumer:ended." << endl;
};
2.Linux信号量

void ThreadProducer() {
        cout << "producer:started..." << endl;
        for( int j = 0; j < TOTAL_NOTES; ++j ) {
//                 s_post_time.store( steady_clock::now(), std::memory_order_release );
                s_post_time = steady_clock::now();
                if( sem_post( &s_note_sem ) != 0 ) {
                        cerr << "producer:" << strerror( errno ) << endl;
                        break;
                }
                this_thread::sleep_for( SEND_INTERVEL );
        }
        cout << "producer:ended." << endl;
};

void ThreadConsumer() {
        cout << "consumer:started..." << endl;
        while( s_recv_count < TOTAL_NOTES ) {
                if( sem_wait( &s_note_sem ) != 0 ) {
                        cerr << "consumer:" << strerror( errno ) << endl;
                        return;
                }

                auto recv_time = steady_clock::now();
//                 s_total_delay += recv_time - s_post_time.load( std::memory_order_acquire );
                s_total_delay += recv_time - s_post_time;
                ++s_recv_count;
        }
        cout << "consumer:ended." << endl;
};
3.POSIX消息队列

void ThreadProducer() {
        auto mq = mq_open( MQ_NAME, O_WRONLY | O_NONBLOCK );
        mq_attr attr {};
        if( mq_getattr( mq, &attr ) == 0 )
                cout << "\nproducer.mq_flags  :" << attr.mq_flags
                         << "\nproducer.mq_maxmsg :" << attr.mq_maxmsg
                         << "\nproducer.mq_msgsize:" << attr.mq_msgsize
                         << "\nproducer.mq_curmsgs:" << attr.mq_curmsgs
                         << endl;
        else
                cerr << "producer:" << strerror( errno ) << endl;

        Msg_u s_buf;
        for( int j = 0; j < TOTAL_NOTES; ++j ) {
                s_buf.send_time = steady_clock::now();
                if( mq_send( mq, s_buf.as_buffer, sizeof( Msg_u ), 0 ) != 0 ) {
                        // 错误信息要先留存
                        string send_err = strerror( errno );
                        cerr << "producer:" << send_err << endl;
                        if( mq_getattr( mq, &attr ) < 0 )
                                cerr << "producer:获取属性也失败(" << strerror( errno ) << ")!" << endl;
                        if( attr.mq_curmsgs >= attr.mq_maxmsg - 1 )
                                cerr << "producer:可能队列已满!" << endl;
                        break;
                }
                this_thread::sleep_for( SEND_INTERVEL );
        }

        mq_close( mq );
        cout << "producer:ended." << endl;
};

void threadConsumer() {
        mq_attr cfg {};
//        cfg.mq_curmsgs = 0;
//        cfg.mq_flags = O_CREAT | O_RDONLY | O_WRONLY;
        cfg.mq_maxmsg = 16;
        cfg.mq_msgsize = sizeof( Msg_u );

        auto mq = mq_open( MQ_NAME, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR, &cfg );
        if( mq_getattr( mq, &cfg ) == 0 )
                cout << "\nconsumer.mq_flags  :" << cfg.mq_flags
                         << "\nconsumer.mq_maxmsg :" << cfg.mq_maxmsg
                         << "\nconsumer.mq_msgsize:" << cfg.mq_msgsize
                         << "\nconsumer.mq_curmsgs:" << cfg.mq_curmsgs
                         << endl;
        else
                cerr << "consumer:" << strerror( errno ) << endl;

        Msg_u r_buf;
        while( s_recv_count < TOTAL_NOTES ) {
                auto bytes = mq_receive( mq, r_buf.as_buffer, sizeof( Msg_u ), 0 );
                auto recv_time = steady_clock::now();
                if( bytes != sizeof( Msg_u ) ) {
                        cerr << "consumer:" << strerror( errno ) << endl;
                        mq_close( mq );
                        return;
                }

                s_total_delay += recv_time - r_buf.send_time;
                ++s_recv_count;
        }

        mq_close( mq );
        cout << "consumer:ended." << endl;
};

其中MQ_NAME, TOTAL_NOTES, SEND_INTERVEL统一定义在另一个头文件,分别是:
const char MQ_NAME[] = "/comp_notify";
constexpr int                TOTAL_NOTES { 100000 };
constexpr steady_clock::duration SEND_INTERVEL { 1ms };
分享到 :
0 人收藏

1 个回复

倒序浏览
2#
期权匿名回答  16级独孤 | 2023-2-18 23:41:33 发帖IP地址来自 中国
沙发自占
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:400157
帖子:80032
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP