001/***************************************************
002 * Licensed under MIT No Attribution (SPDX: MIT-0) *
003 ***************************************************/
004
005package org.reactivestreams.tck;
006
007import org.reactivestreams.Publisher;
008import org.reactivestreams.Subscriber;
009import org.reactivestreams.Subscription;
010import org.reactivestreams.tck.TestEnvironment.BlackholeSubscriberWithSubscriptionSupport;
011import org.reactivestreams.tck.TestEnvironment.Latch;
012import org.reactivestreams.tck.TestEnvironment.ManualSubscriber;
013import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport;
014import org.reactivestreams.tck.flow.support.Function;
015import org.reactivestreams.tck.flow.support.Optional;
016import org.reactivestreams.tck.flow.support.PublisherVerificationRules;
017import org.testng.SkipException;
018import org.testng.annotations.BeforeMethod;
019import org.testng.annotations.Test;
020
021import java.lang.Override;
022import java.lang.ref.ReferenceQueue;
023import java.lang.ref.WeakReference;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collections;
027import java.util.List;
028import java.util.Random;
029import java.util.concurrent.atomic.AtomicInteger;
030import java.util.concurrent.atomic.AtomicReference;
031
032import static org.testng.Assert.assertEquals;
033import static org.testng.Assert.assertTrue;
034
035/**
036 * Provides tests for verifying {@code Publisher} specification rules.
037 *
038 * @see org.reactivestreams.Publisher
039 */
040public abstract class PublisherVerification<T> implements PublisherVerificationRules {
041
042  private static final String PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV = "PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS";
043  private static final long DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS = 300L;
044
045  private final TestEnvironment env;
046
047  /**
048   * The amount of time after which a cancelled Subscriber reference should be dropped.
049   * See Rule 3.13 for details.
050   */
051  private final long publisherReferenceGCTimeoutMillis;
052
053  /**
054   * Constructs a new verification class using the given env and configuration.
055   *
056   * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher.
057   */
058  public PublisherVerification(TestEnvironment env, long publisherReferenceGCTimeoutMillis) {
059    this.env = env;
060    this.publisherReferenceGCTimeoutMillis = publisherReferenceGCTimeoutMillis;
061  }
062
063  /**
064   * Constructs a new verification class using the given env and configuration.
065   *
066   * The value for {@code publisherReferenceGCTimeoutMillis} will be obtained by using {@link PublisherVerification#envPublisherReferenceGCTimeoutMillis()}.
067   */
068  public PublisherVerification(TestEnvironment env) {
069    this.env = env;
070    this.publisherReferenceGCTimeoutMillis = envPublisherReferenceGCTimeoutMillis();
071  }
072
073  /**
074   * Tries to parse the env variable {@code PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS} as long and returns the value if present,
075   * OR its default value ({@link PublisherVerification#DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS}).
076   *
077   * This value is used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher.
078   *
079   * @throws java.lang.IllegalArgumentException when unable to parse the env variable
080   */
081  public static long envPublisherReferenceGCTimeoutMillis() {
082    final String envMillis = System.getenv(PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV);
083    if (envMillis == null) return DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS;
084    else try {
085      return Long.parseLong(envMillis);
086    } catch (NumberFormatException ex) {
087      throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV, envMillis), ex);
088    }
089  }
090
091  /**
092   * This is the main method you must implement in your test incarnation.
093   * It must create a Publisher for a stream with exactly the given number of elements.
094   * If `elements` is `Long.MAX_VALUE` the produced stream must be infinite.
095   */
096  public abstract Publisher<T> createPublisher(long elements);
097
098  /**
099   * By implementing this method, additional TCK tests concerning a "failed" publishers will be run.
100   *
101   * The expected behaviour of the {@link Publisher} returned by this method is hand out a subscription,
102   * followed by signalling {@code onError} on it, as specified by Rule 1.9.
103   *
104   * If you ignore these additional tests, return {@code null} from this method.
105   */
106  public abstract Publisher<T> createFailedPublisher();
107
108
109  /**
110   * Override and return lower value if your Publisher is only able to produce a known number of elements.
111   * For example, if it is designed to return at-most-one element, return {@code 1} from this method.
112   *
113   * Defaults to {@code Long.MAX_VALUE - 1}, meaning that the Publisher can be produce a huge but NOT an unbounded number of elements.
114   *
115   * To mark your Publisher will *never* signal an {@code onComplete} override this method and return {@code Long.MAX_VALUE},
116   * which will result in *skipping all tests which require an onComplete to be triggered* (!).
117   */
118  public long maxElementsFromPublisher() {
119    return Long.MAX_VALUE - 1;
120  }
121
122  /**
123   * Override and return {@code true} in order to skip executing tests marked as {@code Stochastic}.
124   * Stochastic in this case means that the Rule is impossible or infeasible to deterministically verify—
125   * usually this means that this test case can yield false positives ("be green") even if for some case,
126   * the given implementation may violate the tested behaviour.
127   */
128  public boolean skipStochasticTests() {
129    return false;
130  }
131
132  /**
133   * In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a
134   * {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of
135   * recursive calls to exceed the number returned by this method.
136   *
137   * @see <a href="https://github.com/reactive-streams/reactive-streams-jvm#3.3">reactive streams spec, rule 3.3</a>
138   * @see PublisherVerification#required_spec303_mustNotAllowUnboundedRecursion()
139   */
140  public long boundedDepthOfOnNextAndRequestRecursion() {
141    return 1;
142  }
143
144  ////////////////////// TEST ENV CLEANUP /////////////////////////////////////
145
146  @BeforeMethod
147  public void setUp() throws Exception {
148    env.clearAsyncErrors();
149  }
150
151  ////////////////////// TEST SETUP VERIFICATION //////////////////////////////
152
153  @Override @Test
154  public void required_createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable {
155    activePublisherTest(1, true, new PublisherTestRun<T>() {
156      @Override
157      public void run(Publisher<T> pub) throws InterruptedException {
158        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
159        assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced no elements", pub));
160        sub.requestEndOfStream();
161      }
162
163      Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, ManualSubscriber<T> sub) throws InterruptedException {
164        return sub.requestNextElementOrEndOfStream(String.format("Timeout while waiting for next element from Publisher %s", pub));
165      }
166
167    });
168  }
169
170  @Override @Test
171  public void required_createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable {
172    activePublisherTest(3, true, new PublisherTestRun<T>() {
173      @Override
174      public void run(Publisher<T> pub) throws InterruptedException {
175        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
176        assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced no elements", pub));
177        assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced only 1 element", pub));
178        assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced only 2 elements", pub));
179        sub.requestEndOfStream();
180      }
181
182      Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, ManualSubscriber<T> sub) throws InterruptedException {
183        return sub.requestNextElementOrEndOfStream(String.format("Timeout while waiting for next element from Publisher %s", pub));
184      }
185
186    });
187  }
188
189  @Override @Test
190  public void required_validate_maxElementsFromPublisher() throws Exception {
191    assertTrue(maxElementsFromPublisher() >= 0, "maxElementsFromPublisher MUST return a number >= 0");
192  }
193
194  @Override @Test
195  public void required_validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception {
196    assertTrue(boundedDepthOfOnNextAndRequestRecursion() >= 1, "boundedDepthOfOnNextAndRequestRecursion must return a number >= 1");
197  }
198
199
200  ////////////////////// SPEC RULE VERIFICATION ///////////////////////////////
201
202  @Override @Test
203  public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable {
204    activePublisherTest(5, false, new PublisherTestRun<T>() {
205      @Override
206      public void run(Publisher<T> pub) throws InterruptedException {
207
208        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
209        try {
210            sub.expectNone(String.format("Publisher %s produced value before the first `request`: ", pub));
211            sub.request(1);
212            sub.nextElement(String.format("Publisher %s produced no element after first `request`", pub));
213            sub.expectNone(String.format("Publisher %s produced unrequested: ", pub));
214    
215            sub.request(1);
216            sub.request(2);
217            sub.nextElements(3, env.defaultTimeoutMillis(), String.format("Publisher %s produced less than 3 elements after two respective `request` calls", pub));
218
219            sub.expectNone(String.format("Publisher %s produced unrequested ", pub));
220        } finally {
221            sub.cancel();
222        }
223      }
224    });
225  }
226
227  @Override @Test
228  public void required_spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable {
229    final int elements = 3;
230    final int requested = 10;
231
232    activePublisherTest(elements, true, new PublisherTestRun<T>() {
233      @Override
234      public void run(Publisher<T> pub) throws Throwable {
235        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
236        sub.request(requested);
237        sub.nextElements(elements);
238        sub.expectCompletion();
239      }
240    });
241  }
242
243  @Override @Test
244  public void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable {
245    final int iterations = 100;
246    final int elements = 10;
247
248    stochasticTest(iterations, new Function<Integer, Void>() {
249      @Override
250      public Void apply(final Integer runNumber) throws Throwable {
251        activePublisherTest(elements, true, new PublisherTestRun<T>() {
252          @Override
253          public void run(Publisher<T> pub) throws Throwable {
254            final Latch completionLatch = new Latch(env);
255
256            final AtomicInteger gotElements = new AtomicInteger(0);
257            pub.subscribe(new Subscriber<T>() {
258              private Subscription subs;
259
260              private ConcurrentAccessBarrier concurrentAccessBarrier = new ConcurrentAccessBarrier();
261
262              /**
263               * Concept wise very similar to a {@link org.reactivestreams.tck.TestEnvironment.Latch}, serves to protect
264               * a critical section from concurrent access, with the added benefit of Thread tracking and same-thread-access awareness.
265               *
266               * Since a <i>Synchronous</i> Publisher may choose to synchronously (using the same {@link Thread}) call
267               * {@code onNext} directly from either {@code subscribe} or {@code request} a plain Latch is not enough
268               * to verify concurrent access safety - one needs to track if the caller is not still using the calling thread
269               * to enter subsequent critical sections ("nesting" them effectively).
270               */
271              final class ConcurrentAccessBarrier {
272                private AtomicReference<Thread> currentlySignallingThread = new AtomicReference<Thread>(null);
273                private volatile String previousSignal = null;
274
275                public void enterSignal(String signalName) {
276                  if((!currentlySignallingThread.compareAndSet(null, Thread.currentThread())) && !isSynchronousSignal()) {
277                    env.flop(String.format(
278                      "Illegal concurrent access detected (entering critical section)! " +
279                        "%s emited %s signal, before %s finished its %s signal.",
280                        Thread.currentThread(), signalName, currentlySignallingThread.get(), previousSignal));
281                  }
282                  this.previousSignal = signalName;
283                }
284
285                public void leaveSignal(String signalName) {
286                  currentlySignallingThread.set(null);
287                  this.previousSignal = signalName;
288                }
289
290                private boolean isSynchronousSignal() {
291                  return (previousSignal != null) && Thread.currentThread().equals(currentlySignallingThread.get());
292                }
293
294              }
295
296              @Override
297              public void onSubscribe(Subscription s) {
298                final String signal = "onSubscribe()";
299                concurrentAccessBarrier.enterSignal(signal);
300
301                subs = s;
302                subs.request(1);
303
304                concurrentAccessBarrier.leaveSignal(signal);
305              }
306
307              @Override
308              public void onNext(T ignore) {
309                final String signal = String.format("onNext(%s)", ignore);
310                concurrentAccessBarrier.enterSignal(signal);
311
312                if (gotElements.incrementAndGet() <= elements) // requesting one more than we know are in the stream (some Publishers need this)
313                  subs.request(1);
314
315                concurrentAccessBarrier.leaveSignal(signal);
316              }
317
318              @Override
319              public void onError(Throwable t) {
320                final String signal = String.format("onError(%s)", t.getMessage());
321                concurrentAccessBarrier.enterSignal(signal);
322
323                // ignore value
324
325                concurrentAccessBarrier.leaveSignal(signal);
326              }
327
328              @Override
329              public void onComplete() {
330                final String signal = "onComplete()";
331                concurrentAccessBarrier.enterSignal(signal);
332
333                // entering for completeness
334
335                concurrentAccessBarrier.leaveSignal(signal);
336                completionLatch.close();
337              }
338            });
339
340            completionLatch.expectClose(
341              elements * env.defaultTimeoutMillis(),
342              String.format("Failed in iteration %d of %d. Expected completion signal after signalling %d elements (signalled %d), yet did not receive it",
343                            runNumber, iterations, elements, gotElements.get()));
344          }
345        });
346        return null;
347      }
348    });
349  }
350
351  @Override @Test
352  public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable {
353    try {
354      whenHasErrorPublisherTest(new PublisherTestRun<T>() {
355        @Override
356        public void run(final Publisher<T> pub) throws InterruptedException {
357          final Latch onErrorlatch = new Latch(env);
358          final Latch onSubscribeLatch = new Latch(env);
359          pub.subscribe(new TestEnvironment.TestSubscriber<T>(env) {
360            @Override
361            public void onSubscribe(Subscription subs) {
362              onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
363              onSubscribeLatch.close();
364            }
365            @Override
366            public void onError(Throwable cause) {
367              onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
368              onErrorlatch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub));
369              onErrorlatch.close();
370            }
371          });
372
373          onSubscribeLatch.expectClose("Should have received onSubscribe");
374          onErrorlatch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub));
375
376          env.verifyNoAsyncErrors();
377          }
378      });
379    } catch (SkipException se) {
380      throw se;
381    } catch (Throwable ex) {
382      // we also want to catch AssertionErrors and anything the publisher may have thrown inside subscribe
383      // which was wrong of him - he should have signalled on error using onError
384      throw new RuntimeException(String.format("Publisher threw exception (%s) instead of signalling error via onError!", ex.getMessage()), ex);
385    }
386  }
387
388  @Override @Test
389  public void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable {
390    activePublisherTest(3, true, new PublisherTestRun<T>() {
391      @Override
392      public void run(Publisher<T> pub) throws Throwable {
393        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
394        sub.requestNextElement();
395        sub.requestNextElement();
396        sub.requestNextElement();
397        sub.requestEndOfStream();
398        sub.expectNone();
399      }
400    });
401  }
402
403  @Override @Test
404  public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable {
405    optionalActivePublisherTest(0, true, new PublisherTestRun<T>() {
406      @Override
407      public void run(Publisher<T> pub) throws Throwable {
408        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
409        sub.request(1);
410        sub.expectCompletion();
411        sub.expectNone();
412      }
413    });
414  }
415
416  @Override @Test
417  public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable {
418    notVerified(); // not really testable without more control over the Publisher
419  }
420
421  @Override @Test
422  public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable {
423    activePublisherTest(1, true, new PublisherTestRun<T>() {
424      @Override
425      public void run(Publisher<T> pub) throws Throwable {
426        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
427        sub.request(10);
428        sub.nextElement();
429        sub.expectCompletion();
430
431        sub.request(10);
432        sub.expectNone();
433      }
434    });
435  }
436
437  @Override @Test
438  public void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable {
439    notVerified(); // can we meaningfully test this, without more control over the publisher?
440  }
441
442  @Override @Test
443  public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable {
444    notVerified(); // can we meaningfully test this?
445  }
446
447  @Override @Test
448  public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable {
449    notVerified(); // can we meaningfully test this?
450  }
451
452  @Override @Test
453  public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable {
454    activePublisherTest(0, false, new PublisherTestRun<T>() {
455      @Override
456      public void run(Publisher<T> pub) throws Throwable {
457        try {
458            pub.subscribe(null);
459            env.flop("Publisher did not throw a NullPointerException when given a null Subscribe in subscribe");
460        } catch (NullPointerException ignored) {
461          // valid behaviour
462        }
463        env.verifyNoAsyncErrorsNoDelay();
464      }
465    });
466  }
467
468  @Override @Test
469  public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable {
470    activePublisherTest(0, false, new PublisherTestRun<T>() {
471      @Override
472      public void run(Publisher<T> pub) throws Throwable {
473        final Latch onSubscribeLatch = new Latch(env);
474        final AtomicReference<Subscription> cancel = new AtomicReference<Subscription>();
475        try {
476          pub.subscribe(new Subscriber<T>() {
477            @Override
478            public void onError(Throwable cause) {
479              onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
480            }
481    
482            @Override
483            public void onSubscribe(Subscription subs) {
484              cancel.set(subs);
485              onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
486              onSubscribeLatch.close();
487            }
488    
489            @Override
490            public void onNext(T elem) {
491              onSubscribeLatch.assertClosed("onSubscribe should be called prior to onNext always");
492            }
493    
494            @Override
495            public void onComplete() {
496              onSubscribeLatch.assertClosed("onSubscribe should be called prior to onComplete always");
497            }
498          });
499          onSubscribeLatch.expectClose("Should have received onSubscribe");
500          env.verifyNoAsyncErrorsNoDelay();
501        } finally {
502          Subscription s = cancel.getAndSet(null);
503          if (s != null) {
504            s.cancel();
505          }
506        }
507      }
508    });
509  }
510
511  @Override @Test
512  public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable {
513    whenHasErrorPublisherTest(new PublisherTestRun<T>() {
514      @Override
515      public void run(Publisher<T> pub) throws Throwable {
516        final Latch onErrorLatch = new Latch(env);
517        final Latch onSubscribeLatch = new Latch(env);
518        ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
519          @Override
520          public void onError(Throwable cause) {
521            onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
522            onErrorLatch.assertOpen("Only one onError call expected");
523            onErrorLatch.close();
524          }
525
526          @Override
527          public void onSubscribe(Subscription subs) {
528            onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
529            onSubscribeLatch.close();
530          }
531        };
532        pub.subscribe(sub);
533        onSubscribeLatch.expectClose("Should have received onSubscribe");
534        onErrorLatch.expectClose("Should have received onError");
535
536        env.verifyNoAsyncErrorsNoDelay();
537      }
538    });
539  }
540
541  @Override @Test
542  public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
543    notVerified(); // can we meaningfully test this?
544  }
545
546  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.11
547  @Override @Test
548  public void optional_spec111_maySupportMultiSubscribe() throws Throwable {
549    optionalActivePublisherTest(1, false, new PublisherTestRun<T>() {
550      @Override
551      public void run(Publisher<T> pub) throws Throwable {
552        ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
553        ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
554
555        try {
556          env.verifyNoAsyncErrors();
557        } finally {
558          try {
559            sub1.cancel();
560          } finally {
561            sub2.cancel();
562          }
563        }
564      }
565    });
566  }
567
568  @Override @Test
569  public void optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals() throws Throwable {
570    optionalActivePublisherTest(1, false, new PublisherTestRun<T>() {
571      @Override
572      public void run(Publisher<T> pub) throws Throwable {
573        ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
574        ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
575        // Since we're testing the case when the Publisher DOES support the optional multi-subscribers scenario,
576        // and decides if it handles them uni-cast or multi-cast, we don't know which subscriber will receive an
577        // onNext (and optional onComplete) signal(s) and which just onComplete signal.
578        // Plus, even if subscription assumed to be unicast, it's implementation choice, which one will be signalled
579        // with onNext.
580        sub1.requestNextElementOrEndOfStream();
581        sub2.requestNextElementOrEndOfStream();
582        try {
583            env.verifyNoAsyncErrors();
584        } finally {
585            try {
586                sub1.cancel();
587            } finally {
588                sub2.cancel();
589            }
590        }
591      }
592    });
593  }
594
595  @Override @Test
596  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
597    optionalActivePublisherTest(5, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete)
598      @Override
599      public void run(Publisher<T> pub) throws InterruptedException {
600        ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
601        ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
602        ManualSubscriber<T> sub3 = env.newManualSubscriber(pub);
603
604        sub1.request(1);
605        T x1 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub));
606        sub2.request(2);
607        List<T> y1 = sub2.nextElements(2, String.format("Publisher %s did not produce the requested 2 elements on 2nd subscriber", pub));
608        sub1.request(1);
609        T x2 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub));
610        sub3.request(3);
611        List<T> z1 = sub3.nextElements(3, String.format("Publisher %s did not produce the requested 3 elements on 3rd subscriber", pub));
612        sub3.request(1);
613        T z2 = sub3.nextElement(String.format("Publisher %s did not produce the requested 1 element on 3rd subscriber", pub));
614        sub3.request(1);
615        T z3 = sub3.nextElement(String.format("Publisher %s did not produce the requested 1 element on 3rd subscriber", pub));
616        sub3.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 3rd subscriber", pub));
617        sub2.request(3);
618        List<T> y2 = sub2.nextElements(3, String.format("Publisher %s did not produce the requested 3 elements on 2nd subscriber", pub));
619        sub2.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 2nd subscriber", pub));
620        sub1.request(2);
621        List<T> x3 = sub1.nextElements(2, String.format("Publisher %s did not produce the requested 2 elements on 1st subscriber", pub));
622        sub1.request(1);
623        T x4 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub));
624        sub1.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 1st subscriber", pub));
625
626        @SuppressWarnings("unchecked")
627        List<T> r = new ArrayList<T>(Arrays.asList(x1, x2));
628        r.addAll(x3);
629        r.addAll(Collections.singleton(x4));
630
631        List<T> check1 = new ArrayList<T>(y1);
632        check1.addAll(y2);
633
634        //noinspection unchecked
635        List<T> check2 = new ArrayList<T>(z1);
636        check2.add(z2);
637        check2.add(z3);
638
639        assertEquals(r, check1, String.format("Publisher %s did not produce the same element sequence for subscribers 1 and 2", pub));
640        assertEquals(r, check2, String.format("Publisher %s did not produce the same element sequence for subscribers 1 and 3", pub));
641      }
642    });
643  }
644
645  @Override @Test
646  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
647    optionalActivePublisherTest(3, false, new PublisherTestRun<T>() { // This test is skipped if the publisher cannot produce enough elements
648      @Override
649      public void run(Publisher<T> pub) throws Throwable {
650        ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
651        ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
652        ManualSubscriber<T> sub3 = env.newManualSubscriber(pub);
653
654        List<T> received1 = new ArrayList<T>();
655        List<T> received2 = new ArrayList<T>();
656        List<T> received3 = new ArrayList<T>();
657
658        // if the publisher must touch it's source to notice it's been drained, the OnComplete won't come until we ask for more than it actually contains...
659        // edgy edge case?
660        sub1.request(4);
661        sub2.request(4);
662        sub3.request(4);
663
664        received1.addAll(sub1.nextElements(3));
665        received2.addAll(sub2.nextElements(3));
666        received3.addAll(sub3.nextElements(3));
667
668        // NOTE: can't check completion, the Publisher may not be able to signal it
669        //       a similar test *with* completion checking is implemented
670
671        assertEquals(received1, received2, String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers"));
672        assertEquals(received2, received3, String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers"));
673      }
674    });
675  }
676
677  @Override @Test
678  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
679    optionalActivePublisherTest(3, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete)
680      @Override
681      public void run(Publisher<T> pub) throws Throwable {
682        ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
683        ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
684        ManualSubscriber<T> sub3 = env.newManualSubscriber(pub);
685
686        List<T> received1 = new ArrayList<T>();
687        List<T> received2 = new ArrayList<T>();
688        List<T> received3 = new ArrayList<T>();
689
690        // if the publisher must touch it's source to notice it's been drained, the OnComplete won't come until we ask for more than it actually contains...
691        // edgy edge case?
692        sub1.request(4);
693        sub2.request(4);
694        sub3.request(4);
695
696        received1.addAll(sub1.nextElements(3));
697        received2.addAll(sub2.nextElements(3));
698        received3.addAll(sub3.nextElements(3));
699
700        sub1.expectCompletion();
701        sub2.expectCompletion();
702        sub3.expectCompletion();
703
704        assertEquals(received1, received2, String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers"));
705        assertEquals(received2, received3, String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers"));
706      }
707    });
708  }
709
710  ///////////////////// SUBSCRIPTION TESTS //////////////////////////////////
711
712  @Override @Test
713  public void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable {
714    activePublisherTest(6, false, new PublisherTestRun<T>() {
715      @Override
716      public void run(Publisher<T> pub) throws Throwable {
717        ManualSubscriber<T> sub = new ManualSubscriber<T>(env) {
718          @Override
719          public void onSubscribe(Subscription subs) {
720            this.subscription.completeImmediatly(subs);
721
722            subs.request(1);
723            subs.request(1);
724            subs.request(1);
725          }
726
727          @Override
728          public void onNext(T element) {
729            Subscription subs = this.subscription.value();
730            subs.request(1);
731          }
732        };
733
734        env.subscribe(pub, sub);
735
736        env.verifyNoAsyncErrors();
737      }
738    });
739  }
740
741  @Override @Test
742  public void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable {
743    final long oneMoreThanBoundedLimit = boundedDepthOfOnNextAndRequestRecursion() + 1;
744
745    activePublisherTest(oneMoreThanBoundedLimit, false, new PublisherTestRun<T>() {
746      @Override
747      public void run(Publisher<T> pub) throws Throwable {
748        final ThreadLocal<Long> stackDepthCounter = new ThreadLocal<Long>() {
749          @Override
750          protected Long initialValue() {
751            return 0L;
752          }
753        };
754
755        final Latch runCompleted = new Latch(env);
756
757        final ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
758          // counts the number of signals received, used to break out from possibly infinite request/onNext loops
759          long signalsReceived = 0L;
760
761          @Override
762          public void onNext(T element) {
763            // NOT calling super.onNext as this test only cares about stack depths, not the actual values of elements
764            // which also simplifies this test as we do not have to drain the test buffer, which would otherwise be in danger of overflowing
765
766            signalsReceived += 1;
767            stackDepthCounter.set(stackDepthCounter.get() + 1);
768            if (env.debugEnabled()) {
769              env.debug(String.format("%s(recursion depth: %d)::onNext(%s)", this, stackDepthCounter.get(), element));
770            }
771
772            final long callsUntilNow = stackDepthCounter.get();
773            if (callsUntilNow > boundedDepthOfOnNextAndRequestRecursion()) {
774              env.flop(String.format("Got %d onNext calls within thread: %s, yet expected recursive bound was %d",
775                                     callsUntilNow, Thread.currentThread(), boundedDepthOfOnNextAndRequestRecursion()));
776
777              // stop the recursive call chain
778              runCompleted.close();
779              return;
780            } else if (signalsReceived >= oneMoreThanBoundedLimit) {
781              // since max number of signals reached, and recursion depth not exceeded, we judge this as a success and
782              // stop the recursive call chain
783              runCompleted.close();
784              return;
785            }
786
787            // request more right away, the Publisher must break the recursion
788            subscription.value().request(1);
789
790            stackDepthCounter.set(stackDepthCounter.get() - 1);
791          }
792
793          @Override
794          public void onComplete() {
795            super.onComplete();
796            runCompleted.close();
797          }
798
799          @Override
800          public void onError(Throwable cause) {
801            super.onError(cause);
802            runCompleted.close();
803          }
804        };
805
806        try {
807          env.subscribe(pub, sub);
808
809          sub.request(1); // kick-off the `request -> onNext -> request -> onNext -> ...`
810
811          final String msg = String.format("Unable to validate call stack depth safety, " +
812                                               "awaited at-most %s signals (`maxOnNextSignalsInRecursionTest()`) or completion",
813                                           oneMoreThanBoundedLimit);
814          runCompleted.expectClose(env.defaultTimeoutMillis(), msg);
815          env.verifyNoAsyncErrorsNoDelay();
816        } finally {
817          // since the request/onNext recursive calls may keep the publisher running "forever",
818          // we MUST cancel it manually before exiting this test case
819          sub.cancel();
820        }
821      }
822    });
823  }
824
825  @Override @Test
826  public void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception {
827    notVerified(); // cannot be meaningfully tested, or can it?
828  }
829
830  @Override @Test
831  public void untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation() throws Exception {
832    notVerified(); // cannot be meaningfully tested, or can it?
833  }
834
835  @Override @Test
836  public void required_spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable {
837    activePublisherTest(3, false, new PublisherTestRun<T>() {
838      @Override
839      public void run(Publisher<T> pub) throws Throwable {
840
841        // override ManualSubscriberWithSubscriptionSupport#cancel because by default a ManualSubscriber will drop the
842        // subscription once it's cancelled (as expected).
843        // In this test however it must keep the cancelled Subscription and keep issuing `request(long)` to it.
844        ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
845          @Override
846          public void cancel() {
847            if (subscription.isCompleted()) {
848              subscription.value().cancel();
849            } else {
850              env.flop("Cannot cancel a subscription before having received it");
851            }
852          }
853        };
854
855        env.subscribe(pub, sub);
856
857        sub.cancel();
858        sub.request(1);
859        sub.request(1);
860        sub.request(1);
861
862        sub.expectNone();
863        env.verifyNoAsyncErrorsNoDelay();
864      }
865    });
866  }
867
868  @Override @Test
869  public void required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable {
870    activePublisherTest(1, false, new PublisherTestRun<T>() {
871      @Override
872      public void run(Publisher<T> pub) throws Throwable {
873        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
874
875        // leak the Subscription
876        final Subscription subs = sub.subscription.value();
877
878        subs.cancel();
879        subs.cancel();
880        subs.cancel();
881
882        sub.expectNone();
883        env.verifyNoAsyncErrorsNoDelay();
884      }
885    });
886  }
887
888  @Override @Test
889  public void required_spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable {
890    activePublisherTest(10, false, new PublisherTestRun<T>() {
891      @Override public void run(Publisher<T> pub) throws Throwable {
892        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
893        sub.request(0);
894        sub.expectError(IllegalArgumentException.class);
895      }
896    });
897  }
898
899  @Override @Test
900  public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable {
901    activePublisherTest(10, false, new PublisherTestRun<T>() {
902      @Override
903      public void run(Publisher<T> pub) throws Throwable {
904        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
905        final Random r = new Random();
906        sub.request(-r.nextInt(Integer.MAX_VALUE) - 1);
907        // we do require implementations to mention the rule number at the very least, or mentioning that the non-negative request is the problem
908        sub.expectError(IllegalArgumentException.class); 
909      }
910    });
911  }
912
913  @Override @Test
914  public void optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage() throws Throwable {
915    optionalActivePublisherTest(10, false, new PublisherTestRun<T>() {
916      @Override
917      public void run(Publisher<T> pub) throws Throwable {
918        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
919        final Random r = new Random();
920        sub.request(-r.nextInt(Integer.MAX_VALUE) - 1);
921        // we do require implementations to mention the rule number at the very least, or mentioning that the non-negative request is the problem
922        sub.expectErrorWithMessage(IllegalArgumentException.class, Arrays.asList("3.9", "non-positive subscription request", "negative subscription request")); 
923      }
924    });
925  }
926
927  @Override @Test
928  public void required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable {
929    // the publisher is able to signal more elements than the subscriber will be requesting in total
930    final int publisherElements = 20;
931
932    final int demand1 = 10;
933    final int demand2 = 5;
934    final int totalDemand = demand1 + demand2;
935
936    activePublisherTest(publisherElements, false, new PublisherTestRun<T>() {
937      @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
938      public void run(Publisher<T> pub) throws Throwable {
939        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
940
941        sub.request(demand1);
942        sub.request(demand2);
943
944        /*
945          NOTE: The order of the nextElement/cancel calls below is very important (!)
946
947          If this ordering was reversed, given an asynchronous publisher,
948          the following scenario would be *legal* and would break this test:
949
950          > AsyncPublisher receives request(10) - it does not emit data right away, it's asynchronous
951          > AsyncPublisher receives request(5) - demand is now 15
952          ! AsyncPublisher didn't emit any onNext yet (!)
953          > AsyncPublisher receives cancel() - handles it right away, by "stopping itself" for example
954          ! cancel was handled hefore the AsyncPublisher ever got the chance to emit data
955          ! the subscriber ends up never receiving even one element - the test is stuck (and fails, even on valid Publisher)
956
957          Which is why we must first expect an element, and then cancel, once the producing is "running".
958         */
959        sub.nextElement();
960        sub.cancel();
961
962        int onNextsSignalled = 1;
963
964        boolean stillBeingSignalled;
965        do {
966          // put asyncError if onNext signal received
967          sub.expectNone();
968          Throwable error = env.dropAsyncError();
969
970          if (error == null) {
971            stillBeingSignalled = false;
972          } else {
973            onNextsSignalled += 1;
974            stillBeingSignalled = true;
975          }
976
977          // if the Publisher tries to emit more elements than was requested (and/or ignores cancellation) this will throw
978          assertTrue(onNextsSignalled <= totalDemand,
979                     String.format("Publisher signalled [%d] elements, which is more than the signalled demand: %d",
980                                   onNextsSignalled, totalDemand));
981
982        } while (stillBeingSignalled);
983      }
984    });
985
986    env.verifyNoAsyncErrorsNoDelay();
987  }
988
989  @Override @Test
990  public void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable {
991    final ReferenceQueue<ManualSubscriber<T>> queue = new ReferenceQueue<ManualSubscriber<T>>();
992
993    final Function<Publisher<T>, WeakReference<ManualSubscriber<T>>> run = new Function<Publisher<T>, WeakReference<ManualSubscriber<T>>>() {
994      @Override
995      public WeakReference<ManualSubscriber<T>> apply(Publisher<T> pub) throws Exception {
996        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
997        final WeakReference<ManualSubscriber<T>> ref = new WeakReference<ManualSubscriber<T>>(sub, queue);
998
999        sub.request(1);
1000        sub.nextElement();
1001        sub.cancel();
1002
1003        return ref;
1004      }
1005    };
1006
1007    activePublisherTest(3, false, new PublisherTestRun<T>() {
1008      @Override
1009      public void run(Publisher<T> pub) throws Throwable {
1010        final WeakReference<ManualSubscriber<T>> ref = run.apply(pub);
1011
1012        // cancel may be run asynchronously so we add a sleep before running the GC
1013        // to "resolve" the race
1014        Thread.sleep(publisherReferenceGCTimeoutMillis);
1015        System.gc();
1016
1017        if (!ref.equals(queue.remove(100))) {
1018          env.flop(String.format("Publisher %s did not drop reference to test subscriber after subscription cancellation", pub));
1019        }
1020
1021        env.verifyNoAsyncErrorsNoDelay();
1022      }
1023    });
1024  }
1025
1026  @Override @Test
1027  public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable {
1028    final int totalElements = 3;
1029
1030    activePublisherTest(totalElements, true, new PublisherTestRun<T>() {
1031      @Override
1032      public void run(Publisher<T> pub) throws Throwable {
1033        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
1034        sub.request(Long.MAX_VALUE);
1035
1036        sub.nextElements(totalElements);
1037        sub.expectCompletion();
1038
1039        env.verifyNoAsyncErrorsNoDelay();
1040      }
1041    });
1042  }
1043
1044  @Override @Test
1045  public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable {
1046    final int totalElements = 3;
1047
1048    activePublisherTest(totalElements, true, new PublisherTestRun<T>() {
1049      @Override
1050      public void run(Publisher<T> pub) throws Throwable {
1051        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
1052        sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE / 2
1053        sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE - 1
1054        sub.request(1); // pending = Long.MAX_VALUE
1055
1056        sub.nextElements(totalElements);
1057        sub.expectCompletion();
1058
1059        try {
1060          env.verifyNoAsyncErrorsNoDelay();
1061        } finally {
1062          sub.cancel();
1063        }
1064        
1065      }
1066    });
1067  }
1068
1069  @Override @Test
1070  public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
1071    activePublisherTest(Integer.MAX_VALUE, false, new PublisherTestRun<T>() {
1072      @Override public void run(Publisher<T> pub) throws Throwable {
1073        final ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(env) {
1074           // arbitrarily set limit on nuber of request calls signalled, we expect overflow after already 2 calls,
1075           // so 10 is relatively high and safe even if arbitrarily chosen
1076          int callsCounter = 10;
1077
1078          @Override
1079          public void onNext(T element) {
1080            if (env.debugEnabled()) {
1081              env.debug(String.format("%s::onNext(%s)", this, element));
1082            }
1083            if (subscription.isCompleted()) {
1084              if (callsCounter > 0) {
1085                subscription.value().request(Long.MAX_VALUE - 1);
1086                callsCounter--;
1087              } else {
1088                  subscription.value().cancel();
1089              }
1090            } else {
1091              env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element));
1092            }
1093          }
1094        };
1095        env.subscribe(pub, sub, env.defaultTimeoutMillis());
1096
1097        // eventually triggers `onNext`, which will then trigger up to `callsCounter` times `request(Long.MAX_VALUE - 1)`
1098        // we're pretty sure to overflow from those
1099        sub.request(1);
1100
1101        // no onError should be signalled
1102        try {
1103          env.verifyNoAsyncErrors();
1104        } finally {
1105          sub.cancel();
1106        }
1107      }
1108    });
1109  }
1110
1111  ///////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////////
1112
1113  ///////////////////// TEST INFRASTRUCTURE /////////////////////////////////
1114
1115  public interface PublisherTestRun<T> {
1116    public void run(Publisher<T> pub) throws Throwable;
1117  }
1118
1119  /**
1120   * Test for feature that SHOULD/MUST be implemented, using a live publisher.
1121   *
1122   * @param elements the number of elements the Publisher under test  must be able to emit to run this test
1123   * @param completionSignalRequired true if an {@code onComplete} signal is required by this test to run.
1124   *                                 If the tested Publisher is unable to signal completion, tests requireing onComplete signals will be skipped.
1125   *                                 To signal if your Publisher is able to signal completion see {@link PublisherVerification#maxElementsFromPublisher()}.
1126   */
1127  public void activePublisherTest(long elements, boolean completionSignalRequired, PublisherTestRun<T> body) throws Throwable {
1128    if (elements > maxElementsFromPublisher()) {
1129      throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", elements, maxElementsFromPublisher()));
1130    } else if (completionSignalRequired && maxElementsFromPublisher() == Long.MAX_VALUE) {
1131      throw new SkipException("Unable to run this test, as it requires an onComplete signal, " +
1132                                "which this Publisher is unable to provide (as signalled by returning Long.MAX_VALUE from `maxElementsFromPublisher()`)");
1133    } else {
1134      Publisher<T> pub = createPublisher(elements);
1135      body.run(pub);
1136      env.verifyNoAsyncErrorsNoDelay();
1137    }
1138  }
1139
1140  /**
1141   * Test for feature that MAY be implemented. This test will be marked as SKIPPED if it fails.
1142   *
1143   * @param elements the number of elements the Publisher under test  must be able to emit to run this test
1144   * @param completionSignalRequired true if an {@code onComplete} signal is required by this test to run.
1145   *                                 If the tested Publisher is unable to signal completion, tests requireing onComplete signals will be skipped.
1146   *                                 To signal if your Publisher is able to signal completion see {@link PublisherVerification#maxElementsFromPublisher()}.
1147   */
1148  public void optionalActivePublisherTest(long elements, boolean completionSignalRequired, PublisherTestRun<T> body) throws Throwable {
1149    if (elements > maxElementsFromPublisher()) {
1150      throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", elements, maxElementsFromPublisher()));
1151    } else if (completionSignalRequired && maxElementsFromPublisher() == Long.MAX_VALUE) {
1152      throw new SkipException("Unable to run this test, as it requires an onComplete signal, " +
1153                                "which this Publisher is unable to provide (as signalled by returning Long.MAX_VALUE from `maxElementsFromPublisher()`)");
1154    } else {
1155
1156      final Publisher<T> pub = createPublisher(elements);
1157      final String skipMessage = "Skipped because tested publisher does NOT implement this OPTIONAL requirement.";
1158
1159      try {
1160        potentiallyPendingTest(pub, body);
1161      } catch (Exception ex) {
1162        notVerified(skipMessage);
1163      } catch (AssertionError ex) {
1164        notVerified(skipMessage + " Reason for skipping was: " + ex.getMessage());
1165      }
1166    }
1167  }
1168
1169  public static final String SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE =
1170    "Skipping because no error state Publisher provided, and the test requires it. " +
1171          "Please implement PublisherVerification#createFailedPublisher to run this test.";
1172
1173  public static final String SKIPPING_OPTIONAL_TEST_FAILED =
1174    "Skipping, because provided Publisher does not pass this *additional* verification.";
1175  /**
1176   * Additional test for Publisher in error state
1177   */
1178  public void whenHasErrorPublisherTest(PublisherTestRun<T> body) throws Throwable {
1179    potentiallyPendingTest(createFailedPublisher(), body, SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE);
1180  }
1181
1182  public void potentiallyPendingTest(Publisher<T> pub, PublisherTestRun<T> body) throws Throwable {
1183    potentiallyPendingTest(pub, body, SKIPPING_OPTIONAL_TEST_FAILED);
1184  }
1185
1186  public void potentiallyPendingTest(Publisher<T> pub, PublisherTestRun<T> body, String message) throws Throwable {
1187    if (pub != null) {
1188      body.run(pub);
1189    } else {
1190      throw new SkipException(message);
1191    }
1192  }
1193
1194  /**
1195   * Executes a given test body {@code n} times.
1196   * All the test runs must pass in order for the stochastic test to pass.
1197   */
1198  public void stochasticTest(int n, Function<Integer, Void> body) throws Throwable {
1199    if (skipStochasticTests()) {
1200      notVerified("Skipping @Stochastic test because `skipStochasticTests()` returned `true`!");
1201    }
1202
1203    for (int i = 0; i < n; i++) {
1204      body.apply(i);
1205    }
1206  }
1207
1208  public void notVerified() {
1209    throw new SkipException("Not verified by this TCK.");
1210  }
1211
1212  /**
1213   * Return this value from {@link PublisherVerification#maxElementsFromPublisher()} to mark that the given {@link org.reactivestreams.Publisher},
1214   * is not able to signal completion. For example it is strictly a time-bound or unbounded source of data.
1215   *
1216   * <b>Returning this value from {@link PublisherVerification#maxElementsFromPublisher()} will result in skipping all TCK tests which require onComplete signals!</b>
1217   */
1218  public long publisherUnableToSignalOnComplete() {
1219    return Long.MAX_VALUE;
1220  }
1221
1222  public void notVerified(String message) {
1223    throw new SkipException(message);
1224  }
1225
1226}