Highly performant JavaScript data stream ETL engine.
Bellboy streams input data row by row. Every row, in turn, goes through user-defined function where it can be transformed. When enough data is collected in batch, it is being loaded to destination.
Before install, make sure you are using latest version of Node.js.
npm install bellboy
If you will be using bellboy
with the native msnodesqlv8 driver, add it as a dependency.
npm install msnodesqlv8
This example shows how bellboy
can extract rows from the Excel file, modify it on the fly, load to the Postgres database, move processed file to the other folder and process remaining files.
Just in five simple steps.
const bellboy = require("bellboy");
const fs = require("fs");
const path = require("path");
(async () => {
const srcPath = `C:/source`;
// 1. create a processor which will process
// Excel files in the folder one by one
const processor = new bellboy.ExcelProcessor({
path: srcPath,
hasHeader: true,
});
// 2. create a destination which will add a new 'status'
// field to each row and load processed data into a Postgres database
const destination = new bellboy.PostgresDestination({
connection: {
user: "user",
password: "password",
host: "localhost",
database: "bellboy",
},
table: "stats",
recordGenerator: async function* (record) {
yield {
...record.raw.obj,
status: "done",
};
},
});
// 3. create a job which will glue the processor and the destination together
const job = new bellboy.Job(processor, [destination]);
// 4. tell bellboy to move the file away as soon as it was processed
job.on("endProcessingStream", async (file) => {
const filePath = path.join(srcPath, file);
const newFilePath = path.join(`./destination`, file);
await fs.renameSync(filePath, newFilePath);
});
// 5. Log all error events
job.onAny(async (eventName, ...args) => {
if (eventName.includes("Error")) {
console.log(args);
}
});
// 6. run your job
await job.run();
})();
A job in bellboy
is a relationship link between processor and destinations. When the job is run, data processing and loading mechanism will be started.
To initialize a Job instance, pass processor and some destination(s).
const job = new bellboy.Job(
processor_instance,
[destination_instance],
(job_options = {})
);
- reporters
Reporter[]
Array of reporters. - jobName
string
Optional user-defined name of the job. Can become handy if used in combination with extended events to distinguish events from different jobs.
- run
async function()
Starts processing data. - on
function(event, async function listener)
Add specific event listener. - onAny
function(async function listener)
Add any event listener. - stop
function(errorMessage?)
Stops job execution. IferrorMessage
is passed, job will throw an error with this message.
Event listeners, which can be registered with job.on
or job.onAny
methods, allow you to listen to specific events in the job lifecycle and to interact with them.
- When multiple listeners are registered for the same event, those added using
.on
will always be executed first, regardless of the order in which they were added compared to.onAny
. This ensures that specific event listeners have priority over generic ones. - When multiple listeners are registered for a single event, those added by reporters will be executed first, followed by the order of registration for the remaining listeners.
- Job always waits for the code inside a listener to complete.
- Any error thrown inside a listener will be ignored and warning message will be printed out.
job.stop()
method can be used inside a listener to stop job execution and throw an error if needed.
job.on(
"startProcessing",
async (processor: IProcessor, destinations: IDestination[]) => {
// Job has started execution.
}
);
job.on("startProcessingStream", async (...args: any) => {
// Stream processing has been started.
// Passed parameters may vary based on specific processor.
});
job.on("startProcessingRow", async (row: any) => {
// Row has been received and is about to be processed inside `recordGenerator` method.
});
job.on("rowGenerated", async (destinationIndex: number, generatedRow: any) => {
// Row has been generated using `recordGenerator` method.
});
job.on(
"rowGenerationError",
async (destinationIndex: number, row: any, error: any) => {
// Record generation (`recordGenerator` method) has thrown an error.
}
);
job.on('endProcessingRow', async ()) => {
// Row has been processed.
});
job.on("transformingBatch", async (destinationIndex: number, rows: any[]) => {
// Batch is about to be transformed inside `batchTransformer` method.
});
job.on(
"transformedBatch",
async (destinationIndex: number, transformedRows: any) => {
// Batch has been transformed using`batchTransformer` method.
}
);
job.on(
"transformingBatchError",
async (destinationIndex: number, rows: any[], error: any) => {
// Batch transformation (`batchTransformer` method) has thrown an error.
}
);
job.on("endTransformingBatch", async (destinationIndex: number) => {
// Batch has been transformed.
});
job.on("loadingBatch", async (destinationIndex: number, data: any[]) => {
// Batch is about to be loaded into destination.
});
job.on(
"loadedBatch",
async (destinationIndex: number, data: any[], result: any) => {
// Batch has been loaded into destination.
}
);
job.on(
"loadingBatchError",
async (destinationIndex: number, data: any[], error: any) => {
// Batch load has failed.
}
);
job.on("endLoadingBatch", async (destinationIndex: number) => {
// Batch load has finished .
});
job.on("endProcessingStream", async (...args: any) => {
// Stream processing has finished.
// Passed parameters may vary based on specific processor.
});
job.on("processingError", async (error: any) => {
// Unexpected error has occured.
});
job.on("endProcessing", async () => {
// Job has finished execution.
});
Special listener can be registered using job.onAny
method which will listen for any previously mentioned event.
job.onAny(async (eventName: string, ...args: any) => {
// An event has been fired.
});
Sometimes more information about event is needed, especially if you are building custom reporter to log or trace fired events.
This information can be obtained by registering an async function as a third parameter with job.on
method or as a second parameter with job.onAny
method.
For example,
job.on("rowGenerated", undefined, async (event: IBellboyEvent) => {
// Row has been generated using `recordGenerator` method.
console.log(
`${event.jobName} has generated row for #${event.eventArguments.destinationIndex} destination`
);
});
or
job.onAny(undefined, async (event: IBellboyEvent) => {
console.log(`${event.jobName} has fired ${event.jobEvent}`);
});
- eventName
string
Name of the event. - eventArguments
any
Arguments of the event. - jobName
string?
User-defined name of the job. - jobId
string
Unique ID of the job. - eventId
string
Unique ID of the event. - timestamp
number
High resolution timestamp of the event. - jobStopped
boolean
Whether the job is stopped or not.
Each processor in bellboy
is a class which has a single responsibility of processing data of specific type -
- MqttProcessor processes MQTT protocol messages.
- HttpProcessor processes data received from a HTTP call.
- ExcelProcessor processes XLSX file data from the file system.
- JsonProcessor processes JSON file data from the file system.
- DelimitedProcessor processes files with delimited data from the file system.
- PostgresProcessor processes data received from a PostgreSQL SELECT.
- MySqlProcessor processes data received from a MySQL SELECT.
- MssqlProcessor processes data received from a MSSQL SELECT.
- FirebirdProcessor processes data received from a Firebird SELECT.
- DynamicProcessor processes dynamically generated data.
- TailProcessor processes new lines added to the file.
- rowLimit
number
Number of records to be processed before stopping processor. If not specified or0
is passed, all records will be processed.
Listens for messages and processes them one by one. It also handles backpressure by queuing messages, so all messages can be eventually processed.
- Processor options
- url
string
required
- topics
string[]
required
Processes data received from a HTTP call. Can process json
, xml
as well as delimited
data. Can handle pagination by using nextRequest
function.
For delimited data produces rows described here.
- Processor options
- connection
object
required
Options from axios library. - dataFormat
delimited | json | xml
required
- rowSeparator
string
required for delimited
- delimiter
string
only for delimited
A symbol separating fields of the row. - hasHeader
boolean
only for delimited
Iftrue
, first row will be processed as a header. - qualifier
string
only for delimited
Symbol placed around a field to signify that it is the same field. - encoding
string
only for delimited
- jsonPath
RegExp | string
Path to the array to be streamed. This option is described in detail inside JsonProcessor section. - saxOptions
object
only for xml
Options for XML streaming as described in sax-stream library. - authorizationRequest
object
- connection
Options from axios library. - applyTo
Where extracted field should be applied. Whetherheader
orquery
. - sourceField
Name of the field from which value of authorization token will be extracted. - destinationField
Name of the field which will be applied toheader
orquery
usingapplyTo
option. - prefix
Custom prefix to apply to the token.
- connection
- nextRequest
async function(header)
Function which must returnconnection
for the next request ornull
if the next request is not needed.
const processor = new bellboy.HttpProcessor({
nextRequest: async function () {
if (currentPage < pageCount) {
return {
...connection,
url: `${url}¤t_page=${currentPage + 1}`,
};
}
return null;
},
// ...
});
Used for streaming text data from files in directory. There are currently four types of directory processors - ExcelProcessor
, JsonProcessor
, DelimitedProcessor
and TailProcessor
. Such processors search for the files in the source directory and process them one by one.
File name (file
) and full file path (filePath
) parameters will be passed to startProcessingStream
event.
- Processor options
- path
string
Path to the directory where files are located. Current directory by default. - filePattern
RegExp
Regex pattern for the files to be processed. If not specified, all files in the directory will be matched. - files
string[]
Array of file names. If not specified, all files in the directory will be matched againstfilePattern
regex and processed in alphabetical order.
Processes XLSX
files in the directory.
- Directory processor options
- hasHeader
boolean
|number
Whether the worksheet has a header or not,false
by default. 0-based row location can be passed to this option if header is not located on the first row. - fillMergedCells
boolean
Iftrue
, merged cells wil have the same value (by default, only the first cell of merged cells is filled with value).
Warning! Enabling this feature may increase streaming time because file must be processed to detect merged cells before actual stream.false
by default. - ignoreEmpty
boolean
Whether to ignore empty rows or not,true
by default. - sheets
(string | number)[] | async function(sheets)
Array of sheet names and/or sheet indexes or async function, which accepts array of all sheets and must return another array of sheet names that needs to be processed. If not specified, first sheet will be processed. - encoding
string
XLSX file encoding.
const processor = new bellboy.ExcelProcessor({
// process last sheet
sheets: async (sheets) => {
const sheet = sheets[sheets.length - 1];
return [sheet.name];
},
// ...
});
To see how processed row will look like, proceed to xlstream library documentation which is used for Excel processing.
Processes JSON
files in the directory.
- Directory processor options
- jsonPath
RegExp | string
Path to the array to be streamed. Internally when JSON is streamed, current path is joined together using.
as separator and then tested against provided regular expression. If not specified, a root array will be streamed. As an example, if you have this JSON object:
{ "animals": { "dogs": [ "pug", "bulldog", "poodle" ] } }
And want to streamdogs
array, path you will need to use is/animals.dogs.(\d+)/
if using RegExp asjsonPath
andanimals.dogs.(\\d+)
if a string is used.
(\d+)
is used here because each index of the array is a number.
Processes files with delimited data in the directory.
- Directory processor options
- rowSeparator
string
required
- delimiter
string
A symbol separating fields of the row. - hasHeader
boolean
Iftrue
, first row will be processed as a header. - qualifier
string
Symbol placed around a field to signify that it is the same field. - encoding
string
only for delimited
- header
string[]
IfhasHeader
istrue
, first row will appear here. - arr
string
Row split bydelimiter
andqualifier
. - obj
string
IfhasHeader
istrue
, object with header elements as keys will appear here. - row
string
Received raw row.
Watches for file changes and outputs last part of file as soon as new lines are added to the file.
- Directory processor options
- fromBeginning
boolean
In addition to emitting new lines, emits lines from the beginning of file,false
by default.
- file
string
Name of the file the data came from. - data
string
Processes a PostgreSQL SELECT
query row by row.
- Processor options
- query
string
required
Query to execute. - connection
object
required
- user
- password
- host
- port
- database
- schema
Processes a MySQL SELECT
query row by row.
- Processor options
- query
string
required
Query to execute. - connection
object
required
- user
- password
- host
- port
- database
Processes a Firebird SELECT
query row by row.
- Processor options
- query
string
required
Query to execute. - connection
object
required
- user
- password
- host
- database
Processes a MSSQL SELECT
query row by row.
- Processor options
- query
string
required
Query to execute. - connection
object
required
Here is an example of how to configure MssqlProcessor
with a native TDS driver instead of the default pure JavasScript Tedious driver.
const nativeDriver: ITdsDriver = await import("mssql/msnodesqlV8");
const connection: IMssqlDbConnection = {
user: "user",
password: "password",
server: "server",
database: "database",
driver: nativeDriver,
};
const source = new MssqlProcessor({
connection,
query: "select * from orders",
});
In previous versions of bellboy
, connection.driver
was a string
parameter.
Processor which generates records on the fly. Can be used to define custom data processors.
- Processor options
- generator
async generator function
required
Generator function which must yield records to process.
// processor which generates 10 records dynamically
const processor = new bellboy.DynamicProcessor({
generator: async function* () {
for (let i = 0; i < 10; i++) {
yield i;
}
},
});
Every job can have as many destinations (outputs) as needed. For example, one job can load processed data into a database, log this data to stdout and post it by HTTP simultaneously.
- StdoutDestination logs data to console.
- HttpDestination executes HTTP request calls.
- PostgresDestination inserts/upserts data to PostgreSQL database.
- MySqlDestination inserts/upserts data to MySQL database.
- MssqlDestination inserts data to MSSQL database.
- disableLoad
boolean
Iftrue
, no data will be loaded to the destination. In combination with reporters, this option can become handy during testing process. - batchSize
number
Number of records to be processed before loading them to the destination. If not specified or0
is passed, all records will be processed. - recordGenerator
async generator function(row)
Function which receives produced row by processor and can apply transformations to it. - batchTransformer
async function(rows)
Function which receives whole batch of rows. This function is being called after row count reachesbatchSize
. Data is being loaded to destination immediately after this function has been executed.
Logs out all data to stdout (console).
- General destination options
- asTable
boolean
If set totrue
, data will be printed as table.
Puts processed data one by one in body
and executes specified HTTP request.
- General destination options
- request
required
Options from axios library. - authorizationRequest
object
- connection
Options from axios library. - applyTo
Where extracted field should be applied. Whetherheader
orquery
. - sourceField
Name of the field from which value of authorization token will be extracted. - destinationField
Name of the field which will be applied toheader
orquery
usingapplyTo
option. - prefix
Custom prefix to apply to the token.
- connection
Inserts data to PostgreSQL.
- General destination options
- table
string
required
Table name. - upsertConstraints
string[]
If specified,UPSERT
command will be executed based on provided constraints. - connection
object
required
- user
- password
- host
- database
- schema
Inserts data to MySQL.
- General destination options
- table
string
required
Table name. - connection
object
required
- user
- password
- host
- database
- useSourceColumns
boolean
Iftrue
, only the columns in the source data will be used for data load. Default isfalse
, using all destination table columns. - postLoadQuery
string
A query which will be executed after the data load and before connection is closed. Result will be available in theresult
object of theloadedBatch
event.
Inserts data to MSSQL.
- General destination options
- table
string
required
Table name. - connection
object
required
Here is an example of how to configure MssqlDestination
with a native TDS driver instead of the default pure JavasScript Tedious driver.
const nativeDriver: ITdsDriver = await import("mssql/msnodesqlV8");
const connection: IMssqlDbConnection = {
user: "user",
password: "password",
server: "server",
database: "database",
driver: nativeDriver,
};
const sink = new MssqlDestination({
connection,
table: "orders",
batchSize: 1000,
});
New processors and destinations can be made by extending existing ones. Feel free to make a pull request if you create something interesting.
To create a new processor, you must extend Processor
class and implement async process
function. This function accepts one parameter:
- processStream
async function(readStream, ...args)
required
Callback function which accepts Readable stream. After calling this function,job
instance will handle passed stream internally. Passed parameters (args
) will be emitted withstartProcessingStream
event during job execution.
class CustomProcessor extends bellboy.Processor {
async process(processStream) {
// await processStream(readStream, 'hello', 'world');
}
}
To create a new destination, you must extend Destination
class and implement async loadBatch
function. This function accepts one parameter:
- data
any[]
required
Array of some processed data that needs to be loaded.
class CustomDestination extends bellboy.Destination {
async loadBatch(data) {
console.log(data);
}
}
Reporter is a job wrapper which can operate with job instance (for example, listen to events using job on
method). To create a new reporter, you must extend Reporter
class and implement report
function, which will be executed during job instance initialization.
Reporter event listeners (on
, onAny
) are added before any other user-defined listeners.
This function accepts one parameter:
- job
Job
required
Job instance
class CustomReporter extends bellboy.Reporter {
report(job) {
job.on("startProcessing", undefined, async ({ jobName }) => {
console.log(`Job ${jobName} has been started.`);
});
}
}
Tests can be run by using docker compose up --abort-on-container-exit --exit-code-from test --build test
command.