从具有多个线程/任务的队列中消费

Consume from Queue with multiple threads/tasks(从具有多个线程/任务的队列中消费)

本文介绍了从具有多个线程/任务的队列中消费的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个生产者从资源中获取用户并将它们放入 ConcurrentQueue,然后我想要做的是使用多个消费者并处理所有用户并从另一个资源中获取他们的信息.

I have a producer that gets users from a resource and places them into a ConcurrentQueue, then What I want to do is using multiple consumers and process all users and get their information from another resource.

  public void Populate(IEnumerable<Users> users){
     _queue.Enqueue(users);
     // here single threaded
  }

  public void Process(){
     // here i want this to be processed by multiple consumers
     // say multiple threads so that I can finish processing them.
  }

我的问题是,我应该使用线程吗?任务?线程池?

My question is, should i use thread? task? ThreadPool?

我看过这个问题:C# 等价于Java ExecutorService.newSingleThreadExecutor(),或者:如何序列化对资源的多线程访问

推荐答案

既然你已经在使用队列机制,我建议你使用 BlockingCollection 而不是 ConcurrentQueue,以及 Parallel.Invoke().

Since you are using a queuing mechanism already, I suggest you use a BlockingCollection instead of ConcurrentQueue, along with Parallel.Invoke().

BlockingCollection 有一些重要的东西使它很好用.

There are some important things about BlockingCollection that make it nice to use.

  1. BlockingCollection 让消费线程使用 foreach 以线程安全和自然的方式从集合中获取项目.
  2. 当队列为空时,消耗 foreach 循环会自动阻塞,并在项目可用时继续.
  3. BlockingCollection 提供了一种易于使用的机制来表示数据结束.队列所有者只需调用 queue.CompleteAdding(),任何从队列中获取项目的 foreach 循环都会在队列完全为空时自动退出.
  1. BlockingCollection lets the consuming threads take items from the collection in a threadsafe and natural manner using foreach.
  2. The consuming foreach loop blocks automatically when the queue is empty, and continues when items become available.
  3. BlockingCollection provides an easy-to-use mechanism to signal the end of data. The queue owner simply calls queue.CompleteAdding() and any foreach loops taking items from the queue will exit automatically when the queue becomes completely empty.

您可以使用 Parallel.Invoke() 来启动多个线程,每个线程都使用 foreach 来遍历队列.(Parallel.Invoke() 可以让你给它一个并行运行的任务数组,这使得它使用起来非常简单.)

You can use Parallel.Invoke() to start a number of threads, each of which uses foreach to iterate over the queue. (Parallel.Invoke() lets you give it an array of tasks to run in parallel, which makes it quite simple to use.)

最好用一个示例程序来说明这一点:

This is best illustrated with a sample program:

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    class User
    {
        public string Name;
    }

    class Program
    {
        readonly BlockingCollection<User> _queue = new BlockingCollection<User>();

        void run()
        {
            var background = Task.Factory.StartNew(process); // Start the processing threads.

            // Make up 50 sample users.
            var users = Enumerable.Range(0, 50).Select(n => new User{Name = n.ToString()});

            foreach (var user in users) // Add some sample data.
                _queue.Add(user);

            Console.WriteLine("Press <RETURN> to exit.");
            Console.ReadLine();
            _queue.CompleteAdding(); // Makes all the consuming foreach loops exit.
            background.Wait();
            Console.WriteLine("Exited.");
        }

        void process() // Process the input queue,
        {
            int taskCount = 4;  // Let's use 4 threads.
            var actions = Enumerable.Repeat<Action>(processQueue, taskCount);
            Parallel.Invoke(actions.ToArray());
        }

        void processQueue()
        {
            foreach (User user in _queue.GetConsumingEnumerable())
                processUser(user);
        }

        void processUser(User user)
        {
            Console.WriteLine("Processing user " + user.Name);
            Thread.Sleep(200); // Simulate work.
        }

        static void Main()
        {
            new Program().run();
        }
    }
}

如果您不需要限制并发线程的数量并且很乐意让 .Net 为您决定(这不是一个坏主意),那么您可以通过删除 processQueue() 来简化代码 并将 process() 更改为:

If you don't need to limit the number of concurrent threads and are happy to let .Net decide for you (not a bad idea), then you can simplify the code quite a bit by removing processQueue() altogether and changing process() to:

void process() // Process the input queue,
{
    Parallel.ForEach(_queue.GetConsumingEnumerable(), processUser);
}

但是,这比它需要的锁定更多,所以你最好只使用原始方法(不会遇到这个问题),或者使用这里描述的解决方案:http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

However, that does more locking than it needs to, so you're probably best off just using the original method (which doesn't suffer from that problem), or using the solution described here: http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

这篇关于从具有多个线程/任务的队列中消费的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本文标题为:从具有多个线程/任务的队列中消费