我想等待一个任务<T>完成一些特殊的规则: 如果在X毫秒后还没有完成,我希望向用户显示一条消息。 如果在Y毫秒后还没有完成,我想自动请求取消。

我可以使用Task。ContinueWith异步等待任务完成(即计划在任务完成时执行一个操作),但不允许指定超时。 我可以使用Task。等待同步等待任务超时完成,但这会阻塞我的线程。 我如何异步等待任务超时完成?


当前回答

上面@Kevan的答案的通用版本,使用响应式扩展。

public static Task<T> TimeoutAfter<T>(this Task<T> task, TimeSpan timeout, IScheduler scheduler)
{
    return task.ToObservable().Timeout(timeout, scheduler).ToTask();
}

可选调度器:

public static Task<T> TimeoutAfter<T>(this Task<T> task, TimeSpan timeout, Scheduler scheduler = null)
{
    return scheduler is null 
       ? task.ToObservable().Timeout(timeout).ToTask() 
       : task.ToObservable().Timeout(timeout, scheduler).ToTask();
}

BTW:当Timeout发生时,将抛出一个超时异常

其他回答

为了它的乐趣,我做了一个“OnTimeout”扩展任务。超时时Task执行所需的内联lambda Action()并返回true,否则返回false。

public static async Task<bool> OnTimeout<T>(this T t, Action<T> action, int waitms) where T : Task
{
    if (!(await Task.WhenAny(t, Task.Delay(waitms)) == t))
    {
        action(t);
        return true;
    } else {
        return false;
    }
}

OnTimeout扩展返回一个bool结果,可以分配给一个变量,就像这个例子调用UDP套接字Async:

var t = UdpSocket.ReceiveAsync();

var timeout = await t.OnTimeout(task => {
    Console.WriteLine("No Response");
}, 5000);

在timeout lambda中可以访问“task”变量以进行更多处理。

Action接收对象的使用可能会启发其他各种扩展设计。

这个怎么样:

int timeout = 1000;
var task = SomeOperationAsync();
if (await Task.WhenAny(task, Task.Delay(timeout)) == task) {
    // task completed within timeout
} else { 
    // timeout logic
}

这里有一篇很棒的博客文章“制定任务”。TimeoutAfter Method”(来自MS并行库团队)提供了关于这类事情的更多信息。

补充:在我的回答的评论请求,这里是一个扩展的解决方案,包括取消处理。请注意,将取消传递给任务和计时器意味着在代码中可以经历多种取消方式,您应该确保测试并确信您正确地处理了所有这些方法。不要让各种组合的机会,并希望您的计算机在运行时做正确的事情。

int timeout = 1000;
var task = SomeOperationAsync(cancellationToken);
if (await Task.WhenAny(task, Task.Delay(timeout, cancellationToken)) == task)
{
    // Task completed within timeout.
    // Consider that the task may have faulted or been canceled.
    // We re-await the task so that any exceptions/cancellation is rethrown.
    await task;

}
else
{
    // timeout/cancellation logic
}

解决这个问题的另一种方法是使用响应式扩展:

public static Task TimeoutAfter(this Task task, TimeSpan timeout, IScheduler scheduler)
{
        return task.ToObservable().Timeout(timeout, scheduler).ToTask();
}

测试上面使用下面的代码在你的单元测试,它为我工作

TestScheduler scheduler = new TestScheduler();
Task task = Task.Run(() =>
                {
                    int i = 0;
                    while (i < 5)
                    {
                        Console.WriteLine(i);
                        i++;
                        Thread.Sleep(1000);
                    }
                })
                .TimeoutAfter(TimeSpan.FromSeconds(5), scheduler)
                .ContinueWith(t => { }, TaskContinuationOptions.OnlyOnFaulted);

scheduler.AdvanceBy(TimeSpan.FromSeconds(6).Ticks);

您可能需要以下命名空间:

using System.Threading.Tasks;
using System.Reactive.Subjects;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using Microsoft.Reactive.Testing;
using System.Threading;
using System.Reactive.Concurrency;

我觉得task . delay()任务和CancellationTokenSource在另一个紧密的网络循环中回答了我的用例。

尽管乔·霍格的《制作任务》MSDN博客上的TimeoutAfter方法是鼓舞人心的,出于同样的原因,我对使用TimeoutException进行流控制有点厌倦,因为超时比不超时更频繁。

所以我使用了这个,它也处理了博客中提到的优化:

public static async Task<bool> BeforeTimeout(this Task task, int millisecondsTimeout)
{
    if (task.IsCompleted) return true;
    if (millisecondsTimeout == 0) return false;

    if (millisecondsTimeout == Timeout.Infinite)
    {
        await Task.WhenAll(task);
        return true;
    }

    var tcs = new TaskCompletionSource<object>();

    using (var timer = new Timer(state => ((TaskCompletionSource<object>)state).TrySetCanceled(), tcs,
        millisecondsTimeout, Timeout.Infinite))
    {
        return await Task.WhenAny(task, tcs.Task) == task;
    }
}

一个示例用例如下:

var receivingTask = conn.ReceiveAsync(ct);

while (!await receivingTask.BeforeTimeout(keepAliveMilliseconds))
{
    // Send keep-alive
}

// Read and do something with data
var data = await receivingTask;

使用定时器处理消息并自动取消。当Task完成时,对计时器调用Dispose,以便它们永远不会触发。这里有一个例子;将taskDelay改为500、1500或2500来查看不同的情况:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class Program
    {
        private static Task CreateTaskWithTimeout(
            int xDelay, int yDelay, int taskDelay)
        {
            var cts = new CancellationTokenSource();
            var token = cts.Token;
            var task = Task.Factory.StartNew(() =>
            {
                // Do some work, but fail if cancellation was requested
                token.WaitHandle.WaitOne(taskDelay);
                token.ThrowIfCancellationRequested();
                Console.WriteLine("Task complete");
            });
            var messageTimer = new Timer(state =>
            {
                // Display message at first timeout
                Console.WriteLine("X milliseconds elapsed");
            }, null, xDelay, -1);
            var cancelTimer = new Timer(state =>
            {
                // Display message and cancel task at second timeout
                Console.WriteLine("Y milliseconds elapsed");
                cts.Cancel();
            }
                , null, yDelay, -1);
            task.ContinueWith(t =>
            {
                // Dispose the timers when the task completes
                // This will prevent the message from being displayed
                // if the task completes before the timeout
                messageTimer.Dispose();
                cancelTimer.Dispose();
            });
            return task;
        }

        static void Main(string[] args)
        {
            var task = CreateTaskWithTimeout(1000, 2000, 2500);
            // The task has been started and will display a message after
            // one timeout and then cancel itself after the second
            // You can add continuations to the task
            // or wait for the result as needed
            try
            {
                task.Wait();
                Console.WriteLine("Done waiting for task");
            }
            catch (AggregateException ex)
            {
                Console.WriteLine("Error waiting for task:");
                foreach (var e in ex.InnerExceptions)
                {
                    Console.WriteLine(e);
                }
            }
        }
    }
}

此外,Async CTP提供了一个TaskEx。Delay方法,它将为您在任务中包装计时器。这可以给你更多的控制来做一些事情,比如设置TaskScheduler为Timer触发时的延续。

private static Task CreateTaskWithTimeout(
    int xDelay, int yDelay, int taskDelay)
{
    var cts = new CancellationTokenSource();
    var token = cts.Token;
    var task = Task.Factory.StartNew(() =>
    {
        // Do some work, but fail if cancellation was requested
        token.WaitHandle.WaitOne(taskDelay);
        token.ThrowIfCancellationRequested();
        Console.WriteLine("Task complete");
    });

    var timerCts = new CancellationTokenSource();

    var messageTask = TaskEx.Delay(xDelay, timerCts.Token);
    messageTask.ContinueWith(t =>
    {
        // Display message at first timeout
        Console.WriteLine("X milliseconds elapsed");
    }, TaskContinuationOptions.OnlyOnRanToCompletion);

    var cancelTask = TaskEx.Delay(yDelay, timerCts.Token);
    cancelTask.ContinueWith(t =>
    {
        // Display message and cancel task at second timeout
        Console.WriteLine("Y milliseconds elapsed");
        cts.Cancel();
    }, TaskContinuationOptions.OnlyOnRanToCompletion);

    task.ContinueWith(t =>
    {
        timerCts.Cancel();
    });

    return task;
}