001/*************************************************** 002 * Licensed under MIT No Attribution (SPDX: MIT-0) * 003 ***************************************************/ 004 005package org.reactivestreams.tck; 006 007import org.reactivestreams.Publisher; 008import org.reactivestreams.Subscriber; 009import org.reactivestreams.Subscription; 010import org.reactivestreams.tck.TestEnvironment.*; 011import org.reactivestreams.tck.flow.support.Optional; 012import org.reactivestreams.tck.flow.support.SubscriberWhiteboxVerificationRules; 013import org.reactivestreams.tck.flow.support.TestException; 014import org.testng.SkipException; 015import org.testng.annotations.AfterClass; 016import org.testng.annotations.BeforeClass; 017import org.testng.annotations.BeforeMethod; 018import org.testng.annotations.Test; 019 020import java.util.concurrent.ExecutorService; 021import java.util.concurrent.Executors; 022 023import static org.testng.Assert.assertTrue; 024 025/** 026 * Provides whitebox style tests for verifying {@link org.reactivestreams.Subscriber} 027 * and {@link org.reactivestreams.Subscription} specification rules. 028 * 029 * @see org.reactivestreams.Subscriber 030 * @see org.reactivestreams.Subscription 031 */ 032public abstract class SubscriberWhiteboxVerification<T> extends WithHelperPublisher<T> 033 implements SubscriberWhiteboxVerificationRules { 034 035 private final TestEnvironment env; 036 037 protected SubscriberWhiteboxVerification(TestEnvironment env) { 038 this.env = env; 039 } 040 041 // USER API 042 043 /** 044 * This is the main method you must implement in your test incarnation. 045 * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic. 046 * 047 * In order to be meaningfully testable your Subscriber must inform the given 048 * `WhiteboxSubscriberProbe` of the respective events having been received. 049 */ 050 public abstract Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe); 051 052 // ENV SETUP 053 054 /** 055 * Executor service used by the default provided asynchronous Publisher. 056 * @see #createHelperPublisher(long) 057 */ 058 private ExecutorService publisherExecutor; 059 @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); } 060 @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); } 061 @Override public ExecutorService publisherExecutorService() { return publisherExecutor; } 062 063 ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// 064 065 @BeforeMethod 066 public void setUp() throws Exception { 067 env.clearAsyncErrors(); 068 } 069 070 ////////////////////// TEST SETUP VERIFICATION ////////////////////////////// 071 072 @Test 073 public void required_exerciseWhiteboxHappyPath() throws Throwable { 074 subscriberTest(new TestStageTestRun() { 075 @Override 076 public void run(WhiteboxTestStage stage) throws InterruptedException { 077 stage.puppet().triggerRequest(1); 078 stage.puppet().triggerRequest(1); 079 080 long receivedRequests = stage.expectRequest(); 081 082 stage.signalNext(); 083 stage.probe.expectNext(stage.lastT); 084 085 stage.puppet().triggerRequest(1); 086 if (receivedRequests == 1) { 087 stage.expectRequest(); 088 } 089 090 stage.signalNext(); 091 stage.probe.expectNext(stage.lastT); 092 093 stage.puppet().signalCancel(); 094 stage.expectCancelling(); 095 096 stage.verifyNoAsyncErrors(); 097 } 098 }); 099 } 100 101 ////////////////////// SPEC RULE VERIFICATION /////////////////////////////// 102 103 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.1 104 @Override @Test 105 public void required_spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable { 106 subscriberTest(new TestStageTestRun() { 107 @Override 108 public void run(WhiteboxTestStage stage) throws InterruptedException { 109 stage.puppet().triggerRequest(1); 110 stage.expectRequest(); 111 112 stage.signalNext(); 113 } 114 }); 115 } 116 117 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.2 118 @Override @Test 119 public void untested_spec202_shouldAsynchronouslyDispatch() throws Exception { 120 notVerified(); // cannot be meaningfully tested, or can it? 121 } 122 123 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3 124 @Override @Test 125 public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable { 126 subscriberTestWithoutSetup(new TestStageTestRun() { 127 @Override 128 public void run(WhiteboxTestStage stage) throws Throwable { 129 final String onCompleteMethod = "required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete_call"; 130 131 final Subscription subs = new Subscription() { 132 @Override 133 public void request(long n) { 134 final Optional<StackTraceElement> onCompleteStackTraceElement = env.findCallerMethodInStackTrace(onCompleteMethod); 135 if (onCompleteStackTraceElement.isDefined()) { 136 final StackTraceElement stackElem = onCompleteStackTraceElement.get(); 137 env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)", 138 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 139 } 140 } 141 142 @Override 143 public void cancel() { 144 final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace(onCompleteMethod); 145 if (onCompleteStackElement.isDefined()) { 146 final StackTraceElement stackElem = onCompleteStackElement.get(); 147 env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)", 148 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 149 } 150 } 151 }; 152 153 stage.probe = stage.createWhiteboxSubscriberProbe(env); 154 final Subscriber<T> sub = createSubscriber(stage.probe); 155 156 sub.onSubscribe(subs); 157 required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete_call(sub); 158 159 env.verifyNoAsyncErrorsNoDelay(); 160 } 161 162 /** Makes sure the onComplete is initiated with a recognizable stacktrace element on the current thread. */ 163 void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete_call(Subscriber<?> sub) { 164 sub.onComplete(); 165 } 166 }); 167 } 168 169 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3 170 @Override @Test 171 public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable { 172 subscriberTestWithoutSetup(new TestStageTestRun() { 173 @Override 174 public void run(WhiteboxTestStage stage) throws Throwable { 175 final String onErrorMethod = "required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError_call"; 176 177 final Subscription subs = new Subscription() { 178 @Override 179 public void request(long n) { 180 final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace(onErrorMethod); 181 if (onCompleteStackElement.isDefined()) { 182 final StackTraceElement stackElem = onCompleteStackElement.get(); 183 env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)", 184 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 185 } 186 } 187 188 @Override 189 public void cancel() { 190 final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace(onErrorMethod); 191 if (onCompleteStackElement.isDefined()) { 192 final StackTraceElement stackElem = onCompleteStackElement.get(); 193 env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)", 194 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 195 } 196 } 197 }; 198 199 stage.probe = stage.createWhiteboxSubscriberProbe(env); 200 final Subscriber<T> sub = createSubscriber(stage.probe); 201 202 sub.onSubscribe(subs); 203 required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError_call(sub); 204 205 env.verifyNoAsyncErrorsNoDelay(); 206 } 207 208 /** Makes sure the onError is initiated with a recognizable stacktrace element on the current thread. */ 209 void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError_call(Subscriber<?> sub) { 210 sub.onError(new TestException()); 211 } 212 }); 213 } 214 215 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.4 216 @Override @Test 217 public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception { 218 notVerified(); // cannot be meaningfully tested, or can it? 219 } 220 221 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.5 222 @Override @Test 223 public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable { 224 subscriberTest(new TestStageTestRun() { 225 @Override 226 public void run(WhiteboxTestStage stage) throws Throwable { 227 // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail 228 final Latch secondSubscriptionCancelled = new Latch(env); 229 final Subscriber<? super T> sub = stage.sub(); 230 final Subscription subscription = new Subscription() { 231 @Override 232 public void request(long elements) { 233 // ignore... 234 } 235 236 @Override 237 public void cancel() { 238 secondSubscriptionCancelled.close(); 239 } 240 241 @Override 242 public String toString() { 243 return "SecondSubscription(should get cancelled)"; 244 } 245 }; 246 sub.onSubscribe(subscription); 247 248 secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called"); 249 env.verifyNoAsyncErrors(); 250 } 251 }); 252 } 253 254 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.6 255 @Override @Test 256 public void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception { 257 notVerified(); // cannot be meaningfully tested, or can it? 258 } 259 260 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.7 261 @Override @Test 262 public void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception { 263 notVerified(); // cannot be meaningfully tested, or can it? 264 // the same thread part of the clause can be verified but that is not very useful, or is it? 265 } 266 267 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.8 268 @Override @Test 269 public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable { 270 subscriberTest(new TestStageTestRun() { 271 @Override 272 public void run(WhiteboxTestStage stage) throws InterruptedException { 273 stage.puppet().triggerRequest(1); 274 stage.expectRequest(); 275 stage.puppet().signalCancel(); 276 stage.expectCancelling(); 277 stage.signalNext(); 278 279 stage.puppet().triggerRequest(1); 280 stage.puppet().triggerRequest(1); 281 282 stage.verifyNoAsyncErrors(); 283 } 284 }); 285 } 286 287 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9 288 @Override @Test 289 public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable { 290 subscriberTest(new TestStageTestRun() { 291 @Override 292 public void run(WhiteboxTestStage stage) throws InterruptedException { 293 stage.puppet().triggerRequest(1); 294 stage.sendCompletion(); 295 stage.probe.expectCompletion(); 296 297 stage.verifyNoAsyncErrors(); 298 } 299 }); 300 } 301 302 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9 303 @Override @Test 304 public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable { 305 subscriberTest(new TestStageTestRun() { 306 @Override 307 public void run(WhiteboxTestStage stage) throws InterruptedException { 308 stage.sendCompletion(); 309 stage.probe.expectCompletion(); 310 311 stage.verifyNoAsyncErrors(); 312 } 313 }); 314 } 315 316 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10 317 @Override @Test 318 public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable { 319 subscriberTest(new TestStageTestRun() { 320 @Override 321 public void run(WhiteboxTestStage stage) throws InterruptedException { 322 stage.puppet().triggerRequest(1); 323 stage.puppet().triggerRequest(1); 324 325 Exception ex = new TestException(); 326 stage.sendError(ex); 327 stage.probe.expectError(ex); 328 329 env.verifyNoAsyncErrorsNoDelay(); 330 } 331 }); 332 } 333 334 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10 335 @Override @Test 336 public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable { 337 subscriberTest(new TestStageTestRun() { 338 @Override 339 public void run(WhiteboxTestStage stage) throws InterruptedException { 340 Exception ex = new TestException(); 341 stage.sendError(ex); 342 stage.probe.expectError(ex); 343 344 env.verifyNoAsyncErrorsNoDelay(); 345 } 346 }); 347 } 348 349 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.11 350 @Override @Test 351 public void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception { 352 notVerified(); // cannot be meaningfully tested, or can it? 353 } 354 355 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.12 356 @Override @Test 357 public void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable { 358 notVerified(); // cannot be meaningfully tested, or can it? 359 } 360 361 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 362 @Override @Test 363 public void untested_spec213_failingOnSignalInvocation() throws Exception { 364 notVerified(); // cannot be meaningfully tested, or can it? 365 } 366 367 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 368 @Override @Test 369 public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 370 subscriberTest(new TestStageTestRun() { 371 @Override 372 public void run(WhiteboxTestStage stage) throws Throwable { 373 374 final Subscriber<? super T> sub = stage.sub(); 375 boolean gotNPE = false; 376 try { 377 sub.onSubscribe(null); 378 } catch (final NullPointerException expected) { 379 gotNPE = true; 380 } 381 382 assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException"); 383 env.verifyNoAsyncErrorsNoDelay(); 384 } 385 }); 386 } 387 388 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 389 @Override @Test 390 public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 391 subscriberTest(new TestStageTestRun() { 392 @Override 393 public void run(WhiteboxTestStage stage) throws Throwable { 394 395 final Subscriber<? super T> sub = stage.sub(); 396 boolean gotNPE = false; 397 try { 398 sub.onNext(null); 399 } catch (final NullPointerException expected) { 400 gotNPE = true; 401 } 402 403 assertTrue(gotNPE, "onNext(null) did not throw NullPointerException"); 404 env.verifyNoAsyncErrorsNoDelay(); 405 } 406 }); 407 } 408 409 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 410 @Override @Test 411 public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 412 subscriberTest(new TestStageTestRun() { 413 @Override 414 public void run(WhiteboxTestStage stage) throws Throwable { 415 416 final Subscriber<? super T> sub = stage.sub(); 417 boolean gotNPE = false; 418 try { 419 sub.onError(null); 420 } catch (final NullPointerException expected) { 421 gotNPE = true; 422 } finally { 423 assertTrue(gotNPE, "onError(null) did not throw NullPointerException"); 424 } 425 426 env.verifyNoAsyncErrorsNoDelay(); 427 } 428 }); 429 } 430 431 432 ////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION ////////////////// 433 434 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.1 435 @Override @Test 436 public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception { 437 notVerified(); // cannot be meaningfully tested, or can it? 438 } 439 440 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.8 441 @Override @Test 442 public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable { 443 subscriberTest(new TestStageTestRun() { 444 @Override 445 public void run(WhiteboxTestStage stage) throws InterruptedException { 446 stage.puppet().triggerRequest(2); 447 long requestedElements = stage.expectRequest(); 448 stage.probe.expectNext(stage.signalNext()); 449 // Some subscribers may only request one element at a time. 450 if (requestedElements < 2) { 451 stage.expectRequest(); 452 } 453 stage.probe.expectNext(stage.signalNext()); 454 455 stage.probe.expectNone(); 456 stage.puppet().triggerRequest(3); 457 458 stage.verifyNoAsyncErrors(); 459 } 460 }); 461 } 462 463 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.10 464 @Override @Test 465 public void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception { 466 notVerified(); // cannot be meaningfully tested, or can it? 467 } 468 469 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.11 470 @Override @Test 471 public void untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception { 472 notVerified(); // cannot be meaningfully tested, or can it? 473 } 474 475 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.14 476 @Override @Test 477 public void untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception { 478 notVerified(); // cannot be meaningfully tested, or can it? 479 } 480 481 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.15 482 @Override @Test 483 public void untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception { 484 notVerified(); // cannot be meaningfully tested, or can it? 485 } 486 487 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.16 488 @Override @Test 489 public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception { 490 notVerified(); // cannot be meaningfully tested, or can it? 491 } 492 493 /////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////// 494 495 /////////////////////// TEST INFRASTRUCTURE ///////////////////////////////// 496 497 abstract class TestStageTestRun { 498 public abstract void run(WhiteboxTestStage stage) throws Throwable; 499 } 500 501 /** 502 * Prepares subscriber and publisher pair (by subscribing the first to the latter), 503 * and then hands over the tests {@link WhiteboxTestStage} over to the test. 504 * 505 * The test stage is, like in a puppet show, used to orchestrate what each participant should do. 506 * Since this is a whitebox test, this allows the stage to completely control when and how to signal / expect signals. 507 */ 508 public void subscriberTest(TestStageTestRun body) throws Throwable { 509 WhiteboxTestStage stage = new WhiteboxTestStage(env, true); 510 body.run(stage); 511 } 512 513 /** 514 * Provides a {@link WhiteboxTestStage} without performing any additional setup, 515 * like the {@link #subscriberTest(SubscriberWhiteboxVerification.TestStageTestRun)} would. 516 * 517 * Use this method to write tests in which you need full control over when and how the initial {@code subscribe} is signalled. 518 */ 519 public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable { 520 WhiteboxTestStage stage = new WhiteboxTestStage(env, false); 521 body.run(stage); 522 } 523 524 /** 525 * Test for feature that MAY be implemented. This test will be marked as SKIPPED if it fails. 526 */ 527 public void optionalSubscriberTestWithoutSetup(TestStageTestRun body) throws Throwable { 528 try { 529 subscriberTestWithoutSetup(body); 530 } catch (Exception ex) { 531 notVerified("Skipped because tested publisher does NOT implement this OPTIONAL requirement."); 532 } 533 } 534 535 public class WhiteboxTestStage extends ManualPublisher<T> { 536 public Publisher<T> pub; 537 public ManualSubscriber<T> tees; // gives us access to a stream T values 538 public WhiteboxSubscriberProbe<T> probe; 539 540 public T lastT = null; 541 542 public WhiteboxTestStage(TestEnvironment env) throws InterruptedException { 543 this(env, true); 544 } 545 546 public WhiteboxTestStage(TestEnvironment env, boolean runDefaultInit) throws InterruptedException { 547 super(env); 548 if (runDefaultInit) { 549 pub = this.createHelperPublisher(Long.MAX_VALUE); 550 tees = env.newManualSubscriber(pub); 551 probe = new WhiteboxSubscriberProbe<T>(env, subscriber); 552 subscribe(createSubscriber(probe)); 553 probe.puppet.expectCompletion(env.defaultTimeoutMillis(), String.format("Subscriber %s did not `registerOnSubscribe`", sub())); 554 env.verifyNoAsyncErrorsNoDelay(); 555 } 556 } 557 558 public Subscriber<? super T> sub() { 559 return subscriber.value(); 560 } 561 562 public SubscriberPuppet puppet() { 563 return probe.puppet(); 564 } 565 566 public WhiteboxSubscriberProbe<T> probe() { 567 return probe; 568 } 569 570 public Publisher<T> createHelperPublisher(long elements) { 571 return SubscriberWhiteboxVerification.this.createHelperPublisher(elements); 572 } 573 574 public WhiteboxSubscriberProbe<T> createWhiteboxSubscriberProbe(TestEnvironment env) { 575 return new WhiteboxSubscriberProbe<T>(env, subscriber); 576 } 577 578 public T signalNext() throws InterruptedException { 579 return signalNext(nextT()); 580 } 581 582 private T signalNext(T element) throws InterruptedException { 583 sendNext(element); 584 return element; 585 } 586 587 public T nextT() throws InterruptedException { 588 lastT = tees.requestNextElement(); 589 return lastT; 590 } 591 592 public void verifyNoAsyncErrors() { 593 env.verifyNoAsyncErrors(); 594 } 595 } 596 597 /** 598 * This class is intented to be used as {@code Subscriber} decorator and should be used in {@code pub.subscriber(...)} calls, 599 * in order to allow intercepting calls on the underlying {@code Subscriber}. 600 * This delegation allows the proxy to implement {@link BlackboxProbe} assertions. 601 */ 602 public static class BlackboxSubscriberProxy<T> extends BlackboxProbe<T> implements Subscriber<T> { 603 604 public BlackboxSubscriberProxy(TestEnvironment env, Subscriber<T> subscriber) { 605 super(env, Promise.<Subscriber<? super T>>completed(env, subscriber)); 606 } 607 608 @Override 609 public void onSubscribe(Subscription s) { 610 sub().onSubscribe(s); 611 } 612 613 @Override 614 public void onNext(T t) { 615 registerOnNext(t); 616 sub().onNext(t); 617 } 618 619 @Override 620 public void onError(Throwable cause) { 621 registerOnError(cause); 622 sub().onError(cause); 623 } 624 625 @Override 626 public void onComplete() { 627 registerOnComplete(); 628 sub().onComplete(); 629 } 630 } 631 632 public static class BlackboxProbe<T> implements SubscriberProbe<T> { 633 protected final TestEnvironment env; 634 protected final Promise<Subscriber<? super T>> subscriber; 635 636 protected final Receptacle<T> elements; 637 protected final Promise<Throwable> error; 638 639 public BlackboxProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) { 640 this.env = env; 641 this.subscriber = subscriber; 642 elements = new Receptacle<T>(env); 643 error = new Promise<Throwable>(env); 644 } 645 646 @Override 647 public void registerOnNext(T element) { 648 elements.add(element); 649 } 650 651 @Override 652 public void registerOnComplete() { 653 try { 654 elements.complete(); 655 } catch (IllegalStateException ex) { 656 // "Queue full", onComplete was already called 657 env.flop("subscriber::onComplete was called a second time, which is illegal according to Rule 1.7"); 658 } 659 } 660 661 @Override 662 public void registerOnError(Throwable cause) { 663 try { 664 error.complete(cause); 665 } catch (IllegalStateException ex) { 666 // "Queue full", onError was already called 667 env.flop("subscriber::onError was called a second time, which is illegal according to Rule 1.7"); 668 } 669 } 670 671 public T expectNext() throws InterruptedException { 672 return elements.next(env.defaultTimeoutMillis(), String.format("Subscriber %s did not call `registerOnNext(_)`", sub())); 673 } 674 675 public void expectNext(T expected) throws InterruptedException { 676 expectNext(expected, env.defaultTimeoutMillis()); 677 } 678 679 public void expectNext(T expected, long timeoutMillis) throws InterruptedException { 680 T received = elements.next(timeoutMillis, String.format("Subscriber %s did not call `registerOnNext(%s)`", sub(), expected)); 681 if (!received.equals(expected)) { 682 env.flop(String.format("Subscriber %s called `registerOnNext(%s)` rather than `registerOnNext(%s)`", sub(), received, expected)); 683 } 684 } 685 686 public Subscriber<? super T> sub() { 687 return subscriber.value(); 688 } 689 690 public void expectCompletion() throws InterruptedException { 691 expectCompletion(env.defaultTimeoutMillis()); 692 } 693 694 public void expectCompletion(long timeoutMillis) throws InterruptedException { 695 expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnComplete()`", sub())); 696 } 697 698 public void expectCompletion(long timeoutMillis, String msg) throws InterruptedException { 699 elements.expectCompletion(timeoutMillis, msg); 700 } 701 702 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 703 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws InterruptedException { 704 final E err = expectError(expected); 705 String message = err.getMessage(); 706 assertTrue(message.contains(requiredMessagePart), 707 String.format("Got expected exception %s but missing message [%s], was: %s", err.getClass(), requiredMessagePart, expected)); 708 } 709 710 public <E extends Throwable> E expectError(Class<E> expected) throws InterruptedException { 711 return expectError(expected, env.defaultTimeoutMillis()); 712 } 713 714 @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) 715 public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws InterruptedException { 716 error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected)); 717 if (error.value() == null) { 718 return env.flopAndFail(String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected)); 719 } else if (expected.isInstance(error.value())) { 720 return (E) error.value(); 721 } else { 722 return env.flopAndFail(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected)); 723 } 724 } 725 726 public void expectError(Throwable expected) throws InterruptedException { 727 expectError(expected, env.defaultTimeoutMillis()); 728 } 729 730 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 731 public void expectError(Throwable expected, long timeoutMillis) throws InterruptedException { 732 error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected)); 733 if (error.value() != expected) { 734 env.flop(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected)); 735 } 736 } 737 738 public void expectNone() throws InterruptedException { 739 expectNone(env.defaultNoSignalsTimeoutMillis()); 740 } 741 742 public void expectNone(long withinMillis) throws InterruptedException { 743 elements.expectNone(withinMillis, "Expected nothing"); 744 } 745 746 } 747 748 public static class WhiteboxSubscriberProbe<T> extends BlackboxProbe<T> implements SubscriberPuppeteer { 749 protected Promise<SubscriberPuppet> puppet; 750 751 public WhiteboxSubscriberProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) { 752 super(env, subscriber); 753 puppet = new Promise<SubscriberPuppet>(env); 754 } 755 756 private SubscriberPuppet puppet() { 757 return puppet.value(); 758 } 759 760 @Override 761 public void registerOnSubscribe(SubscriberPuppet p) { 762 if (!puppet.isCompleted()) { 763 puppet.complete(p); 764 } 765 } 766 767 } 768 769 public interface SubscriberPuppeteer { 770 771 /** 772 * Must be called by the test subscriber when it has successfully registered a subscription 773 * inside the `onSubscribe` method. 774 */ 775 void registerOnSubscribe(SubscriberPuppet puppet); 776 } 777 778 public interface SubscriberProbe<T> { 779 780 /** 781 * Must be called by the test subscriber when it has received an`onNext` event. 782 */ 783 void registerOnNext(T element); 784 785 /** 786 * Must be called by the test subscriber when it has received an `onComplete` event. 787 */ 788 void registerOnComplete(); 789 790 /** 791 * Must be called by the test subscriber when it has received an `onError` event. 792 */ 793 void registerOnError(Throwable cause); 794 795 } 796 797 /** 798 * Implement this puppet in your Whitebox style tests. 799 * The test suite will invoke the specific trigger/signal methods requesting you to execute the specific action. 800 * Since this is a whitebox style test, you're allowed and expected to use knowladge about your implementation to 801 * make implement these calls. 802 */ 803 public interface SubscriberPuppet { 804 805 /** 806 * Ensure that at least {@code elements} are eventually requested by your {@link Subscriber}, if it hasn't already 807 * requested that many elements. 808 * <p> 809 * This does not necessarily have to correlate 1:1 with a {@code Subscription.request(elements)} call, but the sum 810 * of the elements requested by your {@code Subscriber} must eventually be at least the the sum of the elements 811 * triggered to be requested by all the invocations of this method. 812 * <p> 813 * Additionally, subscribers are permitted to delay requesting elements until previous requests for elements have 814 * been fulfilled. For example, a subscriber that only requests one element at a time may fulfill the request made 815 * by this method by requesting one element {@code elements} times, waiting for each element to arrive before the 816 * next request is made. 817 * <p> 818 * Before sending any element to the subscriber, the TCK must wait for the subscriber to request that element, and 819 * must be prepared for the subscriber to only request one element at a time, it is not enough for the TCK to 820 * simply invoke this method before sending elements. 821 * <p> 822 * An invocation of {@link #signalCancel()} may be coalesced into any elements that have not yet been requested, 823 * such that only a cancel signal is emitted. 824 */ 825 void triggerRequest(long elements); 826 827 /** 828 * Trigger {@code cancel()} on your {@link Subscriber}. 829 * <p> 830 * An invocation of this method may be coalesced into any outstanding requests, as requested by 831 * {@link #triggerRequest(long)}, such that only a cancel signal is emitted. 832 */ 833 void signalCancel(); 834 } 835 836 public void notVerified() { 837 throw new SkipException("Not verified using this TCK."); 838 } 839 840 public void notVerified(String msg) { 841 throw new SkipException(msg); 842 } 843}