001/*************************************************** 002 * Licensed under MIT No Attribution (SPDX: MIT-0) * 003 ***************************************************/ 004 005package org.reactivestreams.example.unicast; 006 007import org.reactivestreams.Subscriber; 008import org.reactivestreams.Subscription; 009 010/** 011 * SyncSubscriber is an implementation of Reactive Streams `Subscriber`, 012 * it runs synchronously (on the Publisher's thread) and requests one element 013 * at a time and invokes a user-defined method to process each element. 014 * 015 * NOTE: The code below uses a lot of try-catches to show the reader where exceptions can be expected, and where they are forbidden. 016 */ 017public abstract class SyncSubscriber<T> implements Subscriber<T> { 018 private Subscription subscription; // Obeying rule 3.1, we make this private! 019 private boolean done = false; 020 021 @Override public void onSubscribe(final Subscription s) { 022 // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null` 023 if (s == null) throw null; 024 025 if (subscription != null) { // If someone has made a mistake and added this Subscriber multiple times, let's handle it gracefully 026 try { 027 s.cancel(); // Cancel the additional subscription 028 } catch(final Throwable t) { 029 //Subscription.cancel is not allowed to throw an exception, according to rule 3.15 030 (new IllegalStateException(s + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err); 031 } 032 } else { 033 // We have to assign it locally before we use it, if we want to be a synchronous `Subscriber` 034 // Because according to rule 3.10, the Subscription is allowed to call `onNext` synchronously from within `request` 035 subscription = s; 036 try { 037 // If we want elements, according to rule 2.1 we need to call `request` 038 // And, according to rule 3.2 we are allowed to call this synchronously from within the `onSubscribe` method 039 s.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time 040 } catch(final Throwable t) { 041 // Subscription.request is not allowed to throw according to rule 3.16 042 (new IllegalStateException(s + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)).printStackTrace(System.err); 043 } 044 } 045 } 046 047 @Override public void onNext(final T element) { 048 if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec 049 (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onNext prior to onSubscribe.")).printStackTrace(System.err); 050 } else { 051 // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null` 052 if (element == null) throw null; 053 054 if (!done) { // If we aren't already done 055 try { 056 if (whenNext(element)) { 057 try { 058 subscription.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time 059 } catch (final Throwable t) { 060 // Subscription.request is not allowed to throw according to rule 3.16 061 (new IllegalStateException(subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)).printStackTrace(System.err); 062 } 063 } else { 064 done(); 065 } 066 } catch (final Throwable t) { 067 done(); 068 try { 069 onError(t); 070 } catch (final Throwable t2) { 071 //Subscriber.onError is not allowed to throw an exception, according to rule 2.13 072 (new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err); 073 } 074 } 075 } 076 } 077 } 078 079 // Showcases a convenience method to idempotently marking the Subscriber as "done", so we don't want to process more elements 080 // herefor we also need to cancel our `Subscription`. 081 private void done() { 082 //On this line we could add a guard against `!done`, but since rule 3.7 says that `Subscription.cancel()` is idempotent, we don't need to. 083 done = true; // If we `whenNext` throws an exception, let's consider ourselves done (not accepting more elements) 084 try { 085 subscription.cancel(); // Cancel the subscription 086 } catch(final Throwable t) { 087 //Subscription.cancel is not allowed to throw an exception, according to rule 3.15 088 (new IllegalStateException(subscription + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err); 089 } 090 } 091 092 // This method is left as an exercise to the reader/extension point 093 // Returns whether more elements are desired or not, and if no more elements are desired 094 protected abstract boolean whenNext(final T element); 095 096 @Override public void onError(final Throwable t) { 097 if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec 098 (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err); 099 } else { 100 // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null` 101 if (t == null) throw null; 102 // Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3 103 // And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4 104 } 105 } 106 107 @Override public void onComplete() { 108 if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec 109 (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err); 110 } else { 111 // Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3 112 // And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4 113 } 114 } 115}