Skip to main content

Automate ETL tasks with Workflows

· 10 min read

ETL workflow

A common use case for Retool Workflows is to automate ETL tasks. Retool apps can query data from a variety of sources but it's much more efficient to prepare data outside of the frontend application.

This tutorial explains how to build an ETL workflow that:

  • Extracts account data from Salesforce and transform it using JavaScript.
  • Upserts the transformed data into a PostgreSQL database.
  • Reads behavioral event data from a Google Sheet.
  • Aggregates event data at an account level.
  • Upserts aggregate event counts into a PostgreSQL database alongside account data.

The transformed data is then available to all Retool apps without the need for complex queries that run every time the app launches.

Considerations

This tutorial uses the following resources to demonstrate a real-world use case of Retool Workflows:

Much of what you'll learn can also apply to other resources and data.

You can generate your own test API and sample data to try out Retool Workflows without using any data sources of your own.

1. Create a new workflow

Sign in to Retool, select the Workflows tab in the navigation bar, then click Create new. Set the name to ETL workflow.

Create a new workflow

2. Configure Start block

The workflow initially contains two blocks: Start and Resource query. This workflow should run every day at 9:00 AM.

To configure a daily schedule:

  1. Click on the Start block to expand its settings.
  2. Set the Trigger to Schedule.
  3. Set the Schedule type to Interval.
  4. Update the schedule to run every day at 9:00 AM.

Configure a schedule

The connecting lines between blocks in a workflow represent the control flow. Workflow blocks connect together to perform specific actions in sequential order, beginning with the Start block. Once a block has completed its action, it triggers the next block in the chain, and so on.

3. Retrieve Salesforce account data

The first Resource query block in this workflow retrieves all Salesforce accounts with an account number, which excludes any test accounts from the resulting data set.

Select your Salesforce resource and update the query with the following SOQL statement:

getSfdcAccounts
select
Name,
AccountNumber,
Industry
from account
where
AccountNumber != null

Click to run the query and rename the block to getSfdcAccounts.

Salesforce SOQL query

4. Transform account records

Now that the workflow has a list of accounts to start from, it can pass this data to a block and transform it further. Click and drag ⦿ to create a connected Query block and set its name to formatAccounts. This also displays a label with the connected block's name. You can preview the data passed into the block by hovering the cursor over the label.

You transform the array of account records using JavaScript so it conforms to the table schema of the PostgreSQL database. The transformation also removes the attributes field that Salesforce returned but which is not required.

SOQL fieldPostgreSQL field
AccountNumberaccount_number
Namename
Industryindustry
attributesRemoved

Select Run JS Code as the resource and then insert the following JavaScript:

const data = getSfdcAccounts.data;

return data.map((account) => ({
name: account.Name,
account_number: account.AccountNumber,
industry: account.Industry,
}));

Transform data

5. Upsert account records into the database

The formatAccounts Query block transforms the account records and returns account records to upsert into the PostgreSQL database. Click and drag ⦿ to create a connected Query block and select the write-enabled postgresql resource. You then construct a query in GUI mode for this use case:

  1. Select the required table.
  2. Set Action type to Bulk upsert via a primary key.
  3. Set Primary key column to account_number.
  4. Set Array of records to update to {{formatAccounts.data}}.

Upsert data

An alternative method would be to use a Loop block and iterate through each account to upsert them individually. In this case, transforming the data and performing a bulk upsert is more efficient as it reduces the number of database operations required.

6. Retrieve behavioral event data from Google Sheets

In this use case, behavioral event data is being stored in Google Sheets. Click and drag ⦿ to create a connected Query block and select the Google Sheets resource, then rename the block to getEvents.

Query data from Google Sheets

Click ▸ to run the query.

Although the getEvents block is connected, it does not rely on the response from the upsertAccounts block. It is connected so that it only runs when the formatted account data is upserted to the database. The rest of the workflow requires the account data to be available first before continuing.

7. Aggregate event records and calculate metrics

With account and behavioral event data now available, the next step in the workflow is to process events and aggregate them by account and date. Similar to the earlier transformation of account data, the resulting data must conform to a database table schema.

  • id (primary key)
  • date
  • account_number
  • tasks_completed_total
  • tasks_completed_unique
  • tasks_created_total
  • tasks_created_unique
  • tasks_shared_total
  • tasks_shared_unique
  • task_list_views_total
  • task_list_views_unique

Click and drag ⦿ to create a connected Query block named aggregateLoop and select the Run JS Code resource. The following JavaScript includes comments to describe each step in the process.

aggregateLoop
// Get event data from the previous Google Sheets query response
const events = getEvents.data;

// Map to convert sentence-cased event names into Postgres field names
const eventTypeMap = {
"Task Created": "tasks_created",
"Task Completed": "tasks_completed",
"Task Shared": "tasks_shared",
"Task List Viewed": "task_list_views",
};

/* raw object structure:
{
"CD736025" (SFDC account number): {
"2022-07-01" (date of events) {
"tasks_created" (name of event): [
"75762915-55cc-4709-a49d-38c41ddcf446" (id of user who triggered event, one per event),
...
],
...
},
...
},
...
}
*/
const raw = {};

// Unix timestamp for 24 hours prior to the start of the last successful workflow run,
// used to exclude events which have already been aggregated.
const cutoff =
workflowContext.lastSuccessfulRun.startedAtEpochMs - 24 * 60 * 60 * 1000;

// Iterate through each event
for (const event of events) {
// Continue to the next iteration of the event loop if the event timestamp is before the cutoff time.
if (event.timestamp < cutoff) {
continue;
}

// Check if the raw object has a key for the event's account, and create it if not
if (raw[event.account_number] === undefined) {
raw[event.account_number] = {};
}

// Get the event's calendar date, using Moment.js (preloaded in Workflows by default)
const date = moment(event.timestamp).format("YYYY-MM-DD");

// Check if the raw[account_number] object has a key for the event's calendar date, and create it if not
if (raw[event.account_number][date] === undefined) {
raw[event.account_number][date] = {};
}

// Get the database field name for the event from the eventTypeMap
const eventType = eventTypeMap[event.event_type];

// Check if the raw[account_number][date] object has a key for the event's event type, and create it if not
if (raw[event.account_number][date][eventType] === undefined) {
raw[event.account_number][date][eventType] = [];
}

// Push the event's user_id into the raw[account_number][date][eventType] array
raw[event.account_number][date][eventType].push(event.user_id);
}

// Get an array of account keys from the raw object
const accounts = Object.keys(raw);

/* metrics array structure. Each element is a metricRecord object to be inserted into Postgres
[
{
account_number: "CD736025" (SFDC account number),
date: "2022-07-01" (date of events),
id: "CD736025|2022-07-01" (composite record key, concatenating account_number and date),
tasks_created_unique: 5 (count of unique account users triggering the tasks_created event on the specified date),
tasks_created_total: 10 (count of tasks_created events triggered by account users on the specified date),
tasks_completed_unique: 5,
tasks_completed_total: 10,
tasks_shared_unique: 5,
tasks_shared_total: 10,
task_list_views_unique: 5,
task_list_views_total: 10,
},
...
]
*/
const metrics = [];

// Iterate through each account
for (const account of accounts) {
// Get an array of date keys from the raw[account_number] object
const dates = Object.keys(raw[account]);

// Iterate through each date
for (const date of dates) {
// Create metricRecord object
const metricRecord = {
account_number: account,
date: date,
id: account + "|" + date,
};

// Get an array of event keys from the raw[account_number][date] object
const events = Object.keys(raw[account][date]);

// Iterate through each event
for (const event of events) {
// Get an array of account user IDs (one per event) who triggered the event on a given date
const idArray = raw[account][date][event];

// Calculate the number of events triggered by account users on the specified date
metricRecord[event + "_total"] = idArray.length;

// Calculate the number of unique account users triggering the event on the specified date
metricRecord[event + "_unique"] = new Set(idArray).size;
}

// Push metricRecord into the metrics array
metrics.push(metricRecord);
}
}

return metrics;

Expand code editor for more space

8. Check for any returned metric records

The {{ workflowContext }} property contains information about the current workflow. The aggregateLoop block uses workflowContext.lastSuccessfulRun.startedAtEpochMs to filter out events triggered more than 24 hours ago, since this workflow is scheduled to run daily. If no events were received during this time, no results are returned.

This workflow uses the Branch block to check if any metric records were returned. Branch blocks enable you to visually construct if...else statements that evaluate data and perform conditional actions. If the condition evaluates as a truthy value, it triggers the block to which it's connected. If not, it triggers another block.

Click and drag ⦿ to create a new Branch block and set the If condition to aggregateLoop.data.length > 0. If the number of items in aggregateLoop.data is greater than zero, this evaluates as true.

Perform conditional action with a Branch block

Click ▸ to run the code. As you test your workflow, the condition that evaluates as true highlights in green.

9. Upsert metric records

The final step is to upsert the aggregated metric records into the PostgreSQL database. Click and drag ⦿ to create a connected Query block named insertMetrics, then select the write-enabled postgresql resource.

  1. Select the required table.
  2. Set Action type to Bulk upsert via a primary key.
  3. Set Primary key column to id.
  4. Set Array of records to update to {{aggregateLoop.data}}.

Upsert metrics records

10. Test and enable the workflow

Now that the workflow is complete, manually run the workflow by clicking Run on the right of the Workflow editor toolbar.

Workflows are not triggered automatically by default. After verifying that the workflow runs as expected, toggle Enable. This activates the Start block's trigger so that it runs on its configured schedule automatically.

Wrap up

Using Retool Workflows, you have now fully automated a complex ETL task that interacts with multiple database tables, transforms data, and aggregates the results.

By applying the lessons learned here and following the same patterns, you can extend the workflow's functionality further, such as sending conditional notifications or using integrations like Slack.