Restructed code to make the jobs save the media data and don´t run in a queue any more

This commit is contained in:
Serraniel 2018-02-01 21:17:12 +01:00
parent b022306cfd
commit fb364f154d
2 changed files with 44 additions and 107 deletions

View file

@ -60,7 +60,7 @@ namespace DML.AppCore.Classes
return (from c in server.TextChannels where c.Id == id select c).FirstOrDefault(); return (from c in server.TextChannels where c.Id == id select c).FirstOrDefault();
} }
internal async Task<List<IMessage>> Scan() internal async Task Scan()
{ {
Debug($"Starting scan of guild {GuildId} channel {ChannelId}..."); Debug($"Starting scan of guild {GuildId} channel {ChannelId}...");
var result = new List<IMessage>(); var result = new List<IMessage>();
@ -74,11 +74,10 @@ namespace DML.AppCore.Classes
var channel = FindChannelById(guild, ChannelId); var channel = FindChannelById(guild, ChannelId);
Debug("Checking channel access"); Debug("Checking channel access");
//channel.GetUser(channel.Guild.CurrentUser.Id);
if (channel.GetUser(channel.Guild.CurrentUser.Id) == null) if (channel.GetUser(channel.Guild.CurrentUser.Id) == null)
{ {
Info("Skipping channel without access"); Info("Skipping channel without access");
return result; return;
} }
if (Math.Abs(StopTimestamp) < 0.4) if (Math.Abs(StopTimestamp) < 0.4)
@ -93,39 +92,24 @@ namespace DML.AppCore.Classes
Trace($"Downloading next {limit} messages..."); Trace($"Downloading next {limit} messages...");
if (isFirst) if (isFirst)
{ {
//messages = await channel.GetMessagesAsync(limit).ToArray() as SocketMessage[];
var realMessages = await channel.GetMessagesAsync(limit).ToArray(); var realMessages = await channel.GetMessagesAsync(limit).ToArray();
foreach (var realMessageArray in realMessages) messages.AddRange(realMessages.SelectMany(realMessageArray => realMessageArray));
{
foreach (var realMessage in realMessageArray)
{
messages.Add(realMessage);
}
}
} }
else else
{ {
var realMessages = await channel.GetMessagesAsync(lastId, Direction.Before, limit).ToArray(); var realMessages = await channel.GetMessagesAsync(lastId, Direction.Before, limit).ToArray();
foreach (var realMessageArray in realMessages) messages.AddRange(realMessages.SelectMany(realMessageArray => realMessageArray));
{
foreach (var realMessage in realMessageArray)
{
messages.Add(realMessage);
}
} }
//messages = await channel.GetMessagesAsync(lastId, Direction.Before, limit).ToArray() as SocketMessage[];
}
Trace($"Downloaded {messages.Count} messages."); Trace($"Downloaded {messages.Count} messages.");
isFirst = false; isFirst = false;
foreach (var m in messages) foreach (var m in messages)
{ {
if (!IsValid) if (!IsValid)
return null; return;
Core.Scheduler.MessagesScanned++; Core.Scheduler.MessagesScanned++;
@ -155,11 +139,29 @@ namespace DML.AppCore.Classes
finished = finished || messages.Count < limit; finished = finished || messages.Count < limit;
} }
Trace($"Downloaded all messages for guild {GuildId} channel {ChannelId}."); Trace($"Downloaded all messages for guild {GuildId} channel {ChannelId}.");
Trace("Sorting messages..."); Trace("Sorting messages...");
result.Sort((a, b) => DateTime.Compare(a.CreatedAt.UtcDateTime, b.CreatedAt.UtcDateTime)); result.Sort((a, b) => DateTime.Compare(a.CreatedAt.UtcDateTime, b.CreatedAt.UtcDateTime));
foreach (var r in result)
{
foreach (var a in r.Attachments)
{
var mediaData = new MediaData
{
Id = a.Id,
GuildId = (r.Channel as SocketTextChannel)?.Guild?.Id ?? 0,
ChannelId = r.Channel.Id,
DownloadSource = a.Url,
Filename = a.Filename,
TimeStamp = SweetUtils.DateTimeToUnixTimeStamp(r.CreatedAt.UtcDateTime)
};
mediaData.Store();
}
}
if (result.Count > 0) if (result.Count > 0)
{ {
Trace("Updating StopTimestamp for next scan..."); Trace("Updating StopTimestamp for next scan...");
@ -167,8 +169,6 @@ namespace DML.AppCore.Classes
} }
Debug($"Fisnished scan of guild {GuildId} channel {ChannelId}."); Debug($"Fisnished scan of guild {GuildId} channel {ChannelId}.");
return result;
} }
public void Stop() public void Stop()

View file

@ -82,99 +82,36 @@ namespace DML.AppCore.Classes
Run = false; Run = false;
} }
public void Start() public void StartAll()
{
Run = true;
Task.Run(async () =>
{ {
Logger.Info("Started JobScheduler..."); Logger.Info("Started JobScheduler...");
while (Run)
{
try
{
Logger.Debug("Entering job list handler loop..."); Logger.Debug("Entering job list handler loop...");
//foreach (var job in JobList) //foreach (var job in JobList)
for (var i = JobList.Count - 1; i >= 0; i--) for (var i = JobList.Count - 1; i >= 0; i--)
{
try
{ {
var job = JobList[i]; var job = JobList[i];
Logger.Debug($"Checking job {job}"); Logger.Debug($"Checking job {job}");
var hasJob = false;
Logger.Trace("Locking scheduler..."); Task.Run(async () =>
lock (this)
{ {
Logger.Trace("Checking if job is already performed..."); await job.Scan();
hasJob = RunningJobs.ContainsKey(job.Id); });
}
Logger.Trace("Unlocked scheduler.");
if (!hasJob)
{
Logger.Debug("Job is not performed yet...Performing job...");
var queue = new Queue<IMessage>();
Logger.Trace("Locking scheduler...");
lock (this)
{
Logger.Trace("Adding job to running jobs.");
RunningJobs.Add(job.Id, queue);
}
Logger.Trace("Unlocked scheduler.");
Logger.Trace("Issuing job message scan...");
var messages = await job.Scan();
if (messages == null)
continue;
Logger.Trace($"Adding {messages.Count} messages to queue...");
foreach (var msg in messages)
{
queue.Enqueue(msg);
}
Logger.Trace($"Added {queue.Count} messages to queue.");
if (messages.Count != queue.Count)
Logger.Warn("Not all messages have been added into the queue.");
var startedDownload = false;
while (!startedDownload)
{
Logger.Debug("Entering loop to check thread availability");
Logger.Trace("Locking scheduler...");
lock (this)
{
Logger.Trace(
$"Checking thread limit. Running: {RunningThreads}, Max: {Core.Settings.ThreadLimit}");
if (RunningThreads >= Core.Settings.ThreadLimit)
continue;
RunningThreads++;
startedDownload = true;
}
Logger.Trace("Unlocked scheduler.");
}
Logger.Trace("Start downloading job async.");
Task.Run(() => WorkQueue(job.Id)); // do not await to work parallel
}
}
} }
catch (Exception ex) catch (Exception ex)
{ {
Logger.Error(ex.Message); Logger.Error(ex.Message);
} }
} }
});
} }
private void WorkQueue(int jobId) private void WorkQueue(int jobId)
{ {
try try
{ {
Logger.Debug("Beginning job download..."); Logger.Debug("Beginning attachment download...");
Logger.Trace("Finding job..."); Logger.Trace("Finding job...");
var job = (from j in JobList where j.Id == jobId select j).FirstOrDefault(); var job = (from j in JobList where j.Id == jobId select j).FirstOrDefault();