Reusable multicasting with Subject factories

InstructorAndré Staltz

Share this video with your friends

Send Tweet

The way we use publish() (or multicast with an RxJS Subject) makes the shared Observable not reusable if the shared execution happens to complete or emit an error. In this lesson we will see how to use a simple Subject factory function in order to create a new Subject, one for each shared execution, whenever connect() is called.

Bogdan Nenu
~ 8 years ago

Hi, This topic of Subject factories looks very interesting but i can't wrap my head around one problem...What if i want to have a Subject (actually a BehaviourSubject) which i want to use to push values to the stream continuously and also use for multicast, and the factory would seem to solve the problem of resubscription if the stream errors somehow Basically i want to use this construct for state shaped by actions Like var actionSubject = new Rx.BehaviourSubject() var reducer = (state, action) => ( {...state, [action.type]: action.payload} ) var state = Rx.Observable.from({}) .multicast(actionSubject) .flatMap(action => isObservable(action) ? action : Rx.Observable.from([action])) .scan(reducer) .refCount()

var actionStream = subject => action => { subject.next(action) } var action = actionStream(actionSubject)

The idea is that if I swap the actionSubject with a factory in multicast I would have to find a way to push the actions to that newly created subject from that point on...correct? Or is there a better way to do this altogether?

André Staltzinstructor
~ 8 years ago

Bogdan, if you already have a subject then you do not need to apply multicast. You can simply write:

var stateObservable = actionSubject
  .flatMap(action => isObservable(action) ? action : Rx.Observable.from([action]))
  .scan(reducer)
  .refCount()

Multicast is useful for plain observables, not subjects. That's because multicast basically converts an observable to the subject, but you already have a subject.

Kevin Pinny
~ 7 years ago

The reason why directly putting a Rx.Subject() in a multicast prevents from 're-connecting' is because of the fact that multicast holds a reference to that Subject, while a factory always generates a new instance of that Subject? Do I understand this correctly?

André Staltzinstructor
~ 7 years ago

Yes, that's correct.

Byron McMullen
~ 6 years ago

This took me awhile to figure out so I'll leave it here:

If your source observable does not emit done before the refcount returns to 0, then when you re-subscribe to shared observable A will receive each of the emitted values of source.

let shared = Observable.interval(1000).take(6).multicast(new Subject()).refCount();
let subA = shared.subscribe(observerA);

// note, we are unsubscribing before the shared observable could emit 
// all 6 values, which would take 6000ms
setTimeout(function() {
  subA.unsubscribe();
}, 5000); 

setTimeout(function() {
  subA = shared.subscribe(observerA);
}, 8000);
// this will emit
// A next 0
// A next 1
// ...
// A done

This confused me because at first glance it appears to behave similarly to if you had passed a factory to multiCast.