线程安全 FIFO 队列/缓冲区

Threadsafe FIFO Queue/Buffer(线程安全 FIFO 队列/缓冲区)

本文介绍了线程安全 FIFO 队列/缓冲区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要实现一种任务缓冲区.基本要求是:

I need to implement a sort of task buffer. Basic requirements are:

  • 在单个后台线程中处理任务
  • 从多个线程接收任务
  • 处理所有接收到的任务,即确保在收到停止信号后缓冲区中的缓冲任务已耗尽
  • 必须保持每个线程接收到的任务的顺序

我正在考虑使用如下所示的队列来实现它.将不胜感激有关实施的反馈.还有其他更好的想法来实现这样的事情吗?

I was thinking of implementing it using a Queue like below. Would appreciate feedback on the implementation. Are there any other brighter ideas to implement such a thing?

public class TestBuffer
{
    private readonly object queueLock = new object();
    private Queue<Task> queue = new Queue<Task>();
    private bool running = false;

    public TestBuffer()
    {
    }

    public void start()
    {
        Thread t = new Thread(new ThreadStart(run));
        t.Start();
    }

    private void run()
    {
        running = true;

        bool run = true;
        while(run)
        {
            Task task = null;
            // Lock queue before doing anything
            lock (queueLock)
            {
                // If the queue is currently empty and it is still running
                // we need to wait until we're told something changed
                if (queue.Count == 0 && running)
                {
                    Monitor.Wait(queueLock);
                }

                // Check there is something in the queue
                // Note - there might not be anything in the queue if we were waiting for something to change and the queue was stopped
                if (queue.Count > 0)
                {
                    task = queue.Dequeue();
                }
            }

            // If something was dequeued, handle it
            if (task != null)
            {
                handle(task);
            }

            // Lock the queue again and check whether we need to run again
            // Note - Make sure we drain the queue even if we are told to stop before it is emtpy
            lock (queueLock)
            {
                run = queue.Count > 0 || running;
            }
        }
    }

    public void enqueue(Task toEnqueue)
    {
        lock (queueLock)
        {
            queue.Enqueue(toEnqueue);
            Monitor.PulseAll(queueLock);
        }
    }

    public void stop()
    {
        lock (queueLock)
        {
            running = false;
            Monitor.PulseAll(queueLock);
        }
    }

    public void handle(Task dequeued)
    {
        dequeued.execute();
    }
}

推荐答案

您实际上可以使用开箱即用的方式处理此问题 BlockingCollection.

You can actually handle this with the out-of-the-box BlockingCollection.

它被设计成有 1 个或多个生产者,以及 1 个或多个消费者.在您的情况下,您将有多个生产者和一个消费者.

It is designed to have 1 or more producers, and 1 or more consumers. In your case, you would have multiple producers and one consumer.

当您收到停止信号时,使用该信号处理程序

When you receive a stop signal, have that signal handler

  • 通知生产者线程停止
  • 在 BlockingCollection 实例上调用 CompleteAdding

消费者线程将继续运行,直到所有排队的项目都被移除并处理完毕,然后它会遇到BlockingCollection完成的条件.当线程遇到该条件时,它就会退出.

The consumer thread will continue to run until all queued items are removed and processed, then it will encounter the condition that the BlockingCollection is complete. When the thread encounters that condition, it just exits.

这篇关于线程安全 FIFO 队列/缓冲区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本文标题为:线程安全 FIFO 队列/缓冲区