Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
426 views
in Technique[技术] by (71.8m points)

c++ - Does ZeroMQ have a notification/callback event/message for when data arrives?

I am trying to integrate ZMQ into an existing windows application that relies heavily on MFC sockets (CASyncSocket).

I've got a CWinThread derived UI thread (without a GUI) that communicates with a server asynchronously using CAsyncSocket. I would like to add a ZMQ inproc communication line to handle communicating the data received from the server (on a REQ/REP basis) to other threads within the application.

Using CAsyncSocket, the OnReceive method is called by the MFC framework whenever new data is available on the socket to be received (that might be an over-simplification to the hardcore MFC gurus out there).

Is there any such mechanism in ZMQ? Or do I have to add an additional dedicated WorkerThread that the UI thread launches to handle my ZMQ communications to the rest of the app? The traffic on both pipelines is minimal so I really don't want to have to create 2 separate threads if I can get by with 1.

Note, I've got the basics working, I'm just having problems with synchronization. If I use blocking recv/send with ZMQ, it starves out my CAsycSocket because the windows messages never get processed by the thread resulting in sometimes never getting the data from the server that the ZMQ is supposed to be delivering. But if I use non-blocking ZMQ calls, then the thread frequently ends up sitting idle because it doesn't know to read off the ZMQ socket.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Ultimately, the answer is no. There are no current callback/notifications for when data arrives in ZeroMQ that you can link into. I was also unable to find any fork that adds this functionality.

I was unable to get ZMQ working while using the traditional OnReceive calls provide by the MFC socket framework within a single thread and adding a 2nd thread to dedicate to ZMQ defeated the whole purpose for using it (it is being used for thread synchronization).

My implementation that works ended up dropping MFC sockets and using ZMQ for both my inproc server (for communicating with other thread) as well as my TCP (non-ZMQ) server connection and using a blocking polling call (zmq_poll()) in the OnIdle() method (returning 1 every time to create a busy loop). The blocking poll

BOOL CMyThreaClass::OnIdle(LONG lCount)
{
    UNREFERENCED_PARAMETER(lCount);

    zmq_pollitem_t items [] = {
    { m_pZMQInprocServer, 0, ZMQ_POLLIN, 0 },
    { m_pZMQTCPSocket, 0, ZMQ_POLLIN, 0 }
    };
    const int iZMQInfiniteTimeout(-1);
    iResult = zmq_poll(&items[0], sizeof(items) / sizeof(items[0]), iZMQInfiniteTimeout);
    TRACE("zmq_poll result: %d
", iResult);

    if (items[0].revents & ZMQ_POLLIN)
    {
        sMyStruct sMessage;
        iResult = zmq_recv(m_pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_DONTWAIT); // don't block (the zmq_poll blocks for us)
        TRACE("inproc recv result: %d
", iResult);
        //  Process inproc messages
        iResult = zmq_send(pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_NULL); // block
        TRACE("inproc send result: %d
", iResult);
    }
    if (items[1].revents & ZMQ_POLLIN)
    {
        // there will be an ZMQ_IDENTITY identifier on the beginning of the socket buffer, read it off first
        uint8_t id [256];
        size_t id_size = 256;
        iResult = zmq_getsockopt(m_pZMQTCPSocket, ZMQ_IDENTITY, id, &id_size);
        TRACE("getsockopt poll result %d:id %d
", iResult, id);
        iResult = zmq_recv(m_pZMQTCPSocket, &id, id_Size, ZMQ_DONTWAIT); // don't block
        // now get our actual data
        char szBuffer[1024];
        int iBytesReceived = zmq_recv(m_pZMQSocket, szBuffer, sizeof(szBuffer), ZMQ_DONTWAIT);
        if (iBytesReceived > 0)
        {
            // process TCP data
        }
    }
}

Note: This answer requires using ZMQ 4 or later since earlier versions of ZMQ will not communicate with a regular TCP socket connection.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...