Observable wrapping

Follow me on Twitter, happy to take your suggestions on topics or improvements /Chris

NOTE, this code is not using the new pipeable syntax and will need to be updated at some point. However, there is a great learning opportunity in looking at a longer RxJS example

We have just learned in Observable Anatomy that the key operators next() , error() and complete is what makes our Observable tick, if we define it ourselves. We have also learned that these methods triggers a corresponding callback on our subscription.

Wrapping something in an observable means we take something that is NOT an Observable and turn it into one, so it can play nice with other Observables. It also means that it can now use Operators.

Wrapping an ajax call

import { Observable } from 'rxjs';

let stream = Observable.create(observer => {
let request = new XMLHttpRequest();

request.open('GET', 'url');
request.onload = () => {
  if (request.status === 200) {
    observer.next(request.response);
    observer.complete();
  } else {
    observer.error('error happened');
  }
};

  request.onerror = () => {
    observer.error('error happened');
  };

  request.send();
});

stream.subscribe(data => console.log(data));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

Three things we need to do here emit data, handle errors and close the stream

Emit the data

if(request.status === 200) {
  observer.next( request.response ) // emit data
}
1
2
3

Handle potential errors

else {
  observer.error('error happened');
}
1
2
3

and

request.onerror = () => {
  observer.error('error happened')
}
1
2
3

Close the stream

if(request.status === 200) {
  observer.next( request.response )
  observer.complete() // close stream, as we don't expect more data
}
1
2
3
4

Wrapping a speech audio API

console.clear();

const { Observable } = Rx;

const speechRecognition$ = new Observable(observer => {
  const speech = new webkitSpeechRecognition();

  speech.onresult = event => {
    observer.next(event);
    observer.complete();
  };

  speech.start();

  return () => {
    speech.stop();
  };
});

const say = text =>
  new Observable(observer => {
    const utterance = new SpeechSynthesisUtterance(text);
    utterance.onend = e => {
      observer.next(e);
      observer.complete();
    };
    speechSynthesis.speak(utterance);
});

const button = document.querySelector('button');

const heyClick$ = Observable.fromEvent(button, 'click');

heyClick$
  .switchMap(e => speechRecognition$)
  .map(e => e.results[0][0].transcript)
  .map(text => {
    switch (text) {
      case 'I want':
        return 'candy';
      case 'hi':
      case 'ice ice':
        return 'baby';
      case 'hello':
        return 'Is it me you are looking for';
      case 'make me a sandwich':
      case 'get me a sandwich':
        return 'do it yo damn self';
      case 'why are you being so sexist':
        return 'you made me that way';
      default:
        return `I don't understand: "${text}"`;
    }
  })
  .concatMap(say)
  .subscribe(e => console.log(e));

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

Speech recognition stream

This activates the microphone in the browser and records us

const speechRecognition$ = new Observable(observer => {
  const speech = new webkitSpeechRecognition();

  speech.onresult = event => {
   observer.next(event);
   observer.complete();
  };

  speech.start();

  return () => {
    speech.stop();
  };
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14

This essentially sets up the speech recognition API. We wait for one response and after that we complete the stream, much like the first example with AJAX.

Note also that a function is defined for cleanup

return () => {
  speech.stop();
}
1
2
3

so that we can call speechRecognition.unsubscribe() to clean up resources

Speech synthesis utterance, say

This is responsible for uttering what you want it to utter ( say ).

const say = (text) => new Observable(observer => {
  const utterance = new SpeechSynthesisUtterance(text);
  utterance.onend = (e) => {
    observer.next(e);
    observer.complete();
  };
  speechSynthesis.speak(utterance);
});

1
2
3
4
5
6
7
8
9

main stream hey$

heyClick$
  .switchMap(e => speechRecognition$)
  .map(e => e.results[0][0].transcript)
  .map(text => {
    switch (text) {
      case 'I want':
        return 'candy';
      case 'hi':
      case 'ice ice':
        return 'baby';
      case 'hello':
        return 'Is it me you are looking for';
      case 'make me a sandwich':
      case 'get me a sandwich':
        return 'do it yo damn self';
      case 'why are you being so sexist':
        return 'you made me that way';
      default:
        return `I don't understand: "${text}"`;
      }
})
  .concatMap(say)
  .subscribe(e => console.log(e));

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

Logic should be read as follows

  • heyClick$ is activated on a click on a button.
  • speechRecognition is listening for what we say and sends that result into - heyClick$ where the switching logic determines an appropriate response that is uttered by say Observable.

all credit due to @ladyleet and @benlesh

Summary

One easier Ajax wrapping and one a little more advanced Speech API has been wrapped into an Observable. The mechanics are still the same though:

  1. Where data is emitted, add a call to next()
  2. If there is NO more data to emit call complete
  3. If there is a need for it, define a function that can be called upon unsubscribe()
  4. Handle errors through calling .error() in the appropriate place. (only done in the first example)