Measure RxJS Observable Performance
Monday, March 8, 2021
Measure RxJS Observable Performance
The User Timing API provides developers with tooling to precisely measure blocks of code within our applications. This is important for two reasons:
- We can measure the runtime performance of our applications
- We can monitor the runtime performance of our applications over time
Let's start by looking at the Performance API and then create a custom RxJS operator to measure the performance of an Observable.
User Timing API
The User Timing API provides methods to create performance marks and measurements. A simple definition of each of these is:
- A Performance Mark notes when a specific "thing" has occurred in our application's timeline.
- A Performance Measurement determines the time that has elapsed between two marks.
I should also note that the User Timings API is broadly supported by modern browsers..
Create a Performance Mark
In order to create a performance mark we use the mark()
method provided by the globally available performance
object. The method has a single required argument name
, which is a DOMString
that represents the mark.
Here is an example of creating a performance mark.
if ('performance' in window) {
window.performance.mark('start');
}
In the code example above we are checking that the performance
object exists in the global window
object, and then we are subsequently invoking the mark()
method, providing the name
argument.
Using the developer tools in Chrome navigate to the "Performance" tab. This is where we can observe the performance marks for our application. If you want to learn more about using this tool, I suggest the article entitled Getting Started with Analyzing Runtime Performance by Kayce Basques.
Measuring
Using the measure()
method we can determine the runtime performance of a block of code In our application. In most instances, this should be used to measure critical aspects of your application, such as the time required to bootstrap your application. This can also be helpful for measuring the runtime performance of a block of code during development, as well as monitoring the runtime performance of a critical block of code in our application over time.
Let's look at an example of using the measure()
method.
function fibonacci() {
let prev = 0;
let current = 1;
let next = -1;
while (next < Infinity) {
next = prev + current;
prev = current;
current = next;
}
}
if ('performance' in window) {
window.performance.mark('fibonacci:start');
fibonacci();
window.performance.mark('fibonacci:end');
window.performance.measure('fibonacci', 'fibonacci:start', 'fibonacci:end');
}
Let's quickly review the code above:
- First, I've created a
fibonacci()
function. This function should take some time to complete. We'll use this measure how long it takes for the runtime engine to compute the Fibonacci value ofInfinity
. - After checking if the
performance
property exists in thewindow
object we create a starting mark named "fibonacci:start". - We then execute the
fibonacci()
function. - After the "critical" code has completed we create an ending mark named "fibonacci:end".
- Finally, we invoke the
measure()
method to measure the time between our two marks. The first argument is thename
of our measurement, followed by the starting and ending mark names.
Measure RxJS Observable
Now that we have an understanding of the User Timings API we can use this knowledge to begin measuring the runtime performance of an Observable in our application.
There are several points in time of an Observable that we are interested in:
- When a new Observer subscribes to the Observable.
- When the Observable emits a next notification.
- When the Observable emits an error notification.
- When the Observable terminates via either an error notification or a completion notification.
Let's define a custom measure()
operator to meet these requirements.
export const measure = function <T>(
nativeWindow: Window,
name: string,
prefix = "app:"
) {
return (source: Observable<T>) => {
if (
"performance" in nativeWindow &&
nativeWindow.performance !== undefined
) {
return defer(() => {
nativeWindow.performance.mark(`${prefix}${name}:subscribe`);
return source.pipe(
tap(() => nativeWindow.performance.mark(`${prefix}${name}:next`)),
catchError((error) => {
nativeWindow.performance.mark(`${prefix}${name}:error`);
return throwError(error);
}),
finalize(() => {
nativeWindow.performance.mark(`${prefix}${name}:complete`);
nativeWindow.performance.measure(
`${prefix}${name}`,
`${prefix}${name}:subscribe`,
`${prefix}${name}:complete`
);
})
);
});
}
return source;
};
};
Let's review the code above:
- First, we define a new higher-order function named
measure
that returns a function whose single argument is anObservable
. The higher-order function has two required arguments: the nativeWindow
object and thename
for the measurement. The reason that we require thenativeWindow
argument is to allow for isomorphic rendering of our application; for example, for server-side rendering within the context of Node.js. - All RxJS operators are functions that accept a source Observable and return a new Observable. We'll use the
defer()
operator to create a performance mark when a new Observer subscribes to the source Observable, and return the Observable created by thedefer()
operator. - We then use the
pipe()
method on thesource
Observable along with thetap()
operator to create a performance mark when a next notification is emitted by the source Observable. - Further, we use the
catchError()
operator to create a performance mark when the source Observable emits an error notification. - Finally, we use the
finalize()
operator to create a performance mark when the source Observable terminates due to either an error or complete notification. Then, we use themeasure()
method to create a runtime performance measurement of the time from the Observer's subscription to the termination of the Observable.
Here is an example implementation of the measure()
operator:
interval(1000)
.pipe(
switchMap((value) => (value === 2 ? throwError("error") : of(value))),
measure(window, "test"),
take(5)
)
.subscribe({
next: console.log,
error: console.error,
complete: () => console.log("complete")
});
Instructions
Use performance marks to measure an Observable's notifications from subscription to termination
Code Examples
measure.ts
/**
* Creates an Observable that mirrors the source Observable and adds performance marks for:
* - subscribe: when an Observer subscribes to the source Observable.
* - next: when the source Observable emits a next notification.
* - error: when the source Observable emits an error notification.
* - finalize: when the source Observer terminates on complete or error.
*
* ## Examples
*
* For each of the following examples assume the constant Observer is defined as follows:
* ```
* const observer = {
* next: console.log,
* error: console.error,
* complete: () => console.log('complete');
* } as Observer;
* ```
*
* ### Example 1
*
* ```
* interval(1000)
* .pipe(
* measure(window, "test"),
* take(5)
* )
* .subscribe(observer);
* ```
*
* Every 1000 milliseconds the source Observable emits a next notification of incrementing integers
* from `0` to `5`. On frame 5000 the source Observable emits a complete notification.
*
* In the "Timings" section within the "Performance" tab of Chrome DevTools you should observe a
* mark for each next notification, a mark for the finalize (complete/error notifications), and
* a measurement for 5.0 seconds for the source Observable.
*
* ### Example 2
*
* ```
* interval(1000)
* .pipe(
* switchMap((value) => (value === 2 ? throwError("error") : of(value))),
* measure(window, "test"),
* take(5)
* )
* .subscribe(observer);
* ```
*
* Every 1000 milliseconds the source Observable emits a next notification of incrementing integers
* from `0` to `1`. On frame 3000 the source Observable emits an error notification, immediately
* followed by a completion notification.
*
* In the "Timings" section within the "Performance" tab of Chrome DevTools you should observe a
* mark for each next notification, a mark for the error notification, and (almost) immediately afterwards,
* a mark for the completion notification. Finally, you will see a measurement for 3.01 seconds for
* the source Observable.
*
* @param {Window} nativeWindow The global `Window` object.
* @param {string} name The name for the marks and measurement.
* @param {string} prefix An optional prefix to prepend to each mark and measurement.
*/
export const measure = function <T>(
nativeWindow: Window,
name: string,
prefix = "app:"
) {
return (source: Observable<T>) => {
if (
"performance" in nativeWindow &&
nativeWindow.performance !== undefined
) {
return defer(() => {
nativeWindow.performance.mark(`${prefix}${name}:subscribe`);
return source.pipe(
tap(() => nativeWindow.performance.mark(`${prefix}${name}:next`)),
catchError((error) => {
nativeWindow.performance.mark(`${prefix}${name}:error`);
return throwError(error);
}),
finalize(() => {
nativeWindow.performance.mark(`${prefix}${name}:complete`);
nativeWindow.performance.measure(
`${prefix}${name}`,
`${prefix}${name}:subscribe`,
`${prefix}${name}:complete`
);
})
);
});
}
return source;
};
};
Have a question or comment?