001/*************************************************** 002 * Licensed under MIT No Attribution (SPDX: MIT-0) * 003 ***************************************************/ 004 005package org.reactivestreams.example.unicast; 006 007import org.reactivestreams.*; 008 009import java.util.concurrent.atomic.AtomicLong; 010 011/** 012 * A synchronous implementation of the {@link Publisher} that can 013 * be subscribed to multiple times and each individual subscription 014 * will receive range of monotonically increasing integer values on demand. 015 */ 016public final class RangePublisher implements Publisher<Integer> { 017 018 /** The starting value of the range. */ 019 final int start; 020 021 /** The number of items to emit. */ 022 final int count; 023 024 /** 025 * Constructs a RangePublisher instance with the given start and count values 026 * that yields a sequence of [start, start + count). 027 * @param start the starting value of the range 028 * @param count the number of items to emit 029 */ 030 public RangePublisher(int start, int count) { 031 this.start = start; 032 this.count = count; 033 } 034 035 @Override 036 public void subscribe(Subscriber<? super Integer> subscriber) { 037 // As per rule 1.11, we have decided to support multiple subscribers 038 // in a unicast configuration for this `Publisher` implementation. 039 040 // As per rule 1.09, we need to throw a `java.lang.NullPointerException` 041 // if the `Subscriber` is `null` 042 if (subscriber == null) throw null; 043 044 // As per 2.13, this method must return normally (i.e. not throw). 045 try { 046 subscriber.onSubscribe(new RangeSubscription(subscriber, start, start + count)); 047 } catch (Throwable ex) { 048 new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 " + 049 "by throwing an exception from onSubscribe.", ex) 050 // When onSubscribe fails this way, we don't know what state the 051 // subscriber is thus calling onError may cause more crashes. 052 .printStackTrace(); 053 } 054 } 055 056 /** 057 * A Subscription implementation that holds the current downstream 058 * requested amount and responds to the downstream's request() and 059 * cancel() calls. 060 */ 061 static final class RangeSubscription 062 // We are using this `AtomicLong` to make sure that this `Subscription` 063 // doesn't run concurrently with itself, which would violate rule 1.3 064 // among others (no concurrent notifications). 065 // The atomic transition from 0L to N > 0L will ensure this. 066 extends AtomicLong implements Subscription { 067 068 private static final long serialVersionUID = -9000845542177067735L; 069 070 /** The Subscriber we are emitting integer values to. */ 071 final Subscriber<? super Integer> downstream; 072 073 /** The end index (exclusive). */ 074 final int end; 075 076 /** 077 * The current index and within the [start, start + count) range that 078 * will be emitted as downstream.onNext(). 079 */ 080 int index; 081 082 /** 083 * Indicates the emission should stop. 084 */ 085 volatile boolean cancelled; 086 087 /** 088 * Holds onto the IllegalArgumentException (containing the offending stacktrace) 089 * indicating there was a non-positive request() call from the downstream. 090 */ 091 volatile Throwable invalidRequest; 092 093 /** 094 * Constructs a stateful RangeSubscription that emits signals to the given 095 * downstream from an integer range of [start, end). 096 * @param downstream the Subscriber receiving the integer values and the completion signal. 097 * @param start the first integer value emitted, start of the range 098 * @param end the end of the range, exclusive 099 */ 100 RangeSubscription(Subscriber<? super Integer> downstream, int start, int end) { 101 this.downstream = downstream; 102 this.index = start; 103 this.end = end; 104 } 105 106 // This method will register inbound demand from our `Subscriber` and 107 // validate it against rule 3.9 and rule 3.17 108 @Override 109 public void request(long n) { 110 // Non-positive requests should be honored with IllegalArgumentException 111 if (n <= 0L) { 112 invalidRequest = new IllegalArgumentException("ยง3.9: non-positive requests are not allowed!"); 113 n = 1; 114 } 115 // Downstream requests are cumulative and may come from any thread 116 for (;;) { 117 long requested = get(); 118 long update = requested + n; 119 // As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` 120 // we treat the signalled demand as "effectively unbounded" 121 if (update < 0L) { 122 update = Long.MAX_VALUE; 123 } 124 // atomically update the current requested amount 125 if (compareAndSet(requested, update)) { 126 // if there was no prior request amount, we start the emission loop 127 if (requested == 0L) { 128 emit(update); 129 } 130 break; 131 } 132 } 133 } 134 135 // This handles cancellation requests, and is idempotent, thread-safe and not 136 // synchronously performing heavy computations as specified in rule 3.5 137 @Override 138 public void cancel() { 139 // Indicate to the emission loop it should stop. 140 cancelled = true; 141 } 142 143 void emit(long currentRequested) { 144 // Load fields to avoid re-reading them from memory due to volatile accesses in the loop. 145 Subscriber<? super Integer> downstream = this.downstream; 146 int index = this.index; 147 int end = this.end; 148 int emitted = 0; 149 150 try { 151 for (; ; ) { 152 // Check if there was an invalid request and then report its exception 153 // as mandated by rule 3.9. The stacktrace in it should 154 // help locate the faulty logic in the Subscriber. 155 Throwable invalidRequest = this.invalidRequest; 156 if (invalidRequest != null) { 157 // When we signal onError, the subscription must be considered as cancelled, as per rule 1.6 158 cancelled = true; 159 160 downstream.onError(invalidRequest); 161 return; 162 } 163 164 // Loop while the index hasn't reached the end and we haven't 165 // emitted all that's been requested 166 while (index != end && emitted != currentRequested) { 167 // to make sure that we follow rule 1.8, 3.6 and 3.7 168 // We stop if cancellation was requested. 169 if (cancelled) { 170 return; 171 } 172 173 downstream.onNext(index); 174 175 // Increment the index for the next possible emission. 176 index++; 177 // Increment the emitted count to prevent overflowing the downstream. 178 emitted++; 179 } 180 181 // If the index reached the end, we complete the downstream. 182 if (index == end) { 183 // to make sure that we follow rule 1.8, 3.6 and 3.7 184 // Unless cancellation was requested by the last onNext. 185 if (!cancelled) { 186 // We need to consider this `Subscription` as cancelled as per rule 1.6 187 // Note, however, that this state is not observable from the outside 188 // world and since we leave the loop with requested > 0L, any 189 // further request() will never trigger the loop. 190 cancelled = true; 191 192 downstream.onComplete(); 193 } 194 return; 195 } 196 197 // Did the requested amount change while we were looping? 198 long freshRequested = get(); 199 if (freshRequested == currentRequested) { 200 // Save where the loop has left off: the next value to be emitted 201 this.index = index; 202 // Atomically subtract the previously requested (also emitted) amount 203 currentRequested = addAndGet(-currentRequested); 204 // If there was no new request in between get() and addAndGet(), we simply quit 205 // The next 0 to N transition in request() will trigger the next emission loop. 206 if (currentRequested == 0L) { 207 break; 208 } 209 // Looks like there were more async requests, reset the emitted count and continue. 210 emitted = 0; 211 } else { 212 // Yes, avoid the atomic subtraction and resume. 213 // emitted != currentRequest in this case and index 214 // still points to the next value to be emitted 215 currentRequested = freshRequested; 216 } 217 } 218 } catch (Throwable ex) { 219 // We can only get here if `onNext`, `onError` or `onComplete` threw, and they 220 // are not allowed to according to 2.13, so we can only cancel and log here. 221 // If `onError` throws an exception, this is a spec violation according to rule 1.9, 222 // and all we can do is to log it. 223 224 // Make sure that we are cancelled, since we cannot do anything else 225 // since the `Subscriber` is faulty. 226 cancelled = true; 227 228 // We can't report the failure to onError as the Subscriber is unreliable. 229 (new IllegalStateException(downstream + " violated the Reactive Streams rule 2.13 by " + 230 "throwing an exception from onNext, onError or onComplete.", ex)) 231 .printStackTrace(); 232 } 233 } 234 } 235}