001/*************************************************** 002 * Licensed under MIT No Attribution (SPDX: MIT-0) * 003 ***************************************************/ 004 005package org.reactivestreams; 006 007import java.util.concurrent.Flow; 008import static java.util.Objects.requireNonNull; 009 010/** 011 * Bridge between Reactive Streams API and the Java 9 {@link java.util.concurrent.Flow} API. 012 */ 013public final class FlowAdapters { 014 /** Utility class. */ 015 private FlowAdapters() { 016 throw new IllegalStateException("No instances!"); 017 } 018 019 /** 020 * Converts a Flow Publisher into a Reactive Streams Publisher. 021 * @param <T> the element type 022 * @param flowPublisher the source Flow Publisher to convert 023 * @return the equivalent Reactive Streams Publisher 024 */ 025 @SuppressWarnings("unchecked") 026 public static <T> org.reactivestreams.Publisher<T> toPublisher( 027 Flow.Publisher<? extends T> flowPublisher) { 028 requireNonNull(flowPublisher, "flowPublisher"); 029 final org.reactivestreams.Publisher<T> publisher; 030 if (flowPublisher instanceof FlowPublisherFromReactive) { 031 publisher = (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams); 032 } else if (flowPublisher instanceof org.reactivestreams.Publisher) { 033 publisher = (org.reactivestreams.Publisher<T>)flowPublisher; 034 } else { 035 publisher = new ReactivePublisherFromFlow<T>(flowPublisher); 036 } 037 return publisher; 038 } 039 040 /** 041 * Converts a Reactive Streams Publisher into a Flow Publisher. 042 * @param <T> the element type 043 * @param reactiveStreamsPublisher the source Reactive Streams Publisher to convert 044 * @return the equivalent Flow Publisher 045 */ 046 @SuppressWarnings("unchecked") 047 public static <T> Flow.Publisher<T> toFlowPublisher( 048 org.reactivestreams.Publisher<? extends T> reactiveStreamsPublisher 049 ) { 050 requireNonNull(reactiveStreamsPublisher, "reactiveStreamsPublisher"); 051 final Flow.Publisher<T> flowPublisher; 052 if (reactiveStreamsPublisher instanceof ReactivePublisherFromFlow) { 053 flowPublisher = (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactiveStreamsPublisher).flow); 054 } else if (reactiveStreamsPublisher instanceof Flow.Publisher) { 055 flowPublisher = (Flow.Publisher<T>)reactiveStreamsPublisher; 056 } else { 057 flowPublisher = new FlowPublisherFromReactive<T>(reactiveStreamsPublisher); 058 } 059 return flowPublisher; 060 } 061 062 /** 063 * Converts a Flow Processor into a Reactive Streams Processor. 064 * @param <T> the input value type 065 * @param <U> the output value type 066 * @param flowProcessor the source Flow Processor to convert 067 * @return the equivalent Reactive Streams Processor 068 */ 069 @SuppressWarnings("unchecked") 070 public static <T, U> org.reactivestreams.Processor<T, U> toProcessor( 071 Flow.Processor<? super T, ? extends U> flowProcessor 072 ) { 073 requireNonNull(flowProcessor, "flowProcessor"); 074 final org.reactivestreams.Processor<T, U> processor; 075 if (flowProcessor instanceof FlowToReactiveProcessor) { 076 processor = (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactiveStreams); 077 } else if (flowProcessor instanceof org.reactivestreams.Processor) { 078 processor = (org.reactivestreams.Processor<T, U>)flowProcessor; 079 } else { 080 processor = new ReactiveToFlowProcessor<T, U>(flowProcessor); 081 } 082 return processor; 083 } 084 085 /** 086 * Converts a Reactive Streams Processor into a Flow Processor. 087 * @param <T> the input value type 088 * @param <U> the output value type 089 * @param reactiveStreamsProcessor the source Reactive Streams Processor to convert 090 * @return the equivalent Flow Processor 091 */ 092 @SuppressWarnings("unchecked") 093 public static <T, U> Flow.Processor<T, U> toFlowProcessor( 094 org.reactivestreams.Processor<? super T, ? extends U> reactiveStreamsProcessor 095 ) { 096 requireNonNull(reactiveStreamsProcessor, "reactiveStreamsProcessor"); 097 final Flow.Processor<T, U> flowProcessor; 098 if (reactiveStreamsProcessor instanceof ReactiveToFlowProcessor) { 099 flowProcessor = (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveStreamsProcessor).flow); 100 } else if (reactiveStreamsProcessor instanceof Flow.Processor) { 101 flowProcessor = (Flow.Processor<T, U>)reactiveStreamsProcessor; 102 } else { 103 flowProcessor = new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor); 104 } 105 return flowProcessor; 106 } 107 108 /** 109 * Converts a Reactive Streams Subscriber into a Flow Subscriber. 110 * @param <T> the input and output value type 111 * @param reactiveStreamsSubscriber the Reactive Streams Subscriber instance to convert 112 * @return the equivalent Flow Subscriber 113 */ 114 @SuppressWarnings("unchecked") 115 public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) { 116 requireNonNull(reactiveStreamsSubscriber, "reactiveStreamsSubscriber"); 117 final Flow.Subscriber<T> flowSubscriber; 118 if (reactiveStreamsSubscriber instanceof ReactiveToFlowSubscriber) { 119 flowSubscriber = (Flow.Subscriber<T>)((ReactiveToFlowSubscriber<T>)reactiveStreamsSubscriber).flow; 120 } else if (reactiveStreamsSubscriber instanceof Flow.Subscriber) { 121 flowSubscriber = (Flow.Subscriber<T>)reactiveStreamsSubscriber; 122 } else { 123 flowSubscriber = new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber); 124 } 125 return flowSubscriber; 126 } 127 128 /** 129 * Converts a Flow Subscriber into a Reactive Streams Subscriber. 130 * @param <T> the input and output value type 131 * @param flowSubscriber the Flow Subscriber instance to convert 132 * @return the equivalent Reactive Streams Subscriber 133 */ 134 @SuppressWarnings("unchecked") 135 public static <T> org.reactivestreams.Subscriber<T> toSubscriber(Flow.Subscriber<T> flowSubscriber) { 136 requireNonNull(flowSubscriber, "flowSubscriber"); 137 final org.reactivestreams.Subscriber<T> subscriber; 138 if (flowSubscriber instanceof FlowToReactiveSubscriber) { 139 subscriber = (org.reactivestreams.Subscriber<T>)((FlowToReactiveSubscriber<T>)flowSubscriber).reactiveStreams; 140 } else if (flowSubscriber instanceof org.reactivestreams.Subscriber) { 141 subscriber = (org.reactivestreams.Subscriber<T>)flowSubscriber; 142 } else { 143 subscriber = new ReactiveToFlowSubscriber<T>(flowSubscriber); 144 } 145 return subscriber; 146 } 147 148 /** 149 * Wraps a Reactive Streams Subscription and converts the calls to a Flow Subscription. 150 */ 151 static final class FlowToReactiveSubscription implements Flow.Subscription { 152 final org.reactivestreams.Subscription reactiveStreams; 153 154 public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) { 155 this.reactiveStreams = reactive; 156 } 157 158 @Override 159 public void request(long n) { 160 reactiveStreams.request(n); 161 } 162 163 @Override 164 public void cancel() { 165 reactiveStreams.cancel(); 166 } 167 168 } 169 170 /** 171 * Wraps a Flow Subscription and converts the calls to a Reactive Streams Subscription. 172 */ 173 static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription { 174 final Flow.Subscription flow; 175 176 public ReactiveToFlowSubscription(Flow.Subscription flow) { 177 this.flow = flow; 178 } 179 180 @Override 181 public void request(long n) { 182 flow.request(n); 183 } 184 185 @Override 186 public void cancel() { 187 flow.cancel(); 188 } 189 190 191 } 192 193 /** 194 * Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it. 195 * @param <T> the element type 196 */ 197 static final class FlowToReactiveSubscriber<T> implements Flow.Subscriber<T> { 198 final org.reactivestreams.Subscriber<? super T> reactiveStreams; 199 200 public FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive) { 201 this.reactiveStreams = reactive; 202 } 203 204 @Override 205 public void onSubscribe(Flow.Subscription subscription) { 206 reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription)); 207 } 208 209 @Override 210 public void onNext(T item) { 211 reactiveStreams.onNext(item); 212 } 213 214 @Override 215 public void onError(Throwable throwable) { 216 reactiveStreams.onError(throwable); 217 } 218 219 @Override 220 public void onComplete() { 221 reactiveStreams.onComplete(); 222 } 223 224 } 225 226 /** 227 * Wraps a Flow Subscriber and forwards methods of the Reactive Streams Subscriber to it. 228 * @param <T> the element type 229 */ 230 static final class ReactiveToFlowSubscriber<T> implements org.reactivestreams.Subscriber<T> { 231 final Flow.Subscriber<? super T> flow; 232 233 public ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow) { 234 this.flow = flow; 235 } 236 237 @Override 238 public void onSubscribe(org.reactivestreams.Subscription subscription) { 239 flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription)); 240 } 241 242 @Override 243 public void onNext(T item) { 244 flow.onNext(item); 245 } 246 247 @Override 248 public void onError(Throwable throwable) { 249 flow.onError(throwable); 250 } 251 252 @Override 253 public void onComplete() { 254 flow.onComplete(); 255 } 256 257 } 258 259 /** 260 * Wraps a Flow Processor and forwards methods of the Reactive Streams Processor to it. 261 * @param <T> the input type 262 * @param <U> the output type 263 */ 264 static final class ReactiveToFlowProcessor<T, U> implements org.reactivestreams.Processor<T, U> { 265 final Flow.Processor<? super T, ? extends U> flow; 266 267 public ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow) { 268 this.flow = flow; 269 } 270 271 @Override 272 public void onSubscribe(org.reactivestreams.Subscription subscription) { 273 flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription)); 274 } 275 276 @Override 277 public void onNext(T t) { 278 flow.onNext(t); 279 } 280 281 @Override 282 public void onError(Throwable t) { 283 flow.onError(t); 284 } 285 286 @Override 287 public void onComplete() { 288 flow.onComplete(); 289 } 290 291 @Override 292 public void subscribe(org.reactivestreams.Subscriber<? super U> s) { 293 flow.subscribe((s == null) ? null : new FlowToReactiveSubscriber<U>(s)); 294 } 295 } 296 297 /** 298 * Wraps a Reactive Streams Processor and forwards methods of the Flow Processor to it. 299 * @param <T> the input type 300 * @param <U> the output type 301 */ 302 static final class FlowToReactiveProcessor<T, U> implements Flow.Processor<T, U> { 303 final org.reactivestreams.Processor<? super T, ? extends U> reactiveStreams; 304 305 public FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extends U> reactive) { 306 this.reactiveStreams = reactive; 307 } 308 309 @Override 310 public void onSubscribe(Flow.Subscription subscription) { 311 reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription)); 312 } 313 314 @Override 315 public void onNext(T t) { 316 reactiveStreams.onNext(t); 317 } 318 319 @Override 320 public void onError(Throwable t) { 321 reactiveStreams.onError(t); 322 } 323 324 @Override 325 public void onComplete() { 326 reactiveStreams.onComplete(); 327 } 328 329 @Override 330 public void subscribe(Flow.Subscriber<? super U> s) { 331 reactiveStreams.subscribe((s == null) ? null : new ReactiveToFlowSubscriber<U>(s)); 332 } 333 } 334 335 /** 336 * Reactive Streams Publisher that wraps a Flow Publisher. 337 * @param <T> the element type 338 */ 339 static final class ReactivePublisherFromFlow<T> implements org.reactivestreams.Publisher<T> { 340 final Flow.Publisher<? extends T> flow; 341 342 public ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher) { 343 this.flow = flowPublisher; 344 } 345 346 @Override 347 public void subscribe(org.reactivestreams.Subscriber<? super T> reactive) { 348 flow.subscribe((reactive == null) ? null : new FlowToReactiveSubscriber<T>(reactive)); 349 } 350 } 351 352 /** 353 * Flow Publisher that wraps a Reactive Streams Publisher. 354 * @param <T> the element type 355 */ 356 static final class FlowPublisherFromReactive<T> implements Flow.Publisher<T> { 357 358 final org.reactivestreams.Publisher<? extends T> reactiveStreams; 359 360 public FlowPublisherFromReactive(org.reactivestreams.Publisher<? extends T> reactivePublisher) { 361 this.reactiveStreams = reactivePublisher; 362 } 363 364 @Override 365 public void subscribe(Flow.Subscriber<? super T> flow) { 366 reactiveStreams.subscribe((flow == null) ? null : new ReactiveToFlowSubscriber<T>(flow)); 367 } 368 } 369 370}