001/*************************************************** 002 * Licensed under MIT No Attribution (SPDX: MIT-0) * 003 ***************************************************/ 004 005package org.reactivestreams.tck; 006 007import org.reactivestreams.Processor; 008import org.reactivestreams.Publisher; 009import org.reactivestreams.Subscriber; 010import org.reactivestreams.Subscription; 011import org.reactivestreams.tck.TestEnvironment.ManualPublisher; 012import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; 013import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; 014import org.reactivestreams.tck.TestEnvironment.Promise; 015import org.reactivestreams.tck.flow.support.Function; 016import org.reactivestreams.tck.flow.support.SubscriberWhiteboxVerificationRules; 017import org.reactivestreams.tck.flow.support.PublisherVerificationRules; 018import org.testng.annotations.BeforeMethod; 019import org.testng.annotations.Test; 020 021import java.util.HashSet; 022import java.util.Set; 023 024public abstract class IdentityProcessorVerification<T> extends WithHelperPublisher<T> 025 implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules { 026 027 private final TestEnvironment env; 028 029 ////////////////////// DELEGATED TO SPECS ////////////////////// 030 031 // for delegating tests 032 private final SubscriberWhiteboxVerification<T> subscriberVerification; 033 034 // for delegating tests 035 private final PublisherVerification<T> publisherVerification; 036 037 ////////////////// END OF DELEGATED TO SPECS ////////////////// 038 039 // number of elements the processor under test must be able ot buffer, 040 // without dropping elements. Defaults to `TestEnvironment.TEST_BUFFER_SIZE`. 041 private final int processorBufferSize; 042 043 /** 044 * Test class must specify the expected time it takes for the publisher to 045 * shut itself down when the the last downstream {@code Subscription} is cancelled. 046 * 047 * The processor will be required to be able to buffer {@code TestEnvironment.TEST_BUFFER_SIZE} elements. 048 */ 049 @SuppressWarnings("unused") 050 public IdentityProcessorVerification(final TestEnvironment env) { 051 this(env, PublisherVerification.envPublisherReferenceGCTimeoutMillis(), TestEnvironment.TEST_BUFFER_SIZE); 052 } 053 054 /** 055 * Test class must specify the expected time it takes for the publisher to 056 * shut itself down when the the last downstream {@code Subscription} is cancelled. 057 * 058 * The processor will be required to be able to buffer {@code TestEnvironment.TEST_BUFFER_SIZE} elements. 059 * 060 * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. 061 */ 062 @SuppressWarnings("unused") 063 public IdentityProcessorVerification(final TestEnvironment env, long publisherReferenceGCTimeoutMillis) { 064 this(env, publisherReferenceGCTimeoutMillis, TestEnvironment.TEST_BUFFER_SIZE); 065 } 066 067 /** 068 * Test class must specify the expected time it takes for the publisher to 069 * shut itself down when the the last downstream {@code Subscription} is cancelled. 070 * 071 * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. 072 * @param processorBufferSize number of elements the processor is required to be able to buffer. 073 */ 074 public IdentityProcessorVerification(final TestEnvironment env, long publisherReferenceGCTimeoutMillis, int processorBufferSize) { 075 this.env = env; 076 this.processorBufferSize = processorBufferSize; 077 078 this.subscriberVerification = new SubscriberWhiteboxVerification<T>(env) { 079 @Override 080 public Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe) { 081 return IdentityProcessorVerification.this.createSubscriber(probe); 082 } 083 084 @Override public T createElement(int element) { 085 return IdentityProcessorVerification.this.createElement(element); 086 } 087 088 @Override 089 public Publisher<T> createHelperPublisher(long elements) { 090 return IdentityProcessorVerification.this.createHelperPublisher(elements); 091 } 092 }; 093 094 publisherVerification = new PublisherVerification<T>(env, publisherReferenceGCTimeoutMillis) { 095 @Override 096 public Publisher<T> createPublisher(long elements) { 097 return IdentityProcessorVerification.this.createPublisher(elements); 098 } 099 100 @Override 101 public Publisher<T> createFailedPublisher() { 102 return IdentityProcessorVerification.this.createFailedPublisher(); 103 } 104 105 @Override 106 public long maxElementsFromPublisher() { 107 return IdentityProcessorVerification.this.maxElementsFromPublisher(); 108 } 109 110 @Override 111 public long boundedDepthOfOnNextAndRequestRecursion() { 112 return IdentityProcessorVerification.this.boundedDepthOfOnNextAndRequestRecursion(); 113 } 114 115 @Override 116 public boolean skipStochasticTests() { 117 return IdentityProcessorVerification.this.skipStochasticTests(); 118 } 119 }; 120 } 121 122 /** 123 * This is the main method you must implement in your test incarnation. 124 * It must create a {@link Processor}, which simply forwards all stream elements from its upstream 125 * to its downstream. It must be able to internally buffer the given number of elements. 126 * 127 * @param bufferSize number of elements the processor is required to be able to buffer. 128 */ 129 public abstract Processor<T, T> createIdentityProcessor(int bufferSize); 130 131 /** 132 * By implementing this method, additional TCK tests concerning a "failed" publishers will be run. 133 * 134 * The expected behaviour of the {@link Publisher} returned by this method is hand out a subscription, 135 * followed by signalling {@code onError} on it, as specified by Rule 1.9. 136 * 137 * If you want to ignore these additional tests, return {@code null} from this method. 138 */ 139 public abstract Publisher<T> createFailedPublisher(); 140 141 /** 142 * Override and return lower value if your Publisher is only able to produce a known number of elements. 143 * For example, if it is designed to return at-most-one element, return {@code 1} from this method. 144 * 145 * Defaults to {@code Long.MAX_VALUE - 1}, meaning that the Publisher can be produce a huge but NOT an unbounded number of elements. 146 * 147 * To mark your Publisher will *never* signal an {@code onComplete} override this method and return {@code Long.MAX_VALUE}, 148 * which will result in *skipping all tests which require an onComplete to be triggered* (!). 149 */ 150 public long maxElementsFromPublisher() { 151 return Long.MAX_VALUE - 1; 152 } 153 154 /** 155 * In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a 156 * {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of 157 * recursive calls to exceed the number returned by this method. 158 * 159 * @see <a href="https://github.com/reactive-streams/reactive-streams-jvm#3.3">reactive streams spec, rule 3.3</a> 160 * @see PublisherVerification#required_spec303_mustNotAllowUnboundedRecursion() 161 */ 162 public long boundedDepthOfOnNextAndRequestRecursion() { 163 return 1; 164 } 165 166 /** 167 * Override and return {@code true} in order to skip executing tests marked as {@code Stochastic}. 168 * Stochastic in this case means that the Rule is impossible or infeasible to deterministically verify— 169 * usually this means that this test case can yield false positives ("be green") even if for some case, 170 * the given implementation may violate the tested behaviour. 171 */ 172 public boolean skipStochasticTests() { 173 return false; 174 } 175 176 /** 177 * Describes the tested implementation in terms of how many subscribers they can support. 178 * Some tests require the {@code Publisher} under test to support multiple Subscribers, 179 * yet the spec does not require all publishers to be able to do so, thus – if an implementation 180 * supports only a limited number of subscribers (e.g. only 1 subscriber, also known as "no fanout") 181 * you MUST return that number from this method by overriding it. 182 */ 183 public long maxSupportedSubscribers() { 184 return Long.MAX_VALUE; 185 } 186 187 /** 188 * Override this method and return {@code true} if the {@link Processor} returned by the 189 * {@link #createIdentityProcessor(int)} coordinates its {@link Subscriber}s 190 * request amounts and only delivers onNext signals if all Subscribers have 191 * indicated (via their Subscription#request(long)) they are ready to receive elements. 192 */ 193 public boolean doesCoordinatedEmission() { 194 return false; 195 } 196 197 ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// 198 199 @BeforeMethod 200 public void setUp() throws Exception { 201 publisherVerification.setUp(); 202 subscriberVerification.setUp(); 203 } 204 205 ////////////////////// PUBLISHER RULES VERIFICATION /////////////////////////// 206 207 // A Processor 208 // must obey all Publisher rules on its publishing side 209 public Publisher<T> createPublisher(long elements) { 210 final Processor<T, T> processor = createIdentityProcessor(processorBufferSize); 211 final Publisher<T> pub = createHelperPublisher(elements); 212 pub.subscribe(processor); 213 return processor; // we run the PublisherVerification against this 214 } 215 216 @Override @Test 217 public void required_validate_maxElementsFromPublisher() throws Exception { 218 publisherVerification.required_validate_maxElementsFromPublisher(); 219 } 220 221 @Override @Test 222 public void required_validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception { 223 publisherVerification.required_validate_boundedDepthOfOnNextAndRequestRecursion(); 224 } 225 226 /////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" PUBLISHER ////////////////////// 227 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1 228 229 @Test 230 public void required_createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable { 231 publisherVerification.required_createPublisher1MustProduceAStreamOfExactly1Element(); 232 } 233 234 @Test 235 public void required_createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable { 236 publisherVerification.required_createPublisher3MustProduceAStreamOfExactly3Elements(); 237 } 238 239 @Override @Test 240 public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable { 241 publisherVerification.required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements(); 242 } 243 244 @Override @Test 245 public void required_spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable { 246 publisherVerification.required_spec102_maySignalLessThanRequestedAndTerminateSubscription(); 247 } 248 249 @Override @Test 250 public void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable { 251 publisherVerification.stochastic_spec103_mustSignalOnMethodsSequentially(); 252 } 253 254 @Override @Test 255 public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable { 256 publisherVerification.optional_spec104_mustSignalOnErrorWhenFails(); 257 } 258 259 @Override @Test 260 public void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable { 261 publisherVerification.required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates(); 262 } 263 264 @Override @Test 265 public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable { 266 publisherVerification.optional_spec105_emptyStreamMustTerminateBySignallingOnComplete(); 267 } 268 269 @Override @Test 270 public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable { 271 publisherVerification.untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled(); 272 } 273 274 @Override @Test 275 public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable { 276 publisherVerification.required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled(); 277 } 278 279 @Override @Test 280 public void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable { 281 publisherVerification.untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled(); 282 } 283 284 @Override @Test 285 public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable { 286 publisherVerification.untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals(); 287 } 288 289 @Override @Test 290 public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable { 291 publisherVerification.untested_spec109_subscribeShouldNotThrowNonFatalThrowable(); 292 } 293 294 @Override @Test 295 public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable { 296 publisherVerification.required_spec109_subscribeThrowNPEOnNullSubscriber(); 297 } 298 299 @Override @Test 300 public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable { 301 publisherVerification.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe(); 302 } 303 304 @Override @Test 305 public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable { 306 publisherVerification.required_spec109_mustIssueOnSubscribeForNonNullSubscriber(); 307 } 308 309 @Override @Test 310 public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable { 311 publisherVerification.untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice(); 312 } 313 314 @Override @Test 315 public void optional_spec111_maySupportMultiSubscribe() throws Throwable { 316 publisherVerification.optional_spec111_maySupportMultiSubscribe(); 317 } 318 319 @Override @Test 320 public void optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals() throws Throwable { 321 publisherVerification.optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals(); 322 } 323 324 @Override @Test 325 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable { 326 publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne(); 327 } 328 329 @Override @Test 330 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable { 331 publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront(); 332 } 333 334 @Override @Test 335 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable { 336 publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected(); 337 } 338 339 @Override @Test 340 public void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable { 341 publisherVerification.required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe(); 342 } 343 344 @Override @Test 345 public void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable { 346 publisherVerification.required_spec303_mustNotAllowUnboundedRecursion(); 347 } 348 349 @Override @Test 350 public void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception { 351 publisherVerification.untested_spec304_requestShouldNotPerformHeavyComputations(); 352 } 353 354 @Override @Test 355 public void untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation() throws Exception { 356 publisherVerification.untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation(); 357 } 358 359 @Override @Test 360 public void required_spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable { 361 publisherVerification.required_spec306_afterSubscriptionIsCancelledRequestMustBeNops(); 362 } 363 364 @Override @Test 365 public void required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable { 366 publisherVerification.required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops(); 367 } 368 369 @Override @Test 370 public void required_spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable { 371 publisherVerification.required_spec309_requestZeroMustSignalIllegalArgumentException(); 372 } 373 374 @Override @Test 375 public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable { 376 publisherVerification.required_spec309_requestNegativeNumberMustSignalIllegalArgumentException(); 377 } 378 379 @Override @Test 380 public void optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage() throws Throwable { 381 publisherVerification.optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage(); 382 } 383 384 @Override @Test 385 public void required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable { 386 publisherVerification.required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling(); 387 } 388 389 @Override @Test 390 public void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable { 391 publisherVerification.required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber(); 392 } 393 394 @Override @Test 395 public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable { 396 publisherVerification.required_spec317_mustSupportAPendingElementCountUpToLongMaxValue(); 397 } 398 399 @Override @Test 400 public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable { 401 publisherVerification.required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue(); 402 } 403 404 @Override @Test 405 public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable { 406 publisherVerification.required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue(); 407 } 408 409 410 /** 411 * Asks for a {@code Processor} that supports at least 2 {@code Subscriber}s at once and checks if two {@code Subscriber}s 412 * receive the same items and a terminal {@code Exception}. 413 * <p> 414 * If the {@code Processor} requests and/or emits items only when all of its {@code Subscriber}s have requested, 415 * override {@link #doesCoordinatedEmission()} and return {@code true} to indicate this property. 416 * <p> 417 * <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#1.4'>1.4</a> with multiple 418 * {@code Subscriber}s. 419 * <p> 420 * The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2. 421 * <p> 422 * If this test fails, the following could be checked within the {@code Processor} implementation: 423 * <ul> 424 * <li>The {@code TestEnvironment} has large enough timeout specified in case the {@code Processor} has some time-delay behavior.</li> 425 * <li>The {@code Processor} is able to fulfill requests of its {@code Subscriber}s independently of each other's requests or 426 * else override {@link #doesCoordinatedEmission()} and return {@code true} to indicate the test {@code Subscriber}s 427 * both have to request first.</li> 428 * </ul> 429 */ 430 @Test 431 public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Throwable { 432 optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() { 433 @Override 434 public TestSetup apply(Long aLong) throws Throwable { 435 return new TestSetup(env, processorBufferSize) {{ 436 final ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env); 437 env.subscribe(processor, sub1); 438 439 final ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env); 440 env.subscribe(processor, sub2); 441 442 final Exception ex = new RuntimeException("Test exception"); 443 444 if (doesCoordinatedEmission()) { 445 sub1.request(1); 446 sub2.request(1); 447 448 expectRequest(); 449 450 final T x = sendNextTFromUpstream(); 451 452 expectNextElement(sub1, x); 453 expectNextElement(sub2, x); 454 455 sub1.request(1); 456 sub2.request(1); 457 } else { 458 sub1.request(1); 459 460 expectRequest(env.defaultTimeoutMillis(), 461 "If the Processor coordinates requests/emissions when having multiple Subscribers" 462 + " at once, please override doesCoordinatedEmission() to return true in this " 463 + "IdentityProcessorVerification to allow this test to pass."); 464 465 final T x = sendNextTFromUpstream(); 466 expectNextElement(sub1, x, 467 "If the Processor coordinates requests/emissions when having multiple Subscribers" 468 + " at once, please override doesCoordinatedEmission() to return true in this " 469 + "IdentityProcessorVerification to allow this test to pass."); 470 471 sub1.request(1); 472 473 // sub1 has received one element, and has one demand pending 474 // sub2 has not yet requested anything 475 } 476 sendError(ex); 477 478 sub1.expectError(ex); 479 sub2.expectError(ex); 480 481 env.verifyNoAsyncErrorsNoDelay(); 482 }}; 483 } 484 }); 485 } 486 487 ////////////////////// SUBSCRIBER RULES VERIFICATION /////////////////////////// 488 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1 489 490 // A Processor 491 // must obey all Subscriber rules on its consuming side 492 public Subscriber<T> createSubscriber(final SubscriberWhiteboxVerification.WhiteboxSubscriberProbe<T> probe) { 493 final Processor<T, T> processor = createIdentityProcessor(processorBufferSize); 494 processor.subscribe( 495 new Subscriber<T>() { 496 private final Promise<Subscription> subs = new Promise<Subscription>(env); 497 498 @Override 499 public void onSubscribe(final Subscription subscription) { 500 if (env.debugEnabled()) { 501 env.debug(String.format("whiteboxSubscriber::onSubscribe(%s)", subscription)); 502 } 503 if (subs.isCompleted()) subscription.cancel(); // the Probe must also pass subscriber verification 504 505 probe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet() { 506 507 @Override 508 public void triggerRequest(long elements) { 509 subscription.request(elements); 510 } 511 512 @Override 513 public void signalCancel() { 514 subscription.cancel(); 515 } 516 }); 517 } 518 519 @Override 520 public void onNext(T element) { 521 if (env.debugEnabled()) { 522 env.debug(String.format("whiteboxSubscriber::onNext(%s)", element)); 523 } 524 probe.registerOnNext(element); 525 } 526 527 @Override 528 public void onComplete() { 529 if (env.debugEnabled()) { 530 env.debug("whiteboxSubscriber::onComplete()"); 531 } 532 probe.registerOnComplete(); 533 } 534 535 @Override 536 public void onError(Throwable cause) { 537 if (env.debugEnabled()) { 538 env.debug(String.format("whiteboxSubscriber::onError(%s)", cause)); 539 } 540 probe.registerOnError(cause); 541 } 542 }); 543 544 return processor; // we run the SubscriberVerification against this 545 } 546 547 ////////////////////// OTHER RULE VERIFICATION /////////////////////////// 548 549 // A Processor 550 // must immediately pass on `onError` events received from its upstream to its downstream 551 @Test 552 public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream() throws Exception { 553 new TestSetup(env, processorBufferSize) {{ 554 final ManualSubscriberWithErrorCollection<T> sub = new ManualSubscriberWithErrorCollection<T>(env); 555 env.subscribe(processor, sub); 556 557 final Exception ex = new RuntimeException("Test exception"); 558 sendError(ex); 559 sub.expectError(ex); // "immediately", i.e. without a preceding request 560 561 env.verifyNoAsyncErrorsNoDelay(); 562 }}; 563 } 564 565 /////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" SUBSCRIBER ////////////////////// 566 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1 567 568 @Test 569 public void required_exerciseWhiteboxHappyPath() throws Throwable { 570 subscriberVerification.required_exerciseWhiteboxHappyPath(); 571 } 572 573 @Override @Test 574 public void required_spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable { 575 subscriberVerification.required_spec201_mustSignalDemandViaSubscriptionRequest(); 576 } 577 578 @Override @Test 579 public void untested_spec202_shouldAsynchronouslyDispatch() throws Exception { 580 subscriberVerification.untested_spec202_shouldAsynchronouslyDispatch(); 581 } 582 583 @Override @Test 584 public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable { 585 subscriberVerification.required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete(); 586 } 587 588 @Override @Test 589 public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable { 590 subscriberVerification.required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError(); 591 } 592 593 @Override @Test 594 public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception { 595 subscriberVerification.untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError(); 596 } 597 598 @Override @Test 599 public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable { 600 subscriberVerification.required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal(); 601 } 602 603 @Override @Test 604 public void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception { 605 subscriberVerification.untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid(); 606 } 607 608 @Override @Test 609 public void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception { 610 subscriberVerification.untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization(); 611 } 612 613 @Override @Test 614 public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable { 615 subscriberVerification.required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel(); 616 } 617 618 @Override @Test 619 public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable { 620 subscriberVerification.required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall(); 621 } 622 623 @Override @Test 624 public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable { 625 subscriberVerification.required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall(); 626 } 627 628 @Override @Test 629 public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable { 630 subscriberVerification.required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall(); 631 } 632 633 @Override @Test 634 public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable { 635 subscriberVerification.required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall(); 636 } 637 638 @Override @Test 639 public void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception { 640 subscriberVerification.untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents(); 641 } 642 643 @Override @Test 644 public void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable { 645 subscriberVerification.untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation(); 646 } 647 648 @Override @Test 649 public void untested_spec213_failingOnSignalInvocation() throws Exception { 650 subscriberVerification.untested_spec213_failingOnSignalInvocation(); 651 } 652 653 @Override @Test 654 public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 655 subscriberVerification.required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull(); 656 } 657 @Override @Test 658 public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 659 subscriberVerification.required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull(); 660 } 661 @Override @Test 662 public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 663 subscriberVerification.required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull(); 664 } 665 666 @Override @Test 667 public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception { 668 subscriberVerification.untested_spec301_mustNotBeCalledOutsideSubscriberContext(); 669 } 670 671 @Override @Test 672 public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable { 673 subscriberVerification.required_spec308_requestMustRegisterGivenNumberElementsToBeProduced(); 674 } 675 676 @Override @Test 677 public void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception { 678 subscriberVerification.untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber(); 679 } 680 681 @Override @Test 682 public void untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception { 683 subscriberVerification.untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError(); 684 } 685 686 @Override @Test 687 public void untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception { 688 subscriberVerification.untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists(); 689 } 690 691 @Override @Test 692 public void untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception { 693 subscriberVerification.untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError(); 694 } 695 696 @Override @Test 697 public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception { 698 subscriberVerification.untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber(); 699 } 700 701 /////////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////// 702 703 /** 704 * Asks for a {@code Processor} that supports at least 2 {@code Subscriber}s at once and checks requests 705 * from {@code Subscriber}s will eventually lead to requests towards the upstream of the {@code Processor}. 706 * <p> 707 * If the {@code Processor} requests and/or emits items only when all of its {@code Subscriber}s have requested, 708 * override {@link #doesCoordinatedEmission()} and return {@code true} to indicate this property. 709 * <p> 710 * <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#1.4'>2.1</a> with multiple 711 * {@code Subscriber}s. 712 * <p> 713 * The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2. 714 * <p> 715 * If this test fails, the following could be checked within the {@code Processor} implementation: 716 * <ul> 717 * <li>The {@code TestEnvironment} has large enough timeout specified in case the {@code Processor} has some time-delay behavior.</li> 718 * <li>The {@code Processor} is able to fulfill requests of its {@code Subscriber}s independently of each other's requests or 719 * else override {@link #doesCoordinatedEmission()} and return {@code true} to indicate the test {@code Subscriber}s 720 * both have to request first.</li> 721 * </ul> 722 */ 723 @Test 724 public void required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Throwable { 725 optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() { 726 @Override 727 public TestSetup apply(Long subscribers) throws Throwable { 728 return new TestSetup(env, processorBufferSize) {{ 729 ManualSubscriber<T> sub1 = newSubscriber(); 730 sub1.request(20); 731 732 long totalRequests = expectRequest(); 733 final T x = sendNextTFromUpstream(); 734 expectNextElement(sub1, x); 735 736 if (totalRequests == 1) { 737 totalRequests += expectRequest(); 738 } 739 final T y = sendNextTFromUpstream(); 740 expectNextElement(sub1, y); 741 742 if (totalRequests == 2) { 743 totalRequests += expectRequest(); 744 } 745 746 final ManualSubscriber<T> sub2 = newSubscriber(); 747 748 // sub1 now has 18 pending 749 // sub2 has 0 pending 750 751 if (doesCoordinatedEmission()) { 752 sub2.expectNone(); // since sub2 hasn't requested anything yet 753 754 sub2.request(1); 755 756 final T z = sendNextTFromUpstream(); 757 expectNextElement(sub1, z); 758 expectNextElement(sub2, z); 759 } else { 760 final T z = sendNextTFromUpstream(); 761 expectNextElement(sub1, z, 762 "If the Processor coordinates requests/emissions when having multiple Subscribers" 763 + " at once, please override doesCoordinatedEmission() to return true in this " 764 + "IdentityProcessorVerification to allow this test to pass."); 765 sub2.expectNone(); // since sub2 hasn't requested anything yet 766 767 sub2.request(1); 768 expectNextElement(sub2, z); 769 } 770 if (totalRequests == 3) { 771 expectRequest(); 772 } 773 774 // to avoid error messages during test harness shutdown 775 sendCompletion(); 776 sub1.expectCompletion(env.defaultTimeoutMillis()); 777 sub2.expectCompletion(env.defaultTimeoutMillis()); 778 779 env.verifyNoAsyncErrorsNoDelay(); 780 }}; 781 } 782 }); 783 } 784 785 /////////////////////// TEST INFRASTRUCTURE ////////////////////// 786 787 public void notVerified() { 788 publisherVerification.notVerified(); 789 } 790 791 public void notVerified(String message) { 792 publisherVerification.notVerified(message); 793 } 794 795 /** 796 * Test for feature that REQUIRES multiple subscribers to be supported by Publisher. 797 */ 798 public void optionalMultipleSubscribersTest(long requiredSubscribersSupport, Function<Long, TestSetup> body) throws Throwable { 799 if (requiredSubscribersSupport > maxSupportedSubscribers()) 800 notVerified(String.format("The Publisher under test only supports %d subscribers, while this test requires at least %d to run.", 801 maxSupportedSubscribers(), requiredSubscribersSupport)); 802 else body.apply(requiredSubscribersSupport); 803 } 804 805 public abstract class TestSetup extends ManualPublisher<T> { 806 final private ManualSubscriber<T> tees; // gives us access to an infinite stream of T values 807 private Set<T> seenTees = new HashSet<T>(); 808 809 final Processor<T, T> processor; 810 811 public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedException { 812 super(env); 813 tees = env.newManualSubscriber(createHelperPublisher(Long.MAX_VALUE)); 814 processor = createIdentityProcessor(testBufferSize); 815 subscribe(processor); 816 } 817 818 public ManualSubscriber<T> newSubscriber() throws InterruptedException { 819 return env.newManualSubscriber(processor); 820 } 821 822 public T nextT() throws InterruptedException { 823 final T t = tees.requestNextElement(); 824 if (seenTees.contains(t)) { 825 env.flop(String.format("Helper publisher illegally produced the same element %s twice", t)); 826 } 827 seenTees.add(t); 828 return t; 829 } 830 831 public void expectNextElement(ManualSubscriber<T> sub, T expected) throws InterruptedException { 832 final T elem = sub.nextElement(String.format("timeout while awaiting %s", expected)); 833 if (!elem.equals(expected)) { 834 env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected)); 835 } 836 } 837 838 public void expectNextElement(ManualSubscriber<T> sub, T expected, String errorMessageAddendum) throws InterruptedException { 839 final T elem = sub.nextElement(String.format("timeout while awaiting %s. %s", expected, errorMessageAddendum)); 840 if (!elem.equals(expected)) { 841 env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected)); 842 } 843 } 844 845 public T sendNextTFromUpstream() throws InterruptedException { 846 final T x = nextT(); 847 sendNext(x); 848 return x; 849 } 850 } 851 852 public class ManualSubscriberWithErrorCollection<A> extends ManualSubscriberWithSubscriptionSupport<A> { 853 Promise<Throwable> error; 854 855 public ManualSubscriberWithErrorCollection(TestEnvironment env) { 856 super(env); 857 error = new Promise<Throwable>(env); 858 } 859 860 @Override 861 public void onError(Throwable cause) { 862 error.complete(cause); 863 } 864 865 public void expectError(Throwable cause) throws InterruptedException { 866 expectError(cause, env.defaultTimeoutMillis()); 867 } 868 869 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 870 public void expectError(Throwable cause, long timeoutMillis) throws InterruptedException { 871 error.expectCompletion(timeoutMillis, "Did not receive expected error on downstream"); 872 if (!cause.equals(error.value())) { 873 env.flop(String.format("Expected error %s but got %s", cause, error.value())); 874 } 875 } 876 } 877}