001/************************************************************************ 002 * Licensed under Public Domain (CC0) * 003 * * 004 * To the extent possible under law, the person who associated CC0 with * 005 * this code has waived all copyright and related or neighboring * 006 * rights to this code. * 007 * * 008 * You should have received a copy of the CC0 legalcode along with this * 009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.* 010 ************************************************************************/ 011 012package org.reactivestreams; 013 014import java.util.concurrent.Flow; 015 016/** 017 * Bridge between Reactive Streams API and the Java 9 {@link java.util.concurrent.Flow} API. 018 */ 019public final class FlowAdapters { 020 /** Utility class. */ 021 private FlowAdapters() { 022 throw new IllegalStateException("No instances!"); 023 } 024 025 /** 026 * Converts a Flow Publisher into a Reactive Streams Publisher. 027 * @param <T> the element type 028 * @param flowPublisher the source Flow Publisher to convert 029 * @return the equivalent Reactive Streams Publisher 030 */ 031 @SuppressWarnings("unchecked") 032 public static <T> org.reactivestreams.Publisher<T> toPublisher( 033 Flow.Publisher<? extends T> flowPublisher) { 034 if (flowPublisher == null) { 035 throw new NullPointerException("flowPublisher"); 036 } 037 if (flowPublisher instanceof FlowPublisherFromReactive) { 038 return (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams); 039 } 040 if (flowPublisher instanceof org.reactivestreams.Publisher) { 041 return (org.reactivestreams.Publisher<T>)flowPublisher; 042 } 043 return new ReactivePublisherFromFlow<T>(flowPublisher); 044 } 045 046 /** 047 * Converts a Reactive Streams Publisher into a Flow Publisher. 048 * @param <T> the element type 049 * @param reactiveStreamsPublisher the source Reactive Streams Publisher to convert 050 * @return the equivalent Flow Publisher 051 */ 052 @SuppressWarnings("unchecked") 053 public static <T> Flow.Publisher<T> toFlowPublisher( 054 org.reactivestreams.Publisher<? extends T> reactiveStreamsPublisher 055 ) { 056 if (reactiveStreamsPublisher == null) { 057 throw new NullPointerException("reactiveStreamsPublisher"); 058 } 059 if (reactiveStreamsPublisher instanceof ReactivePublisherFromFlow) { 060 return (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactiveStreamsPublisher).flow); 061 } 062 if (reactiveStreamsPublisher instanceof Flow.Publisher) { 063 return (Flow.Publisher<T>)reactiveStreamsPublisher; 064 } 065 return new FlowPublisherFromReactive<T>(reactiveStreamsPublisher); 066 } 067 068 /** 069 * Converts a Flow Processor into a Reactive Streams Processor. 070 * @param <T> the input value type 071 * @param <U> the output value type 072 * @param flowProcessor the source Flow Processor to convert 073 * @return the equivalent Reactive Streams Processor 074 */ 075 @SuppressWarnings("unchecked") 076 public static <T, U> org.reactivestreams.Processor<T, U> toProcessor( 077 Flow.Processor<? super T, ? extends U> flowProcessor 078 ) { 079 if (flowProcessor == null) { 080 throw new NullPointerException("flowProcessor"); 081 } 082 if (flowProcessor instanceof FlowToReactiveProcessor) { 083 return (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactiveStreams); 084 } 085 if (flowProcessor instanceof org.reactivestreams.Processor) { 086 return (org.reactivestreams.Processor<T, U>)flowProcessor; 087 } 088 return new ReactiveToFlowProcessor<T, U>(flowProcessor); 089 } 090 091 /** 092 * Converts a Reactive Streams Processor into a Flow Processor. 093 * @param <T> the input value type 094 * @param <U> the output value type 095 * @param reactiveStreamsProcessor the source Reactive Streams Processor to convert 096 * @return the equivalent Flow Processor 097 */ 098 @SuppressWarnings("unchecked") 099 public static <T, U> Flow.Processor<T, U> toFlowProcessor( 100 org.reactivestreams.Processor<? super T, ? extends U> reactiveStreamsProcessor 101 ) { 102 if (reactiveStreamsProcessor == null) { 103 throw new NullPointerException("reactiveStreamsProcessor"); 104 } 105 if (reactiveStreamsProcessor instanceof ReactiveToFlowProcessor) { 106 return (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveStreamsProcessor).flow); 107 } 108 if (reactiveStreamsProcessor instanceof Flow.Processor) { 109 return (Flow.Processor<T, U>)reactiveStreamsProcessor; 110 } 111 return new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor); 112 } 113 114 /** 115 * Converts a Reactive Streams Subscriber into a Flow Subscriber. 116 * @param <T> the input and output value type 117 * @param reactiveStreamsSubscriber the Reactive Streams Subscriber instance to convert 118 * @return the equivalent Flow Subscriber 119 */ 120 @SuppressWarnings("unchecked") 121 public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) { 122 if (reactiveStreamsSubscriber == null) { 123 throw new NullPointerException("reactiveStreamsSubscriber"); 124 } 125 if (reactiveStreamsSubscriber instanceof ReactiveToFlowSubscriber) { 126 return (Flow.Subscriber<T>)((ReactiveToFlowSubscriber<T>)reactiveStreamsSubscriber).flow; 127 } 128 if (reactiveStreamsSubscriber instanceof Flow.Subscriber) { 129 return (Flow.Subscriber<T>)reactiveStreamsSubscriber; 130 } 131 return new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber); 132 } 133 134 /** 135 * Converts a Flow Subscriber into a Reactive Streams Subscriber. 136 * @param <T> the input and output value type 137 * @param flowSubscriber the Flow Subscriber instance to convert 138 * @return the equivalent Reactive Streams Subscriber 139 */ 140 @SuppressWarnings("unchecked") 141 public static <T> org.reactivestreams.Subscriber<T> toSubscriber(Flow.Subscriber<T> flowSubscriber) { 142 if (flowSubscriber == null) { 143 throw new NullPointerException("flowSubscriber"); 144 } 145 if (flowSubscriber instanceof FlowToReactiveSubscriber) { 146 return (org.reactivestreams.Subscriber<T>)((FlowToReactiveSubscriber<T>)flowSubscriber).reactiveStreams; 147 } 148 if (flowSubscriber instanceof org.reactivestreams.Subscriber) { 149 return (org.reactivestreams.Subscriber<T>)flowSubscriber; 150 } 151 return new ReactiveToFlowSubscriber<T>(flowSubscriber); 152 } 153 154 /** 155 * Wraps a Reactive Streams Subscription and converts the calls to a Flow Subscription. 156 */ 157 static final class FlowToReactiveSubscription implements Flow.Subscription { 158 final org.reactivestreams.Subscription reactiveStreams; 159 160 public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) { 161 this.reactiveStreams = reactive; 162 } 163 164 @Override 165 public void request(long n) { 166 reactiveStreams.request(n); 167 } 168 169 @Override 170 public void cancel() { 171 reactiveStreams.cancel(); 172 } 173 174 } 175 176 /** 177 * Wraps a Flow Subscription and converts the calls to a Reactive Streams Subscription. 178 */ 179 static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription { 180 final Flow.Subscription flow; 181 182 public ReactiveToFlowSubscription(Flow.Subscription flow) { 183 this.flow = flow; 184 } 185 186 @Override 187 public void request(long n) { 188 flow.request(n); 189 } 190 191 @Override 192 public void cancel() { 193 flow.cancel(); 194 } 195 196 197 } 198 199 /** 200 * Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it. 201 * @param <T> the element type 202 */ 203 static final class FlowToReactiveSubscriber<T> 204 implements Flow.Subscriber<T> { 205 final org.reactivestreams.Subscriber<? super T> reactiveStreams; 206 207 public FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive) { 208 this.reactiveStreams = reactive; 209 } 210 211 @Override 212 public void onSubscribe(Flow.Subscription subscription) { 213 reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription)); 214 } 215 216 @Override 217 public void onNext(T item) { 218 reactiveStreams.onNext(item); 219 } 220 221 @Override 222 public void onError(Throwable throwable) { 223 reactiveStreams.onError(throwable); 224 } 225 226 @Override 227 public void onComplete() { 228 reactiveStreams.onComplete(); 229 } 230 231 } 232 233 /** 234 * Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it. 235 * @param <T> the element type 236 */ 237 static final class ReactiveToFlowSubscriber<T> 238 implements org.reactivestreams.Subscriber<T> { 239 final Flow.Subscriber<? super T> flow; 240 241 public ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow) { 242 this.flow = flow; 243 } 244 245 @Override 246 public void onSubscribe(org.reactivestreams.Subscription subscription) { 247 flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription)); 248 } 249 250 @Override 251 public void onNext(T item) { 252 flow.onNext(item); 253 } 254 255 @Override 256 public void onError(Throwable throwable) { 257 flow.onError(throwable); 258 } 259 260 @Override 261 public void onComplete() { 262 flow.onComplete(); 263 } 264 265 } 266 267 /** 268 * Wraps a Flow Processor and forwards methods of the Reactive Streams Processor to it. 269 * @param <T> the input type 270 * @param <U> the output type 271 */ 272 static final class ReactiveToFlowProcessor<T, U> 273 implements org.reactivestreams.Processor<T, U> { 274 final Flow.Processor<? super T, ? extends U> flow; 275 276 public ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow) { 277 this.flow = flow; 278 } 279 280 @Override 281 public void onSubscribe(org.reactivestreams.Subscription subscription) { 282 flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription)); 283 } 284 285 @Override 286 public void onNext(T t) { 287 flow.onNext(t); 288 } 289 290 @Override 291 public void onError(Throwable t) { 292 flow.onError(t); 293 } 294 295 @Override 296 public void onComplete() { 297 flow.onComplete(); 298 } 299 300 @Override 301 public void subscribe(org.reactivestreams.Subscriber<? super U> s) { 302 flow.subscribe((s == null) ? null : new FlowToReactiveSubscriber<U>(s)); 303 } 304 } 305 306 /** 307 * Wraps a Reactive Streams Processor and forwards methods of the Flow Processor to it. 308 * @param <T> the input type 309 * @param <U> the output type 310 */ 311 static final class FlowToReactiveProcessor<T, U> 312 implements Flow.Processor<T, U> { 313 final org.reactivestreams.Processor<? super T, ? extends U> reactiveStreams; 314 315 public FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extends U> reactive) { 316 this.reactiveStreams = reactive; 317 } 318 319 @Override 320 public void onSubscribe(Flow.Subscription subscription) { 321 reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription)); 322 } 323 324 @Override 325 public void onNext(T t) { 326 reactiveStreams.onNext(t); 327 } 328 329 @Override 330 public void onError(Throwable t) { 331 reactiveStreams.onError(t); 332 } 333 334 @Override 335 public void onComplete() { 336 reactiveStreams.onComplete(); 337 } 338 339 @Override 340 public void subscribe(Flow.Subscriber<? super U> s) { 341 reactiveStreams.subscribe((s == null) ? null : new ReactiveToFlowSubscriber<U>(s)); 342 } 343 } 344 345 /** 346 * Reactive Streams Publisher that wraps a Flow Publisher. 347 * @param <T> the element type 348 */ 349 static final class ReactivePublisherFromFlow<T> implements org.reactivestreams.Publisher<T> { 350 351 final Flow.Publisher<? extends T> flow; 352 353 public ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher) { 354 this.flow = flowPublisher; 355 } 356 357 @Override 358 public void subscribe(org.reactivestreams.Subscriber<? super T> reactive) { 359 flow.subscribe((reactive == null) ? null : new FlowToReactiveSubscriber<T>(reactive)); 360 } 361 } 362 363 /** 364 * Flow Publisher that wraps a Reactive Streams Publisher. 365 * @param <T> the element type 366 */ 367 static final class FlowPublisherFromReactive<T> implements Flow.Publisher<T> { 368 369 final org.reactivestreams.Publisher<? extends T> reactiveStreams; 370 371 public FlowPublisherFromReactive(org.reactivestreams.Publisher<? extends T> reactivePublisher) { 372 this.reactiveStreams = reactivePublisher; 373 } 374 375 @Override 376 public void subscribe(Flow.Subscriber<? super T> flow) { 377 reactiveStreams.subscribe((flow == null) ? null : new ReactiveToFlowSubscriber<T>(flow)); 378 } 379 } 380 381}