001/*************************************************** 002 * Licensed under MIT No Attribution (SPDX: MIT-0) * 003 ***************************************************/ 004 005package org.reactivestreams.tck; 006 007import org.reactivestreams.Publisher; 008import org.reactivestreams.Subscriber; 009import org.reactivestreams.Subscription; 010import org.reactivestreams.tck.flow.support.Optional; 011import org.reactivestreams.tck.flow.support.SubscriberBufferOverflowException; 012 013import java.util.Collections; 014import java.util.LinkedList; 015import java.util.List; 016import java.util.concurrent.ArrayBlockingQueue; 017import java.util.concurrent.CopyOnWriteArrayList; 018import java.util.concurrent.CountDownLatch; 019import java.util.concurrent.TimeUnit; 020import java.util.concurrent.atomic.AtomicReference; 021 022import static java.util.concurrent.TimeUnit.MILLISECONDS; 023import static java.util.concurrent.TimeUnit.NANOSECONDS; 024import static org.testng.Assert.assertTrue; 025import static org.testng.Assert.fail; 026 027public class TestEnvironment { 028 public static final int TEST_BUFFER_SIZE = 16; 029 030 private static final String DEFAULT_TIMEOUT_MILLIS_ENV = "DEFAULT_TIMEOUT_MILLIS"; 031 private static final long DEFAULT_TIMEOUT_MILLIS = 100; 032 033 private static final String DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV = "DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS"; 034 private static final String DEFAULT_POLL_TIMEOUT_MILLIS_ENV = "DEFAULT_POLL_TIMEOUT_MILLIS_ENV"; 035 036 private final long defaultTimeoutMillis; 037 private final long defaultPollTimeoutMillis; 038 private final long defaultNoSignalsTimeoutMillis; 039 private final boolean printlnDebug; 040 041 private CopyOnWriteArrayList<Throwable> asyncErrors = new CopyOnWriteArrayList<Throwable>(); 042 043 /** 044 * Tests must specify the timeout for expected outcome of asynchronous 045 * interactions. Longer timeout does not invalidate the correctness of 046 * the implementation, but can in some cases result in longer time to 047 * run the tests. 048 * @param defaultTimeoutMillis default timeout to be used in all expect* methods 049 * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore 050 * @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't 051 * preempted by an asynchronous event. 052 * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output, 053 */ 054 public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, long defaultPollTimeoutMillis, 055 boolean printlnDebug) { 056 this.defaultTimeoutMillis = defaultTimeoutMillis; 057 this.defaultPollTimeoutMillis = defaultPollTimeoutMillis; 058 this.defaultNoSignalsTimeoutMillis = defaultNoSignalsTimeoutMillis; 059 this.printlnDebug = printlnDebug; 060 } 061 062 /** 063 * Tests must specify the timeout for expected outcome of asynchronous 064 * interactions. Longer timeout does not invalidate the correctness of 065 * the implementation, but can in some cases result in longer time to 066 * run the tests. 067 * @param defaultTimeoutMillis default timeout to be used in all expect* methods 068 * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore 069 * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output, 070 */ 071 public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, boolean printlnDebug) { 072 this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, defaultTimeoutMillis, printlnDebug); 073 } 074 075 /** 076 * Tests must specify the timeout for expected outcome of asynchronous 077 * interactions. Longer timeout does not invalidate the correctness of 078 * the implementation, but can in some cases result in longer time to 079 * run the tests. 080 * 081 * @param defaultTimeoutMillis default timeout to be used in all expect* methods 082 * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore 083 * @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't 084 * preempted by an asynchronous event. 085 */ 086 public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, long defaultPollTimeoutMillis) { 087 this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, defaultPollTimeoutMillis, false); 088 } 089 090 /** 091 * Tests must specify the timeout for expected outcome of asynchronous 092 * interactions. Longer timeout does not invalidate the correctness of 093 * the implementation, but can in some cases result in longer time to 094 * run the tests. 095 * 096 * @param defaultTimeoutMillis default timeout to be used in all expect* methods 097 * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore 098 */ 099 public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis) { 100 this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, defaultTimeoutMillis); 101 } 102 103 /** 104 * Tests must specify the timeout for expected outcome of asynchronous 105 * interactions. Longer timeout does not invalidate the correctness of 106 * the implementation, but can in some cases result in longer time to 107 * run the tests. 108 * 109 * @param defaultTimeoutMillis default timeout to be used in all expect* methods 110 */ 111 public TestEnvironment(long defaultTimeoutMillis) { 112 this(defaultTimeoutMillis, defaultTimeoutMillis, defaultTimeoutMillis); 113 } 114 115 /** 116 * Tests must specify the timeout for expected outcome of asynchronous 117 * interactions. Longer timeout does not invalidate the correctness of 118 * the implementation, but can in some cases result in longer time to 119 * run the tests. 120 * 121 * The default timeout for all expect* methods will be obtained by either the env variable {@code DEFAULT_TIMEOUT_MILLIS} 122 * or the default value ({@link TestEnvironment#DEFAULT_TIMEOUT_MILLIS}) will be used. 123 * 124 * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output, 125 * often helpful to pinpoint simple race conditions etc. 126 */ 127 public TestEnvironment(boolean printlnDebug) { 128 this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis(), envDefaultPollTimeoutMillis(), printlnDebug); 129 } 130 131 /** 132 * Tests must specify the timeout for expected outcome of asynchronous 133 * interactions. Longer timeout does not invalidate the correctness of 134 * the implementation, but can in some cases result in longer time to 135 * run the tests. 136 * 137 * The default timeout for all expect* methods will be obtained by either the env variable {@code DEFAULT_TIMEOUT_MILLIS} 138 * or the default value ({@link TestEnvironment#DEFAULT_TIMEOUT_MILLIS}) will be used. 139 */ 140 public TestEnvironment() { 141 this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis()); 142 } 143 144 /** This timeout is used when waiting for a signal to arrive. */ 145 public long defaultTimeoutMillis() { 146 return defaultTimeoutMillis; 147 } 148 149 /** 150 * This timeout is used when asserting that no further signals are emitted. 151 * Note that this timeout default 152 */ 153 public long defaultNoSignalsTimeoutMillis() { 154 return defaultNoSignalsTimeoutMillis; 155 } 156 157 /** 158 * The default amount of time to poll for events if {@code defaultTimeoutMillis} isn't preempted by an asynchronous 159 * event. 160 */ 161 public long defaultPollTimeoutMillis() { 162 return defaultPollTimeoutMillis; 163 } 164 165 /** 166 * Tries to parse the env variable {@code DEFAULT_TIMEOUT_MILLIS} as long and returns the value if present OR its default value. 167 * 168 * @throws java.lang.IllegalArgumentException when unable to parse the env variable 169 */ 170 public static long envDefaultTimeoutMillis() { 171 final String envMillis = System.getenv(DEFAULT_TIMEOUT_MILLIS_ENV); 172 if (envMillis == null) return DEFAULT_TIMEOUT_MILLIS; 173 else try { 174 return Long.parseLong(envMillis); 175 } catch (NumberFormatException ex) { 176 throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_TIMEOUT_MILLIS_ENV, envMillis), ex); 177 } 178 } 179 180 /** 181 * Tries to parse the env variable {@code DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS} as long and returns the value if present OR its default value. 182 * 183 * @throws java.lang.IllegalArgumentException when unable to parse the env variable 184 */ 185 public static long envDefaultNoSignalsTimeoutMillis() { 186 final String envMillis = System.getenv(DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV); 187 if (envMillis == null) return envDefaultTimeoutMillis(); 188 else try { 189 return Long.parseLong(envMillis); 190 } catch (NumberFormatException ex) { 191 throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV, envMillis), ex); 192 } 193 } 194 195 /** 196 * Tries to parse the env variable {@code DEFAULT_POLL_TIMEOUT_MILLIS_ENV} as long and returns the value if present OR its default value. 197 * 198 * @throws java.lang.IllegalArgumentException when unable to parse the env variable 199 */ 200 public static long envDefaultPollTimeoutMillis() { 201 final String envMillis = System.getenv(DEFAULT_POLL_TIMEOUT_MILLIS_ENV); 202 if (envMillis == null) return envDefaultTimeoutMillis(); 203 else try { 204 return Long.parseLong(envMillis); 205 } catch (NumberFormatException ex) { 206 throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_POLL_TIMEOUT_MILLIS_ENV, envMillis), ex); 207 } 208 } 209 210 /** 211 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. 212 * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required. 213 * 214 * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution. 215 * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly 216 * from the environment using {@code env.dropAsyncError()}. 217 * 218 * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()} 219 */ 220 public void flop(String msg) { 221 try { 222 fail(msg); 223 } catch (Throwable t) { 224 asyncErrors.add(t); 225 } 226 } 227 228 /** 229 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. 230 * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required. 231 * 232 * This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this. 233 * 234 * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution. 235 * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly 236 * from the environment using {@code env.dropAsyncError()}. 237 * 238 * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()} 239 */ 240 public void flop(Throwable thr, String msg) { 241 try { 242 fail(msg, thr); 243 } catch (Throwable t) { 244 asyncErrors.add(thr); 245 } 246 } 247 248 /** 249 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. 250 * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required. 251 * 252 * This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this. 253 * 254 * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution. 255 * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly 256 * from the environment using {@code env.dropAsyncError()}. 257 * 258 * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()} 259 */ 260 public void flop(Throwable thr) { 261 try { 262 fail(thr.getMessage(), thr); 263 } catch (Throwable t) { 264 asyncErrors.add(thr); 265 } 266 } 267 268 /** 269 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. 270 * 271 * This method DOES fail the test right away (it tries to, by throwing an AssertionException), 272 * in such it is different from {@link org.reactivestreams.tck.TestEnvironment#flop} which only records the error. 273 * 274 * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution. 275 * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly 276 * from the environment using {@code env.dropAsyncError()}. 277 * 278 * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()} 279 */ 280 public <T> T flopAndFail(String msg) { 281 try { 282 fail(msg); 283 } catch (Throwable t) { 284 asyncErrors.add(t); 285 fail(msg, t); 286 } 287 return null; // unreachable, the previous block will always exit by throwing 288 } 289 290 291 292 public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub) throws InterruptedException { 293 subscribe(pub, sub, defaultTimeoutMillis); 294 } 295 296 public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub, long timeoutMillis) throws InterruptedException { 297 pub.subscribe(sub); 298 sub.subscription.expectCompletion(timeoutMillis, String.format("Could not subscribe %s to Publisher %s", sub, pub)); 299 verifyNoAsyncErrorsNoDelay(); 300 } 301 302 public <T> ManualSubscriber<T> newBlackholeSubscriber(Publisher<T> pub) throws InterruptedException { 303 ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(this); 304 subscribe(pub, sub, defaultTimeoutMillis()); 305 return sub; 306 } 307 308 public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub) throws InterruptedException { 309 return newManualSubscriber(pub, defaultTimeoutMillis()); 310 } 311 312 public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub, long timeoutMillis) throws InterruptedException { 313 ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(this); 314 subscribe(pub, sub, timeoutMillis); 315 return sub; 316 } 317 318 public void clearAsyncErrors() { 319 asyncErrors.clear(); 320 } 321 322 public Throwable dropAsyncError() { 323 try { 324 return asyncErrors.remove(0); 325 } catch (IndexOutOfBoundsException ex) { 326 return null; 327 } 328 } 329 330 /** 331 * Waits for {@link TestEnvironment#defaultNoSignalsTimeoutMillis()} and then verifies that no asynchronous errors 332 * were signalled pior to, or during that time (by calling {@code flop()}). 333 */ 334 public void verifyNoAsyncErrors() { 335 verifyNoAsyncErrors(defaultNoSignalsTimeoutMillis()); 336 } 337 338 /** 339 * This version of {@code verifyNoAsyncErrors} should be used when errors still could be signalled 340 * asynchronously during {@link TestEnvironment#defaultTimeoutMillis()} time. 341 * <p></p> 342 * It will immediatly check if any async errors were signaled (using {@link TestEnvironment#flop(String)}, 343 * and if no errors encountered wait for another default timeout as the errors may yet be signalled. 344 * The initial check is performed in order to fail-fast in case of an already failed test. 345 */ 346 public void verifyNoAsyncErrors(long delay) { 347 try { 348 verifyNoAsyncErrorsNoDelay(); 349 350 Thread.sleep(delay); 351 verifyNoAsyncErrorsNoDelay(); 352 } catch (InterruptedException e) { 353 throw new RuntimeException(e); 354 } 355 } 356 357 /** 358 * Verifies that no asynchronous errors were signalled pior to calling this method (by calling {@code flop()}). 359 * This version of verifyNoAsyncError <b>does not wait before checking for asynchronous errors</b>, and is to be used 360 * for example in tight loops etc. 361 */ 362 public void verifyNoAsyncErrorsNoDelay() { 363 for (Throwable e : asyncErrors) { 364 if (e instanceof AssertionError) { 365 throw (AssertionError) e; 366 } else { 367 fail(String.format("Async error during test execution: %s", e.getMessage()), e); 368 } 369 } 370 } 371 372 /** If {@code TestEnvironment#printlnDebug} is true, print debug message to std out. */ 373 public void debug(String msg) { 374 if (debugEnabled()) { 375 System.out.printf("[TCK-DEBUG] %s%n", msg); 376 } 377 } 378 379 public final boolean debugEnabled() { 380 return printlnDebug; 381 } 382 383 /** 384 * Looks for given {@code method} method in stack trace. 385 * Can be used to answer questions like "was this method called from onComplete?". 386 * 387 * @return the caller's StackTraceElement at which he the looked for method was found in the call stack, EMPTY otherwise 388 */ 389 public Optional<StackTraceElement> findCallerMethodInStackTrace(String method) { 390 final Throwable thr = new Throwable(); // gets the stacktrace 391 392 for (StackTraceElement stackElement : thr.getStackTrace()) { 393 if (stackElement.getMethodName().equals(method)) { 394 return Optional.of(stackElement); 395 } 396 } 397 return Optional.empty(); 398 } 399 400 // ---- classes ---- 401 402 /** 403 * {@link Subscriber} implementation which can be steered by test code and asserted on. 404 */ 405 public static class ManualSubscriber<T> extends TestSubscriber<T> { 406 Receptacle<T> received; 407 408 public ManualSubscriber(TestEnvironment env) { 409 super(env); 410 received = new Receptacle<T>(this.env); 411 } 412 413 @Override 414 public void onNext(T element) { 415 try { 416 received.add(element); 417 } catch (IllegalStateException ex) { 418 // error message refinement 419 throw new SubscriberBufferOverflowException( 420 String.format("Received more than bufferSize (%d) onNext signals. " + 421 "The Publisher probably emited more signals than expected!", 422 received.QUEUE_SIZE), ex); 423 } 424 } 425 426 @Override 427 public void onComplete() { 428 received.complete(); 429 } 430 431 public void request(long elements) { 432 subscription.value().request(elements); 433 } 434 435 public T requestNextElement() throws InterruptedException { 436 return requestNextElement(env.defaultTimeoutMillis()); 437 } 438 439 public T requestNextElement(long timeoutMillis) throws InterruptedException { 440 return requestNextElement(timeoutMillis, "Did not receive expected element"); 441 } 442 443 public T requestNextElement(String errorMsg) throws InterruptedException { 444 return requestNextElement(env.defaultTimeoutMillis(), errorMsg); 445 } 446 447 public T requestNextElement(long timeoutMillis, String errorMsg) throws InterruptedException { 448 request(1); 449 return nextElement(timeoutMillis, errorMsg); 450 } 451 452 public Optional<T> requestNextElementOrEndOfStream() throws InterruptedException { 453 return requestNextElementOrEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion"); 454 } 455 456 public Optional<T> requestNextElementOrEndOfStream(String errorMsg) throws InterruptedException { 457 return requestNextElementOrEndOfStream(env.defaultTimeoutMillis(), errorMsg); 458 } 459 460 public Optional<T> requestNextElementOrEndOfStream(long timeoutMillis) throws InterruptedException { 461 return requestNextElementOrEndOfStream(timeoutMillis, "Did not receive expected stream completion"); 462 } 463 464 public Optional<T> requestNextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { 465 request(1); 466 return nextElementOrEndOfStream(timeoutMillis, errorMsg); 467 } 468 469 public void requestEndOfStream() throws InterruptedException { 470 requestEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion"); 471 } 472 473 public void requestEndOfStream(long timeoutMillis) throws InterruptedException { 474 requestEndOfStream(timeoutMillis, "Did not receive expected stream completion"); 475 } 476 477 public void requestEndOfStream(String errorMsg) throws InterruptedException { 478 requestEndOfStream(env.defaultTimeoutMillis(), errorMsg); 479 } 480 481 public void requestEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { 482 request(1); 483 expectCompletion(timeoutMillis, errorMsg); 484 } 485 486 public List<T> requestNextElements(long elements) throws InterruptedException { 487 request(elements); 488 return nextElements(elements, env.defaultTimeoutMillis()); 489 } 490 491 public List<T> requestNextElements(long elements, long timeoutMillis) throws InterruptedException { 492 request(elements); 493 return nextElements(elements, timeoutMillis, String.format("Did not receive %d expected elements", elements)); 494 } 495 496 public List<T> requestNextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException { 497 request(elements); 498 return nextElements(elements, timeoutMillis, errorMsg); 499 } 500 501 public T nextElement() throws InterruptedException { 502 return nextElement(env.defaultTimeoutMillis()); 503 } 504 505 public T nextElement(long timeoutMillis) throws InterruptedException { 506 return nextElement(timeoutMillis, "Did not receive expected element"); 507 } 508 509 public T nextElement(String errorMsg) throws InterruptedException { 510 return nextElement(env.defaultTimeoutMillis(), errorMsg); 511 } 512 513 public T nextElement(long timeoutMillis, String errorMsg) throws InterruptedException { 514 return received.next(timeoutMillis, errorMsg); 515 } 516 517 public Optional<T> nextElementOrEndOfStream() throws InterruptedException { 518 return nextElementOrEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion"); 519 } 520 521 public Optional<T> nextElementOrEndOfStream(long timeoutMillis) throws InterruptedException { 522 return nextElementOrEndOfStream(timeoutMillis, "Did not receive expected stream completion"); 523 } 524 525 public Optional<T> nextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { 526 return received.nextOrEndOfStream(timeoutMillis, errorMsg); 527 } 528 529 public List<T> nextElements(long elements) throws InterruptedException { 530 return nextElements(elements, env.defaultTimeoutMillis(), "Did not receive expected element or completion"); 531 } 532 533 public List<T> nextElements(long elements, String errorMsg) throws InterruptedException { 534 return nextElements(elements, env.defaultTimeoutMillis(), errorMsg); 535 } 536 537 public List<T> nextElements(long elements, long timeoutMillis) throws InterruptedException { 538 return nextElements(elements, timeoutMillis, "Did not receive expected element or completion"); 539 } 540 541 public List<T> nextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException { 542 return received.nextN(elements, timeoutMillis, errorMsg); 543 } 544 545 public void expectNext(T expected) throws InterruptedException { 546 expectNext(expected, env.defaultTimeoutMillis()); 547 } 548 549 public void expectNext(T expected, long timeoutMillis) throws InterruptedException { 550 T received = nextElement(timeoutMillis, "Did not receive expected element on downstream"); 551 if (!received.equals(expected)) { 552 env.flop(String.format("Expected element %s on downstream but received %s", expected, received)); 553 } 554 } 555 556 public void expectCompletion() throws InterruptedException { 557 expectCompletion(env.defaultTimeoutMillis(), "Did not receive expected stream completion"); 558 } 559 560 public void expectCompletion(long timeoutMillis) throws InterruptedException { 561 expectCompletion(timeoutMillis, "Did not receive expected stream completion"); 562 } 563 564 public void expectCompletion(String errorMsg) throws InterruptedException { 565 expectCompletion(env.defaultTimeoutMillis(), errorMsg); 566 } 567 568 public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException { 569 received.expectCompletion(timeoutMillis, errorMsg); 570 } 571 572 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws Exception { 573 expectErrorWithMessage(expected, Collections.singletonList(requiredMessagePart), env.defaultTimeoutMillis(), env.defaultPollTimeoutMillis()); 574 } 575 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives) throws Exception { 576 expectErrorWithMessage(expected, requiredMessagePartAlternatives, env.defaultTimeoutMillis(), env.defaultPollTimeoutMillis()); 577 } 578 579 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 580 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart, long timeoutMillis) throws Exception { 581 expectErrorWithMessage(expected, Collections.singletonList(requiredMessagePart), timeoutMillis); 582 } 583 584 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives, long timeoutMillis) throws Exception { 585 expectErrorWithMessage(expected, requiredMessagePartAlternatives, timeoutMillis, timeoutMillis); 586 } 587 588 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives, 589 long totalTimeoutMillis, long pollTimeoutMillis) throws Exception { 590 final E err = expectError(expected, totalTimeoutMillis, pollTimeoutMillis); 591 final String message = err.getMessage(); 592 593 boolean contains = false; 594 for (String requiredMessagePart : requiredMessagePartAlternatives) 595 if (message.contains(requiredMessagePart)) contains = true; // not short-circuting loop, it is expected to 596 assertTrue(contains, 597 String.format("Got expected exception [%s] but missing message part [%s], was: %s", 598 err.getClass(), "anyOf: " + requiredMessagePartAlternatives, err.getMessage())); 599 } 600 601 public <E extends Throwable> E expectError(Class<E> expected) throws Exception { 602 return expectError(expected, env.defaultTimeoutMillis()); 603 } 604 605 public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws Exception { 606 return expectError(expected, timeoutMillis, env.defaultPollTimeoutMillis()); 607 } 608 609 public <E extends Throwable> E expectError(Class<E> expected, String errorMsg) throws Exception { 610 return expectError(expected, env.defaultTimeoutMillis(), errorMsg); 611 } 612 613 public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis, String errorMsg) throws Exception { 614 return expectError(expected, timeoutMillis, env.defaultPollTimeoutMillis(), errorMsg); 615 } 616 617 public <E extends Throwable> E expectError(Class<E> expected, long totalTimeoutMillis, long pollTimeoutMillis) throws Exception { 618 return expectError(expected, totalTimeoutMillis, pollTimeoutMillis, String.format("Expected onError(%s)", expected.getName())); 619 } 620 621 public <E extends Throwable> E expectError(Class<E> expected, long totalTimeoutMillis, long pollTimeoutMillis, 622 String errorMsg) throws Exception { 623 return received.expectError(expected, totalTimeoutMillis, pollTimeoutMillis, errorMsg); 624 } 625 626 public void expectNone() throws InterruptedException { 627 expectNone(env.defaultNoSignalsTimeoutMillis()); 628 } 629 630 public void expectNone(String errMsgPrefix) throws InterruptedException { 631 expectNone(env.defaultNoSignalsTimeoutMillis(), errMsgPrefix); 632 } 633 634 public void expectNone(long withinMillis) throws InterruptedException { 635 expectNone(withinMillis, "Did not expect an element but got element"); 636 } 637 638 public void expectNone(long withinMillis, String errMsgPrefix) throws InterruptedException { 639 received.expectNone(withinMillis, errMsgPrefix); 640 } 641 642 } 643 644 public static class ManualSubscriberWithSubscriptionSupport<T> extends ManualSubscriber<T> { 645 646 public ManualSubscriberWithSubscriptionSupport(TestEnvironment env) { 647 super(env); 648 } 649 650 @Override 651 public void onNext(T element) { 652 if (env.debugEnabled()) { 653 env.debug(String.format("%s::onNext(%s)", this, element)); 654 } 655 if (subscription.isCompleted()) { 656 super.onNext(element); 657 } else { 658 env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element)); 659 } 660 } 661 662 @Override 663 public void onComplete() { 664 if (env.debugEnabled()) { 665 env.debug(this + "::onComplete()"); 666 } 667 if (subscription.isCompleted()) { 668 super.onComplete(); 669 } else { 670 env.flop("Subscriber::onComplete() called before Subscriber::onSubscribe"); 671 } 672 } 673 674 @Override 675 public void onSubscribe(Subscription s) { 676 if (env.debugEnabled()) { 677 env.debug(String.format("%s::onSubscribe(%s)", this, s)); 678 } 679 if (!subscription.isCompleted()) { 680 subscription.complete(s); 681 } else { 682 env.flop("Subscriber::onSubscribe called on an already-subscribed Subscriber"); 683 } 684 } 685 686 @Override 687 public void onError(Throwable cause) { 688 if (env.debugEnabled()) { 689 env.debug(String.format("%s::onError(%s)", this, cause)); 690 } 691 if (subscription.isCompleted()) { 692 super.onError(cause); 693 } else { 694 env.flop(cause, String.format("Subscriber::onError(%s) called before Subscriber::onSubscribe", cause)); 695 } 696 } 697 } 698 699 /** 700 * Similar to {@link org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport} 701 * but does not accumulate values signalled via <code>onNext</code>, thus it can not be used to assert 702 * values signalled to this subscriber. Instead it may be used to quickly drain a given publisher. 703 */ 704 public static class BlackholeSubscriberWithSubscriptionSupport<T> 705 extends ManualSubscriberWithSubscriptionSupport<T> { 706 707 public BlackholeSubscriberWithSubscriptionSupport(TestEnvironment env) { 708 super(env); 709 } 710 711 @Override 712 public void onNext(T element) { 713 if (env.debugEnabled()) { 714 env.debug(String.format("%s::onNext(%s)", this, element)); 715 } 716 if (!subscription.isCompleted()) { 717 env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element)); 718 } 719 } 720 721 @Override 722 public T nextElement(long timeoutMillis, String errorMsg) throws InterruptedException { 723 throw new RuntimeException("Can not expect elements from BlackholeSubscriber, use ManualSubscriber instead!"); 724 } 725 726 @Override 727 public List<T> nextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException { 728 throw new RuntimeException("Can not expect elements from BlackholeSubscriber, use ManualSubscriber instead!"); 729 } 730 } 731 732 public static class TestSubscriber<T> implements Subscriber<T> { 733 final Promise<Subscription> subscription; 734 735 protected final TestEnvironment env; 736 737 public TestSubscriber(TestEnvironment env) { 738 this.env = env; 739 subscription = new Promise<Subscription>(env); 740 } 741 742 @Override 743 public void onError(Throwable cause) { 744 env.flop(cause, String.format("Unexpected Subscriber::onError(%s)", cause)); 745 } 746 747 @Override 748 public void onComplete() { 749 env.flop("Unexpected Subscriber::onComplete()"); 750 } 751 752 @Override 753 public void onNext(T element) { 754 env.flop(String.format("Unexpected Subscriber::onNext(%s)", element)); 755 } 756 757 @Override 758 public void onSubscribe(Subscription subscription) { 759 env.flop(String.format("Unexpected Subscriber::onSubscribe(%s)", subscription)); 760 } 761 762 public void cancel() { 763 if (subscription.isCompleted()) { 764 subscription.value().cancel(); 765 } else { 766 env.flop("Cannot cancel a subscription before having received it"); 767 } 768 } 769 } 770 771 public static class ManualPublisher<T> implements Publisher<T> { 772 protected final TestEnvironment env; 773 774 protected long pendingDemand = 0L; 775 protected Promise<Subscriber<? super T>> subscriber; 776 777 protected final Receptacle<Long> requests; 778 779 protected final Latch cancelled; 780 781 public ManualPublisher(TestEnvironment env) { 782 this.env = env; 783 requests = new Receptacle<Long>(env); 784 cancelled = new Latch(env); 785 subscriber = new Promise<Subscriber<? super T>>(this.env); 786 } 787 788 @Override 789 public void subscribe(Subscriber<? super T> s) { 790 if (!subscriber.isCompleted()) { 791 subscriber.completeImmediatly(s); 792 793 Subscription subs = new Subscription() { 794 @Override 795 public void request(long elements) { 796 requests.add(elements); 797 } 798 799 @Override 800 public void cancel() { 801 cancelled.close(); 802 } 803 }; 804 s.onSubscribe(subs); 805 806 } else { 807 env.flop("TestPublisher doesn't support more than one Subscriber"); 808 } 809 } 810 811 public void sendNext(T element) { 812 if (subscriber.isCompleted()) { 813 subscriber.value().onNext(element); 814 } else { 815 env.flop("Cannot sendNext before having a Subscriber"); 816 } 817 } 818 819 public void sendCompletion() { 820 if (subscriber.isCompleted()) { 821 subscriber.value().onComplete(); 822 } else { 823 env.flop("Cannot sendCompletion before having a Subscriber"); 824 } 825 } 826 827 public void sendError(Throwable cause) { 828 if (subscriber.isCompleted()) { 829 subscriber.value().onError(cause); 830 } else { 831 env.flop("Cannot sendError before having a Subscriber"); 832 } 833 } 834 835 public long expectRequest() throws InterruptedException { 836 return expectRequest(env.defaultTimeoutMillis()); 837 } 838 839 public long expectRequest(long timeoutMillis) throws InterruptedException { 840 long requested = requests.next(timeoutMillis, "Did not receive expected `request` call"); 841 if (requested <= 0) { 842 return env.<Long>flopAndFail(String.format("Requests cannot be zero or negative but received request(%s)", requested)); 843 } else { 844 pendingDemand += requested; 845 return requested; 846 } 847 } 848 849 850 public long expectRequest(long timeoutMillis, String errorMessageAddendum) throws InterruptedException { 851 long requested = requests.next(timeoutMillis, String.format("Did not receive expected `request` call. %s", errorMessageAddendum)); 852 if (requested <= 0) { 853 return env.<Long>flopAndFail(String.format("Requests cannot be zero or negative but received request(%s)", requested)); 854 } else { 855 pendingDemand += requested; 856 return requested; 857 } 858 } 859 860 public void expectExactRequest(long expected) throws InterruptedException { 861 expectExactRequest(expected, env.defaultTimeoutMillis()); 862 } 863 864 public void expectExactRequest(long expected, long timeoutMillis) throws InterruptedException { 865 long requested = expectRequest(timeoutMillis); 866 if (requested != expected) { 867 env.flop(String.format("Received `request(%d)` on upstream but expected `request(%d)`", requested, expected)); 868 } 869 pendingDemand += requested; 870 } 871 872 public void expectNoRequest() throws InterruptedException { 873 expectNoRequest(env.defaultTimeoutMillis()); 874 } 875 876 public void expectNoRequest(long timeoutMillis) throws InterruptedException { 877 requests.expectNone(timeoutMillis, "Received an unexpected call to: request: "); 878 } 879 880 public void expectCancelling() throws InterruptedException { 881 expectCancelling(env.defaultTimeoutMillis()); 882 } 883 884 public void expectCancelling(long timeoutMillis) throws InterruptedException { 885 cancelled.expectClose(timeoutMillis, "Did not receive expected cancelling of upstream subscription"); 886 } 887 888 public boolean isCancelled() throws InterruptedException { 889 return cancelled.isClosed(); 890 } 891 } 892 893 /** 894 * Like a CountDownLatch, but resettable and with some convenience methods 895 */ 896 public static class Latch { 897 private final TestEnvironment env; 898 volatile private CountDownLatch countDownLatch = new CountDownLatch(1); 899 900 public Latch(TestEnvironment env) { 901 this.env = env; 902 } 903 904 public void reOpen() { 905 countDownLatch = new CountDownLatch(1); 906 } 907 908 public boolean isClosed() { 909 return countDownLatch.getCount() == 0; 910 } 911 912 public void close() { 913 countDownLatch.countDown(); 914 } 915 916 public void assertClosed(String openErrorMsg) { 917 if (!isClosed()) { 918 env.flop(new ExpectedClosedLatchException(openErrorMsg)); 919 } 920 } 921 922 public void assertOpen(String closedErrorMsg) { 923 if (isClosed()) { 924 env.flop(new ExpectedOpenLatchException(closedErrorMsg)); 925 } 926 } 927 928 public void expectClose(String notClosedErrorMsg) throws InterruptedException { 929 expectClose(env.defaultTimeoutMillis(), notClosedErrorMsg); 930 } 931 932 public void expectClose(long timeoutMillis, String notClosedErrorMsg) throws InterruptedException { 933 countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); 934 if (countDownLatch.getCount() > 0) { 935 env.flop(String.format("%s within %d ms", notClosedErrorMsg, timeoutMillis)); 936 } 937 } 938 939 static final class ExpectedOpenLatchException extends RuntimeException { 940 public ExpectedOpenLatchException(String message) { 941 super(message); 942 } 943 } 944 945 static final class ExpectedClosedLatchException extends RuntimeException { 946 public ExpectedClosedLatchException(String message) { 947 super(message); 948 } 949 } 950 951 } 952 953 // simple promise for *one* value, which cannot be reset 954 public static class Promise<T> { 955 private final TestEnvironment env; 956 957 public static <T> Promise<T> completed(TestEnvironment env, T value) { 958 Promise<T> promise = new Promise<T>(env); 959 promise.completeImmediatly(value); 960 return promise; 961 } 962 963 public Promise(TestEnvironment env) { 964 this.env = env; 965 } 966 967 private ArrayBlockingQueue<T> abq = new ArrayBlockingQueue<T>(1); 968 private AtomicReference<T> _value = new AtomicReference<T>(); 969 970 public T value() { 971 final T value = _value.get(); 972 if (value != null) { 973 return value; 974 } else { 975 env.flop("Cannot access promise value before completion"); 976 return null; 977 } 978 } 979 980 public boolean isCompleted() { 981 return _value.get() != null; 982 } 983 984 /** 985 * Allows using expectCompletion to await for completion of the value and complete it _then_ 986 */ 987 public void complete(T value) { 988 if (_value.compareAndSet(null, value)) { 989 // we add the value to the queue such to wake up any expectCompletion which was triggered before complete() was called 990 abq.add(value); 991 } else { 992 env.flop(String.format("Cannot complete a promise more than once! Present value: %s, attempted to set: %s", _value.get(), value)); 993 } 994 } 995 996 /** 997 * Same as complete. 998 * 999 * Keeping this method for binary compatibility. 1000 */ 1001 public void completeImmediatly(T value) { 1002 complete(value); 1003 } 1004 1005 public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException { 1006 if (!isCompleted()) { 1007 T val = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); 1008 1009 if (val == null) { 1010 env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis)); 1011 } 1012 } 1013 } 1014 } 1015 1016 // a "Promise" for multiple values, which also supports "end-of-stream reached" 1017 public static class Receptacle<T> { 1018 final int QUEUE_SIZE = 2 * TEST_BUFFER_SIZE; 1019 private final TestEnvironment env; 1020 1021 private final ArrayBlockingQueue<Optional<T>> abq = new ArrayBlockingQueue<Optional<T>>(QUEUE_SIZE); 1022 1023 private final Latch completedLatch; 1024 1025 Receptacle(TestEnvironment env) { 1026 this.env = env; 1027 this.completedLatch = new Latch(env); 1028 } 1029 1030 public void add(T value) { 1031 completedLatch.assertOpen(String.format("Unexpected element %s received after stream completed", value)); 1032 1033 abq.add(Optional.of(value)); 1034 } 1035 1036 public void complete() { 1037 completedLatch.assertOpen("Unexpected additional complete signal received!"); 1038 completedLatch.close(); 1039 1040 abq.add(Optional.<T>empty()); 1041 } 1042 1043 public T next(long timeoutMillis, String errorMsg) throws InterruptedException { 1044 Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); 1045 1046 if (value == null) { 1047 return env.flopAndFail(String.format("%s within %d ms", errorMsg, timeoutMillis)); 1048 } else if (value.isDefined()) { 1049 return value.get(); 1050 } else { 1051 return env.flopAndFail("Expected element but got end-of-stream"); 1052 } 1053 } 1054 1055 public Optional<T> nextOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { 1056 Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); 1057 1058 if (value == null) { 1059 env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis)); 1060 return Optional.empty(); 1061 } 1062 1063 return value; 1064 } 1065 1066 /** 1067 * @param timeoutMillis total timeout time for awaiting all {@code elements} number of elements 1068 */ 1069 public List<T> nextN(long elements, long timeoutMillis, String errorMsg) throws InterruptedException { 1070 List<T> result = new LinkedList<T>(); 1071 long remaining = elements; 1072 long deadline = System.currentTimeMillis() + timeoutMillis; 1073 while (remaining > 0) { 1074 long remainingMillis = deadline - System.currentTimeMillis(); 1075 1076 result.add(next(remainingMillis, errorMsg)); 1077 remaining--; 1078 } 1079 1080 return result; 1081 } 1082 1083 1084 public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException { 1085 Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); 1086 1087 if (value == null) { 1088 env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis)); 1089 } else if (value.isDefined()) { 1090 env.flop(String.format("Expected end-of-stream but got element [%s]", value.get())); 1091 } // else, ok 1092 } 1093 1094 /** 1095 * @deprecated Deprecated in favor of {@link #expectError(Class, long, long, String)}. 1096 */ 1097 @Deprecated 1098 public <E extends Throwable> E expectError(Class<E> clazz, long timeoutMillis, String errorMsg) throws Exception { 1099 return expectError(clazz, timeoutMillis, timeoutMillis, errorMsg); 1100 } 1101 1102 @SuppressWarnings("unchecked") 1103 final <E extends Throwable> E expectError(Class<E> clazz, final long totalTimeoutMillis, 1104 long pollTimeoutMillis, 1105 String errorMsg) throws Exception { 1106 long totalTimeoutRemainingNs = MILLISECONDS.toNanos(totalTimeoutMillis); 1107 long timeStampANs = System.nanoTime(); 1108 long timeStampBNs; 1109 1110 for (;;) { 1111 Thread.sleep(Math.min(pollTimeoutMillis, NANOSECONDS.toMillis(totalTimeoutRemainingNs))); 1112 1113 if (env.asyncErrors.isEmpty()) { 1114 timeStampBNs = System.nanoTime(); 1115 totalTimeoutRemainingNs =- timeStampBNs - timeStampANs; 1116 timeStampANs = timeStampBNs; 1117 1118 if (totalTimeoutRemainingNs <= 0) { 1119 return env.flopAndFail(String.format("%s within %d ms", errorMsg, totalTimeoutMillis)); 1120 } 1121 } else { 1122 // ok, there was an expected error 1123 Throwable thrown = env.asyncErrors.remove(0); 1124 1125 if (clazz.isInstance(thrown)) { 1126 return (E) thrown; 1127 } else { 1128 1129 return env.flopAndFail(String.format("%s within %d ms; Got %s but expected %s", 1130 errorMsg, totalTimeoutMillis, thrown.getClass().getCanonicalName(), clazz.getCanonicalName())); 1131 } 1132 } 1133 } 1134 } 1135 1136 public void expectNone(long withinMillis, String errorMsgPrefix) throws InterruptedException { 1137 Thread.sleep(withinMillis); 1138 Optional<T> value = abq.poll(); 1139 1140 if (value == null) { 1141 // ok 1142 } else if (value.isDefined()) { 1143 env.flop(String.format("%s [%s]", errorMsgPrefix, value.get())); 1144 } else { 1145 env.flop("Expected no element but got end-of-stream"); 1146 } 1147 } 1148 } 1149} 1150