commit 1780ef83e0554657761332aa7fe896732aba9205 (HEAD, refs/heads/master) Author: Adam Sutton Date: Tue Apr 29 11:01:15 2014 +0100 WIP [pvr.tvh] - put general message processing in thread This is to ensure that demux data get's processed efficiently. In reality I could (should?) also split out the less critical demux status messages. But I think at this stage that's probably not necessary. diff --git a/addons/pvr.tvh/src/HTSPConnection.cpp b/addons/pvr.tvh/src/HTSPConnection.cpp index cb75214..33bb17c 100644 --- a/addons/pvr.tvh/src/HTSPConnection.cpp +++ b/addons/pvr.tvh/src/HTSPConnection.cpp @@ -253,16 +253,16 @@ bool CHTSPConnection::ReadMessage ( void ) if (!(method = htsmsg_get_str(msg, "method"))) { tvherror("message without a method"); - goto done; + htsmsg_destroy(msg); + return true; } tvhtrace("receive message [%s]", method); - /* Pass */ - tvh->ProcessMessage(method, msg); + /* Pass (if return is true, message is finished) */ + if (tvh->ProcessMessage(method, msg)) + htsmsg_destroy(msg); + // TODO: maybe a copy should be made if it needs to be kept? - /* Free */ -done: - htsmsg_destroy(msg); return true; } diff --git a/addons/pvr.tvh/src/Tvheadend.cpp b/addons/pvr.tvh/src/Tvheadend.cpp index 78c0810..0927fda 100644 --- a/addons/pvr.tvh/src/Tvheadend.cpp +++ b/addons/pvr.tvh/src/Tvheadend.cpp @@ -722,74 +722,102 @@ bool CTvheadend::ProcessMessage ( const char *method, htsmsg_t *msg ) if (m_vfs.ProcessMessage(method, msg)) return true; - /* Lock */ - { - CLockObject lock(m_mutex); - - /* Channels */ - if (!strcmp("channelAdd", method) || !strcmp("channelUpdate", method)) - ParseChannelUpdate(msg); - else if (!strcmp("channelDelete", method)) - ParseChannelDelete(msg); - - /* Tags */ - else if (!strcmp("tagAdd", method) || !strcmp("tagUpdate", method)) - ParseTagUpdate(msg); - else if (!strcmp("tagDelete", method)) - ParseTagDelete(msg); - - /* Recordings */ - else if (!strcmp("dvrEntryAdd", method) || - !strcmp("dvrEntryUpdate", method)) - ParseRecordingUpdate(msg); - else if (!strcmp("dvrEntryDelete", method)) - ParseRecordingDelete(msg); - - /* EPG */ - else if (!strcmp("eventAdd", method) || !strcmp("eventUpdate", method)) - ParseEventUpdate(msg); - else if (!strcmp("eventDelete", method)) - ParseEventDelete(msg); - - /* ASync complete */ - else if (!strcmp("initialSyncCompleted", method)) - SyncCompleted(); - - /* Unknown */ - else - tvhdebug("unhandled message [%s]", method); - } + /* Store */ + m_queue.Push(CHTSPMessage(method, msg)); + return false; +} + +void* CTvheadend::Process ( void ) +{ + CHTSPMessage msg; + const char *method; - /* Process events - * Note: due to potential deadly embrace this must be done without the - * m_mutex held! - */ - SHTSPEventList::const_iterator it; - for (it = m_events.begin(); it != m_events.end(); ++it) + while (!IsStopped()) { - switch (it->m_type) + /* Check Q */ + // this is a bit horrible, but meh + if (!m_queue.Pop(msg, 2000)) + continue; + if (!msg.m_msg) + continue; + method = msg.m_method.c_str(); + + /* Scope lock for processing */ { - case HTSP_EVENT_TAG_UPDATE: - PVR->TriggerChannelGroupsUpdate(); - break; - case HTSP_EVENT_CHN_UPDATE: - PVR->TriggerChannelUpdate(); - break; - case HTSP_EVENT_REC_UPDATE: - PVR->TriggerTimerUpdate(); - PVR->TriggerRecordingUpdate(); - break; - case HTSP_EVENT_EPG_UPDATE: - PVR->TriggerEpgUpdate(it->m_idx); - break; - case HTSP_EVENT_NONE: - break; + CLockObject lock(m_mutex); + + /* Channels */ + if (!strcmp("channelAdd", method) || + !strcmp("channelUpdate", method)) + ParseChannelUpdate(msg.m_msg); + else if (!strcmp("channelDelete", method)) + ParseChannelDelete(msg.m_msg); + + /* Tags */ + else if (!strcmp("tagAdd", method) || + !strcmp("tagUpdate", method)) + ParseTagUpdate(msg.m_msg); + else if (!strcmp("tagDelete", method)) + ParseTagDelete(msg.m_msg); + + /* Recordings */ + else if (!strcmp("dvrEntryAdd", method) || + !strcmp("dvrEntryUpdate", method)) + ParseRecordingUpdate(msg.m_msg); + else if (!strcmp("dvrEntryDelete", method)) + ParseRecordingDelete(msg.m_msg); + + /* EPG */ + else if (!strcmp("eventAdd", method) || + !strcmp("eventUpdate", method)) + ParseEventUpdate(msg.m_msg); + else if (!strcmp("eventDelete", method)) + ParseEventDelete(msg.m_msg); + + /* ASync complete */ + else if (!strcmp("initialSyncCompleted", method)) + SyncCompleted(); + + /* Unknown */ + else + tvhdebug("unhandled message [%s]", method); } + + /* Manual delete rather than waiting */ + htsmsg_destroy(msg.m_msg); + msg.m_msg = NULL; + + /* Process events + * Note: due to potential deadly embrace this must be done without the + * m_mutex held! + */ + SHTSPEventList::const_iterator it; + for (it = m_events.begin(); it != m_events.end(); ++it) + { + switch (it->m_type) + { + case HTSP_EVENT_TAG_UPDATE: + PVR->TriggerChannelGroupsUpdate(); + break; + case HTSP_EVENT_CHN_UPDATE: + PVR->TriggerChannelUpdate(); + break; + case HTSP_EVENT_REC_UPDATE: + PVR->TriggerTimerUpdate(); + PVR->TriggerRecordingUpdate(); + break; + case HTSP_EVENT_EPG_UPDATE: + PVR->TriggerEpgUpdate(it->m_idx); + break; + case HTSP_EVENT_NONE: + break; + } + } + m_events.clear(); } - m_events.clear(); /* Local */ - return true; + return NULL; } void CTvheadend::SyncCompleted ( void ) diff --git a/addons/pvr.tvh/src/Tvheadend.h b/addons/pvr.tvh/src/Tvheadend.h index 0bcc28c..9b0bcd9 100644 --- a/addons/pvr.tvh/src/Tvheadend.h +++ b/addons/pvr.tvh/src/Tvheadend.h @@ -32,6 +32,7 @@ #include "CircBuffer.h" #include "HTSPTypes.h" #include +#include #include #include @@ -83,10 +84,12 @@ class CHTSPConnection; class CHTSPDemuxer; class CHTSPVFS; class CHTSPResponse; +class CHTSPMessage; /* Typedefs */ typedef std::runtime_error AuthException; typedef std::map CHTSPResponseList; +typedef PLATFORM::SyncedBuffer CHTSPMessageQueue; /* * Global (TODO: might want to change this) @@ -110,6 +113,37 @@ private: }; /* + * HTSP Message + */ +class CHTSPMessage +{ +public: + CHTSPMessage(std::string method = "", htsmsg_t *msg = NULL) + : m_method(method), m_msg(msg) + { + } + ~CHTSPMessage() + { + if (m_msg) + htsmsg_destroy(m_msg); + } + CHTSPMessage& operator=(const CHTSPMessage &msg) + { + if (this != &msg) + { + if (m_msg) + htsmsg_destroy(m_msg); + m_method = msg.m_method; + m_msg = msg.m_msg; + msg.m_msg = NULL; // ownership is passed + } + return *this; + } + std::string m_method; + mutable htsmsg_t *m_msg; +}; + +/* * HTSP Connection registration thread */ class CHTSPRegister @@ -286,6 +320,7 @@ private: * Root object for Tvheadend connection */ class CTvheadend + : PLATFORM::CThread { public: CTvheadend(); @@ -331,6 +366,8 @@ private: CHTSPDemuxer m_dmx; CHTSPVFS m_vfs; + CHTSPMessageQueue m_queue; + SChannels m_channels; STags m_tags; SRecordings m_recordings; @@ -351,6 +388,11 @@ private: CStdString GetImageURL ( const char *str ); /* + * Message processing + */ + void *Process ( void ); + + /* * Event handling */ inline void TriggerChannelGroupsUpdate ( void )