Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/api/Logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ namespace MAT_NS_BEGIN

Logger::~Logger() noexcept
{
LOG_TRACE("%p: Destroyed", this);
// Intentionally empty — logging here triggers a static-destruction-order
// crash on iOS simulator (recursive_mutex used after teardown).
}

ISemanticContext* Logger::GetSemanticContext() const
Expand Down
4 changes: 0 additions & 4 deletions lib/http/HttpResponseDecoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,11 @@ namespace MAT_NS_BEGIN {
break;

case HttpResult_Aborted:
ctx->httpResponse = nullptr;
outcome = Abort;
break;

case HttpResult_LocalFailure:
case HttpResult_NetworkFailure:
ctx->httpResponse = nullptr;
outcome = RetryNetwork;
break;
}
Expand Down Expand Up @@ -129,7 +127,6 @@ namespace MAT_NS_BEGIN {
evt.param1 = 0; // response.GetStatusCode();
DispatchEvent(evt);
}
ctx->httpResponse = nullptr;
// eventsRejected(ctx); // FIXME: [MG] - investigate why ctx gets corrupt after eventsRejected
requestAborted(ctx);
break;
Expand Down Expand Up @@ -253,4 +250,3 @@ namespace MAT_NS_BEGIN {
}

} MAT_NS_END

65 changes: 49 additions & 16 deletions lib/pal/WorkerThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include "pal/WorkerThread.hpp"
#include "pal/PAL.hpp"

#include <system_error>

#if defined(MATSDK_PAL_CPP11) || defined(MATSDK_PAL_WIN32)

/* Maximum scheduler interval for SDK is 1 hour required for clamping in case of monotonic clock drift */
Expand Down Expand Up @@ -35,7 +37,7 @@ namespace PAL_NS_BEGIN {
std::list<MAT::Task*> m_timerQueue;
Event m_event;
MAT::Task* m_itemInProgress;
int count = 0;
bool m_shuttingDown = false;

public:

Expand All @@ -53,32 +55,65 @@ namespace PAL_NS_BEGIN {

void Join() final
{
auto item = new WorkerThreadShutdownItem();
Queue(item);
std::thread::id this_id = std::this_thread::get_id();
bool joined = false;
{
LOCKGUARD(m_lock);
if (!m_shuttingDown) {
m_shuttingDown = true;
m_queue.push_back(new WorkerThreadShutdownItem());
m_event.post();
}
}
try {
if (m_hThread.joinable() && (m_hThread.get_id() != this_id))
if (!m_hThread.joinable()) {
return;
}
if (m_hThread.get_id() != this_id) {
m_hThread.join();
else
joined = true;
} else {
m_hThread.detach();
}
}
catch (const std::system_error& e) {
LOG_ERROR("Thread join/detach failed: [%d] %s", e.code().value(), e.what());
}
catch (const std::exception& e) {
LOG_ERROR("Thread join/detach failed: %s", e.what());
}
catch (...) {};

// TODO: [MG] - investigate if we ever drop work items on shutdown.
if (!m_queue.empty())
{
LOG_WARN("m_queue is not empty!");
// Log pending work in both paths so operators can see if
// shutdown is dropping tasks.
LOCKGUARD(m_lock);
if (!m_queue.empty()) {
LOG_WARN("Shutdown with %zu queued task(s) pending", m_queue.size());
}
if (!m_timerQueue.empty())
{
LOG_WARN("m_timerQueue is not empty!");
if (!m_timerQueue.empty()) {
LOG_WARN("Shutdown with %zu timer(s) pending", m_timerQueue.size());
}

// Clean up any tasks remaining in the queues after shutdown.
// Only safe after join() — the thread has fully exited.
// After detach(), the thread still needs the shutdown item
// and may still be accessing the queues.
if (joined) {
for (auto task : m_queue) { delete task; }
m_queue.clear();
for (auto task : m_timerQueue) { delete task; }
Comment thread
bmehta001 marked this conversation as resolved.
m_timerQueue.clear();
}
}

void Queue(MAT::Task* item) final
{
LOG_INFO("queue item=%p", &item);
LOG_INFO("queue item=%p", static_cast<void*>(item));
LOCKGUARD(m_lock);
if (m_shuttingDown) {
LOG_WARN("Dropping queued task %p during shutdown", static_cast<void*>(item));
delete item;
return;
Comment on lines +112 to +115
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pointer is only used for checking the identity of the task or queue removal, not for accessing the actual task

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if Cancel() only compares the pointer today, the handle still points to a task that Queue() already deleted. That feels unsafe because the caller gets a normal-looking handle with no signal that scheduling failed; can we return an invalid/no-op handle instead?

}
if (item->Type == MAT::Task::TimedCall) {
auto it = m_timerQueue.begin();
while (it != m_timerQueue.end() && (*it)->TargetTime < item->TargetTime) {
Expand All @@ -89,7 +124,6 @@ namespace PAL_NS_BEGIN {
else {
m_queue.push_back(item);
}
count++;
m_event.post();
}

Expand Down Expand Up @@ -261,4 +295,3 @@ namespace PAL_NS_BEGIN {
} PAL_NS_END

#endif

132 changes: 96 additions & 36 deletions lib/tpm/TransmissionPolicyManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,26 +111,36 @@ namespace MAT_NS_BEGIN {
LOG_TRACE("Collector URL is not set, no upload.");
return;
}
LOCKGUARD(m_scheduledUploadMutex);
if (delay.count() < 0 || m_timerdelay.count() < 0)
{
LOG_TRACE("Negative delay(%d) or m_timerdelay(%d), no upload", delay.count(), m_timerdelay.count());
return;
}
if (m_scheduledUploadAborted)
{
LOG_TRACE("Scheduled upload aborted, no upload.");
return;
}
if (uploadCount() >= static_cast<uint32_t>(m_config[CFG_INT_MAX_PENDING_REQ]) )
auto shouldSkipScheduling = [&delay, this]() -> bool
{
LOG_TRACE("Maximum number of HTTP requests reached");
return;
}
if (delay.count() < 0 || m_timerdelay.count() < 0)
{
LOG_TRACE("Negative delay(%lld) or m_timerdelay(%lld), no upload",
delay.count(), m_timerdelay.count());
return true;
}
if (m_scheduledUploadAborted)
{
LOG_TRACE("Scheduled upload aborted, no upload.");
return true;
}
if (uploadCount() >= static_cast<uint32_t>(m_config[CFG_INT_MAX_PENDING_REQ]))
{
LOG_TRACE("Maximum number of HTTP requests reached");
return true;
}
if (m_isPaused)
{
LOG_TRACE("Paused, not uploading anything until resumed");
return true;
}

return false;
};

if (m_isPaused)
LOCKGUARD(m_scheduledUploadMutex);
if (shouldSkipScheduling())
{
LOG_TRACE("Paused, not uploading anything until resumed");
return;
}

Expand All @@ -151,29 +161,45 @@ namespace MAT_NS_BEGIN {
if (delta <= static_cast<uint64_t>(delay.count()))
{
// Don't need to cancel and reschedule if it's about to happen now anyways.
// m_isUploadScheduled check does not have to be strictly atomic because
// the completion of upload will schedule more uploads as-needed, we only
// want to avoid the unnecessary wasteful rescheduling.
LOG_TRACE("WAIT upload %d ms for lat=%d", delta, m_runningLatency);
LOG_TRACE("WAIT upload %llu ms for lat=%d", delta, m_runningLatency);
return;
}
}

// Cancel upload if already scheduled.
if (force || delay.count() == 0)
{
if (!cancelUploadTask())
if (!cancelUploadTaskNoWaitLocked())
{
LOG_TRACE("Upload either hasn't been scheduled or already done.");
// Cancel can return false when the previous upload task is
// currently executing on the worker. If uploadAsync hasn't
// yet entered its own LOCKGUARD (m_isUploadScheduled is
// still set under the mutex we hold), propagate the
// requested latency so the running task picks it up when
// it acquires m_scheduledUploadMutex. Otherwise the
// running task has already cleared the flag and the
// schedule below will queue a fresh task.
if (m_isUploadScheduled)
{
m_runningLatency = latency;
}
}
if (shouldSkipScheduling())
{
return;
}
}

// Schedule new upload
if (!m_isUploadScheduled.exchange(true))
if (!m_isUploadScheduled)
{
m_isUploadScheduled = true;
m_scheduledUploadTime = PAL::getMonotonicTimeMs() + delay.count();
m_runningLatency = latency;
LOG_TRACE("SCHED upload %d ms for lat=%d", delay.count(), m_runningLatency);
LOG_TRACE("SCHED upload %lld ms for lat=%d", delay.count(), m_runningLatency);
m_scheduledUpload = PAL::scheduleTask(&m_taskDispatcher, static_cast<unsigned>(delay.count()), this, &TransmissionPolicyManager::uploadAsync, latency);
}
}
Expand All @@ -184,16 +210,15 @@ namespace MAT_NS_BEGIN {
if (guard.isPaused()) {
return;
}
m_runningLatency = latency;
m_scheduledUploadTime = std::numeric_limits<uint64_t>::max();

EventLatency requestedLatency = latency;
{
LOCKGUARD(m_scheduledUploadMutex);
requestedLatency = m_runningLatency;
m_scheduledUploadTime = std::numeric_limits<uint64_t>::max();
m_isUploadScheduled = false; // Allow to schedule another uploadAsync
if ((m_isPaused) || (m_scheduledUploadAborted))
{
LOG_TRACE("Paused or upload aborted: cancel pending upload task.");
cancelUploadTask(); // If there is a pending upload task, kill it
LOG_TRACE("Paused or upload aborted: skip upload.");
return;
}
}
Expand All @@ -210,14 +235,14 @@ namespace MAT_NS_BEGIN {
unsigned delayMs = 1000;
LOG_INFO("Bandwidth controller proposed bandwidth %u bytes/sec but minimum accepted is %u, will retry %u ms later",
proposedBandwidthBps, minimumBandwidthBps, delayMs);
scheduleUpload(delayMs, latency); // reschedule uploadAsync to run again 1000 ms later
scheduleUpload(std::chrono::milliseconds{delayMs}, requestedLatency); // reschedule uploadAsync to run again 1000 ms later
return;
}
}
#endif

auto ctx = m_system.createEventsUploadContext();
ctx->requestedMinLatency = m_runningLatency;
ctx->requestedMinLatency = requestedLatency;
addUpload(ctx);
initiateUpload(ctx);
}
Expand All @@ -238,7 +263,7 @@ namespace MAT_NS_BEGIN {
// Rescheduling upload
if (nextUpload.count() >= 0)
{
LOG_TRACE("Scheduling upload in %d ms", nextUpload.count());
LOG_TRACE("Scheduling upload in %lld ms", nextUpload.count());
EventLatency proposed = calculateNewPriority();
scheduleUpload(nextUpload, proposed); // reschedule uploadAsync again
}
Expand Down Expand Up @@ -284,9 +309,9 @@ namespace MAT_NS_BEGIN {
LOCKGUARD(m_scheduledUploadMutex);
// Prevent execution of all upload tasks
m_scheduledUploadAborted = true;
// Make sure we wait for completion of the upload scheduling task that may be running
cancelUploadTask();
}
// Make sure we wait for completion of the upload scheduling task that may be running
cancelUploadTask();

// Make sure we wait for all active upload callbacks to finish
while (uploadCount() > 0)
Expand Down Expand Up @@ -342,7 +367,12 @@ namespace MAT_NS_BEGIN {
}

// Schedule async upload if not scheduled yet
if (!m_isUploadScheduled || TransmitProfiles::isTimerUpdateRequired())
bool isUploadScheduled = false;
{
LOCKGUARD(m_scheduledUploadMutex);
isUploadScheduled = m_isUploadScheduled;
}
if (!isUploadScheduled || TransmitProfiles::isTimerUpdateRequired())
{
if (updateTimersIfNecessary())
{
Expand Down Expand Up @@ -374,7 +404,13 @@ namespace MAT_NS_BEGIN {
return EventLatency_RealTime;
}

if (m_runningLatency == EventLatency_RealTime)
EventLatency runningLatency = EventLatency_RealTime;
{
LOCKGUARD(m_scheduledUploadMutex);
runningLatency = m_runningLatency;
}

if (runningLatency == EventLatency_RealTime)
{
return EventLatency_Normal;
}
Expand Down Expand Up @@ -453,16 +489,39 @@ namespace MAT_NS_BEGIN {
return (m_scheduledUploadAborted) ? DefaultTaskCancelTime : std::chrono::milliseconds {};
}

bool TransmissionPolicyManager::cancelUploadTaskNoWaitLocked()
{
bool result = m_scheduledUpload.Cancel(std::chrono::milliseconds {}.count());

if (result)
{
m_isUploadScheduled = false;
m_scheduledUploadTime = std::numeric_limits<uint64_t>::max();
}
return result;
}

bool TransmissionPolicyManager::cancelUploadTask()
{
bool result = m_scheduledUpload.Cancel(getCancelWaitTime().count());
auto waitTime = std::chrono::milliseconds{};
{
LOCKGUARD(m_scheduledUploadMutex);
waitTime = getCancelWaitTime();
if (waitTime.count() == 0)
{
return cancelUploadTaskNoWaitLocked();
}
}
bool result = m_scheduledUpload.Cancel(waitTime.count());

// TODO: There is a potential for upload tasks to not be canceled, especially if they aren't waited for.
// We either need a stronger guarantee here (could impact SDK performance), or a mechanism to
// ensure those tasks are canceled when the log manager is destroyed. Issue 388
if (result)
{
m_isUploadScheduled.exchange(false);
LOCKGUARD(m_scheduledUploadMutex);
m_isUploadScheduled = false;
m_scheduledUploadTime = std::numeric_limits<uint64_t>::max();
}
return result;
}
Expand All @@ -476,6 +535,7 @@ namespace MAT_NS_BEGIN {
bool TransmissionPolicyManager::isUploadInProgress() const noexcept
{
// unfinished uploads that haven't processed callbacks or pending upload task
LOCKGUARD(m_scheduledUploadMutex);
return (uploadCount() > 0) || m_isUploadScheduled;
}

Expand Down
Loading
Loading