1 /**************************************************************************
3 * Copyright 2013-2014 RAD Game Tools and Valve Software
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to deal
8 * in the Software without restriction, including without limitation the rights
9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 * copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 **************************************************************************/
33 #include <sys/types.h>
42 #if defined(_DEBUG) || defined(DEBUG)
43 #define DEBUG_PRINT(fmt, args...) fprintf( stderr, fmt, ## args )
45 #define DEBUG_PRINT(...) /* Don't do anything in release builds */
48 void *RecvThreadLoop(void *arg);
49 void *ReqRespRecvThreadLoop(void *arg);
50 void *SendThreadLoop(void *arg);
51 void *ReqRespSendThreadLoop(void *arg);
53 using namespace network;
56 channelmgr::channelmgr()
59 m_threadRecv = (pthread_t) 0;
60 m_threadReqRespRecv = (pthread_t) 0;
61 m_threadSend = (pthread_t) 0;
62 m_threadReqRespSend = (pthread_t) 0;
67 memset(m_server, '\0', sizeof(m_server));
69 m_recvCallbackfn = NULL;
70 m_reqCallbackfn = NULL;
71 m_recvCallbackParam = NULL;
72 m_reqCallbackParam = NULL;
73 m_pReqRespChannel = NULL;
74 m_pSendChannel = NULL;
75 m_pRecvChannel = NULL;
78 m_pReqRespSendQueue = NULL;
81 channelmgr::~channelmgr()
87 channelmgr::Connect(char *szServer, int port, char serviceMask, FnReadCallbackPtr recvCallbackfn, void *recvCallbackParam)
91 // int portReqResp = port;
92 // int portSend = port+1; // Needs to match portRecv in channelmgr::Accept
93 // int portRecv = port+2; // Needs to match portSend in channelmgr::Accept
98 // Need to connect to the three different ports on the server, if requested
100 if (0 == (serviceMask & (REQRESP|RECVASYNC|SENDASYNC)))
102 // No service was requested?
104 return EC_NOTALLOWED;
109 strncpy(m_server, szServer, sizeof(m_server)-1);
111 m_fReadyReqResp = true;
112 if (serviceMask & REQRESP)
114 m_fReadyReqResp = false;
115 // spin up a thread to handle the recv channel
116 err = pthread_create(&m_threadReqRespSend, NULL, ReqRespSendThreadLoop, (void *)this);
125 m_fReadySendAsync = true;
126 if (serviceMask & SENDASYNC)
128 m_fReadySendAsync = false;
130 // spin up a thread to handle the recv channel
131 err = pthread_create(&m_threadSend, NULL, SendThreadLoop, (void *)this);
141 m_fReadyRecvAsync = true;
142 if (serviceMask & RECVASYNC)
144 m_fReadyRecvAsync = false;
145 m_recvCallbackfn = recvCallbackfn;
146 m_recvCallbackParam = recvCallbackParam;
148 // spin up a thread to handle the recv channel
149 err = pthread_create(&m_threadRecv, NULL, RecvThreadLoop, (void *)this);
158 // Wait for threads to spin up and signal that they're ready.
161 while ((false == m_fReadyRecvAsync) || (false == m_fReadySendAsync) || (false == m_fReadyReqResp));
175 channelmgr::Accept(int port, char serviceMask, bool fLocal, FnReadCallbackPtr readcallbackfn, void *readCallbackParam, FnRRCallbackPtr reqCallbackfn, void *reqCallbackParam)
179 // int portReqResp = port;
180 // int portSend = port+2; // Needs to match portRecv in channelmgr::Connect
181 // int portRecv = port+1; // Needs to match portSend in channelmgr::Connect
185 if (0 == (serviceMask & (REQRESP|RECVASYNC|SENDASYNC)))
187 // No service was requested?
189 return EC_NOTALLOWED;
196 m_fReadyReqResp = true;
197 if (serviceMask & REQRESP)
199 // spin up a thread to handle the reqresp channel
200 m_fReadyReqResp = false;
202 m_reqCallbackfn = reqCallbackfn;
203 m_reqCallbackParam = reqCallbackParam;
205 err = pthread_create(&m_threadReqRespRecv, NULL, ReqRespRecvThreadLoop, (void *)this);
214 m_fReadyRecvAsync = true;
215 if (serviceMask & RECVASYNC)
217 m_fReadyRecvAsync = false;
218 // spin up a thread to handle the recv channel
219 m_recvCallbackfn = readcallbackfn;
220 m_recvCallbackParam = readCallbackParam;
222 err = pthread_create(&m_threadRecv, NULL, RecvThreadLoop, (void *)this);
231 m_fReadySendAsync = true;
232 if (serviceMask & SENDASYNC)
234 m_fReadySendAsync = false;
236 // spin up a thread to handle the recv channel
237 err = pthread_create(&m_threadSend, NULL, SendThreadLoop, (void *)this);
246 // Wait for threads to spin up and signal that they're ready.
249 while ((false == m_fReadyRecvAsync) || (false == m_fReadySendAsync) || (false == m_fReadyReqResp));
262 channelmgr::SendData(unsigned int cbData, char *pbData, FnReadCallbackPtr respCallbackfn, void *respCallbackParam)
265 queues::MTQ_CODE mtqec = MTQ_NONE;
267 // Don't allow this call if we accepted connections (i.e. this is the "server" side of things)
268 if (true == m_fServer)
269 return EC_NOTALLOWED;
271 mtqec = m_pReqRespSendQueue->Enqueue(cbData, pbData, 500, 2);
272 if (MTQ_NONE != mtqec)
274 if (MTQ_FULL == mtqec)
276 DEBUG_PRINT("%s:%d %s Unable to send Request (QFull) (error = %x, %d)\n", __FILE__, __LINE__, __func__, mtqec, errno);
281 DEBUG_PRINT("%s:%d %s Unable to send Request (error = %x, %d)\n", __FILE__, __LINE__, __func__, mtqec, errno);
286 ec = m_pReqRespChannel->ReadMsg(&cbData, &pbData, 2, 0);
289 DEBUG_PRINT("%s:%d %s Unable to read response (error = %x)\n", __FILE__, __LINE__, __func__, ec);
293 (*respCallbackfn)(respCallbackParam, cbData, pbData);
303 channelmgr::SendData(unsigned int cbData, char *pbData)
306 queues::MTQ_CODE mtqec = MTQ_NONE;
308 mtqec = m_pSendQueue->Enqueue(cbData, pbData, 500, 2);
309 if (MTQ_NONE != mtqec)
311 if (MTQ_FULL == mtqec)
313 DEBUG_PRINT("%s:%d %s Unable to send (QFull) (error = %x, %d)\n", __FILE__, __LINE__, __func__, mtqec, errno);
318 DEBUG_PRINT("%s:%d %s Unable to send (error = %x, %d)\n", __FILE__, __LINE__, __func__, mtqec, errno);
328 channelmgr::HasError(int *perrno)
335 return (0 != m_errno);
339 channelmgr::Disconnect()
344 // Wait for the recvthread to notice and clean itself up
346 pthread_join(m_threadRecv, NULL);
349 pthread_join(m_threadSend, NULL);
351 if (m_threadReqRespRecv)
352 pthread_join(m_threadReqRespRecv, NULL);
354 if (m_threadReqRespSend)
355 pthread_join(m_threadReqRespSend, NULL);
357 // Clean up all remaining channels
358 if (NULL != m_pReqRespChannel)
360 m_pReqRespChannel->Disconnect();
361 delete m_pReqRespChannel;
362 m_pReqRespChannel = NULL;
365 if (NULL != m_pSendChannel)
367 m_pSendChannel->Disconnect();
368 delete m_pSendChannel;
369 m_pSendChannel = NULL;
372 if (NULL != m_pRecvChannel)
374 m_pRecvChannel->Disconnect();
375 delete m_pRecvChannel;
376 m_pRecvChannel = NULL;
385 if (m_pReqRespSendQueue)
387 delete m_pReqRespSendQueue;
388 m_pReqRespSendQueue = NULL;
391 m_threadRecv = (pthread_t) 0;
392 m_threadReqRespRecv= (pthread_t) 0;
394 m_fTerminate = false;
397 memset(m_server, '\0', sizeof(m_server));
399 m_recvCallbackfn = NULL;
400 m_reqCallbackfn = NULL;
401 m_recvCallbackParam = NULL;
402 m_reqCallbackParam = NULL;
409 // Handle the recv thread
412 void *RecvThreadLoop(void *arg)
414 channelmgr *pChannelMgr = (channelmgr *) arg;
416 pChannelMgr->DriveRecvLoop();
422 channelmgr::DriveRecvLoop()
426 unsigned int cbData = 0;
428 pthread_t thisThread = pthread_self();
431 // Set this thread's name:
432 err = pthread_setname_np(thisThread, "VOGLRecvThd");
435 DEBUG_PRINT("%s:%d %s Unable to set the name of the thread. returned %d, errno %d\n", __FILE__, __LINE__, __func__, err, errno);
438 m_pRecvChannel = new channel();
439 if (NULL == m_pRecvChannel)
441 // Unable to allocate a new channel
443 DEBUG_PRINT("%s:%d %s Unable to allocate new channel for Recv - OOM?\n", __FILE__, __LINE__, __func__);
449 portRecv = m_basePort+1; // Needs to match portSend in channelmgr::Connect
450 ec = m_pRecvChannel->Connect(portRecv, 0, m_fLocal);
453 DEBUG_PRINT("%s:%d %s Failed to accept on port %d for Recv. returned %d, errno %d\n", __FILE__, __LINE__, __func__, portRecv, ec, errno);
459 portRecv = m_basePort+2; // Needs to match portSend in channelmgr::Accept
460 ec = m_pRecvChannel->Connect(m_server, portRecv, 100, 500); // loop 100 times, waiting 500MS between attempts
463 DEBUG_PRINT("%s:%d %s Failed to connect to %s on port %d for Recv. returned %d, errno %d\n", __FILE__, __LINE__, __func__, m_server, portRecv, ec, errno);
468 m_fReadyRecvAsync = true;
470 while (!m_fTerminate)
473 ec = m_pRecvChannel->ReadMsg(&cbData, &pbData, 5, 100); // 500
474 //if (EC_TIMEOUT == ec)
476 // Timeouts are not really errors
482 // Keep going on the server side. We'll accept new connections as they come along.
485 m_pRecvChannel->Disconnect();
489 // On the client side, we need to just keep going.
490 //DEBUG_PRINT("%s:%d %s Error reading message (error = %x, %d)\n", __FILE__, __LINE__, __func__, ec, errno);
494 (*m_recvCallbackfn)(m_recvCallbackParam, cbData, pbData);
508 // Handle the Async Send thread
511 void *SendThreadLoop(void *arg)
513 channelmgr *pChannelMgr = (channelmgr *) arg;
515 pChannelMgr->DriveSendLoop();
521 channelmgr::DriveSendLoop()
524 queues::MTQ_CODE mtqec = MTQ_NONE;
527 unsigned int message_size = 0;
528 char *message = NULL;
529 pthread_t thisThread = pthread_self();
532 // Set this thread's name:
533 err = pthread_setname_np(thisThread, "VOGLSendThd");
536 DEBUG_PRINT("%s:%d %s Unable to set the name of the thread. returned %d, errno %d\n", __FILE__, __LINE__, __func__, err, errno);
540 // Generate the sendQ to hold the messages.
541 m_pSendQueue = new queues::MtQueue();
542 if (NULL == m_pSendQueue)
544 // Unable to allocate a new send queue
549 mtqec = m_pSendQueue->Initialize(1000); // Not sure how large this should be. This is a guess.
550 if (MTQ_NONE != mtqec)
556 m_pSendChannel = new channel();
557 if (NULL == m_pSendChannel)
559 // Unable to allocate a new channel
566 portSend = m_basePort+2; // Needs to match portRecv in channelmgr::Connect
567 ec = m_pSendChannel->Connect(portSend, 0, m_fLocal);
570 DEBUG_PRINT("%s:%d %s Failed to accept on port %d for Send. returned %d, errno %d\n", __FILE__, __LINE__, __func__, portSend, ec, errno);
576 portSend = m_basePort+1; // Needs to match portRecv in channelmgr::Accept
577 ec = m_pSendChannel->Connect(m_server, portSend, 100, 500); // loop 100 times, waiting 500MS between attempts
580 DEBUG_PRINT("%s:%d %s Failed to connect to %s on port %d for Send. returned %d, errno %d\n", __FILE__, __LINE__, __func__, m_server, portSend, ec, errno);
585 m_fReadySendAsync = true;
587 while (!m_fTerminate)
589 if (fmessageSent && message)
598 // if we sent the last message dequeued, get a new one to send.
599 mtqec = m_pSendQueue->Dequeue(&message_size, &message, 20, 5);
600 if (MTQ_NONE != mtqec)
602 // Just keep going round until you get a message to send.
605 DEBUG_PRINT("%s:%d %s DQ'd message to send (size = %d)\n", __FILE__, __LINE__, __func__, message_size);
606 fmessageSent = false; // we have a new message to send
608 ec = this->SendDataInt(message_size, message);
611 DEBUG_PRINT("%s:%d %s Unable to send message (error = %x)\n", __FILE__, __LINE__, __func__, ec);
614 DEBUG_PRINT("%s:%d %s sent message (size = %d)\n", __FILE__, __LINE__, __func__, message_size);
615 fmessageSent = true; // the message was sent successfully
628 channelmgr::SendDataInt(unsigned int cbData, char *pbData)
632 ec = m_pSendChannel->WriteMsg(cbData, pbData, 2, 500);
635 DEBUG_PRINT("%s:%d %s Unable to send (error = %x, %d)\n", __FILE__, __LINE__, __func__, ec, errno);
639 DEBUG_PRINT("%s:%d %s sent(internal) message (size = %d)\n", __FILE__, __LINE__, __func__, cbData);
646 // Handle the reqresp thread
649 void *ReqRespRecvThreadLoop(void *arg)
651 channelmgr *pChannelMgr = (channelmgr *) arg;
653 pChannelMgr->DriveReqRespRecvLoop();
659 channelmgr::DriveReqRespRecvLoop()
663 unsigned int cbReq = 0, cbResp = 0;
664 char *pbReq = NULL, *pbResp = NULL;
665 pthread_t thisThread = pthread_self();
666 int portReqResp = m_basePort;
669 // Set this thread's name:
670 err = pthread_setname_np(thisThread, "VOGLRRThd");
673 DEBUG_PRINT("%s:%d %s Unable to set the name of the thread. returned %d, errno %d\n", __FILE__, __LINE__, __func__, err, errno);
676 m_pReqRespChannel = new channel();
677 if (NULL == m_pReqRespChannel)
679 // Unable to allocate a new channel
684 // This thread should only be running when m_fServer is true.
685 ec = m_pReqRespChannel->Connect(portReqResp, 5, m_fLocal);
688 DEBUG_PRINT("%s:%d %s Failed to accept on port %d for ReqResp. returned %d, errno %d\n", __FILE__, __LINE__, __func__, portReqResp, ec, errno);
692 m_fReadyReqResp = true;
694 while (!m_fTerminate)
706 ec = m_pReqRespChannel->ReadMsg(&cbReq, &pbReq, 5, 100); //100
707 if (EC_TIMEOUT == ec)
709 // Timeouts are not really errors
715 DEBUG_PRINT("%s:%d %s Error reading request (error = %x)\n", __FILE__, __LINE__, __func__, ec);
719 (*m_reqCallbackfn)(m_reqCallbackParam, cbReq, pbReq, &cbResp, &pbResp);
721 ec = m_pReqRespChannel->WriteMsg(cbResp, pbResp, 5, 0); //100
722 if (EC_TIMEOUT == ec)
724 // Timeouts are not really errors
730 DEBUG_PRINT("%s:%d %s Unable to write response (error = %x)\n", __FILE__, __LINE__, __func__, ec);
745 // Handle the Async Send thread
748 void *ReqRespSendThreadLoop(void *arg)
750 channelmgr *pChannelMgr = (channelmgr *) arg;
752 pChannelMgr->DriveReqRespSendLoop();
758 channelmgr::DriveReqRespSendLoop()
761 queues::MTQ_CODE mtqec = MTQ_NONE;
763 unsigned int message_size = 0;
764 char *message = NULL;
765 pthread_t thisThread = pthread_self();
766 int portReqResp = m_basePort;
768 // Set this thread's name:
769 err = pthread_setname_np(thisThread, "VOGLRRSndThd");
772 DEBUG_PRINT("%s:%d %s Unable to set the name of the thread. returned %d, errno %d\n", __FILE__, __LINE__, __func__, err, errno);
776 // Generate the sendQ to hold the messages.
777 m_pReqRespSendQueue = new queues::MtQueue();
778 if (NULL == m_pReqRespSendQueue)
780 // Unable to allocate a new send queue
785 mtqec = m_pReqRespSendQueue->Initialize(1000); // Not sure how large this should be. This is a guess.
786 if (MTQ_NONE != mtqec)
792 m_pReqRespChannel = new channel();
793 if (NULL == m_pReqRespChannel)
795 // Unable to allocate a new channel
800 ec = m_pReqRespChannel->Connect(m_server, portReqResp, 100, 500); // loop 100 times, waiting 500MS between attempts
805 m_fReadyReqResp = true;
807 while (!m_fTerminate)
814 mtqec = m_pReqRespSendQueue->Dequeue(&message_size, &message, 20, 5);
815 if (MTQ_NONE != mtqec)
817 // Just keep going round until you get a message to send.
821 ec = this->SendDataRRInt(message_size, message);
824 DEBUG_PRINT("%s:%d %s Unable to send message (error = %x)\n", __FILE__, __LINE__, __func__, ec);
838 channelmgr::SendDataRRInt(unsigned int cbData, char *pbData)
842 // Don't allow this call if we accepted connections (i.e. this is the "server" side of things)
843 if (true == m_fServer)
844 return EC_NOTALLOWED;
846 ec = m_pReqRespChannel->WriteMsg(cbData, pbData, 2, 0);
849 DEBUG_PRINT("%s:%d %s Unable to send request (error = %x, %d)\n", __FILE__, __LINE__, __func__, ec, errno);