到目前为止,我已经避免了测试多线程代码的噩梦,因为它似乎是一个太大的雷区。我想知道人们是如何测试依赖于线程的代码以获得成功执行的,或者人们是如何测试那些仅在两个线程以给定方式交互时才会出现的问题的?
对于今天的程序员来说,这似乎是一个非常关键的问题,恕我直言,将我们的知识集中在这个问题上是很有用的。
到目前为止,我已经避免了测试多线程代码的噩梦,因为它似乎是一个太大的雷区。我想知道人们是如何测试依赖于线程的代码以获得成功执行的,或者人们是如何测试那些仅在两个线程以给定方式交互时才会出现的问题的?
对于今天的程序员来说,这似乎是一个非常关键的问题,恕我直言,将我们的知识集中在这个问题上是很有用的。
当前回答
我做过很多这样的事,的确很糟糕。
一些建议:
GroboUtils for running multiple test threads alphaWorks ConTest to instrument classes to cause interleavings to vary between iterations Create a throwable field and check it in tearDown (see Listing 1). If you catch a bad exception in another thread, just assign it to throwable. I created the utils class in Listing 2 and have found it invaluable, especially waitForVerify and waitForCondition, which will greatly increase the performance of your tests. Make good use of AtomicBoolean in your tests. It is thread safe, and you'll often need a final reference type to store values from callback classes and suchlike. See example in Listing 3. Make sure to always give your test a timeout (e.g., @Test(timeout=60*1000)), as concurrency tests can sometimes hang forever when they're broken.
清单1:
@After
public void tearDown() {
if ( throwable != null )
throw throwable;
}
清单2:
import static org.junit.Assert.fail;
import java.io.File;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.Random;
import org.apache.commons.collections.Closure;
import org.apache.commons.collections.Predicate;
import org.apache.commons.lang.time.StopWatch;
import org.easymock.EasyMock;
import org.easymock.classextension.internal.ClassExtensionHelper;
import static org.easymock.classextension.EasyMock.*;
import ca.digitalrapids.io.DRFileUtils;
/**
* Various utilities for testing
*/
public abstract class DRTestUtils
{
static private Random random = new Random();
/** Calls {@link #waitForCondition(Integer, Integer, Predicate, String)} with
* default max wait and check period values.
*/
static public void waitForCondition(Predicate predicate, String errorMessage)
throws Throwable
{
waitForCondition(null, null, predicate, errorMessage);
}
/** Blocks until a condition is true, throwing an {@link AssertionError} if
* it does not become true during a given max time.
* @param maxWait_ms max time to wait for true condition. Optional; defaults
* to 30 * 1000 ms (30 seconds).
* @param checkPeriod_ms period at which to try the condition. Optional; defaults
* to 100 ms.
* @param predicate the condition
* @param errorMessage message use in the {@link AssertionError}
* @throws Throwable on {@link AssertionError} or any other exception/error
*/
static public void waitForCondition(Integer maxWait_ms, Integer checkPeriod_ms,
Predicate predicate, String errorMessage) throws Throwable
{
waitForCondition(maxWait_ms, checkPeriod_ms, predicate, new Closure() {
public void execute(Object errorMessage)
{
fail((String)errorMessage);
}
}, errorMessage);
}
/** Blocks until a condition is true, running a closure if
* it does not become true during a given max time.
* @param maxWait_ms max time to wait for true condition. Optional; defaults
* to 30 * 1000 ms (30 seconds).
* @param checkPeriod_ms period at which to try the condition. Optional; defaults
* to 100 ms.
* @param predicate the condition
* @param closure closure to run
* @param argument argument for closure
* @throws Throwable on {@link AssertionError} or any other exception/error
*/
static public void waitForCondition(Integer maxWait_ms, Integer checkPeriod_ms,
Predicate predicate, Closure closure, Object argument) throws Throwable
{
if ( maxWait_ms == null )
maxWait_ms = 30 * 1000;
if ( checkPeriod_ms == null )
checkPeriod_ms = 100;
StopWatch stopWatch = new StopWatch();
stopWatch.start();
while ( !predicate.evaluate(null) ) {
Thread.sleep(checkPeriod_ms);
if ( stopWatch.getTime() > maxWait_ms ) {
closure.execute(argument);
}
}
}
/** Calls {@link #waitForVerify(Integer, Object)} with <code>null</code>
* for {@code maxWait_ms}
*/
static public void waitForVerify(Object easyMockProxy)
throws Throwable
{
waitForVerify(null, easyMockProxy);
}
/** Repeatedly calls {@link EasyMock#verify(Object[])} until it succeeds, or a
* max wait time has elapsed.
* @param maxWait_ms Max wait time. <code>null</code> defaults to 30s.
* @param easyMockProxy Proxy to call verify on
* @throws Throwable
*/
static public void waitForVerify(Integer maxWait_ms, Object easyMockProxy)
throws Throwable
{
if ( maxWait_ms == null )
maxWait_ms = 30 * 1000;
StopWatch stopWatch = new StopWatch();
stopWatch.start();
for(;;) {
try
{
verify(easyMockProxy);
break;
}
catch (AssertionError e)
{
if ( stopWatch.getTime() > maxWait_ms )
throw e;
Thread.sleep(100);
}
}
}
/** Returns a path to a directory in the temp dir with the name of the given
* class. This is useful for temporary test files.
* @param aClass test class for which to create dir
* @return the path
*/
static public String getTestDirPathForTestClass(Object object)
{
String filename = object instanceof Class ?
((Class)object).getName() :
object.getClass().getName();
return DRFileUtils.getTempDir() + File.separator +
filename;
}
static public byte[] createRandomByteArray(int bytesLength)
{
byte[] sourceBytes = new byte[bytesLength];
random.nextBytes(sourceBytes);
return sourceBytes;
}
/** Returns <code>true</code> if the given object is an EasyMock mock object
*/
static public boolean isEasyMockMock(Object object) {
try {
InvocationHandler invocationHandler = Proxy
.getInvocationHandler(object);
return invocationHandler.getClass().getName().contains("easymock");
} catch (IllegalArgumentException e) {
return false;
}
}
}
清单3:
@Test
public void testSomething() {
final AtomicBoolean called = new AtomicBoolean(false);
subject.setCallback(new SomeCallback() {
public void callback(Object arg) {
// check arg here
called.set(true);
}
});
subject.run();
assertTrue(called.get());
}
其他回答
我曾经有过测试线程代码的不幸任务,这绝对是我写过的最难的测试。
在编写测试时,我使用委托和事件的组合。基本上,它都是关于使用PropertyNotifyChanged事件和WaitCallback或某种轮询的ConditionalWaiter。
我不确定这是否是最好的方法,但它对我来说是有效的。
确实很难!在我的(c++)单元测试中,我按照使用的并发模式将其分解为几个类别:
Unit tests for classes that operate in a single thread and aren't thread aware -- easy, test as usual. Unit tests for Monitor objects (those that execute synchronized methods in the callers' thread of control) that expose a synchronized public API -- instantiate multiple mock threads that exercise the API. Construct scenarios that exercise internal conditions of the passive object. Include one longer running test that basically beats the heck out of it from multiple threads for a long period of time. This is unscientific I know but it does build confidence. Unit tests for Active objects (those that encapsulate their own thread or threads of control) -- similar to #2 above with variations depending on the class design. Public API may be blocking or non-blocking, callers may obtain futures, data may arrive at queues or need to be dequeued. There are many combinations possible here; white box away. Still requires multiple mock threads to make calls to the object under test.
题外话:
在我所做的内部开发人员培训中,我教授了并发的支柱和这两种模式,作为思考和分解并发问题的主要框架。显然还有更先进的概念,但我发现这组基础知识可以帮助工程师摆脱困境。正如上面所描述的,它还会导致代码更具单元可测试性。
它并不完美,但我用c#写了这个帮助程序:
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Proto.Promises.Tests.Threading
{
public class ThreadHelper
{
public static readonly int multiThreadCount = Environment.ProcessorCount * 100;
private static readonly int[] offsets = new int[] { 0, 10, 100, 1000 };
private readonly Stack<Task> _executingTasks = new Stack<Task>(multiThreadCount);
private readonly Barrier _barrier = new Barrier(1);
private int _currentParticipants = 0;
private readonly TimeSpan _timeout;
public ThreadHelper() : this(TimeSpan.FromSeconds(10)) { } // 10 second timeout should be enough for most cases.
public ThreadHelper(TimeSpan timeout)
{
_timeout = timeout;
}
/// <summary>
/// Execute the action multiple times in parallel threads.
/// </summary>
public void ExecuteMultiActionParallel(Action action)
{
for (int i = 0; i < multiThreadCount; ++i)
{
AddParallelAction(action);
}
ExecutePendingParallelActions();
}
/// <summary>
/// Execute the action once in a separate thread.
/// </summary>
public void ExecuteSingleAction(Action action)
{
AddParallelAction(action);
ExecutePendingParallelActions();
}
/// <summary>
/// Add an action to be run in parallel.
/// </summary>
public void AddParallelAction(Action action)
{
var taskSource = new TaskCompletionSource<bool>();
lock (_executingTasks)
{
++_currentParticipants;
_barrier.AddParticipant();
_executingTasks.Push(taskSource.Task);
}
new Thread(() =>
{
try
{
_barrier.SignalAndWait(); // Try to make actions run in lock-step to increase likelihood of breaking race conditions.
action.Invoke();
taskSource.SetResult(true);
}
catch (Exception e)
{
taskSource.SetException(e);
}
}).Start();
}
/// <summary>
/// Runs the pending actions in parallel, attempting to run them in lock-step.
/// </summary>
public void ExecutePendingParallelActions()
{
Task[] tasks;
lock (_executingTasks)
{
_barrier.SignalAndWait();
_barrier.RemoveParticipants(_currentParticipants);
_currentParticipants = 0;
tasks = _executingTasks.ToArray();
_executingTasks.Clear();
}
try
{
if (!Task.WaitAll(tasks, _timeout))
{
throw new TimeoutException($"Action(s) timed out after {_timeout}, there may be a deadlock.");
}
}
catch (AggregateException e)
{
// Only throw one exception instead of aggregate to try to avoid overloading the test error output.
throw e.Flatten().InnerException;
}
}
/// <summary>
/// Run each action in parallel multiple times with differing offsets for each run.
/// <para/>The number of runs is 4^actions.Length, so be careful if you don't want the test to run too long.
/// </summary>
/// <param name="expandToProcessorCount">If true, copies each action on additional threads up to the processor count. This can help test more without increasing the time it takes to complete.
/// <para/>Example: 2 actions with 6 processors, runs each action 3 times in parallel.</param>
/// <param name="setup">The action to run before each parallel run.</param>
/// <param name="teardown">The action to run after each parallel run.</param>
/// <param name="actions">The actions to run in parallel.</param>
public void ExecuteParallelActionsWithOffsets(bool expandToProcessorCount, Action setup, Action teardown, params Action[] actions)
{
setup += () => { };
teardown += () => { };
int actionCount = actions.Length;
int expandCount = expandToProcessorCount ? Math.Max(Environment.ProcessorCount / actionCount, 1) : 1;
foreach (var combo in GenerateCombinations(offsets, actionCount))
{
setup.Invoke();
for (int k = 0; k < expandCount; ++k)
{
for (int i = 0; i < actionCount; ++i)
{
int offset = combo[i];
Action action = actions[i];
AddParallelAction(() =>
{
for (int j = offset; j > 0; --j) { } // Just spin in a loop for the offset.
action.Invoke();
});
}
}
ExecutePendingParallelActions();
teardown.Invoke();
}
}
// Input: [1, 2, 3], 3
// Ouput: [
// [1, 1, 1],
// [2, 1, 1],
// [3, 1, 1],
// [1, 2, 1],
// [2, 2, 1],
// [3, 2, 1],
// [1, 3, 1],
// [2, 3, 1],
// [3, 3, 1],
// [1, 1, 2],
// [2, 1, 2],
// [3, 1, 2],
// [1, 2, 2],
// [2, 2, 2],
// [3, 2, 2],
// [1, 3, 2],
// [2, 3, 2],
// [3, 3, 2],
// [1, 1, 3],
// [2, 1, 3],
// [3, 1, 3],
// [1, 2, 3],
// [2, 2, 3],
// [3, 2, 3],
// [1, 3, 3],
// [2, 3, 3],
// [3, 3, 3]
// ]
private static IEnumerable<int[]> GenerateCombinations(int[] options, int count)
{
int[] indexTracker = new int[count];
int[] combo = new int[count];
for (int i = 0; i < count; ++i)
{
combo[i] = options[0];
}
// Same algorithm as picking a combination lock.
int rollovers = 0;
while (rollovers < count)
{
yield return combo; // No need to duplicate the array since we're just reading it.
for (int i = 0; i < count; ++i)
{
int index = ++indexTracker[i];
if (index == options.Length)
{
indexTracker[i] = 0;
combo[i] = options[0];
if (i == rollovers)
{
++rollovers;
}
}
else
{
combo[i] = options[index];
break;
}
}
}
}
}
}
使用示例:
[Test]
public void DeferredMayBeBeResolvedAndPromiseAwaitedConcurrently_void0()
{
Promise.Deferred deferred = default(Promise.Deferred);
Promise promise = default(Promise);
int invokedCount = 0;
var threadHelper = new ThreadHelper();
threadHelper.ExecuteParallelActionsWithOffsets(false,
// Setup
() =>
{
invokedCount = 0;
deferred = Promise.NewDeferred();
promise = deferred.Promise;
},
// Teardown
() => Assert.AreEqual(1, invokedCount),
// Parallel Actions
() => deferred.Resolve(),
() => promise.Then(() => { Interlocked.Increment(ref invokedCount); }).Forget()
);
}
并发是内存模型、硬件、缓存和代码之间复杂的相互作用。在Java的情况下,至少这样的测试主要由jcstress部分解决。众所周知,该库的创建者是许多JVM、GC和Java并发特性的作者。
但是即使是这个库也需要对Java内存模型规范有很好的了解,这样我们才能确切地知道我们在测试什么。但我认为这项工作的重点是微基准测试。不是庞大的业务应用。
如果你正在测试简单的new Thread(runnable).run() 您可以模拟Thread来按顺序运行可运行对象
例如,如果被测试对象的代码像这样调用一个新线程
Class TestedClass {
public void doAsychOp() {
new Thread(new myRunnable()).start();
}
}
然后模拟new Threads并按顺序运行runable参数会有所帮助
@Mock
private Thread threadMock;
@Test
public void myTest() throws Exception {
PowerMockito.mockStatic(Thread.class);
//when new thread is created execute runnable immediately
PowerMockito.whenNew(Thread.class).withAnyArguments().then(new Answer<Thread>() {
@Override
public Thread answer(InvocationOnMock invocation) throws Throwable {
// immediately run the runnable
Runnable runnable = invocation.getArgumentAt(0, Runnable.class);
if(runnable != null) {
runnable.run();
}
return threadMock;//return a mock so Thread.start() will do nothing
}
});
TestedClass testcls = new TestedClass()
testcls.doAsychOp(); //will invoke myRunnable.run in current thread
//.... check expected
}