mirror of
https://github.com/Radarr/Radarr.git
synced 2024-09-17 15:02:34 +02:00
Fixed: Refreshing Plex Server series in high volume systems
This commit is contained in:
parent
026c34c3ba
commit
35532b6789
@ -8,6 +8,7 @@ public interface ICacheManager
|
||||
{
|
||||
ICached<T> GetCache<T>(Type host);
|
||||
ICached<T> GetCache<T>(Type host, string name);
|
||||
ICached<T> GetRollingCache<T>(Type host, string name, TimeSpan defaultLifeTime);
|
||||
ICachedDictionary<T> GetCacheDictionary<T>(Type host, string name, Func<IDictionary<string, T>> fetchFunc = null, TimeSpan? lifeTime = null);
|
||||
void Clear();
|
||||
ICollection<ICached> Caches { get; }
|
||||
@ -43,6 +44,14 @@ public ICached<T> GetCache<T>(Type host, string name)
|
||||
return (ICached<T>)_cache.Get(host.FullName + "_" + name, () => new Cached<T>());
|
||||
}
|
||||
|
||||
public ICached<T> GetRollingCache<T>(Type host, string name, TimeSpan defaultLifeTime)
|
||||
{
|
||||
Ensure.That(host, () => host).IsNotNull();
|
||||
Ensure.That(name, () => name).IsNotNullOrWhiteSpace();
|
||||
|
||||
return (ICached<T>)_cache.Get(host.FullName + "_" + name, () => new Cached<T>(defaultLifeTime, true));
|
||||
}
|
||||
|
||||
public ICachedDictionary<T> GetCacheDictionary<T>(Type host, string name, Func<IDictionary<string, T>> fetchFunc = null, TimeSpan? lifeTime = null)
|
||||
{
|
||||
Ensure.That(host, () => host).IsNotNull();
|
||||
|
@ -1,8 +1,9 @@
|
||||
using System;
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using NzbDrone.Common.EnsureThat;
|
||||
using NzbDrone.Common.Extensions;
|
||||
|
||||
namespace NzbDrone.Common.Cache
|
||||
{
|
||||
@ -29,35 +30,49 @@ public bool IsExpired()
|
||||
}
|
||||
|
||||
private readonly ConcurrentDictionary<string, CacheItem> _store;
|
||||
private readonly TimeSpan? _defaultLifeTime;
|
||||
private readonly bool _rollingExpiry;
|
||||
|
||||
public Cached()
|
||||
public Cached(TimeSpan? defaultLifeTime = null, bool rollingExpiry = false)
|
||||
{
|
||||
_store = new ConcurrentDictionary<string, CacheItem>();
|
||||
_defaultLifeTime = defaultLifeTime;
|
||||
_rollingExpiry = rollingExpiry;
|
||||
}
|
||||
|
||||
public void Set(string key, T value, TimeSpan? lifetime = null)
|
||||
public void Set(string key, T value, TimeSpan? lifeTime = null)
|
||||
{
|
||||
Ensure.That(key, () => key).IsNotNullOrWhiteSpace();
|
||||
_store[key] = new CacheItem(value, lifetime);
|
||||
_store[key] = new CacheItem(value, lifeTime ?? _defaultLifeTime);
|
||||
}
|
||||
|
||||
public T Find(string key)
|
||||
{
|
||||
CacheItem value;
|
||||
_store.TryGetValue(key, out value);
|
||||
|
||||
if (value == null)
|
||||
CacheItem cacheItem;
|
||||
if (!_store.TryGetValue(key, out cacheItem))
|
||||
{
|
||||
return default(T);
|
||||
}
|
||||
|
||||
if (value.IsExpired())
|
||||
if (cacheItem.IsExpired())
|
||||
{
|
||||
_store.TryRemove(key, out value);
|
||||
return default(T);
|
||||
if (TryRemove(key, cacheItem))
|
||||
{
|
||||
return default(T);
|
||||
}
|
||||
|
||||
if (!_store.TryGetValue(key, out cacheItem))
|
||||
{
|
||||
return default(T);
|
||||
}
|
||||
}
|
||||
|
||||
return value.Object;
|
||||
if (_rollingExpiry && _defaultLifeTime.HasValue)
|
||||
{
|
||||
_store.TryUpdate(key, new CacheItem(cacheItem.Object, _defaultLifeTime.Value), cacheItem);
|
||||
}
|
||||
|
||||
return cacheItem.Object;
|
||||
}
|
||||
|
||||
public void Remove(string key)
|
||||
@ -72,20 +87,31 @@ public T Get(string key, Func<T> function, TimeSpan? lifeTime = null)
|
||||
{
|
||||
Ensure.That(key, () => key).IsNotNullOrWhiteSpace();
|
||||
|
||||
CacheItem cacheItem;
|
||||
T value;
|
||||
lifeTime = lifeTime ?? _defaultLifeTime;
|
||||
|
||||
if (!_store.TryGetValue(key, out cacheItem) || cacheItem.IsExpired())
|
||||
CacheItem cacheItem;
|
||||
|
||||
if (_store.TryGetValue(key, out cacheItem) && !cacheItem.IsExpired())
|
||||
{
|
||||
value = function();
|
||||
Set(key, value, lifeTime);
|
||||
if (_rollingExpiry && lifeTime.HasValue)
|
||||
{
|
||||
_store.TryUpdate(key, new CacheItem(cacheItem.Object, lifeTime), cacheItem);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
value = cacheItem.Object;
|
||||
var newCacheItem = new CacheItem(function(), lifeTime);
|
||||
if (cacheItem != null && _store.TryUpdate(key, newCacheItem, cacheItem))
|
||||
{
|
||||
cacheItem = newCacheItem;
|
||||
}
|
||||
else
|
||||
{
|
||||
cacheItem = _store.GetOrAdd(key, newCacheItem);
|
||||
}
|
||||
}
|
||||
|
||||
return value;
|
||||
return cacheItem.Object;
|
||||
}
|
||||
|
||||
public void Clear()
|
||||
@ -95,9 +121,11 @@ public void Clear()
|
||||
|
||||
public void ClearExpired()
|
||||
{
|
||||
foreach (var cached in _store.Where(c => c.Value.IsExpired()))
|
||||
var collection = (ICollection<KeyValuePair<string, CacheItem>>)_store;
|
||||
|
||||
foreach (var cached in _store.Where(c => c.Value.IsExpired()).ToList())
|
||||
{
|
||||
Remove(cached.Key);
|
||||
collection.Remove(cached);
|
||||
}
|
||||
}
|
||||
|
||||
@ -108,5 +136,12 @@ public ICollection<T> Values
|
||||
return _store.Values.Select(c => c.Object).ToList();
|
||||
}
|
||||
}
|
||||
|
||||
private bool TryRemove(string key, CacheItem value)
|
||||
{
|
||||
var collection = (ICollection<KeyValuePair<string, CacheItem>>)_store;
|
||||
|
||||
return collection.Remove(new KeyValuePair<string, CacheItem>(key, value));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,8 @@
|
||||
using NzbDrone.Common.Messaging;
|
||||
|
||||
namespace NzbDrone.Core.MediaFiles.Events
|
||||
{
|
||||
public class RenameCompletedEvent : IEvent
|
||||
{
|
||||
}
|
||||
}
|
@ -127,6 +127,8 @@ public void Execute(RenameFilesCommand message)
|
||||
_logger.ProgressInfo("Renaming {0} files for {1}", movieFiles.Count, movie.Title);
|
||||
RenameFiles(movieFiles, movie);
|
||||
_logger.ProgressInfo("Selected movie files renamed for {0}", movie.Title);
|
||||
|
||||
_eventAggregator.PublishEvent(new RenameCompletedEvent());
|
||||
}
|
||||
|
||||
public void Execute(RenameMovieCommand message)
|
||||
@ -141,6 +143,8 @@ public void Execute(RenameMovieCommand message)
|
||||
RenameFiles(movieFiles, movie);
|
||||
_logger.ProgressInfo("All movie files renamed for {0}", movie.Title);
|
||||
}
|
||||
|
||||
_eventAggregator.PublishEvent(new RenameCompletedEvent());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
using NzbDrone.Core.Movies;
|
||||
using NzbDrone.Core.Movies;
|
||||
using NzbDrone.Core.ThingiProvider;
|
||||
|
||||
namespace NzbDrone.Core.Notifications
|
||||
@ -12,6 +12,7 @@ public interface INotification : IProvider
|
||||
void OnMovieRename(Movie movie);
|
||||
void OnHealthIssue(HealthCheck.HealthCheck healthCheck);
|
||||
void OnDelete(DeleteMessage deleteMessage);
|
||||
void ProcessQueue();
|
||||
bool SupportsOnGrab { get; }
|
||||
bool SupportsOnDownload { get; }
|
||||
bool SupportsOnUpgrade { get; }
|
||||
|
@ -50,6 +50,10 @@ public virtual void OnDelete(DeleteMessage deleteMessage)
|
||||
{
|
||||
}
|
||||
|
||||
public virtual void ProcessQueue()
|
||||
{
|
||||
}
|
||||
|
||||
public bool SupportsOnGrab => HasConcreteImplementation("OnGrab");
|
||||
public bool SupportsOnRename => HasConcreteImplementation("OnMovieRename");
|
||||
public bool SupportsOnDownload => HasConcreteImplementation("OnDownload");
|
||||
|
@ -17,7 +17,10 @@ public class NotificationService
|
||||
IHandle<MovieGrabbedEvent>,
|
||||
IHandle<MovieDownloadedEvent>,
|
||||
IHandle<HealthCheckFailedEvent>,
|
||||
IHandle<MovieFileDeletedEvent>
|
||||
IHandle<MovieFileDeletedEvent>,
|
||||
IHandleAsync<DownloadsProcessedEvent>,
|
||||
IHandleAsync<RenameCompletedEvent>,
|
||||
IHandleAsync<HealthCheckCompleteEvent>
|
||||
{
|
||||
private readonly INotificationFactory _notificationFactory;
|
||||
private readonly Logger _logger;
|
||||
@ -176,26 +179,56 @@ public void Handle(HealthCheckFailedEvent message)
|
||||
|
||||
public void Handle(MovieFileDeletedEvent message)
|
||||
{
|
||||
var deleteMessage = new DeleteMessage();
|
||||
deleteMessage.Message = GetMessage(message.MovieFile.Movie, message.MovieFile.Quality);
|
||||
deleteMessage.MovieFile = message.MovieFile;
|
||||
deleteMessage.Movie = message.MovieFile.Movie;
|
||||
deleteMessage.Reason = message.Reason;
|
||||
var deleteMessage = new DeleteMessage();
|
||||
deleteMessage.Message = GetMessage(message.MovieFile.Movie, message.MovieFile.Quality);
|
||||
deleteMessage.MovieFile = message.MovieFile;
|
||||
deleteMessage.Movie = message.MovieFile.Movie;
|
||||
deleteMessage.Reason = message.Reason;
|
||||
|
||||
foreach (var notification in _notificationFactory.OnDeleteEnabled())
|
||||
foreach (var notification in _notificationFactory.OnDeleteEnabled())
|
||||
{
|
||||
try
|
||||
{
|
||||
try
|
||||
if (ShouldHandleMovie(notification.Definition, message.MovieFile.Movie))
|
||||
{
|
||||
if (ShouldHandleMovie(notification.Definition, message.MovieFile.Movie))
|
||||
{
|
||||
notification.OnDelete(deleteMessage);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.Warn(ex, "Unable to send OnDelete notification to: " + notification.Definition.Name);
|
||||
notification.OnDelete(deleteMessage);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.Warn(ex, "Unable to send OnDelete notification to: " + notification.Definition.Name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void HandleAsync(DownloadsProcessedEvent message)
|
||||
{
|
||||
ProcessQueue();
|
||||
}
|
||||
|
||||
public void HandleAsync(RenameCompletedEvent message)
|
||||
{
|
||||
ProcessQueue();
|
||||
}
|
||||
|
||||
public void HandleAsync(HealthCheckCompleteEvent message)
|
||||
{
|
||||
ProcessQueue();
|
||||
}
|
||||
|
||||
private void ProcessQueue()
|
||||
{
|
||||
foreach (var notification in _notificationFactory.GetAvailableProviders())
|
||||
{
|
||||
try
|
||||
{
|
||||
notification.ProcessQueue();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.Warn(ex, "Unable to process notification queue for " + notification.Definition.Name);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,9 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using FluentValidation.Results;
|
||||
using NLog;
|
||||
using NzbDrone.Common.Cache;
|
||||
using NzbDrone.Common.Extensions;
|
||||
using NzbDrone.Core.Exceptions;
|
||||
using NzbDrone.Core.Movies;
|
||||
@ -13,11 +16,23 @@ public class PlexServer : NotificationBase<PlexServerSettings>
|
||||
{
|
||||
private readonly IPlexServerService _plexServerService;
|
||||
private readonly IPlexTvService _plexTvService;
|
||||
private readonly Logger _logger;
|
||||
|
||||
public PlexServer(IPlexServerService plexServerService, IPlexTvService plexTvService)
|
||||
private class PlexUpdateQueue
|
||||
{
|
||||
public Dictionary<int, Movie> Pending { get; } = new Dictionary<int, Movie>();
|
||||
public bool Refreshing { get; set; }
|
||||
}
|
||||
|
||||
private readonly ICached<PlexUpdateQueue> _pendingMoviesCache;
|
||||
|
||||
public PlexServer(IPlexServerService plexServerService, IPlexTvService plexTvService, ICacheManager cacheManager, Logger logger)
|
||||
{
|
||||
_plexServerService = plexServerService;
|
||||
_plexTvService = plexTvService;
|
||||
_logger = logger;
|
||||
|
||||
_pendingMoviesCache = cacheManager.GetRollingCache<PlexUpdateQueue>(GetType(), "pendingSeries", TimeSpan.FromDays(1));
|
||||
}
|
||||
|
||||
public override string Link => "https://www.plex.tv/";
|
||||
@ -37,7 +52,65 @@ private void UpdateIfEnabled(Movie movie)
|
||||
{
|
||||
if (Settings.UpdateLibrary)
|
||||
{
|
||||
_plexServerService.UpdateLibrary(movie, Settings);
|
||||
_logger.Debug("Scheduling library update for movie {0} {1}", movie.Id, movie.Title);
|
||||
var queue = _pendingMoviesCache.Get(Settings.Host, () => new PlexUpdateQueue());
|
||||
lock (queue)
|
||||
{
|
||||
queue.Pending[movie.Id] = movie;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public override void ProcessQueue()
|
||||
{
|
||||
PlexUpdateQueue queue = _pendingMoviesCache.Find(Settings.Host);
|
||||
if (queue == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
lock (queue)
|
||||
{
|
||||
if (queue.Refreshing)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
queue.Refreshing = true;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
List<Movie> refreshingMovies;
|
||||
lock (queue)
|
||||
{
|
||||
if (queue.Pending.Empty())
|
||||
{
|
||||
queue.Refreshing = false;
|
||||
return;
|
||||
}
|
||||
|
||||
refreshingMovies = queue.Pending.Values.ToList();
|
||||
queue.Pending.Clear();
|
||||
}
|
||||
|
||||
if (Settings.UpdateLibrary)
|
||||
{
|
||||
_logger.Debug("Performing library update for {0} movies", refreshingMovies.Count);
|
||||
_plexServerService.UpdateLibrary(refreshingMovies, Settings);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch
|
||||
{
|
||||
lock (queue)
|
||||
{
|
||||
queue.Refreshing = false;
|
||||
}
|
||||
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using System.Text.RegularExpressions;
|
||||
using FluentValidation.Results;
|
||||
@ -13,6 +14,7 @@ namespace NzbDrone.Core.Notifications.Plex.Server
|
||||
public interface IPlexServerService
|
||||
{
|
||||
void UpdateLibrary(Movie movie, PlexServerSettings settings);
|
||||
void UpdateLibrary(IEnumerable<Movie> movie, PlexServerSettings settings);
|
||||
ValidationFailure Test(PlexServerSettings settings);
|
||||
}
|
||||
|
||||
@ -32,10 +34,16 @@ public PlexServerService(ICacheManager cacheManager, IPlexServerProxy plexServer
|
||||
}
|
||||
|
||||
public void UpdateLibrary(Movie movie, PlexServerSettings settings)
|
||||
{
|
||||
UpdateLibrary(new[] { movie }, settings);
|
||||
}
|
||||
|
||||
public void UpdateLibrary(IEnumerable<Movie> multipleMovies, PlexServerSettings settings)
|
||||
{
|
||||
try
|
||||
{
|
||||
_logger.Debug("Sending Update Request to Plex Server");
|
||||
var watch = Stopwatch.StartNew();
|
||||
|
||||
var version = _versionCache.Get(settings.Host, () => GetVersion(settings), TimeSpan.FromHours(2));
|
||||
ValidateVersion(version);
|
||||
@ -45,12 +53,31 @@ public void UpdateLibrary(Movie movie, PlexServerSettings settings)
|
||||
|
||||
if (partialUpdates)
|
||||
{
|
||||
UpdatePartialSection(movie, sections, settings);
|
||||
var partiallyUpdated = true;
|
||||
|
||||
foreach (var movie in multipleMovies)
|
||||
{
|
||||
partiallyUpdated &= UpdatePartialSection(movie, sections, settings);
|
||||
|
||||
if (!partiallyUpdated)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Only update complete sections if all partial updates failed
|
||||
if (!partiallyUpdated)
|
||||
{
|
||||
_logger.Debug("Unable to update partial section, updating all Movie sections");
|
||||
sections.ForEach(s => UpdateSection(s.Id, settings));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
sections.ForEach(s => UpdateSection(s.Id, settings));
|
||||
}
|
||||
|
||||
_logger.Debug("Finished sending Update Request to Plex Server (took {0} ms)", watch.ElapsedMilliseconds);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@ -123,7 +150,7 @@ private void UpdateSection(int sectionId, PlexServerSettings settings)
|
||||
_plexServerProxy.Update(sectionId, settings);
|
||||
}
|
||||
|
||||
private void UpdatePartialSection(Movie movie, List<PlexSection> sections, PlexServerSettings settings)
|
||||
private bool UpdatePartialSection(Movie movie, List<PlexSection> sections, PlexServerSettings settings)
|
||||
{
|
||||
var partiallyUpdated = false;
|
||||
|
||||
@ -140,12 +167,7 @@ private void UpdatePartialSection(Movie movie, List<PlexSection> sections, PlexS
|
||||
}
|
||||
}
|
||||
|
||||
// Only update complete sections if all partial updates failed
|
||||
if (!partiallyUpdated)
|
||||
{
|
||||
_logger.Debug("Unable to update partial section, updating all Movie sections");
|
||||
sections.ForEach(s => UpdateSection(s.Id, settings));
|
||||
}
|
||||
return partiallyUpdated;
|
||||
}
|
||||
|
||||
private int? GetMetadataId(int sectionId, Movie movie, string language, PlexServerSettings settings)
|
||||
@ -159,6 +181,8 @@ public ValidationFailure Test(PlexServerSettings settings)
|
||||
{
|
||||
try
|
||||
{
|
||||
_versionCache.Remove(settings.Host);
|
||||
_partialUpdateCache.Remove(settings.Host);
|
||||
var sections = GetSections(settings);
|
||||
|
||||
if (sections.Empty())
|
||||
|
Loading…
Reference in New Issue
Block a user