001/************************************************************************ 002 * Licensed under Public Domain (CC0) * 003 * * 004 * To the extent possible under law, the person who associated CC0 with * 005 * this code has waived all copyright and related or neighboring * 006 * rights to this code. * 007 * * 008 * You should have received a copy of the CC0 legalcode along with this * 009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.* 010 ************************************************************************/ 011 012package org.reactivestreams; 013 014import java.util.concurrent.Flow; 015import static java.util.Objects.requireNonNull; 016 017/** 018 * Bridge between Reactive Streams API and the Java 9 {@link java.util.concurrent.Flow} API. 019 */ 020public final class FlowAdapters { 021 /** Utility class. */ 022 private FlowAdapters() { 023 throw new IllegalStateException("No instances!"); 024 } 025 026 /** 027 * Converts a Flow Publisher into a Reactive Streams Publisher. 028 * @param <T> the element type 029 * @param flowPublisher the source Flow Publisher to convert 030 * @return the equivalent Reactive Streams Publisher 031 */ 032 @SuppressWarnings("unchecked") 033 public static <T> org.reactivestreams.Publisher<T> toPublisher( 034 Flow.Publisher<? extends T> flowPublisher) { 035 requireNonNull(flowPublisher, "flowPublisher"); 036 final org.reactivestreams.Publisher<T> publisher; 037 if (flowPublisher instanceof FlowPublisherFromReactive) { 038 publisher = (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams); 039 } else if (flowPublisher instanceof org.reactivestreams.Publisher) { 040 publisher = (org.reactivestreams.Publisher<T>)flowPublisher; 041 } else { 042 publisher = new ReactivePublisherFromFlow<T>(flowPublisher); 043 } 044 return publisher; 045 } 046 047 /** 048 * Converts a Reactive Streams Publisher into a Flow Publisher. 049 * @param <T> the element type 050 * @param reactiveStreamsPublisher the source Reactive Streams Publisher to convert 051 * @return the equivalent Flow Publisher 052 */ 053 @SuppressWarnings("unchecked") 054 public static <T> Flow.Publisher<T> toFlowPublisher( 055 org.reactivestreams.Publisher<? extends T> reactiveStreamsPublisher 056 ) { 057 requireNonNull(reactiveStreamsPublisher, "reactiveStreamsPublisher"); 058 final Flow.Publisher<T> flowPublisher; 059 if (reactiveStreamsPublisher instanceof ReactivePublisherFromFlow) { 060 flowPublisher = (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactiveStreamsPublisher).flow); 061 } else if (reactiveStreamsPublisher instanceof Flow.Publisher) { 062 flowPublisher = (Flow.Publisher<T>)reactiveStreamsPublisher; 063 } else { 064 flowPublisher = new FlowPublisherFromReactive<T>(reactiveStreamsPublisher); 065 } 066 return flowPublisher; 067 } 068 069 /** 070 * Converts a Flow Processor into a Reactive Streams Processor. 071 * @param <T> the input value type 072 * @param <U> the output value type 073 * @param flowProcessor the source Flow Processor to convert 074 * @return the equivalent Reactive Streams Processor 075 */ 076 @SuppressWarnings("unchecked") 077 public static <T, U> org.reactivestreams.Processor<T, U> toProcessor( 078 Flow.Processor<? super T, ? extends U> flowProcessor 079 ) { 080 requireNonNull(flowProcessor, "flowProcessor"); 081 final org.reactivestreams.Processor<T, U> processor; 082 if (flowProcessor instanceof FlowToReactiveProcessor) { 083 processor = (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactiveStreams); 084 } else if (flowProcessor instanceof org.reactivestreams.Processor) { 085 processor = (org.reactivestreams.Processor<T, U>)flowProcessor; 086 } else { 087 processor = new ReactiveToFlowProcessor<T, U>(flowProcessor); 088 } 089 return processor; 090 } 091 092 /** 093 * Converts a Reactive Streams Processor into a Flow Processor. 094 * @param <T> the input value type 095 * @param <U> the output value type 096 * @param reactiveStreamsProcessor the source Reactive Streams Processor to convert 097 * @return the equivalent Flow Processor 098 */ 099 @SuppressWarnings("unchecked") 100 public static <T, U> Flow.Processor<T, U> toFlowProcessor( 101 org.reactivestreams.Processor<? super T, ? extends U> reactiveStreamsProcessor 102 ) { 103 requireNonNull(reactiveStreamsProcessor, "reactiveStreamsProcessor"); 104 final Flow.Processor<T, U> flowProcessor; 105 if (reactiveStreamsProcessor instanceof ReactiveToFlowProcessor) { 106 flowProcessor = (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveStreamsProcessor).flow); 107 } else if (reactiveStreamsProcessor instanceof Flow.Processor) { 108 flowProcessor = (Flow.Processor<T, U>)reactiveStreamsProcessor; 109 } else { 110 flowProcessor = new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor); 111 } 112 return flowProcessor; 113 } 114 115 /** 116 * Converts a Reactive Streams Subscriber into a Flow Subscriber. 117 * @param <T> the input and output value type 118 * @param reactiveStreamsSubscriber the Reactive Streams Subscriber instance to convert 119 * @return the equivalent Flow Subscriber 120 */ 121 @SuppressWarnings("unchecked") 122 public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) { 123 requireNonNull(reactiveStreamsSubscriber, "reactiveStreamsSubscriber"); 124 final Flow.Subscriber<T> flowSubscriber; 125 if (reactiveStreamsSubscriber instanceof ReactiveToFlowSubscriber) { 126 flowSubscriber = (Flow.Subscriber<T>)((ReactiveToFlowSubscriber<T>)reactiveStreamsSubscriber).flow; 127 } else if (reactiveStreamsSubscriber instanceof Flow.Subscriber) { 128 flowSubscriber = (Flow.Subscriber<T>)reactiveStreamsSubscriber; 129 } else { 130 flowSubscriber = new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber); 131 } 132 return flowSubscriber; 133 } 134 135 /** 136 * Converts a Flow Subscriber into a Reactive Streams Subscriber. 137 * @param <T> the input and output value type 138 * @param flowSubscriber the Flow Subscriber instance to convert 139 * @return the equivalent Reactive Streams Subscriber 140 */ 141 @SuppressWarnings("unchecked") 142 public static <T> org.reactivestreams.Subscriber<T> toSubscriber(Flow.Subscriber<T> flowSubscriber) { 143 requireNonNull(flowSubscriber, "flowSubscriber"); 144 final org.reactivestreams.Subscriber<T> subscriber; 145 if (flowSubscriber instanceof FlowToReactiveSubscriber) { 146 subscriber = (org.reactivestreams.Subscriber<T>)((FlowToReactiveSubscriber<T>)flowSubscriber).reactiveStreams; 147 } else if (flowSubscriber instanceof org.reactivestreams.Subscriber) { 148 subscriber = (org.reactivestreams.Subscriber<T>)flowSubscriber; 149 } else { 150 subscriber = new ReactiveToFlowSubscriber<T>(flowSubscriber); 151 } 152 return subscriber; 153 } 154 155 /** 156 * Wraps a Reactive Streams Subscription and converts the calls to a Flow Subscription. 157 */ 158 static final class FlowToReactiveSubscription implements Flow.Subscription { 159 final org.reactivestreams.Subscription reactiveStreams; 160 161 public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) { 162 this.reactiveStreams = reactive; 163 } 164 165 @Override 166 public void request(long n) { 167 reactiveStreams.request(n); 168 } 169 170 @Override 171 public void cancel() { 172 reactiveStreams.cancel(); 173 } 174 175 } 176 177 /** 178 * Wraps a Flow Subscription and converts the calls to a Reactive Streams Subscription. 179 */ 180 static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription { 181 final Flow.Subscription flow; 182 183 public ReactiveToFlowSubscription(Flow.Subscription flow) { 184 this.flow = flow; 185 } 186 187 @Override 188 public void request(long n) { 189 flow.request(n); 190 } 191 192 @Override 193 public void cancel() { 194 flow.cancel(); 195 } 196 197 198 } 199 200 /** 201 * Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it. 202 * @param <T> the element type 203 */ 204 static final class FlowToReactiveSubscriber<T> implements Flow.Subscriber<T> { 205 final org.reactivestreams.Subscriber<? super T> reactiveStreams; 206 207 public FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive) { 208 this.reactiveStreams = reactive; 209 } 210 211 @Override 212 public void onSubscribe(Flow.Subscription subscription) { 213 reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription)); 214 } 215 216 @Override 217 public void onNext(T item) { 218 reactiveStreams.onNext(item); 219 } 220 221 @Override 222 public void onError(Throwable throwable) { 223 reactiveStreams.onError(throwable); 224 } 225 226 @Override 227 public void onComplete() { 228 reactiveStreams.onComplete(); 229 } 230 231 } 232 233 /** 234 * Wraps a Flow Subscriber and forwards methods of the Reactive Streams Subscriber to it. 235 * @param <T> the element type 236 */ 237 static final class ReactiveToFlowSubscriber<T> implements org.reactivestreams.Subscriber<T> { 238 final Flow.Subscriber<? super T> flow; 239 240 public ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow) { 241 this.flow = flow; 242 } 243 244 @Override 245 public void onSubscribe(org.reactivestreams.Subscription subscription) { 246 flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription)); 247 } 248 249 @Override 250 public void onNext(T item) { 251 flow.onNext(item); 252 } 253 254 @Override 255 public void onError(Throwable throwable) { 256 flow.onError(throwable); 257 } 258 259 @Override 260 public void onComplete() { 261 flow.onComplete(); 262 } 263 264 } 265 266 /** 267 * Wraps a Flow Processor and forwards methods of the Reactive Streams Processor to it. 268 * @param <T> the input type 269 * @param <U> the output type 270 */ 271 static final class ReactiveToFlowProcessor<T, U> implements org.reactivestreams.Processor<T, U> { 272 final Flow.Processor<? super T, ? extends U> flow; 273 274 public ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow) { 275 this.flow = flow; 276 } 277 278 @Override 279 public void onSubscribe(org.reactivestreams.Subscription subscription) { 280 flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription)); 281 } 282 283 @Override 284 public void onNext(T t) { 285 flow.onNext(t); 286 } 287 288 @Override 289 public void onError(Throwable t) { 290 flow.onError(t); 291 } 292 293 @Override 294 public void onComplete() { 295 flow.onComplete(); 296 } 297 298 @Override 299 public void subscribe(org.reactivestreams.Subscriber<? super U> s) { 300 flow.subscribe((s == null) ? null : new FlowToReactiveSubscriber<U>(s)); 301 } 302 } 303 304 /** 305 * Wraps a Reactive Streams Processor and forwards methods of the Flow Processor to it. 306 * @param <T> the input type 307 * @param <U> the output type 308 */ 309 static final class FlowToReactiveProcessor<T, U> implements Flow.Processor<T, U> { 310 final org.reactivestreams.Processor<? super T, ? extends U> reactiveStreams; 311 312 public FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extends U> reactive) { 313 this.reactiveStreams = reactive; 314 } 315 316 @Override 317 public void onSubscribe(Flow.Subscription subscription) { 318 reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription)); 319 } 320 321 @Override 322 public void onNext(T t) { 323 reactiveStreams.onNext(t); 324 } 325 326 @Override 327 public void onError(Throwable t) { 328 reactiveStreams.onError(t); 329 } 330 331 @Override 332 public void onComplete() { 333 reactiveStreams.onComplete(); 334 } 335 336 @Override 337 public void subscribe(Flow.Subscriber<? super U> s) { 338 reactiveStreams.subscribe((s == null) ? null : new ReactiveToFlowSubscriber<U>(s)); 339 } 340 } 341 342 /** 343 * Reactive Streams Publisher that wraps a Flow Publisher. 344 * @param <T> the element type 345 */ 346 static final class ReactivePublisherFromFlow<T> implements org.reactivestreams.Publisher<T> { 347 final Flow.Publisher<? extends T> flow; 348 349 public ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher) { 350 this.flow = flowPublisher; 351 } 352 353 @Override 354 public void subscribe(org.reactivestreams.Subscriber<? super T> reactive) { 355 flow.subscribe((reactive == null) ? null : new FlowToReactiveSubscriber<T>(reactive)); 356 } 357 } 358 359 /** 360 * Flow Publisher that wraps a Reactive Streams Publisher. 361 * @param <T> the element type 362 */ 363 static final class FlowPublisherFromReactive<T> implements Flow.Publisher<T> { 364 365 final org.reactivestreams.Publisher<? extends T> reactiveStreams; 366 367 public FlowPublisherFromReactive(org.reactivestreams.Publisher<? extends T> reactivePublisher) { 368 this.reactiveStreams = reactivePublisher; 369 } 370 371 @Override 372 public void subscribe(Flow.Subscriber<? super T> flow) { 373 reactiveStreams.subscribe((flow == null) ? null : new ReactiveToFlowSubscriber<T>(flow)); 374 } 375 } 376 377}