001/*************************************************** 002 * Licensed under MIT No Attribution (SPDX: MIT-0) * 003 ***************************************************/ 004 005package org.reactivestreams.tck; 006 007import org.reactivestreams.Publisher; 008import org.reactivestreams.Subscriber; 009import org.reactivestreams.Subscription; 010import org.reactivestreams.tck.TestEnvironment.ManualPublisher; 011import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; 012import org.reactivestreams.tck.flow.support.Optional; 013import org.reactivestreams.tck.flow.support.SubscriberBlackboxVerificationRules; 014import org.reactivestreams.tck.flow.support.TestException; 015import org.testng.SkipException; 016import org.testng.annotations.AfterClass; 017import org.testng.annotations.BeforeClass; 018import org.testng.annotations.BeforeMethod; 019import org.testng.annotations.Test; 020 021import java.util.concurrent.ExecutorService; 022import java.util.concurrent.Executors; 023 024import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy; 025import static org.testng.Assert.assertTrue; 026 027/** 028 * Provides tests for verifying {@link org.reactivestreams.Subscriber} and {@link org.reactivestreams.Subscription} 029 * specification rules, without any modifications to the tested implementation (also known as "Black Box" testing). 030 * 031 * This verification is NOT able to check many of the rules of the spec, and if you want more 032 * verification of your implementation you'll have to implement {@code org.reactivestreams.tck.SubscriberWhiteboxVerification} 033 * instead. 034 * 035 * @see org.reactivestreams.Subscriber 036 * @see org.reactivestreams.Subscription 037 */ 038public abstract class SubscriberBlackboxVerification<T> extends WithHelperPublisher<T> 039 implements SubscriberBlackboxVerificationRules { 040 041 protected final TestEnvironment env; 042 043 protected SubscriberBlackboxVerification(TestEnvironment env) { 044 this.env = env; 045 } 046 047 // USER API 048 049 /** 050 * This is the main method you must implement in your test incarnation. 051 * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic. 052 */ 053 public abstract Subscriber<T> createSubscriber(); 054 055 /** 056 * Override this method if the Subscriber implementation you are verifying 057 * needs an external signal before it signals demand to its Publisher. 058 * 059 * By default this method does nothing. 060 */ 061 public void triggerRequest(final Subscriber<? super T> subscriber) { 062 // this method is intentionally left blank 063 } 064 065 // ENV SETUP 066 067 /** 068 * Executor service used by the default provided asynchronous Publisher. 069 * @see #createHelperPublisher(long) 070 */ 071 private ExecutorService publisherExecutor; 072 @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); } 073 @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); } 074 @Override public ExecutorService publisherExecutorService() { return publisherExecutor; } 075 076 ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// 077 078 @BeforeMethod 079 public void setUp() throws Exception { 080 env.clearAsyncErrors(); 081 } 082 083 ////////////////////// SPEC RULE VERIFICATION /////////////////////////////// 084 085 @Override @Test 086 public void required_spec201_blackbox_mustSignalDemandViaSubscriptionRequest() throws Throwable { 087 blackboxSubscriberTest(new BlackboxTestStageTestRun() { 088 @Override 089 public void run(BlackboxTestStage stage) throws InterruptedException { 090 triggerRequest(stage.subProxy().sub()); 091 final long requested = stage.expectRequest();// assuming subscriber wants to consume elements... 092 final long signalsToEmit = Math.min(requested, 512); // protecting against Subscriber which sends ridiculous large demand 093 094 // should cope with up to requested number of elements 095 for (int i = 0; i < signalsToEmit && sampleIsCancelled(stage, i, 10); i++) 096 stage.signalNext(); 097 098 // we complete after `signalsToEmit` (which can be less than `requested`), 099 // which is legal under https://github.com/reactive-streams/reactive-streams-jvm#1.2 100 stage.sendCompletion(); 101 } 102 103 /** 104 * In order to allow some "skid" and not check state on each iteration, 105 * only check {@code stage.isCancelled} every {@code checkInterval}'th iteration. 106 */ 107 private boolean sampleIsCancelled(BlackboxTestStage stage, int i, int checkInterval) throws InterruptedException { 108 if (i % checkInterval == 0) return stage.isCancelled(); 109 else return false; 110 } 111 }); 112 } 113 114 @Override @Test 115 public void untested_spec202_blackbox_shouldAsynchronouslyDispatch() throws Exception { 116 notVerified(); // cannot be meaningfully tested, or can it? 117 } 118 119 @Override @Test 120 public void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable { 121 blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { 122 @Override 123 public void run(BlackboxTestStage stage) throws Throwable { 124 final String onCompleteMethod = "required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete_call"; 125 126 final Subscription subs = new Subscription() { 127 @Override 128 public void request(long n) { 129 final Optional<StackTraceElement> onCompleteStackTraceElement = env.findCallerMethodInStackTrace(onCompleteMethod); 130 if (onCompleteStackTraceElement.isDefined()) { 131 final StackTraceElement stackElem = onCompleteStackTraceElement.get(); 132 env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)", 133 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 134 } 135 } 136 137 @Override 138 public void cancel() { 139 final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace(onCompleteMethod); 140 if (onCompleteStackElement.isDefined()) { 141 final StackTraceElement stackElem = onCompleteStackElement.get(); 142 env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)", 143 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 144 } 145 } 146 }; 147 148 final Subscriber<T> sub = createSubscriber(); 149 sub.onSubscribe(subs); 150 required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete_call(sub); 151 152 env.verifyNoAsyncErrorsNoDelay(); 153 } 154 155 /** Makes sure the onComplete is initiated with a recognizable stacktrace element on the current thread. */ 156 void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete_call(Subscriber<?> sub) { 157 sub.onComplete(); 158 } 159 }); 160 } 161 162 @Override @Test 163 public void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable { 164 blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { 165 @Override 166 public void run(BlackboxTestStage stage) throws Throwable { 167 final String onErrorMethod = "required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnError_call"; 168 169 final Subscription subs = new Subscription() { 170 @Override 171 public void request(long n) { 172 final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace(onErrorMethod); 173 if (onCompleteStackElement.isDefined()) { 174 final StackTraceElement stackElem = onCompleteStackElement.get(); 175 env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)", 176 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 177 } 178 } 179 180 @Override 181 public void cancel() { 182 final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace(onErrorMethod); 183 if (onCompleteStackElement.isDefined()) { 184 final StackTraceElement stackElem = onCompleteStackElement.get(); 185 env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)", 186 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 187 } 188 } 189 }; 190 191 final Subscriber<T> sub = createSubscriber(); 192 sub.onSubscribe(subs); 193 required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnError_call(sub); 194 195 env.verifyNoAsyncErrorsNoDelay(); 196 } 197 198 /** Makes sure the onError is initiated with a recognizable stacktrace element on the current thread. */ 199 void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnError_call(Subscriber<?> sub) { 200 sub.onError(new TestException()); 201 } 202 }); 203 } 204 205 @Override @Test 206 public void untested_spec204_blackbox_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception { 207 notVerified(); // cannot be meaningfully tested, or can it? 208 } 209 210 @Override @Test 211 public void required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception { 212 new BlackboxTestStage(env) {{ 213 // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail 214 final TestEnvironment.Latch secondSubscriptionCancelled = new TestEnvironment.Latch(env); 215 sub().onSubscribe( 216 new Subscription() { 217 @Override 218 public void request(long elements) { 219 env.flop(String.format("Subscriber %s illegally called `subscription.request(%s)`!", sub(), elements)); 220 } 221 222 @Override 223 public void cancel() { 224 secondSubscriptionCancelled.close(); 225 } 226 227 @Override 228 public String toString() { 229 return "SecondSubscription(should get cancelled)"; 230 } 231 }); 232 233 secondSubscriptionCancelled.expectClose("Expected SecondSubscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called."); 234 env.verifyNoAsyncErrorsNoDelay(); 235 sendCompletion(); // we're done, complete the subscriber under test 236 }}; 237 } 238 239 @Override @Test 240 public void untested_spec206_blackbox_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception { 241 notVerified(); // cannot be meaningfully tested, or can it? 242 } 243 244 @Override @Test 245 public void untested_spec207_blackbox_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception { 246 notVerified(); // cannot be meaningfully tested, or can it? 247 // the same thread part of the clause can be verified but that is not very useful, or is it? 248 } 249 250 @Override @Test 251 public void untested_spec208_blackbox_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable { 252 notVerified(); // cannot be meaningfully tested as black box, or can it? 253 } 254 255 @Override @Test 256 public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable { 257 blackboxSubscriberTest(new BlackboxTestStageTestRun() { 258 @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 259 public void run(BlackboxTestStage stage) throws Throwable { 260 triggerRequest(stage.subProxy().sub()); 261 final long notUsed = stage.expectRequest(); // received request signal 262 stage.sub().onComplete(); 263 stage.subProxy().expectCompletion(); 264 } 265 }); 266 } 267 268 @Override @Test 269 public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable { 270 blackboxSubscriberTest(new BlackboxTestStageTestRun() { 271 @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 272 public void run(BlackboxTestStage stage) throws Throwable { 273 final Subscriber<? super T> sub = stage.sub(); 274 sub.onComplete(); 275 stage.subProxy().expectCompletion(); 276 } 277 }); 278 } 279 280 @Override @Test 281 public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable { 282 blackboxSubscriberTest(new BlackboxTestStageTestRun() { 283 @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 284 public void run(BlackboxTestStage stage) throws Throwable { 285 triggerRequest(stage.subProxy().sub()); 286 final long notUsed = stage.expectRequest(); // received request signal 287 stage.sub().onError(new TestException()); // in response to that, we fail 288 stage.subProxy().expectError(Throwable.class); 289 } 290 }); 291 } 292 293 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10 294 @Override @Test 295 public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable { 296 blackboxSubscriberTest(new BlackboxTestStageTestRun() { 297 @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 298 public void run(BlackboxTestStage stage) throws Throwable { 299 300 stage.sub().onError(new TestException()); 301 stage.subProxy().expectError(Throwable.class); 302 } 303 }); 304 } 305 306 @Override @Test 307 public void untested_spec211_blackbox_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception { 308 notVerified(); // cannot be meaningfully tested, or can it? 309 } 310 311 @Override @Test 312 public void untested_spec212_blackbox_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality() throws Throwable { 313 notVerified(); // cannot be meaningfully tested as black box, or can it? 314 } 315 316 @Override @Test 317 public void untested_spec213_blackbox_failingOnSignalInvocation() throws Exception { 318 notVerified(); // cannot be meaningfully tested, or can it? 319 } 320 321 @Override @Test 322 public void required_spec213_blackbox_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 323 blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { 324 @Override 325 public void run(BlackboxTestStage stage) throws Throwable { 326 327 { 328 final Subscriber<T> sub = createSubscriber(); 329 boolean gotNPE = false; 330 try { 331 sub.onSubscribe(null); 332 } catch(final NullPointerException expected) { 333 gotNPE = true; 334 } 335 assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException"); 336 } 337 338 env.verifyNoAsyncErrorsNoDelay(); 339 } 340 }); 341 } 342 343 @Override @Test 344 public void required_spec213_blackbox_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 345 blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { 346 @Override 347 public void run(BlackboxTestStage stage) throws Throwable { 348 final Subscription subscription = new Subscription() { 349 @Override public void request(final long elements) {} 350 @Override public void cancel() {} 351 }; 352 353 { 354 final Subscriber<T> sub = createSubscriber(); 355 boolean gotNPE = false; 356 sub.onSubscribe(subscription); 357 try { 358 sub.onNext(null); 359 } catch(final NullPointerException expected) { 360 gotNPE = true; 361 } 362 assertTrue(gotNPE, "onNext(null) did not throw NullPointerException"); 363 } 364 365 env.verifyNoAsyncErrorsNoDelay(); 366 } 367 }); 368 } 369 370 @Override @Test 371 public void required_spec213_blackbox_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 372 blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { 373 @Override 374 public void run(BlackboxTestStage stage) throws Throwable { 375 final Subscription subscription = new Subscription() { 376 @Override public void request(final long elements) {} 377 @Override public void cancel() {} 378 }; 379 380 { 381 final Subscriber<T> sub = createSubscriber(); 382 boolean gotNPE = false; 383 sub.onSubscribe(subscription); 384 try { 385 sub.onError(null); 386 } catch(final NullPointerException expected) { 387 gotNPE = true; 388 } 389 assertTrue(gotNPE, "onError(null) did not throw NullPointerException"); 390 } 391 392 env.verifyNoAsyncErrorsNoDelay(); 393 } 394 }); 395 } 396 397 ////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION ////////////////// 398 399 @Override @Test 400 public void untested_spec301_blackbox_mustNotBeCalledOutsideSubscriberContext() throws Exception { 401 notVerified(); // cannot be meaningfully tested, or can it? 402 } 403 404 @Override @Test 405 public void untested_spec308_blackbox_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable { 406 notVerified(); // cannot be meaningfully tested as black box, or can it? 407 } 408 409 @Override @Test 410 public void untested_spec310_blackbox_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception { 411 notVerified(); // cannot be meaningfully tested, or can it? 412 } 413 414 @Override @Test 415 public void untested_spec311_blackbox_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception { 416 notVerified(); // cannot be meaningfully tested, or can it? 417 } 418 419 @Override @Test 420 public void untested_spec314_blackbox_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception { 421 notVerified(); // cannot be meaningfully tested, or can it? 422 } 423 424 @Override @Test 425 public void untested_spec315_blackbox_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception { 426 notVerified(); // cannot be meaningfully tested, or can it? 427 } 428 429 @Override @Test 430 public void untested_spec316_blackbox_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception { 431 notVerified(); // cannot be meaningfully tested, or can it? 432 } 433 434 /////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////// 435 436 /////////////////////// TEST INFRASTRUCTURE ///////////////////////////////// 437 438 abstract class BlackboxTestStageTestRun { 439 public abstract void run(BlackboxTestStage stage) throws Throwable; 440 } 441 442 public void blackboxSubscriberTest(BlackboxTestStageTestRun body) throws Throwable { 443 BlackboxTestStage stage = new BlackboxTestStage(env, true); 444 body.run(stage); 445 } 446 447 public void blackboxSubscriberWithoutSetupTest(BlackboxTestStageTestRun body) throws Throwable { 448 BlackboxTestStage stage = new BlackboxTestStage(env, false); 449 body.run(stage); 450 } 451 452 public class BlackboxTestStage extends ManualPublisher<T> { 453 public Publisher<T> pub; 454 public ManualSubscriber<T> tees; // gives us access to an infinite stream of T values 455 456 public T lastT = null; 457 private Optional<BlackboxSubscriberProxy<T>> subProxy = Optional.empty(); 458 459 public BlackboxTestStage(TestEnvironment env) throws InterruptedException { 460 this(env, true); 461 } 462 463 public BlackboxTestStage(TestEnvironment env, boolean runDefaultInit) throws InterruptedException { 464 super(env); 465 if (runDefaultInit) { 466 pub = this.createHelperPublisher(Long.MAX_VALUE); 467 tees = env.newManualSubscriber(pub); 468 Subscriber<T> sub = createSubscriber(); 469 subProxy = Optional.of(createBlackboxSubscriberProxy(env, sub)); 470 subscribe(subProxy.get()); 471 } 472 } 473 474 public Subscriber<? super T> sub() { 475 return subscriber.value(); 476 } 477 478 /** 479 * Proxy for the {@link #sub()} {@code Subscriber}, providing certain assertions on methods being called on the Subscriber. 480 */ 481 public BlackboxSubscriberProxy<T> subProxy() { 482 return subProxy.get(); 483 } 484 485 public Publisher<T> createHelperPublisher(long elements) { 486 return SubscriberBlackboxVerification.this.createHelperPublisher(elements); 487 } 488 489 public BlackboxSubscriberProxy<T> createBlackboxSubscriberProxy(TestEnvironment env, Subscriber<T> sub) { 490 return new BlackboxSubscriberProxy<T>(env, sub); 491 } 492 493 public T signalNext() throws InterruptedException { 494 T element = nextT(); 495 sendNext(element); 496 return element; 497 } 498 499 public T nextT() throws InterruptedException { 500 lastT = tees.requestNextElement(); 501 return lastT; 502 } 503 504 } 505 506 public void notVerified() { 507 throw new SkipException("Not verified using this TCK."); 508 } 509}