使用 TcpClient 和 Reactive Extensions 从 Stream 读取连续

Read continous bytestream from Stream using TcpClient and Reactive Extensions(使用 TcpClient 和 Reactive Extensions 从 Stream 读取连续字节流)

本文介绍了使用 TcpClient 和 Reactive Extensions 从 Stream 读取连续字节流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑以下代码:

internal class Program
{
    private static void Main(string[] args)
    {
        var client = new TcpClient();
        client.ConnectAsync("localhost", 7105).Wait();
        var stream = client.GetStream();
        var observable = stream.ReadDataObservable().Repeat();

        var s = from d in observable.Buffer(4)
                let headerLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt16(d.ToArray(), 2))
                let b = observable.Take(headerLength)
                select b.ToEnumerable().ToArray();
        s.Subscribe(a => Console.WriteLine("{0}", a));
        Console.ReadLine();
    }
}

public static class Extensions
{
    public static IObservable<byte> ReadDataObservable(this Stream stream)
    {
        return Observable.Defer(async () =>
        {
            var buffer = new byte[1024];
            var readBytes = await stream.ReadAsync(buffer, 0, buffer.Length);
            return buffer.Take(readBytes).ToObservable();
        });
    }
}

基本上我想解析我用 Reactive Extensions 收到的消息.使用 Buffer(4) 正确解析了消息的标头,我得到了消息其余部分的长度.出现的问题是,当我执行 stream.Take(headerLength) 时,代码会重新评估整个链"并尝试从流中获取新消息,而不是返回已经从流中读取的其余字节.更准确地说,第一个 ReadAsync(...) 返回 38 个字节,Buffer(4) 返回其中的前 4 个,observable.Take(headerLength) 不返回剩余的 34 个字节而是尝试读取一个新的带有 ReadAsync 的消息.

Basically I want to parse the messages I receive with Reactive Extensions. The header of the message is parsed correctly using the Buffer(4) and I get the length of the remainder of the message. The problem that arises is that when I do stream.Take(headerLength), the code reevaluates the whole "chain" and tries to get a new message from the stream instead of returning the rest of the bytes which already has been read from the stream. To be more exact, the first ReadAsync(...) returns 38 bytes, the Buffer(4) returns the first 4 of those, the observable.Take(headerLength) does not return the remainding 34 bytes but instead tries to read a new message with ReadAsync.

问题是,我如何确保 observable.Take(headerLength) 接收到已经读取的 34 个字节而不尝试从流中读取新消息?我已经四处寻找解决方案,但我真的不知道如何实现这一点.

The question is, how can I make sure the observable.Take(headerLength) receives the already read 34 bytes and not try to read a new message from the stream? I've searched around for a solution, but I can't really figure out how to achieve this.

此解决方案(使用反应式扩展(Rx) 用于套接字编程实用吗?) 不是我正在寻找的.这不是读取流中可用的所有内容(直到缓冲区大小)并从中生成连续的字节流.对我来说,这个解决方案似乎不是从流中读取的一种非常有效的方式,因此我的问题是.

This solution (Using Reactive Extensions (Rx) for socket programming practical?) is not what I'm looking for. This isn't reading everything available in the stream (up to buffersize) and makes a continous bytestream out of it. To me this solution doesn't seem like a very efficient way to read from a stream, hence my question.

推荐答案

这种方法行不通.问题在于您使用 observable 的方式.Buffer 不会读取 4 个字节并退出,它会继续读取 4 个字节的块.Take 形成将读取重叠字节的第二个订阅.您会发现将流直接解析为消息要容易得多.

This approach isn't going to work. The problem is the way you are using the observable. Buffer will not read 4 bytes and quit, it will continually read 4 byte chunks. The Take forms a second subscription that will read overlapping bytes. You'll find it much easier to parse the stream directly into messages.

下面的代码也做了很多努力来正确清理.

The following code makes a good deal of effort to clean up properly as well.

假设您的 Message 就是这样,(为测试添加了 ToString):

Assuming your Message is just this, (ToString added for testing):

public class Message
{
    public byte[] PayLoad;

    public override string ToString()
    {
        return Encoding.UTF8.GetString(PayLoad);
    }
}

并且您获得了一个 Stream 然后您可以按如下方式解析它.首先,一种从流中读取确切字节数的方法:

And you have acquired a Stream then you can parse it as follows. First, a method to read an exact number of bytes from a stream:

public async static Task ReadExactBytesAsync(
    Stream stream, byte[] buffer, CancellationToken ct)
{
    var count = buffer.Length;
    var totalBytesRemaining = count;
    var totalBytesRead = 0;
    while (totalBytesRemaining != 0)
    {
        var bytesRead = await stream.ReadAsync(
            buffer, totalBytesRead, totalBytesRemaining, ct);
        ct.ThrowIfCancellationRequested();
        totalBytesRead += bytesRead;
        totalBytesRemaining -= bytesRead;
    }
}

然后将流转换为IObservable:

public static IObservable<Message> ReadMessages(
    Stream sourceStream,
    IScheduler scheduler = null)
{
    int subscribed = 0;
    scheduler = scheduler ?? Scheduler.Default;

    return Observable.Create<Message>(o =>
    {
        // first check there is only one subscriber
        // (multiple stream readers would cause havoc)
        int previous = Interlocked.CompareExchange(ref subscribed, 1, 0);

        if (previous != 0)
            o.OnError(new Exception(
                "Only one subscriber is allowed for each stream."));

        // we will return a disposable that cleans
        // up both the scheduled task below and
        // the source stream
        var dispose = new CompositeDisposable
        {
            Disposable.Create(sourceStream.Dispose)
        };

        // use async scheduling to get nice imperative code
        var schedule = scheduler.ScheduleAsync(async (ctrl, ct) =>
        {
            // store the header here each time
            var header = new byte[4];

            // loop until cancellation requested
            while (!ct.IsCancellationRequested)
            {                        
                try
                {
                    // read the exact number of bytes for a header
                    await ReadExactBytesAsync(sourceStream, header, ct);
                }
                catch (OperationCanceledException)
                {
                    throw;
                }
                catch (Exception ex)
                {
                    // pass through any problem in the stream and quit
                    o.OnError(new InvalidDataException("Error in stream.", ex));
                    return;
                }                   
                ct.ThrowIfCancellationRequested();

                var bodyLength = IPAddress.NetworkToHostOrder(
                    BitConverter.ToInt16(header, 2));
                // create buffer to read the message
                var payload = new byte[bodyLength];

                // read exact bytes as before
                try
                {
                    await ReadExactBytesAsync(sourceStream, payload, ct);
                }
                catch (OperationCanceledException)
                {
                    throw;
                }
                catch (Exception ex)
                {
                    o.OnError(new InvalidDataException("Error in stream.", ex));
                    return;
                }

                // create a new message and send it to client
                var message = new Message { PayLoad = payload };
                o.OnNext(message);
            }
            // wrap things up
            ct.ThrowIfCancellationRequested();
            o.OnCompleted();
        });

        // return the suscription handle
        dispose.Add(schedule);
        return dispose;
    });
}

编辑 - 我使用的非常hacky的测试代码:

EDIT - Very hacky test code I used:

private static void Main(string[] args)
{
    var listener = new TcpListener(IPAddress.Any, 12873);
    listener.Start();

    var listenTask = listener.AcceptTcpClientAsync();
    listenTask.ContinueWith((Task<TcpClient> t) =>
    {
        var client = t.Result;
        var stream = client.GetStream();
        const string messageText = "Hello World!";                
        var body = Encoding.UTF8.GetBytes(messageText);                
        var header = BitConverter.GetBytes(
            IPAddress.HostToNetworkOrder(body.Length));
        for (int i = 0; i < 5; i++)
        {
            stream.Write(header, 0, 4);
            stream.Write(body, 0, 4);
            stream.Flush();
            // deliberate nasty delay
            Thread.Sleep(2000);
            stream.Write(body, 4, body.Length - 4);
            stream.Flush();
        }
        stream.Close();
        listener.Stop();
    });


    var tcpClient = new TcpClient();
    tcpClient.Connect(new IPEndPoint(IPAddress.Loopback, 12873));
    var clientStream = tcpClient.GetStream();

    ReadMessages(clientStream).Subscribe(
        Console.WriteLine,
        ex => Console.WriteLine("Error: " + ex.Message),
        () => Console.WriteLine("Done!"));

    Console.ReadLine();
}

总结

您需要考虑设置读取超时,以防服务器死机,服务器应发送某种结束消息".目前,这种方法只会不断尝试接收字节.由于您没有指定它,因此我没有包含这样的内容 - 但是如果您这样做了,那么正如我所写的那样,只要 break 退出 while 循环就会导致 OnCompleted 待发送.

Wrapping up

You need to think about setting a timeout for reads, in case the server dies, and some kind of "end message" should be sent by the server. Currently this method will just continually tries to receive bytes. As you haven't specced it, I haven't included anything like this - but if you do, then as I've written it just breaking out of the while loop will cause OnCompleted to be sent.

这篇关于使用 TcpClient 和 Reactive Extensions 从 Stream 读取连续字节流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本文标题为:使用 TcpClient 和 Reactive Extensions 从 Stream 读取连续