Notice: This article had been created some time ago - before promises and the async/await-concept was supported by node. The code itself is still valid, but with today's technology, SQL database consumption would be solved differently. In one sentence: This article is depreciated. We will keep it for some time and removed without further notice later on.
When it comes to connecting MySQL databases with node applications, the NPM module "mysql" is a frequent choice for database access. "mysql" - as a typical node module - implements asynchronous communications via callbacks. In front-end development, callbacks have been replaced by promises. This makes the code simpler. Therefore also for node a number of modules provide promise based access to MySQL.
Another approach for asynchronous structures is RxJS. It provides a number of benefits when it comes to re-usability of components and meshing information elements from different sources. Here we depict to use the NPM module "mysql" without additional extension with pure RxJS in TypeScript.
Concept
With RxJS we have an "Observable" emitting items. These items are transformed in a "Pipeline". Finally an "Observer" subscribes to the pipeline. Until the subscription takes place, nothing happens. No items are emitted and no data progresses through the pipeline. In this case the "Observable" is a MySQL query which emits either all query results (rows) in a single item or emits a number of items, one for each selected row. The pipeline may filter items, map or project items to a different presentation, split up one item (e.g. complete query result into a number of items), limit the number of items etc. In most real-world applications the Observer will take the results from the pipeline and report them to a client, being part of a express.js or other web application server middleware.
Getting started
It is assumed that you have running a node environment with TypeScript. For the demo we create a single TypeScript file importing neceessray components and defining some functions. All the patterns necessary for a real world application are left out. In order to start, some modules need to be installed via NPM in your project.
$ npm install --save mysql @reactivex/rxjs @types/mysql
This installs the "mysql" module, reactive extensions for JS, and TypeScript typings for "mysql"-module. Next create a new TypeScript file in the "src"-tree, let's say "mysql-rx-demo.ts". Let's start with imports and definitions:
// import necessary libraries and definitions
import mysql = require('mysql'); // MySQL module
import { Observable } from '@reactivex/rxjs'; // Observable from reactive extensions for JS
import { IConnection, IError, IFieldInfo } from '@types/mysql'; // Necessary mysql module type definitions
We import the "mysql" module, the definition for the Observable class from reactive extensions for JS and some types we will use later from typings module for "mysql". Next we connect to a demo database using "mysql".
// create connection to some SQL host
let connection = mysql.createConnection({
host: '192.168.1.1',
user: 'demo',
password: 'secret',
database: 'demo_db'
});
The database we connect to has a table named "users" we want to query. It contains user details such as username, first name, last name etc.
mysql> describe users;
+-------------------+------------------+------+-----+---------------------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------------+------------------+------+-----+---------------------+----------------+
| userId | int(10) unsigned | NO | PRI | NULL | auto_increment |
| name | varchar(40) | YES | | | |
| firstName | varchar(40) | YES | | | |
| lastName | varchar(40) | YES | | | |
| email | varchar(80) | YES | | | |
| locale | varchar(8) | YES | | de | |
+-------------------+------------------+------+-----+---------------------+----------------+
In scope of this demo we want to access user information from this table. To query the database and emit selection items we create an observable.
function mysqlQueryObservable (con:IConnection, queryString:string, values?:Array<any>) : Observable<{results?:Array<any>, fields?: IFieldInfo[]}> {
return Observable.create(observer => {
con.query (queryString, values, (err: IError, results?: any, fields?: IFieldInfo[]) => {
if (err) {
observer.error(err);
} else {
observer.next ({results: results, fields: fields});
}
observer.complete();
});
});
}
The function "mysqlQueryObservable" accepts the database connection created before, a query string an array of value as parameter. The mysql module provides a syntax allowing to provide values for queries in a simple way we are going to use here.
The function returns an Observable emitting items as objects with properties results (our query results) and field definitions found in MySQL for the results object. Results provided are an array of objects.
On subscription the Observable queries MySQL via the existing connection with provided query string and values. Values ar optional. Results found in the database will be handed over to a callback-function accepting up to tree parameters. First parameter is an error. If it is defined, an error happened and has type IError. Results and fields have been described already.
The callback function checks on whether "err" is defined. If so, it calls the error-function of the observer to report the issue.
When no error occured, the observers's next-function is called. The object handed over comprises query results and field definitions. In case of this observable the next-function of the observer will be called once with all query results. Finally the observable calls the complete-function of the observer. No more data follows.
A call to "mysqlQueryObservable" returns an observable being the start of a pipeline. Different pipelines may be created for different purposes. In this case we want to search for a user by username:
// build pipeline and subscribe.
mysqlQueryObservable (connection,'SELECT userId, name, firstName,lastName, email FROM `users` WHERE `name` = ?',['admin'])
.map (data => data.results[0]) // map result, so that only the first user is handed over.
.map (user => user.firstName+' '+user.lastName) // another map to aggregate a nice username
.subscribe (
(user) => {console.log ('(next) user: '+user)},
(error) => {console.log ('(error) error: '+error)},
() => {console.log ('(complete)');}
);
The SQL query string is simple. We select from "users" the elements "userId", "name", "firstName", "lastName" and "email" for user with name "admin". The username is provided as array element in values.
The observable provides one item with the query result and the field definitions. But we are interested only in query results. The first "map"-operator maps from complete data received to first entry (index 0) of the results array. So only the first entry of the results array continues to progress through the remaining pipeline.
With the second "map"-Operator we map from user's first name and last name to a new greeting string. Only the newly aggregated greeting string continues to progress through the remaining pipeline.
Finally we subscribe to the pipeline. This starts the query and emission of items down the pipeline. We do this with 3 functions. The first one is the "next" function. It is called my the second "map"-operator with the full name of the user as string once. The second ("error"-function) is called on error. Here we just log the error to console. The third function "complete" is called after the obersver finalized to emit items. We just log a notice.
When the user we search exists, we receive the console output:
(next) user: The Admin
(complete)
Emitting mutiple items
So far we have one query, one result and one user record logged to console. Next we select all entries from table "users", emit each row as single item, take two of them and log them. This requires to change the observable to emit one item per row instead of one item for the result. The function "mysqlQueryObservableSingleRow" depicts this.
function mysqlQueryObservableSingleRow (con:IConnection, queryString:string, values?:Array<any>) : Observable<{row?:any, fields?: IFieldInfo[]}> {
return Observable.create(observer => {
con.query (queryString, values, (err: IError, results?: any, fields?: IFieldInfo[]) => {
if (err) {
observer.error(err);
} else {
results.forEach(row => {observer.next ({row: row, fields: fields});});
}
observer.complete();
});
});
}
With "mysqlQueryObservableSingleRow" and a modified pipeline regarding query and some name changes:
mysqlQueryObservableSingleRow (connection,'SELECT userId, name, firstName,lastName, email FROM `users`')
.take(2)
.map (data => data.row) // map result, so that only the first user is handed over.
.map (user => user.firstName+' '+user.lastName) // another map to aggregate a nice username
.delay (1000)
.subscribe (
(user) => {console.log ('(next) user: '+user)},
(error) => {console.log ('(error) error: '+error)},
() => {console.log ('(complete)');}
);
We receive:
(next) user: The Manager
(next) user: The Admin
(complete)
Please note that we inserted the "take"-operator. It limits the number of items forwarded downwards to two. The "delay"-operator delays further processing of items by one second.
RxJS flexibility
Was it really necessary to create this new observable?
No, it was not. With the next code piece we show how the same result could have been achieved with the observable we created first:
mysqlQueryObservable (connection,'SELECT userId, name, firstName,lastName, email FROM `users`')
.map (data => data.results) // map result, so that only the first user is handed over.
.flatMap (row => Observable.from (row))
.take(2)
.map (user => user.firstName+' '+user.lastName) // another map to aggregate a nice username
.delay (2000)
.subscribe (
(user) => {console.log ('(next) user: '+user)},
(error) => {console.log ('(error) error: '+error)},
() => {console.log ('(complete) terminate connection'); connection.destroy();}
);
The query emits a single results item. With the first "map"-operator we keep the results element (containing all rows) in the pipeline. With the "flatMap"-operator we emit a single item for each row in results. With the "take"-operator we limit the items to be processed by the remaining pipeline to two. Then we wait 2 seconds. Finally we report - as before - to the consle. Console output is:
(next) user: The Manager
(next) user: The Admin
(complete) terminate connection
The only difference to the second case is that we terminate the MySQL connection on complete.
Summary
In this short demo we depicted how to use with MySQL with RxJS on Node.JS. The examples are simple and use one information source only. RxJS provides the ability to join information from different information sources in a single pipeline and subscription. This allows to join results form database querys with information retrieved from other services such as CRM, REST APIs, Elastic-Search, Wikipedia, ... And all in a well defined, well documented and reliable way.
Please find below the complete source code.
// import necessary libraries and definitions
import mysql = require('mysql'); // MySQL module
import { Observable } from '@reactivex/rxjs'; // Observable from reactive extensions for JS
import { IConnection, IError, IFieldInfo } from '@types/mysql'; // Necessary mysql module type definitions
// create connection to some SQL host
let connection = mysql.createConnection({
host: '192.168.1.1',
user: 'demo',
password: 'secret',
database: 'demo_db'
});
function mysqlQueryObservable (con:IConnection, queryString:string, values?:Array<any>) : Observable<{results?:Array<any>, fields?: IFieldInfo[]}> {
return Observable.create(observer => {
con.query (queryString, values, (err: IError, results?: any, fields?: IFieldInfo[]) => {
if (err) {
observer.error(err);
} else {
observer.next ({results: results, fields: fields});
}
observer.complete();
});
});
}
// build pipeline and subscribe.
mysqlQueryObservable (connection,'SELECT userId, name, firstName,lastName, email FROM `users` WHERE `name` = ?',['admin'])
.map (data => data.results[0]) // map result, so that only the first user is handed over.
.map (user => user.firstName+' '+user.lastName) // another map to aggregate a nice username
.subscribe (
(user) => {console.log ('(next) user: '+user)},
(error) => {console.log ('(error) error: '+error)},
() => {console.log ('(complete)');}
);
function mysqlQueryObservableSingleRow (con:IConnection, queryString:string, values?:Array<any>) : Observable<{row?:any, fields?: IFieldInfo[]}> {
return Observable.create(observer => {
con.query (queryString, values, (err: IError, results?: any, fields?: IFieldInfo[]) => {
if (err) {
observer.error(err);
} else {
results.forEach(row => {observer.next ({row: row, fields: fields});});
}
observer.complete();
});
});
}
mysqlQueryObservableSingleRow (connection,'SELECT userId, name, firstName,lastName, email FROM `users`')
.take(2)
.map (data => data.row) // map result, so that only the first user is handed over.
.map (user => user.firstName+' '+user.lastName) // another map to aggregate a nice username
.delay (1000)
.subscribe (
(user) => {console.log ('(next) user: '+user)},
(error) => {console.log ('(error) error: '+error)},
() => {console.log ('(complete)');}
);
mysqlQueryObservable (connection,'SELECT userId, name, firstName,lastName, email FROM `users`')
.map (data => data.results) // map result, so that only the first user is handed over.
.flatMap (row => Observable.from (row))
.take(2)
.map (user => user.firstName+' '+user.lastName) // another map to aggregate a nice username
.delay (2000)
.subscribe (
(user) => {console.log ('(next) user: '+user)},
(error) => {console.log ('(error) error: '+error)},
() => {console.log ('(complete) terminate connection'); connection.destroy();}
);