Contine on exception during scanning

This commit is contained in:
Serraniel 2018-01-22 20:07:10 +01:00
parent ee72907744
commit b5945036fd

View file

@ -91,73 +91,81 @@ namespace DML.AppCore.Classes
Logger.Info("Started JobScheduler...");
while (Run)
{
Logger.Debug("Entering job list handler loop...");
//foreach (var job in JobList)
for (var i = JobList.Count - 1; i >= 0; i--)
try
{
var job = JobList[i];
Logger.Debug($"Checking job {job}");
var hasJob = false;
Logger.Trace("Locking scheduler...");
lock (this)
Logger.Debug("Entering job list handler loop...");
//foreach (var job in JobList)
for (var i = JobList.Count - 1; i >= 0; i--)
{
Logger.Trace("Checking if job is already performed...");
hasJob = RunningJobs.ContainsKey(job.Id);
}
Logger.Trace("Unlocked scheduler.");
if (!hasJob)
{
Logger.Debug("Job is not performed yet...Performing job...");
var queue = new Queue<IMessage>();
var job = JobList[i];
Logger.Debug($"Checking job {job}");
var hasJob = false;
Logger.Trace("Locking scheduler...");
lock (this)
{
Logger.Trace("Adding job to running jobs.");
RunningJobs.Add(job.Id, queue);
Logger.Trace("Checking if job is already performed...");
hasJob = RunningJobs.ContainsKey(job.Id);
}
Logger.Trace("Unlocked scheduler.");
Logger.Trace("Issuing job message scan...");
var messages = await job.Scan();
if (messages == null)
continue;
Logger.Trace($"Adding {messages.Count} messages to queue...");
foreach (var msg in messages)
if (!hasJob)
{
queue.Enqueue(msg);
}
Logger.Trace($"Added {queue.Count} messages to queue.");
Logger.Debug("Job is not performed yet...Performing job...");
var queue = new Queue<IMessage>();
if (messages.Count != queue.Count)
Logger.Warn("Not all messages have been added into the queue.");
var startedDownload = false;
while (!startedDownload)
{
Logger.Debug("Entering loop to check thread availability");
Logger.Trace("Locking scheduler...");
lock (this)
{
Logger.Trace($"Checking thread limit. Running: {RunningThreads}, Max: {Core.Settings.ThreadLimit}");
if (RunningThreads >= Core.Settings.ThreadLimit)
continue;
RunningThreads++;
startedDownload = true;
Logger.Trace("Adding job to running jobs.");
RunningJobs.Add(job.Id, queue);
}
Logger.Trace("Unlocked scheduler.");
}
Logger.Trace("Start downloading job async.");
Task.Run(() => WorkQueue(job.Id)); // do not await to work parallel
Logger.Trace("Issuing job message scan...");
var messages = await job.Scan();
if (messages == null)
continue;
Logger.Trace($"Adding {messages.Count} messages to queue...");
foreach (var msg in messages)
{
queue.Enqueue(msg);
}
Logger.Trace($"Added {queue.Count} messages to queue.");
if (messages.Count != queue.Count)
Logger.Warn("Not all messages have been added into the queue.");
var startedDownload = false;
while (!startedDownload)
{
Logger.Debug("Entering loop to check thread availability");
Logger.Trace("Locking scheduler...");
lock (this)
{
Logger.Trace(
$"Checking thread limit. Running: {RunningThreads}, Max: {Core.Settings.ThreadLimit}");
if (RunningThreads >= Core.Settings.ThreadLimit)
continue;
RunningThreads++;
startedDownload = true;
}
Logger.Trace("Unlocked scheduler.");
}
Logger.Trace("Start downloading job async.");
Task.Run(() => WorkQueue(job.Id)); // do not await to work parallel
}
}
}
catch (Exception ex)
{
Logger.Error(ex.Message);
}
}
});
}