RxJS version 2.3.23
This release has a number of new features as well as improvements to the existing codebase.
Of note, there are the following items changed:
- New Operators
- Performance Enhancements
- Bug Fixes
- Documentation fixes
New Operators
In this version, a number of new operators are now available
Observable.pairs
One question that was posed to the team, what if we could take an object and turn it into an observable of key/value pairs? To that end, we have implemented Observable.pairs
, which is an implementation which mirrors Lo-Dash and Underscore.js but instead of returning an array of key/value arrays, it returns an Observable
of key/value arrays.
Using this together with ES6 produces quite a nice result with destructuring of the arrays.
let obj = {
foo: 42,
bar: 56,
baz: 78
};
let source = Rx.Observable.pairs(obj);
let subscription = source.subscribe(
[key, value] => {
console.log('Key:', key, 'Value:', value);
},
err => {
console.log('Error: %s', err);
},
=> () {
console.log('Completed');
});
// => Key: 'foo' Value: 42
// => Key: 'bar' Value: 56
// => Key: 'baz' Value: 78
// => Completed
This is a great alternative to using a much longer form of:
var obj = {
foo: 42,
bar: 56,
baz: 78
};
var source = Observable.from(Object.keys(obj))
.map(function (x) { return [x, obj[x]]; });
Observable.prototype.retryWhen
In previous releases of RxJS, for retrying behavior, we had a single operator of Observable.prototype.retry
which would try running the observable the specified number of times. This is acceptable behavior for immediate retries, but for more complex scenarios, we want this behavior to be more configurable. To that end, we have implemented Observable.prototype.retryWhen
which comes from RxJava.
The retryWhen
operator is similar to retry
but decides whether or not to resubscribe to and mirror the source Observable
by passing the Error
from the onError
notification to a function that generates a second Observable
, and observes its result to determine what to do. If that result is an emitted item, retryWhen
resubscribes to and mirrors the source and the process repeats; if that result is an onError
notification, retryWhen
passes this notification on to its observers and terminates.
This allows for an eventual back-off strategy to handle failures such as the following:
var source = Observable.create(function (o) {
console.log('subscribing');
o.onError(new Error('always fails'));
})
.retryWhen(function(attempts) {
return attempts
.zip(Observable.range(1, 3), function (n, i) { return i; })
.flatMap(function(i) {
console.log('delay retry by', i, 'second(s)');
return Observable.timer(i * 1000 /*ms*/);
});
});
source.subscribe();
/*
subscribing
delay retry by 1 second(s)
subscribing
delay retry by 2 second(s)
subscribing
delay retry by 3 second(s)
subscribing
*/
Many thanks to @Blesh for the implementation!
Observable.prototype.withLatestFrom
RxJS has many ways of combining observable sequences whether it is zip
which waits for pairs, which could penalize the faster of the observables which produce values. RxJS also has combineLatest
which allows you to combine the latest value from each of the observable sequences which allows you to no longer be penalized by a slower observable sequence, instead, you will get each one in turn, as part of the pair.
There may be times, however, when you only want the latest values from the other sequences, produced with the source sequence. To that end, we have introduced the Observable.prototype.withLatestFrom
method which merges the specified observable sequences into one observable sequence by using the selector function only when the source observable sequence (the instance) produces an element.
/* Have staggering intervals */
var source1 = Rx.Observable.interval(140)
.map(function (i) { return 'First: ' + i; });
var source2 = Rx.Observable.interval(50)
.map(function (i) { return 'Second: ' + i; });
// When source1 emits a value, combine it with the latest emission from source2.
var source = source1.withLatestFrom(
source2,
function (s1, s2) { return s1 + ', ' + s2; }
).take(4);
var subscription = source.subscribe(
function (x) {
console.log('Next: ' + x.toString());
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
// => Next: First: 0, Second: 1
// => Next: First: 1, Second: 4
// => Next: First: 2, Second: 7
// => Next: First: 3, Second: 10
// => Completed
Many thanks to @staltz for the implementation!
Performance Improvements
RxJS is committed to being as high performance as possible. To fix some issues, we have reverted the usage of Function.prototype.bind
which is incredibly slow as of this date. In addition, we have removed some composition from some operators and implementing them as standalone operators.
The following operators should have much better performance including:
Observable.prototype.concatAll
Observable.prototype.concatMap
Observable.prototype.filter
Observable.prototype.flatMap
Observable.prototype.map
Observable.prototype.merge
Observable.prototype.mergeAll
Observable.prototype.reduce
In the coming point releases, we will be continuously striving for better performance.
Bug Fixes
The following bugs were fixed:
- #484 - Fix Long Stack Traces with Subjects - @trxcllnt
- #485 - Add current time to Immediate Scheduler loop - @trxcllnt
Documentation Fixes
As always, we are adding more documentation for RxJS to help you better understand the library and why you should use RxJS versus other libraries, as well as the mapping concepts between the libraries.
Going forward, you will be able to find the RxJS documentation along with all other languages supported on the reactivex.io home page, which is the home for all implementations of Rx.
We'd like to thank the following for submitting bug fixes for our documentation: @scouten, @artemyarulin, @lazaruslarue, @andrewk, @adm72, @eladb, @YuvalZiegler, @jca02266, @erykpiast, @saraid, @paddlefish, @kt3k, and @38elements! You are what make RxJS so awesome!