diff --git a/NzbDrone.Common.Test/EventingTests/MessageAggregatorEventTests.cs b/NzbDrone.Common.Test/EventingTests/MessageAggregatorEventTests.cs index 52daf6148..b41f81124 100644 --- a/NzbDrone.Common.Test/EventingTests/MessageAggregatorEventTests.cs +++ b/NzbDrone.Common.Test/EventingTests/MessageAggregatorEventTests.cs @@ -1,9 +1,11 @@ using System; using System.Collections.Generic; +using System.Threading; using Moq; using NUnit.Framework; using NzbDrone.Common.Messaging; using NzbDrone.Test.Common; +using FluentAssertions; namespace NzbDrone.Common.Test.EventingTests { @@ -16,6 +18,8 @@ public class MessageAggregatorEventTests : TestBase private Mock> HandlerB1; private Mock> HandlerB2; + private Mock> AsyncHandlerA1; + [SetUp] public void Setup() @@ -25,6 +29,8 @@ public void Setup() HandlerB1 = new Mock>(); HandlerB2 = new Mock>(); + AsyncHandlerA1 = new Mock>(); + Mocker.GetMock() .Setup(c => c.BuildAll>()) .Returns(new List> { HandlerA1.Object, HandlerA2.Object }); @@ -79,6 +85,50 @@ public void broken_handler_should_not_effect_others_handler() ExceptionVerification.ExpectedErrors(1); } + + [Test] + public void should_queue_multiple_async_events() + { + var eventA = new EventA(); + + + + var handlers = new List> + { + AsyncHandlerA1.Object, + AsyncHandlerA1.Object, + AsyncHandlerA1.Object, + AsyncHandlerA1.Object, + AsyncHandlerA1.Object, + AsyncHandlerA1.Object, + AsyncHandlerA1.Object, + }; + + Mocker.GetMock() + .Setup(c => c.BuildAll>()) + .Returns(new List>()); + + Mocker.GetMock() + .Setup(c => c.BuildAll>()) + .Returns(handlers); + + var counter = new ConcurrencyCounter(handlers.Count); + + + AsyncHandlerA1.Setup(c => c.HandleAsync(It.IsAny())) + .Callback(c => + { + var id = counter.Start(); + Thread.Sleep(1000); + counter.Stop(id); + }); + + Subject.PublishEvent(eventA); + + counter.WaitForAllItems(); + + counter.MaxThreads.Should().Be(2); + } } diff --git a/NzbDrone.Common/Messaging/LimitedConcurrencyLevelTaskScheduler.cs b/NzbDrone.Common/Messaging/LimitedConcurrencyLevelTaskScheduler.cs new file mode 100644 index 000000000..a8d9238ab --- /dev/null +++ b/NzbDrone.Common/Messaging/LimitedConcurrencyLevelTaskScheduler.cs @@ -0,0 +1,133 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace NzbDrone.Common.Messaging +{ + public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler + { + /// Whether the current thread is processing work items. + [ThreadStatic] + private static bool _currentThreadIsProcessingItems; + /// The list of tasks to be executed. + private readonly LinkedList _tasks = new LinkedList(); // protected by lock(_tasks) + /// The maximum concurrency level allowed by this scheduler. + private readonly int _maxDegreeOfParallelism; + /// Whether the scheduler is currently processing work items. + private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks) + + /// + /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the + /// specified degree of parallelism. + /// + /// The maximum degree of parallelism provided by this scheduler. + public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism) + { + if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism"); + _maxDegreeOfParallelism = maxDegreeOfParallelism; + } + + /// Queues a task to the scheduler. + /// The task to be queued. + protected sealed override void QueueTask(Task task) + { + // Add the task to the list of tasks to be processed. If there aren't enough + // delegates currently queued or running to process tasks, schedule another. + lock (_tasks) + { + _tasks.AddLast(task); + if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) + { + ++_delegatesQueuedOrRunning; + NotifyThreadPoolOfPendingWork(); + } + } + } + + /// + /// Informs the ThreadPool that there's work to be executed for this scheduler. + /// + private void NotifyThreadPoolOfPendingWork() + { + ThreadPool.UnsafeQueueUserWorkItem(_ => + { + // Note that the current thread is now processing work items. + // This is necessary to enable inlining of tasks into this thread. + _currentThreadIsProcessingItems = true; + try + { + // Process all available items in the queue. + while (true) + { + Task item; + lock (_tasks) + { + // When there are no more items to be processed, + // note that we're done processing, and get out. + if (_tasks.Count == 0) + { + --_delegatesQueuedOrRunning; + break; + } + + // Get the next item from the queue + item = _tasks.First.Value; + _tasks.RemoveFirst(); + } + + // Execute the task we pulled out of the queue + base.TryExecuteTask(item); + } + } + // We're done processing items on the current thread + finally { _currentThreadIsProcessingItems = false; } + }, null); + } + + /// Attempts to execute the specified task on the current thread. + /// The task to be executed. + /// + /// Whether the task could be executed on the current thread. + protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) + { + // If this thread isn't already processing a task, we don't support inlining + if (!_currentThreadIsProcessingItems) return false; + + // If the task was previously queued, remove it from the queue + if (taskWasPreviouslyQueued) TryDequeue(task); + + // Try to run the task. + return base.TryExecuteTask(task); + } + + /// Attempts to remove a previously scheduled task from the scheduler. + /// The task to be removed. + /// Whether the task could be found and removed. + protected sealed override bool TryDequeue(Task task) + { + lock (_tasks) return _tasks.Remove(task); + } + + /// Gets the maximum concurrency level supported by this scheduler. + public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } } + + /// Gets an enumerable of the tasks currently scheduled on this scheduler. + /// An enumerable of the tasks currently scheduled. + protected sealed override IEnumerable GetScheduledTasks() + { + bool lockTaken = false; + try + { + Monitor.TryEnter(_tasks, ref lockTaken); + if (lockTaken) return _tasks.ToArray(); + else throw new NotSupportedException(); + } + finally + { + if (lockTaken) Monitor.Exit(_tasks); + } + } + } +} \ No newline at end of file diff --git a/NzbDrone.Common/Messaging/MessageAggregator.cs b/NzbDrone.Common/Messaging/MessageAggregator.cs index 2a325e177..73f9eb335 100644 --- a/NzbDrone.Common/Messaging/MessageAggregator.cs +++ b/NzbDrone.Common/Messaging/MessageAggregator.cs @@ -1,6 +1,5 @@ using System; using System.Linq; -using System.Reflection; using System.Threading.Tasks; using NLog; using NzbDrone.Common.EnsureThat; @@ -12,11 +11,14 @@ public class MessageAggregator : IMessageAggregator { private readonly Logger _logger; private readonly IServiceFactory _serviceFactory; + private readonly TaskFactory _taskFactory; public MessageAggregator(Logger logger, IServiceFactory serviceFactory) { _logger = logger; _serviceFactory = serviceFactory; + var scheduler = new LimitedConcurrencyLevelTaskScheduler(2); + _taskFactory = new TaskFactory(scheduler); } public void PublishEvent(TEvent @event) where TEvent : class ,IEvent @@ -45,12 +47,13 @@ public void PublishEvent(TEvent @event) where TEvent : class ,IEvent foreach (var handler in _serviceFactory.BuildAll>()) { var handlerLocal = handler; - Task.Factory.StartNew(() => + + _taskFactory.StartNew(() => { _logger.Debug("{0} ~> {1}", eventName, handlerLocal.GetType().Name); handlerLocal.HandleAsync(@event); _logger.Debug("{0} <~ {1}", eventName, handlerLocal.GetType().Name); - }); + }, TaskCreationOptions.PreferFairness); } } diff --git a/NzbDrone.Common/NzbDrone.Common.csproj b/NzbDrone.Common/NzbDrone.Common.csproj index 51955f90b..7410fc382 100644 --- a/NzbDrone.Common/NzbDrone.Common.csproj +++ b/NzbDrone.Common/NzbDrone.Common.csproj @@ -102,6 +102,7 @@ + diff --git a/NzbDrone.Core.Test/IndexerSearchTests/FetchAndParseRssServiceFixture.cs b/NzbDrone.Core.Test/IndexerSearchTests/FetchAndParseRssServiceFixture.cs new file mode 100644 index 000000000..7b96b7690 --- /dev/null +++ b/NzbDrone.Core.Test/IndexerSearchTests/FetchAndParseRssServiceFixture.cs @@ -0,0 +1,78 @@ +using System.Collections.Generic; +using FizzWare.NBuilder; +using FluentAssertions; +using Moq; +using NUnit.Framework; +using NzbDrone.Core.DecisionEngine; +using NzbDrone.Core.IndexerSearch; +using NzbDrone.Core.IndexerSearch.Definitions; +using NzbDrone.Core.Indexers; +using NzbDrone.Core.Indexers.Newznab; +using NzbDrone.Core.Parser.Model; +using NzbDrone.Core.Test.Framework; +using NzbDrone.Core.Tv; +using NzbDrone.Test.Common; + +namespace NzbDrone.Core.Test.IndexerSearchTests +{ + public class NzbSearchServiceFixture : CoreTest + { + private List _indexers; + + private Series _searchTargetSeries; + + [SetUp] + public void Setup() + { + + _searchTargetSeries = Builder.CreateNew().BuildNew(); + + _indexers = new List(); + + _indexers.Add(new Newznab()); + _indexers.Add(new Newznab()); + _indexers.Add(new Newznab()); + _indexers.Add(new Newznab()); + _indexers.Add(new Newznab()); + _indexers.Add(new Newznab()); + _indexers.Add(new Newznab()); + _indexers.Add(new Newznab()); + _indexers.Add(new Newznab()); + _indexers.Add(new Newznab()); + _indexers.Add(new Newznab()); + _indexers.Add(new Newznab()); + _indexers.Add(new Newznab()); + _indexers.Add(new Newznab()); + _indexers.Add(new Newznab()); + + Mocker.SetConstant>(_indexers); + + Mocker.GetMock().Setup(c => c.GetSeries(It.IsAny())) + .Returns(_searchTargetSeries); + + } + + [Test] + public void should_call_fetch_on_all_indexers_at_the_same_time() + { + + var counter = new ConcurrencyCounter(_indexers.Count); + + Mocker.GetMock().Setup(c => c.Fetch(It.IsAny(), It.IsAny())) + .Returns(new List()) + .Callback((() => counter.SimulateWork(500))); + + Mocker.GetMock().Setup(c => c.GetAvailableIndexers()).Returns(_indexers); + + Mocker.GetMock() + .Setup(c => c.GetSearchDecision(It.IsAny>(), It.IsAny())) + .Returns(new List()); + + Subject.SearchSingle(0, 0, 0); + + counter.WaitForAllItems(); + + counter.MaxThreads.Should().Be(_indexers.Count); + } + } +} \ No newline at end of file diff --git a/NzbDrone.Core.Test/IndexerTests/FetchAndParseRssServiceFixture.cs b/NzbDrone.Core.Test/IndexerTests/FetchAndParseRssServiceFixture.cs index 0b9298d83..6dd685d9e 100644 --- a/NzbDrone.Core.Test/IndexerTests/FetchAndParseRssServiceFixture.cs +++ b/NzbDrone.Core.Test/IndexerTests/FetchAndParseRssServiceFixture.cs @@ -9,6 +9,7 @@ using NzbDrone.Core.Indexers.Newznab; using NzbDrone.Core.Parser.Model; using NzbDrone.Core.Test.Framework; +using NzbDrone.Test.Common; namespace NzbDrone.Core.Test.IndexerTests { @@ -45,26 +46,20 @@ public void Setup() [Explicit] public void should_call_fetch_on_all_indexers_at_the_same_time() { - var callsToFetch = new List(); + + var counter = new ConcurrencyCounter(_indexers.Count); Mocker.GetMock().Setup(c => c.FetchRss(It.IsAny())) .Returns(new List()) - .Callback((() => - { - Thread.Sleep(2000); - Console.WriteLine(DateTime.Now); - callsToFetch.Add(DateTime.Now); - })); + .Callback((() => counter.SimulateWork(500))); Mocker.GetMock().Setup(c => c.GetAvailableIndexers()).Returns(_indexers); Subject.Fetch(); + counter.WaitForAllItems(); - var first = callsToFetch.Min(); - var last = callsToFetch.Max(); - - (last - first).Should().BeLessThan(TimeSpan.FromSeconds(1)); + counter.MaxThreads.Should().Be(_indexers.Count); } } } \ No newline at end of file diff --git a/NzbDrone.Core.Test/NzbDrone.Core.Test.csproj b/NzbDrone.Core.Test/NzbDrone.Core.Test.csproj index 421c85a12..5ce1f8026 100644 --- a/NzbDrone.Core.Test/NzbDrone.Core.Test.csproj +++ b/NzbDrone.Core.Test/NzbDrone.Core.Test.csproj @@ -133,6 +133,7 @@ + diff --git a/NzbDrone.Core/IndexerSearch/NzbSearchService.cs b/NzbDrone.Core/IndexerSearch/NzbSearchService.cs index caa4a689a..8ea7cb727 100644 --- a/NzbDrone.Core/IndexerSearch/NzbSearchService.cs +++ b/NzbDrone.Core/IndexerSearch/NzbSearchService.cs @@ -112,21 +112,33 @@ private List Dispatch(Func> var indexers = _indexerService.GetAvailableIndexers().ToList(); var reports = new List(); - Parallel.ForEach(indexers, indexer => + + var taskList = new List(); + var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None); + + foreach (var indexer in indexers) { - try + var indexerLocal = indexer; + + taskList.Add(taskFactory.StartNew(() => { - var indexerReports = searchAction(indexer); - lock (indexer) + try { - reports.AddRange(indexerReports); + var indexerReports = searchAction(indexerLocal); + + lock (reports) + { + reports.AddRange(indexerReports); + } } - } - catch (Exception e) - { - _logger.ErrorException(String.Format("An error has occurred while searching for {0} from: {1}", definitionBase, indexer.Name), e); - } - }); + catch (Exception e) + { + _logger.ErrorException("Error while searching for " + definitionBase, e); + } + })); + } + + Task.WaitAll(taskList.ToArray()); _logger.Debug("Total of {0} reports were found for {1} in {2} indexers", reports.Count, definitionBase, indexers.Count); diff --git a/NzbDrone.Core/Indexers/FetchAndParseRssService.cs b/NzbDrone.Core/Indexers/FetchAndParseRssService.cs index bd1ca797f..5eef13992 100644 --- a/NzbDrone.Core/Indexers/FetchAndParseRssService.cs +++ b/NzbDrone.Core/Indexers/FetchAndParseRssService.cs @@ -38,15 +38,28 @@ public List Fetch() _logger.Debug("Available indexers {0}", indexers.Count); - Parallel.ForEach(indexers, new ParallelOptions { MaxDegreeOfParallelism = 10 }, indexer => - { - var indexerFeed = _feedFetcher.FetchRss(indexer); - lock (result) - { - result.AddRange(indexerFeed); - } - }); + var taskList = new List(); + var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None); + + foreach (var indexer in indexers) + { + var indexerLocal = indexer; + + var task = taskFactory.StartNew(() => + { + var indexerFeed = _feedFetcher.FetchRss(indexerLocal); + + lock (result) + { + result.AddRange(indexerFeed); + } + }); + + taskList.Add(task); + } + + Task.WaitAll(taskList.ToArray()); _logger.Debug("Found {0} reports", result.Count); diff --git a/NzbDrone.Core/Tv/SeriesService.cs b/NzbDrone.Core/Tv/SeriesService.cs index 3ad7ad42a..dd991718d 100644 --- a/NzbDrone.Core/Tv/SeriesService.cs +++ b/NzbDrone.Core/Tv/SeriesService.cs @@ -2,20 +2,16 @@ using System.Collections.Generic; using System.IO; using System.Linq; -using Marr.Data; using NLog; using NzbDrone.Common; using NzbDrone.Common.EnsureThat; using NzbDrone.Common.Messaging; using NzbDrone.Core.Configuration; using NzbDrone.Core.DataAugmentation.Scene; -using NzbDrone.Core.Datastore; using NzbDrone.Core.MetadataSource; using NzbDrone.Core.Model; using NzbDrone.Core.Organizer; -using NzbDrone.Core.Parser; using NzbDrone.Core.RootFolders; -using NzbDrone.Core.SeriesStats; using NzbDrone.Core.Tv.Events; namespace NzbDrone.Core.Tv diff --git a/NzbDrone.Test.Common/ConcurrencyCounter.cs b/NzbDrone.Test.Common/ConcurrencyCounter.cs new file mode 100644 index 000000000..2cc1f08ad --- /dev/null +++ b/NzbDrone.Test.Common/ConcurrencyCounter.cs @@ -0,0 +1,57 @@ +using System; +using System.Collections.Generic; +using System.Threading; + +namespace NzbDrone.Test.Common +{ + public class ConcurrencyCounter + { + private int _items; + readonly object _mutex = new object(); + readonly Dictionary _threads = new Dictionary(); + + public int MaxThreads { get { return _threads.Count; } } + + public ConcurrencyCounter(int items) + { + _items = items; + } + + public void WaitForAllItems() + { + while (_items != 0) + { + Thread.Sleep(500); + } + } + + public int Start() + { + int threadId = Thread.CurrentThread.ManagedThreadId; + lock (_mutex) + { + + _threads[threadId] = 1; + } + + Console.WriteLine("Starting " + threadId); + return threadId; + } + + public void SimulateWork(int sleepInMs) + { + var id = Start(); + Thread.Sleep(sleepInMs); + Stop(id); + } + + public void Stop(int id) + { + Console.WriteLine("Finished " + id); + lock (_mutex) + { + _items--; + } + } + } +} \ No newline at end of file diff --git a/NzbDrone.Test.Common/NzbDrone.Test.Common.csproj b/NzbDrone.Test.Common/NzbDrone.Test.Common.csproj index 45b364f23..f7bf56f61 100644 --- a/NzbDrone.Test.Common/NzbDrone.Test.Common.csproj +++ b/NzbDrone.Test.Common/NzbDrone.Test.Common.csproj @@ -90,6 +90,7 @@ +