001/*************************************************** 002 * Licensed under MIT No Attribution (SPDX: MIT-0) * 003 ***************************************************/ 004 005package org.reactivestreams.tck; 006 007import org.reactivestreams.Publisher; 008import org.reactivestreams.Subscriber; 009import org.reactivestreams.Subscription; 010import org.reactivestreams.tck.TestEnvironment.BlackholeSubscriberWithSubscriptionSupport; 011import org.reactivestreams.tck.TestEnvironment.Latch; 012import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; 013import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; 014import org.reactivestreams.tck.flow.support.Function; 015import org.reactivestreams.tck.flow.support.Optional; 016import org.reactivestreams.tck.flow.support.PublisherVerificationRules; 017import org.testng.SkipException; 018import org.testng.annotations.BeforeMethod; 019import org.testng.annotations.Test; 020 021import java.lang.Override; 022import java.lang.ref.ReferenceQueue; 023import java.lang.ref.WeakReference; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.Collections; 027import java.util.List; 028import java.util.Random; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.concurrent.atomic.AtomicReference; 031 032import static org.testng.Assert.assertEquals; 033import static org.testng.Assert.assertTrue; 034 035/** 036 * Provides tests for verifying {@code Publisher} specification rules. 037 * 038 * @see org.reactivestreams.Publisher 039 */ 040public abstract class PublisherVerification<T> implements PublisherVerificationRules { 041 042 private static final String PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV = "PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS"; 043 private static final long DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS = 300L; 044 045 private final TestEnvironment env; 046 047 /** 048 * The amount of time after which a cancelled Subscriber reference should be dropped. 049 * See Rule 3.13 for details. 050 */ 051 private final long publisherReferenceGCTimeoutMillis; 052 053 /** 054 * Constructs a new verification class using the given env and configuration. 055 * 056 * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. 057 */ 058 public PublisherVerification(TestEnvironment env, long publisherReferenceGCTimeoutMillis) { 059 this.env = env; 060 this.publisherReferenceGCTimeoutMillis = publisherReferenceGCTimeoutMillis; 061 } 062 063 /** 064 * Constructs a new verification class using the given env and configuration. 065 * 066 * The value for {@code publisherReferenceGCTimeoutMillis} will be obtained by using {@link PublisherVerification#envPublisherReferenceGCTimeoutMillis()}. 067 */ 068 public PublisherVerification(TestEnvironment env) { 069 this.env = env; 070 this.publisherReferenceGCTimeoutMillis = envPublisherReferenceGCTimeoutMillis(); 071 } 072 073 /** 074 * Tries to parse the env variable {@code PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS} as long and returns the value if present, 075 * OR its default value ({@link PublisherVerification#DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS}). 076 * 077 * This value is used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. 078 * 079 * @throws java.lang.IllegalArgumentException when unable to parse the env variable 080 */ 081 public static long envPublisherReferenceGCTimeoutMillis() { 082 final String envMillis = System.getenv(PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV); 083 if (envMillis == null) return DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS; 084 else try { 085 return Long.parseLong(envMillis); 086 } catch (NumberFormatException ex) { 087 throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV, envMillis), ex); 088 } 089 } 090 091 /** 092 * This is the main method you must implement in your test incarnation. 093 * It must create a Publisher for a stream with exactly the given number of elements. 094 * If `elements` is `Long.MAX_VALUE` the produced stream must be infinite. 095 */ 096 public abstract Publisher<T> createPublisher(long elements); 097 098 /** 099 * By implementing this method, additional TCK tests concerning a "failed" publishers will be run. 100 * 101 * The expected behaviour of the {@link Publisher} returned by this method is hand out a subscription, 102 * followed by signalling {@code onError} on it, as specified by Rule 1.9. 103 * 104 * If you ignore these additional tests, return {@code null} from this method. 105 */ 106 public abstract Publisher<T> createFailedPublisher(); 107 108 109 /** 110 * Override and return lower value if your Publisher is only able to produce a known number of elements. 111 * For example, if it is designed to return at-most-one element, return {@code 1} from this method. 112 * 113 * Defaults to {@code Long.MAX_VALUE - 1}, meaning that the Publisher can be produce a huge but NOT an unbounded number of elements. 114 * 115 * To mark your Publisher will *never* signal an {@code onComplete} override this method and return {@code Long.MAX_VALUE}, 116 * which will result in *skipping all tests which require an onComplete to be triggered* (!). 117 */ 118 public long maxElementsFromPublisher() { 119 return Long.MAX_VALUE - 1; 120 } 121 122 /** 123 * Override and return {@code true} in order to skip executing tests marked as {@code Stochastic}. 124 * Stochastic in this case means that the Rule is impossible or infeasible to deterministically verify— 125 * usually this means that this test case can yield false positives ("be green") even if for some case, 126 * the given implementation may violate the tested behaviour. 127 */ 128 public boolean skipStochasticTests() { 129 return false; 130 } 131 132 /** 133 * In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a 134 * {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of 135 * recursive calls to exceed the number returned by this method. 136 * 137 * @see <a href="https://github.com/reactive-streams/reactive-streams-jvm#3.3">reactive streams spec, rule 3.3</a> 138 * @see PublisherVerification#required_spec303_mustNotAllowUnboundedRecursion() 139 */ 140 public long boundedDepthOfOnNextAndRequestRecursion() { 141 return 1; 142 } 143 144 ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// 145 146 @BeforeMethod 147 public void setUp() throws Exception { 148 env.clearAsyncErrors(); 149 } 150 151 ////////////////////// TEST SETUP VERIFICATION ////////////////////////////// 152 153 @Override @Test 154 public void required_createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable { 155 activePublisherTest(1, true, new PublisherTestRun<T>() { 156 @Override 157 public void run(Publisher<T> pub) throws InterruptedException { 158 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 159 assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced no elements", pub)); 160 sub.requestEndOfStream(); 161 } 162 163 Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, ManualSubscriber<T> sub) throws InterruptedException { 164 return sub.requestNextElementOrEndOfStream(String.format("Timeout while waiting for next element from Publisher %s", pub)); 165 } 166 167 }); 168 } 169 170 @Override @Test 171 public void required_createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable { 172 activePublisherTest(3, true, new PublisherTestRun<T>() { 173 @Override 174 public void run(Publisher<T> pub) throws InterruptedException { 175 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 176 assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced no elements", pub)); 177 assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced only 1 element", pub)); 178 assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced only 2 elements", pub)); 179 sub.requestEndOfStream(); 180 } 181 182 Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, ManualSubscriber<T> sub) throws InterruptedException { 183 return sub.requestNextElementOrEndOfStream(String.format("Timeout while waiting for next element from Publisher %s", pub)); 184 } 185 186 }); 187 } 188 189 @Override @Test 190 public void required_validate_maxElementsFromPublisher() throws Exception { 191 assertTrue(maxElementsFromPublisher() >= 0, "maxElementsFromPublisher MUST return a number >= 0"); 192 } 193 194 @Override @Test 195 public void required_validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception { 196 assertTrue(boundedDepthOfOnNextAndRequestRecursion() >= 1, "boundedDepthOfOnNextAndRequestRecursion must return a number >= 1"); 197 } 198 199 200 ////////////////////// SPEC RULE VERIFICATION /////////////////////////////// 201 202 @Override @Test 203 public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable { 204 activePublisherTest(5, false, new PublisherTestRun<T>() { 205 @Override 206 public void run(Publisher<T> pub) throws InterruptedException { 207 208 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 209 try { 210 sub.expectNone(String.format("Publisher %s produced value before the first `request`: ", pub)); 211 sub.request(1); 212 sub.nextElement(String.format("Publisher %s produced no element after first `request`", pub)); 213 sub.expectNone(String.format("Publisher %s produced unrequested: ", pub)); 214 215 sub.request(1); 216 sub.request(2); 217 sub.nextElements(3, env.defaultTimeoutMillis(), String.format("Publisher %s produced less than 3 elements after two respective `request` calls", pub)); 218 219 sub.expectNone(String.format("Publisher %s produced unrequested ", pub)); 220 } finally { 221 sub.cancel(); 222 } 223 } 224 }); 225 } 226 227 @Override @Test 228 public void required_spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable { 229 final int elements = 3; 230 final int requested = 10; 231 232 activePublisherTest(elements, true, new PublisherTestRun<T>() { 233 @Override 234 public void run(Publisher<T> pub) throws Throwable { 235 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 236 sub.request(requested); 237 sub.nextElements(elements); 238 sub.expectCompletion(); 239 } 240 }); 241 } 242 243 @Override @Test 244 public void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable { 245 final int iterations = 100; 246 final int elements = 10; 247 248 stochasticTest(iterations, new Function<Integer, Void>() { 249 @Override 250 public Void apply(final Integer runNumber) throws Throwable { 251 activePublisherTest(elements, true, new PublisherTestRun<T>() { 252 @Override 253 public void run(Publisher<T> pub) throws Throwable { 254 final Latch completionLatch = new Latch(env); 255 256 final AtomicInteger gotElements = new AtomicInteger(0); 257 pub.subscribe(new Subscriber<T>() { 258 private Subscription subs; 259 260 private ConcurrentAccessBarrier concurrentAccessBarrier = new ConcurrentAccessBarrier(); 261 262 /** 263 * Concept wise very similar to a {@link org.reactivestreams.tck.TestEnvironment.Latch}, serves to protect 264 * a critical section from concurrent access, with the added benefit of Thread tracking and same-thread-access awareness. 265 * 266 * Since a <i>Synchronous</i> Publisher may choose to synchronously (using the same {@link Thread}) call 267 * {@code onNext} directly from either {@code subscribe} or {@code request} a plain Latch is not enough 268 * to verify concurrent access safety - one needs to track if the caller is not still using the calling thread 269 * to enter subsequent critical sections ("nesting" them effectively). 270 */ 271 final class ConcurrentAccessBarrier { 272 private AtomicReference<Thread> currentlySignallingThread = new AtomicReference<Thread>(null); 273 private volatile String previousSignal = null; 274 275 public void enterSignal(String signalName) { 276 if((!currentlySignallingThread.compareAndSet(null, Thread.currentThread())) && !isSynchronousSignal()) { 277 env.flop(String.format( 278 "Illegal concurrent access detected (entering critical section)! " + 279 "%s emited %s signal, before %s finished its %s signal.", 280 Thread.currentThread(), signalName, currentlySignallingThread.get(), previousSignal)); 281 } 282 this.previousSignal = signalName; 283 } 284 285 public void leaveSignal(String signalName) { 286 currentlySignallingThread.set(null); 287 this.previousSignal = signalName; 288 } 289 290 private boolean isSynchronousSignal() { 291 return (previousSignal != null) && Thread.currentThread().equals(currentlySignallingThread.get()); 292 } 293 294 } 295 296 @Override 297 public void onSubscribe(Subscription s) { 298 final String signal = "onSubscribe()"; 299 concurrentAccessBarrier.enterSignal(signal); 300 301 subs = s; 302 subs.request(1); 303 304 concurrentAccessBarrier.leaveSignal(signal); 305 } 306 307 @Override 308 public void onNext(T ignore) { 309 final String signal = String.format("onNext(%s)", ignore); 310 concurrentAccessBarrier.enterSignal(signal); 311 312 if (gotElements.incrementAndGet() <= elements) // requesting one more than we know are in the stream (some Publishers need this) 313 subs.request(1); 314 315 concurrentAccessBarrier.leaveSignal(signal); 316 } 317 318 @Override 319 public void onError(Throwable t) { 320 final String signal = String.format("onError(%s)", t.getMessage()); 321 concurrentAccessBarrier.enterSignal(signal); 322 323 // ignore value 324 325 concurrentAccessBarrier.leaveSignal(signal); 326 } 327 328 @Override 329 public void onComplete() { 330 final String signal = "onComplete()"; 331 concurrentAccessBarrier.enterSignal(signal); 332 333 // entering for completeness 334 335 concurrentAccessBarrier.leaveSignal(signal); 336 completionLatch.close(); 337 } 338 }); 339 340 completionLatch.expectClose( 341 elements * env.defaultTimeoutMillis(), 342 String.format("Failed in iteration %d of %d. Expected completion signal after signalling %d elements (signalled %d), yet did not receive it", 343 runNumber, iterations, elements, gotElements.get())); 344 } 345 }); 346 return null; 347 } 348 }); 349 } 350 351 @Override @Test 352 public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable { 353 try { 354 whenHasErrorPublisherTest(new PublisherTestRun<T>() { 355 @Override 356 public void run(final Publisher<T> pub) throws InterruptedException { 357 final Latch onErrorlatch = new Latch(env); 358 final Latch onSubscribeLatch = new Latch(env); 359 pub.subscribe(new TestEnvironment.TestSubscriber<T>(env) { 360 @Override 361 public void onSubscribe(Subscription subs) { 362 onSubscribeLatch.assertOpen("Only one onSubscribe call expected"); 363 onSubscribeLatch.close(); 364 } 365 @Override 366 public void onError(Throwable cause) { 367 onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always"); 368 onErrorlatch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub)); 369 onErrorlatch.close(); 370 } 371 }); 372 373 onSubscribeLatch.expectClose("Should have received onSubscribe"); 374 onErrorlatch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub)); 375 376 env.verifyNoAsyncErrors(); 377 } 378 }); 379 } catch (SkipException se) { 380 throw se; 381 } catch (Throwable ex) { 382 // we also want to catch AssertionErrors and anything the publisher may have thrown inside subscribe 383 // which was wrong of him - he should have signalled on error using onError 384 throw new RuntimeException(String.format("Publisher threw exception (%s) instead of signalling error via onError!", ex.getMessage()), ex); 385 } 386 } 387 388 @Override @Test 389 public void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable { 390 activePublisherTest(3, true, new PublisherTestRun<T>() { 391 @Override 392 public void run(Publisher<T> pub) throws Throwable { 393 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 394 sub.requestNextElement(); 395 sub.requestNextElement(); 396 sub.requestNextElement(); 397 sub.requestEndOfStream(); 398 sub.expectNone(); 399 } 400 }); 401 } 402 403 @Override @Test 404 public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable { 405 optionalActivePublisherTest(0, true, new PublisherTestRun<T>() { 406 @Override 407 public void run(Publisher<T> pub) throws Throwable { 408 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 409 sub.request(1); 410 sub.expectCompletion(); 411 sub.expectNone(); 412 } 413 }); 414 } 415 416 @Override @Test 417 public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable { 418 notVerified(); // not really testable without more control over the Publisher 419 } 420 421 @Override @Test 422 public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable { 423 activePublisherTest(1, true, new PublisherTestRun<T>() { 424 @Override 425 public void run(Publisher<T> pub) throws Throwable { 426 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 427 sub.request(10); 428 sub.nextElement(); 429 sub.expectCompletion(); 430 431 sub.request(10); 432 sub.expectNone(); 433 } 434 }); 435 } 436 437 @Override @Test 438 public void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable { 439 notVerified(); // can we meaningfully test this, without more control over the publisher? 440 } 441 442 @Override @Test 443 public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable { 444 notVerified(); // can we meaningfully test this? 445 } 446 447 @Override @Test 448 public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable { 449 notVerified(); // can we meaningfully test this? 450 } 451 452 @Override @Test 453 public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable { 454 activePublisherTest(0, false, new PublisherTestRun<T>() { 455 @Override 456 public void run(Publisher<T> pub) throws Throwable { 457 try { 458 pub.subscribe(null); 459 env.flop("Publisher did not throw a NullPointerException when given a null Subscribe in subscribe"); 460 } catch (NullPointerException ignored) { 461 // valid behaviour 462 } 463 env.verifyNoAsyncErrorsNoDelay(); 464 } 465 }); 466 } 467 468 @Override @Test 469 public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable { 470 activePublisherTest(0, false, new PublisherTestRun<T>() { 471 @Override 472 public void run(Publisher<T> pub) throws Throwable { 473 final Latch onSubscribeLatch = new Latch(env); 474 final AtomicReference<Subscription> cancel = new AtomicReference<Subscription>(); 475 try { 476 pub.subscribe(new Subscriber<T>() { 477 @Override 478 public void onError(Throwable cause) { 479 onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always"); 480 } 481 482 @Override 483 public void onSubscribe(Subscription subs) { 484 cancel.set(subs); 485 onSubscribeLatch.assertOpen("Only one onSubscribe call expected"); 486 onSubscribeLatch.close(); 487 } 488 489 @Override 490 public void onNext(T elem) { 491 onSubscribeLatch.assertClosed("onSubscribe should be called prior to onNext always"); 492 } 493 494 @Override 495 public void onComplete() { 496 onSubscribeLatch.assertClosed("onSubscribe should be called prior to onComplete always"); 497 } 498 }); 499 onSubscribeLatch.expectClose("Should have received onSubscribe"); 500 env.verifyNoAsyncErrorsNoDelay(); 501 } finally { 502 Subscription s = cancel.getAndSet(null); 503 if (s != null) { 504 s.cancel(); 505 } 506 } 507 } 508 }); 509 } 510 511 @Override @Test 512 public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable { 513 whenHasErrorPublisherTest(new PublisherTestRun<T>() { 514 @Override 515 public void run(Publisher<T> pub) throws Throwable { 516 final Latch onErrorLatch = new Latch(env); 517 final Latch onSubscribeLatch = new Latch(env); 518 ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) { 519 @Override 520 public void onError(Throwable cause) { 521 onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always"); 522 onErrorLatch.assertOpen("Only one onError call expected"); 523 onErrorLatch.close(); 524 } 525 526 @Override 527 public void onSubscribe(Subscription subs) { 528 onSubscribeLatch.assertOpen("Only one onSubscribe call expected"); 529 onSubscribeLatch.close(); 530 } 531 }; 532 pub.subscribe(sub); 533 onSubscribeLatch.expectClose("Should have received onSubscribe"); 534 onErrorLatch.expectClose("Should have received onError"); 535 536 env.verifyNoAsyncErrorsNoDelay(); 537 } 538 }); 539 } 540 541 @Override @Test 542 public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable { 543 notVerified(); // can we meaningfully test this? 544 } 545 546 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.11 547 @Override @Test 548 public void optional_spec111_maySupportMultiSubscribe() throws Throwable { 549 optionalActivePublisherTest(1, false, new PublisherTestRun<T>() { 550 @Override 551 public void run(Publisher<T> pub) throws Throwable { 552 ManualSubscriber<T> sub1 = env.newManualSubscriber(pub); 553 ManualSubscriber<T> sub2 = env.newManualSubscriber(pub); 554 555 try { 556 env.verifyNoAsyncErrors(); 557 } finally { 558 try { 559 sub1.cancel(); 560 } finally { 561 sub2.cancel(); 562 } 563 } 564 } 565 }); 566 } 567 568 @Override @Test 569 public void optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals() throws Throwable { 570 optionalActivePublisherTest(1, false, new PublisherTestRun<T>() { 571 @Override 572 public void run(Publisher<T> pub) throws Throwable { 573 ManualSubscriber<T> sub1 = env.newManualSubscriber(pub); 574 ManualSubscriber<T> sub2 = env.newManualSubscriber(pub); 575 // Since we're testing the case when the Publisher DOES support the optional multi-subscribers scenario, 576 // and decides if it handles them uni-cast or multi-cast, we don't know which subscriber will receive an 577 // onNext (and optional onComplete) signal(s) and which just onComplete signal. 578 // Plus, even if subscription assumed to be unicast, it's implementation choice, which one will be signalled 579 // with onNext. 580 sub1.requestNextElementOrEndOfStream(); 581 sub2.requestNextElementOrEndOfStream(); 582 try { 583 env.verifyNoAsyncErrors(); 584 } finally { 585 try { 586 sub1.cancel(); 587 } finally { 588 sub2.cancel(); 589 } 590 } 591 } 592 }); 593 } 594 595 @Override @Test 596 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable { 597 optionalActivePublisherTest(5, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete) 598 @Override 599 public void run(Publisher<T> pub) throws InterruptedException { 600 ManualSubscriber<T> sub1 = env.newManualSubscriber(pub); 601 ManualSubscriber<T> sub2 = env.newManualSubscriber(pub); 602 ManualSubscriber<T> sub3 = env.newManualSubscriber(pub); 603 604 sub1.request(1); 605 T x1 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub)); 606 sub2.request(2); 607 List<T> y1 = sub2.nextElements(2, String.format("Publisher %s did not produce the requested 2 elements on 2nd subscriber", pub)); 608 sub1.request(1); 609 T x2 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub)); 610 sub3.request(3); 611 List<T> z1 = sub3.nextElements(3, String.format("Publisher %s did not produce the requested 3 elements on 3rd subscriber", pub)); 612 sub3.request(1); 613 T z2 = sub3.nextElement(String.format("Publisher %s did not produce the requested 1 element on 3rd subscriber", pub)); 614 sub3.request(1); 615 T z3 = sub3.nextElement(String.format("Publisher %s did not produce the requested 1 element on 3rd subscriber", pub)); 616 sub3.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 3rd subscriber", pub)); 617 sub2.request(3); 618 List<T> y2 = sub2.nextElements(3, String.format("Publisher %s did not produce the requested 3 elements on 2nd subscriber", pub)); 619 sub2.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 2nd subscriber", pub)); 620 sub1.request(2); 621 List<T> x3 = sub1.nextElements(2, String.format("Publisher %s did not produce the requested 2 elements on 1st subscriber", pub)); 622 sub1.request(1); 623 T x4 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub)); 624 sub1.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 1st subscriber", pub)); 625 626 @SuppressWarnings("unchecked") 627 List<T> r = new ArrayList<T>(Arrays.asList(x1, x2)); 628 r.addAll(x3); 629 r.addAll(Collections.singleton(x4)); 630 631 List<T> check1 = new ArrayList<T>(y1); 632 check1.addAll(y2); 633 634 //noinspection unchecked 635 List<T> check2 = new ArrayList<T>(z1); 636 check2.add(z2); 637 check2.add(z3); 638 639 assertEquals(r, check1, String.format("Publisher %s did not produce the same element sequence for subscribers 1 and 2", pub)); 640 assertEquals(r, check2, String.format("Publisher %s did not produce the same element sequence for subscribers 1 and 3", pub)); 641 } 642 }); 643 } 644 645 @Override @Test 646 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable { 647 optionalActivePublisherTest(3, false, new PublisherTestRun<T>() { // This test is skipped if the publisher cannot produce enough elements 648 @Override 649 public void run(Publisher<T> pub) throws Throwable { 650 ManualSubscriber<T> sub1 = env.newManualSubscriber(pub); 651 ManualSubscriber<T> sub2 = env.newManualSubscriber(pub); 652 ManualSubscriber<T> sub3 = env.newManualSubscriber(pub); 653 654 List<T> received1 = new ArrayList<T>(); 655 List<T> received2 = new ArrayList<T>(); 656 List<T> received3 = new ArrayList<T>(); 657 658 // if the publisher must touch it's source to notice it's been drained, the OnComplete won't come until we ask for more than it actually contains... 659 // edgy edge case? 660 sub1.request(4); 661 sub2.request(4); 662 sub3.request(4); 663 664 received1.addAll(sub1.nextElements(3)); 665 received2.addAll(sub2.nextElements(3)); 666 received3.addAll(sub3.nextElements(3)); 667 668 // NOTE: can't check completion, the Publisher may not be able to signal it 669 // a similar test *with* completion checking is implemented 670 671 assertEquals(received1, received2, String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers")); 672 assertEquals(received2, received3, String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers")); 673 } 674 }); 675 } 676 677 @Override @Test 678 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable { 679 optionalActivePublisherTest(3, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete) 680 @Override 681 public void run(Publisher<T> pub) throws Throwable { 682 ManualSubscriber<T> sub1 = env.newManualSubscriber(pub); 683 ManualSubscriber<T> sub2 = env.newManualSubscriber(pub); 684 ManualSubscriber<T> sub3 = env.newManualSubscriber(pub); 685 686 List<T> received1 = new ArrayList<T>(); 687 List<T> received2 = new ArrayList<T>(); 688 List<T> received3 = new ArrayList<T>(); 689 690 // if the publisher must touch it's source to notice it's been drained, the OnComplete won't come until we ask for more than it actually contains... 691 // edgy edge case? 692 sub1.request(4); 693 sub2.request(4); 694 sub3.request(4); 695 696 received1.addAll(sub1.nextElements(3)); 697 received2.addAll(sub2.nextElements(3)); 698 received3.addAll(sub3.nextElements(3)); 699 700 sub1.expectCompletion(); 701 sub2.expectCompletion(); 702 sub3.expectCompletion(); 703 704 assertEquals(received1, received2, String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers")); 705 assertEquals(received2, received3, String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers")); 706 } 707 }); 708 } 709 710 ///////////////////// SUBSCRIPTION TESTS ////////////////////////////////// 711 712 @Override @Test 713 public void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable { 714 activePublisherTest(6, false, new PublisherTestRun<T>() { 715 @Override 716 public void run(Publisher<T> pub) throws Throwable { 717 ManualSubscriber<T> sub = new ManualSubscriber<T>(env) { 718 @Override 719 public void onSubscribe(Subscription subs) { 720 this.subscription.completeImmediatly(subs); 721 722 subs.request(1); 723 subs.request(1); 724 subs.request(1); 725 } 726 727 @Override 728 public void onNext(T element) { 729 Subscription subs = this.subscription.value(); 730 subs.request(1); 731 } 732 }; 733 734 env.subscribe(pub, sub); 735 736 env.verifyNoAsyncErrors(); 737 } 738 }); 739 } 740 741 @Override @Test 742 public void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable { 743 final long oneMoreThanBoundedLimit = boundedDepthOfOnNextAndRequestRecursion() + 1; 744 745 activePublisherTest(oneMoreThanBoundedLimit, false, new PublisherTestRun<T>() { 746 @Override 747 public void run(Publisher<T> pub) throws Throwable { 748 final ThreadLocal<Long> stackDepthCounter = new ThreadLocal<Long>() { 749 @Override 750 protected Long initialValue() { 751 return 0L; 752 } 753 }; 754 755 final Latch runCompleted = new Latch(env); 756 757 final ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) { 758 // counts the number of signals received, used to break out from possibly infinite request/onNext loops 759 long signalsReceived = 0L; 760 761 @Override 762 public void onNext(T element) { 763 // NOT calling super.onNext as this test only cares about stack depths, not the actual values of elements 764 // which also simplifies this test as we do not have to drain the test buffer, which would otherwise be in danger of overflowing 765 766 signalsReceived += 1; 767 stackDepthCounter.set(stackDepthCounter.get() + 1); 768 if (env.debugEnabled()) { 769 env.debug(String.format("%s(recursion depth: %d)::onNext(%s)", this, stackDepthCounter.get(), element)); 770 } 771 772 final long callsUntilNow = stackDepthCounter.get(); 773 if (callsUntilNow > boundedDepthOfOnNextAndRequestRecursion()) { 774 env.flop(String.format("Got %d onNext calls within thread: %s, yet expected recursive bound was %d", 775 callsUntilNow, Thread.currentThread(), boundedDepthOfOnNextAndRequestRecursion())); 776 777 // stop the recursive call chain 778 runCompleted.close(); 779 return; 780 } else if (signalsReceived >= oneMoreThanBoundedLimit) { 781 // since max number of signals reached, and recursion depth not exceeded, we judge this as a success and 782 // stop the recursive call chain 783 runCompleted.close(); 784 return; 785 } 786 787 // request more right away, the Publisher must break the recursion 788 subscription.value().request(1); 789 790 stackDepthCounter.set(stackDepthCounter.get() - 1); 791 } 792 793 @Override 794 public void onComplete() { 795 super.onComplete(); 796 runCompleted.close(); 797 } 798 799 @Override 800 public void onError(Throwable cause) { 801 super.onError(cause); 802 runCompleted.close(); 803 } 804 }; 805 806 try { 807 env.subscribe(pub, sub); 808 809 sub.request(1); // kick-off the `request -> onNext -> request -> onNext -> ...` 810 811 final String msg = String.format("Unable to validate call stack depth safety, " + 812 "awaited at-most %s signals (`maxOnNextSignalsInRecursionTest()`) or completion", 813 oneMoreThanBoundedLimit); 814 runCompleted.expectClose(env.defaultTimeoutMillis(), msg); 815 env.verifyNoAsyncErrorsNoDelay(); 816 } finally { 817 // since the request/onNext recursive calls may keep the publisher running "forever", 818 // we MUST cancel it manually before exiting this test case 819 sub.cancel(); 820 } 821 } 822 }); 823 } 824 825 @Override @Test 826 public void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception { 827 notVerified(); // cannot be meaningfully tested, or can it? 828 } 829 830 @Override @Test 831 public void untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation() throws Exception { 832 notVerified(); // cannot be meaningfully tested, or can it? 833 } 834 835 @Override @Test 836 public void required_spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable { 837 activePublisherTest(3, false, new PublisherTestRun<T>() { 838 @Override 839 public void run(Publisher<T> pub) throws Throwable { 840 841 // override ManualSubscriberWithSubscriptionSupport#cancel because by default a ManualSubscriber will drop the 842 // subscription once it's cancelled (as expected). 843 // In this test however it must keep the cancelled Subscription and keep issuing `request(long)` to it. 844 ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) { 845 @Override 846 public void cancel() { 847 if (subscription.isCompleted()) { 848 subscription.value().cancel(); 849 } else { 850 env.flop("Cannot cancel a subscription before having received it"); 851 } 852 } 853 }; 854 855 env.subscribe(pub, sub); 856 857 sub.cancel(); 858 sub.request(1); 859 sub.request(1); 860 sub.request(1); 861 862 sub.expectNone(); 863 env.verifyNoAsyncErrorsNoDelay(); 864 } 865 }); 866 } 867 868 @Override @Test 869 public void required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable { 870 activePublisherTest(1, false, new PublisherTestRun<T>() { 871 @Override 872 public void run(Publisher<T> pub) throws Throwable { 873 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 874 875 // leak the Subscription 876 final Subscription subs = sub.subscription.value(); 877 878 subs.cancel(); 879 subs.cancel(); 880 subs.cancel(); 881 882 sub.expectNone(); 883 env.verifyNoAsyncErrorsNoDelay(); 884 } 885 }); 886 } 887 888 @Override @Test 889 public void required_spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable { 890 activePublisherTest(10, false, new PublisherTestRun<T>() { 891 @Override public void run(Publisher<T> pub) throws Throwable { 892 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 893 sub.request(0); 894 sub.expectError(IllegalArgumentException.class); 895 } 896 }); 897 } 898 899 @Override @Test 900 public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable { 901 activePublisherTest(10, false, new PublisherTestRun<T>() { 902 @Override 903 public void run(Publisher<T> pub) throws Throwable { 904 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 905 final Random r = new Random(); 906 sub.request(-r.nextInt(Integer.MAX_VALUE) - 1); 907 // we do require implementations to mention the rule number at the very least, or mentioning that the non-negative request is the problem 908 sub.expectError(IllegalArgumentException.class); 909 } 910 }); 911 } 912 913 @Override @Test 914 public void optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage() throws Throwable { 915 optionalActivePublisherTest(10, false, new PublisherTestRun<T>() { 916 @Override 917 public void run(Publisher<T> pub) throws Throwable { 918 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 919 final Random r = new Random(); 920 sub.request(-r.nextInt(Integer.MAX_VALUE) - 1); 921 // we do require implementations to mention the rule number at the very least, or mentioning that the non-negative request is the problem 922 sub.expectErrorWithMessage(IllegalArgumentException.class, Arrays.asList("3.9", "non-positive subscription request", "negative subscription request")); 923 } 924 }); 925 } 926 927 @Override @Test 928 public void required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable { 929 // the publisher is able to signal more elements than the subscriber will be requesting in total 930 final int publisherElements = 20; 931 932 final int demand1 = 10; 933 final int demand2 = 5; 934 final int totalDemand = demand1 + demand2; 935 936 activePublisherTest(publisherElements, false, new PublisherTestRun<T>() { 937 @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 938 public void run(Publisher<T> pub) throws Throwable { 939 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 940 941 sub.request(demand1); 942 sub.request(demand2); 943 944 /* 945 NOTE: The order of the nextElement/cancel calls below is very important (!) 946 947 If this ordering was reversed, given an asynchronous publisher, 948 the following scenario would be *legal* and would break this test: 949 950 > AsyncPublisher receives request(10) - it does not emit data right away, it's asynchronous 951 > AsyncPublisher receives request(5) - demand is now 15 952 ! AsyncPublisher didn't emit any onNext yet (!) 953 > AsyncPublisher receives cancel() - handles it right away, by "stopping itself" for example 954 ! cancel was handled hefore the AsyncPublisher ever got the chance to emit data 955 ! the subscriber ends up never receiving even one element - the test is stuck (and fails, even on valid Publisher) 956 957 Which is why we must first expect an element, and then cancel, once the producing is "running". 958 */ 959 sub.nextElement(); 960 sub.cancel(); 961 962 int onNextsSignalled = 1; 963 964 boolean stillBeingSignalled; 965 do { 966 // put asyncError if onNext signal received 967 sub.expectNone(); 968 Throwable error = env.dropAsyncError(); 969 970 if (error == null) { 971 stillBeingSignalled = false; 972 } else { 973 onNextsSignalled += 1; 974 stillBeingSignalled = true; 975 } 976 977 // if the Publisher tries to emit more elements than was requested (and/or ignores cancellation) this will throw 978 assertTrue(onNextsSignalled <= totalDemand, 979 String.format("Publisher signalled [%d] elements, which is more than the signalled demand: %d", 980 onNextsSignalled, totalDemand)); 981 982 } while (stillBeingSignalled); 983 } 984 }); 985 986 env.verifyNoAsyncErrorsNoDelay(); 987 } 988 989 @Override @Test 990 public void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable { 991 final ReferenceQueue<ManualSubscriber<T>> queue = new ReferenceQueue<ManualSubscriber<T>>(); 992 993 final Function<Publisher<T>, WeakReference<ManualSubscriber<T>>> run = new Function<Publisher<T>, WeakReference<ManualSubscriber<T>>>() { 994 @Override 995 public WeakReference<ManualSubscriber<T>> apply(Publisher<T> pub) throws Exception { 996 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 997 final WeakReference<ManualSubscriber<T>> ref = new WeakReference<ManualSubscriber<T>>(sub, queue); 998 999 sub.request(1); 1000 sub.nextElement(); 1001 sub.cancel(); 1002 1003 return ref; 1004 } 1005 }; 1006 1007 activePublisherTest(3, false, new PublisherTestRun<T>() { 1008 @Override 1009 public void run(Publisher<T> pub) throws Throwable { 1010 final WeakReference<ManualSubscriber<T>> ref = run.apply(pub); 1011 1012 // cancel may be run asynchronously so we add a sleep before running the GC 1013 // to "resolve" the race 1014 Thread.sleep(publisherReferenceGCTimeoutMillis); 1015 System.gc(); 1016 1017 if (!ref.equals(queue.remove(100))) { 1018 env.flop(String.format("Publisher %s did not drop reference to test subscriber after subscription cancellation", pub)); 1019 } 1020 1021 env.verifyNoAsyncErrorsNoDelay(); 1022 } 1023 }); 1024 } 1025 1026 @Override @Test 1027 public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable { 1028 final int totalElements = 3; 1029 1030 activePublisherTest(totalElements, true, new PublisherTestRun<T>() { 1031 @Override 1032 public void run(Publisher<T> pub) throws Throwable { 1033 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 1034 sub.request(Long.MAX_VALUE); 1035 1036 sub.nextElements(totalElements); 1037 sub.expectCompletion(); 1038 1039 env.verifyNoAsyncErrorsNoDelay(); 1040 } 1041 }); 1042 } 1043 1044 @Override @Test 1045 public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable { 1046 final int totalElements = 3; 1047 1048 activePublisherTest(totalElements, true, new PublisherTestRun<T>() { 1049 @Override 1050 public void run(Publisher<T> pub) throws Throwable { 1051 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 1052 sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE / 2 1053 sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE - 1 1054 sub.request(1); // pending = Long.MAX_VALUE 1055 1056 sub.nextElements(totalElements); 1057 sub.expectCompletion(); 1058 1059 try { 1060 env.verifyNoAsyncErrorsNoDelay(); 1061 } finally { 1062 sub.cancel(); 1063 } 1064 1065 } 1066 }); 1067 } 1068 1069 @Override @Test 1070 public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable { 1071 activePublisherTest(Integer.MAX_VALUE, false, new PublisherTestRun<T>() { 1072 @Override public void run(Publisher<T> pub) throws Throwable { 1073 final ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(env) { 1074 // arbitrarily set limit on nuber of request calls signalled, we expect overflow after already 2 calls, 1075 // so 10 is relatively high and safe even if arbitrarily chosen 1076 int callsCounter = 10; 1077 1078 @Override 1079 public void onNext(T element) { 1080 if (env.debugEnabled()) { 1081 env.debug(String.format("%s::onNext(%s)", this, element)); 1082 } 1083 if (subscription.isCompleted()) { 1084 if (callsCounter > 0) { 1085 subscription.value().request(Long.MAX_VALUE - 1); 1086 callsCounter--; 1087 } else { 1088 subscription.value().cancel(); 1089 } 1090 } else { 1091 env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element)); 1092 } 1093 } 1094 }; 1095 env.subscribe(pub, sub, env.defaultTimeoutMillis()); 1096 1097 // eventually triggers `onNext`, which will then trigger up to `callsCounter` times `request(Long.MAX_VALUE - 1)` 1098 // we're pretty sure to overflow from those 1099 sub.request(1); 1100 1101 // no onError should be signalled 1102 try { 1103 env.verifyNoAsyncErrors(); 1104 } finally { 1105 sub.cancel(); 1106 } 1107 } 1108 }); 1109 } 1110 1111 ///////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////// 1112 1113 ///////////////////// TEST INFRASTRUCTURE ///////////////////////////////// 1114 1115 public interface PublisherTestRun<T> { 1116 public void run(Publisher<T> pub) throws Throwable; 1117 } 1118 1119 /** 1120 * Test for feature that SHOULD/MUST be implemented, using a live publisher. 1121 * 1122 * @param elements the number of elements the Publisher under test must be able to emit to run this test 1123 * @param completionSignalRequired true if an {@code onComplete} signal is required by this test to run. 1124 * If the tested Publisher is unable to signal completion, tests requireing onComplete signals will be skipped. 1125 * To signal if your Publisher is able to signal completion see {@link PublisherVerification#maxElementsFromPublisher()}. 1126 */ 1127 public void activePublisherTest(long elements, boolean completionSignalRequired, PublisherTestRun<T> body) throws Throwable { 1128 if (elements > maxElementsFromPublisher()) { 1129 throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", elements, maxElementsFromPublisher())); 1130 } else if (completionSignalRequired && maxElementsFromPublisher() == Long.MAX_VALUE) { 1131 throw new SkipException("Unable to run this test, as it requires an onComplete signal, " + 1132 "which this Publisher is unable to provide (as signalled by returning Long.MAX_VALUE from `maxElementsFromPublisher()`)"); 1133 } else { 1134 Publisher<T> pub = createPublisher(elements); 1135 body.run(pub); 1136 env.verifyNoAsyncErrorsNoDelay(); 1137 } 1138 } 1139 1140 /** 1141 * Test for feature that MAY be implemented. This test will be marked as SKIPPED if it fails. 1142 * 1143 * @param elements the number of elements the Publisher under test must be able to emit to run this test 1144 * @param completionSignalRequired true if an {@code onComplete} signal is required by this test to run. 1145 * If the tested Publisher is unable to signal completion, tests requireing onComplete signals will be skipped. 1146 * To signal if your Publisher is able to signal completion see {@link PublisherVerification#maxElementsFromPublisher()}. 1147 */ 1148 public void optionalActivePublisherTest(long elements, boolean completionSignalRequired, PublisherTestRun<T> body) throws Throwable { 1149 if (elements > maxElementsFromPublisher()) { 1150 throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", elements, maxElementsFromPublisher())); 1151 } else if (completionSignalRequired && maxElementsFromPublisher() == Long.MAX_VALUE) { 1152 throw new SkipException("Unable to run this test, as it requires an onComplete signal, " + 1153 "which this Publisher is unable to provide (as signalled by returning Long.MAX_VALUE from `maxElementsFromPublisher()`)"); 1154 } else { 1155 1156 final Publisher<T> pub = createPublisher(elements); 1157 final String skipMessage = "Skipped because tested publisher does NOT implement this OPTIONAL requirement."; 1158 1159 try { 1160 potentiallyPendingTest(pub, body); 1161 } catch (Exception ex) { 1162 notVerified(skipMessage); 1163 } catch (AssertionError ex) { 1164 notVerified(skipMessage + " Reason for skipping was: " + ex.getMessage()); 1165 } 1166 } 1167 } 1168 1169 public static final String SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE = 1170 "Skipping because no error state Publisher provided, and the test requires it. " + 1171 "Please implement PublisherVerification#createFailedPublisher to run this test."; 1172 1173 public static final String SKIPPING_OPTIONAL_TEST_FAILED = 1174 "Skipping, because provided Publisher does not pass this *additional* verification."; 1175 /** 1176 * Additional test for Publisher in error state 1177 */ 1178 public void whenHasErrorPublisherTest(PublisherTestRun<T> body) throws Throwable { 1179 potentiallyPendingTest(createFailedPublisher(), body, SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE); 1180 } 1181 1182 public void potentiallyPendingTest(Publisher<T> pub, PublisherTestRun<T> body) throws Throwable { 1183 potentiallyPendingTest(pub, body, SKIPPING_OPTIONAL_TEST_FAILED); 1184 } 1185 1186 public void potentiallyPendingTest(Publisher<T> pub, PublisherTestRun<T> body, String message) throws Throwable { 1187 if (pub != null) { 1188 body.run(pub); 1189 } else { 1190 throw new SkipException(message); 1191 } 1192 } 1193 1194 /** 1195 * Executes a given test body {@code n} times. 1196 * All the test runs must pass in order for the stochastic test to pass. 1197 */ 1198 public void stochasticTest(int n, Function<Integer, Void> body) throws Throwable { 1199 if (skipStochasticTests()) { 1200 notVerified("Skipping @Stochastic test because `skipStochasticTests()` returned `true`!"); 1201 } 1202 1203 for (int i = 0; i < n; i++) { 1204 body.apply(i); 1205 } 1206 } 1207 1208 public void notVerified() { 1209 throw new SkipException("Not verified by this TCK."); 1210 } 1211 1212 /** 1213 * Return this value from {@link PublisherVerification#maxElementsFromPublisher()} to mark that the given {@link org.reactivestreams.Publisher}, 1214 * is not able to signal completion. For example it is strictly a time-bound or unbounded source of data. 1215 * 1216 * <b>Returning this value from {@link PublisherVerification#maxElementsFromPublisher()} will result in skipping all TCK tests which require onComplete signals!</b> 1217 */ 1218 public long publisherUnableToSignalOnComplete() { 1219 return Long.MAX_VALUE; 1220 } 1221 1222 public void notVerified(String message) { 1223 throw new SkipException(message); 1224 } 1225 1226}