HYPERFOCUSData

Stateless Distributed ETL Pipelines: Using Message Brokers for Horizontal Scaling

Introduction

Creating a stateless distributed ETL (Extract, Transform, Load) pipeline can be a daunting task, especially if you’re new to the concept. While there are many approaches to building such pipelines, one simple and effective method is to use message brokers. Tools like Kafka StreamsRabbitMQ, and even Redis Streams can simplify the process of constructing a stateless distributed pipeline.

The key to this setup is treating the message broker as a job or task server. In this article, I will illustrate this approach through an example scenario, explain how to achieve a stateless distributed pipeline, and provide advice on important features to consider when choosing a message broker.

Example Scenario

To demonstrate how message brokers can facilitate a distributed pipeline, let’s consider a practical example. This scenario is designed to illustrate the workflow when using a message broker as a distributed pipeline facilitator.

Imagine you’re tasked with extracting several Salesforce tables (objects) and storing the data in Snowflake for easy access by your Business Intelligence (BI) tools and Machine Learning (ML) pipelines. The pipeline needs to run multiple times a day, as tables like “ACCOUNT” update frequently.

General Flow

The general flow of the pipeline involves the following steps:

  1. Data Extraction: Use the Salesforce Bulk API to extract data, as you’re dealing with more rows than the REST API is recommended for. The data comes in as compressed CSV files.
  2. Data Storage: Send the CSV data to an Amazon S3 bucket.
  3. Data Loading: Perform a “MERGE INTO” operation for each target table in Snowflake to update or insert new records.

However, several factors could cause bottlenecks in this pipeline, such as variable extraction times due to differing table sizes, resource constraints with Snowflake, and high data volumes leading to slowdowns or “clogs.”

While vertical scalability (adding more resources to a single node) can address some issues, it has limits due to diminishing returns and increased costs. Therefore, incorporating horizontal scaling (adding more nodes) becomes essential. The challenge is to achieve this without adding complex clustering libraries or dealing with data duplication issues.

Leveraging Message Brokers for Horizontal Scaling

Message brokers provide an almost drop-in solution for smooth horizontal scaling without worrying about state management or data duplication. They are designed to handle the distribution of tasks and messages across multiple consumers efficiently.

By treating the message broker as a job server and keeping your task list and state in the form of messages in the queue, you can achieve a stateless distributed pipeline. Let’s explore how this works in the context of our example scenario.

Pipeline Setup

To address the challenges outlined earlier, we can set up the pipeline using three queues:

  1. Data Extraction Queuedata_extraction_queue
  2. S3 Upload Queuestore_in_s3_queue
  3. Snowflake Merge Queuemerge_into_snowflake_queue

Pipeline Components

The pipeline consists of the following components:

1. Job Loader

  • Function: Initializes the pipeline by sending messages to data_extraction_queue.
  • Process:
    • Creates a message for each Salesforce object/table that needs to be extracted.
    • Each message contains information about the specific table/object, such as the object name and any parameters required for extraction.
    • Sends these messages to data_extraction_queue, effectively queuing up the extraction tasks.

2. Data Extraction Module

  • Function: Processes extraction tasks from data_extraction_queue.
  • Process:
    • Subscription: Listens to data_extraction_queue.
    • Task Execution:
      • Retrieves a message containing details about a specific table/object to extract.
      • Uses the Salesforce Bulk API to extract data for that table/object.
      • Stores the extracted data as a gzipped CSV file.
    • Completion Notification:
      • Upon successful extraction, creates a new message containing information about the extracted data, such as the file location and metadata.
      • Sends this message to store_in_s3_queue, indicating that the extraction job for that table/object is complete and triggering the next step.

3. S3 Upload Module

  • Function: Handles uploading of extracted data to S3.
  • Process:
    • Subscription: Listens to store_in_s3_queue.
    • Task Execution:
      • Retrieves a message with details about the extracted data file to upload.
      • Uploads the gzipped CSV file to the specified location in the Amazon S3 bucket.
    • Completion Notification:
      • Upon successful upload, creates a new message containing information necessary for the Snowflake merge step, such as the S3 file path and any required credentials.
      • Sends this message to merge_into_snowflake_queue, indicating that the upload job for that table/object is complete and triggering the next step.

4. Snowflake Merge Module

  • Function: Performs the merge operation in Snowflake.
  • Process:
    • Subscription: Listens to merge_into_snowflake_queue.
    • Task Execution:
      • Retrieves a message with details about the data file in S3 to merge.
      • Executes a “MERGE INTO” SQL statement on Snowflake to update or insert records based on the new data.
    • Completion Notification:
      • Optionally, upon completion, can log success or send a confirmation message to another queue or monitoring system.
      • If an error occurs, can send a message to a dead-letter queue or retry mechanism.

Workflow Summary

  • Step 1: The Job Loader queues extraction tasks in data_extraction_queue for each table/object.
  • Step 2: The Data Extraction Module processes each task, extracts the data, and upon completion, sends a message to store_in_s3_queue to trigger the upload step for that specific table/object.
  • Step 3: The S3 Upload Module uploads the data to S3 and, upon completion, sends a message to merge_into_snowflake_queue to trigger the merge step.
  • Step 4: The Snowflake Merge Module performs the merge operation in Snowflake for the specific table/object.

A visual flowchart for the pipeline:

This design ensures that each step in the pipeline is decoupled and that the completion of one task explicitly triggers the next step for that particular table/object. It allows for fine-grained control and monitoring of each individual data processing task.

Important Features and Considerations

While message brokers offer significant advantages, it’s important to choose the right one and understand their features. Here are some key considerations:

Exactly-Once Delivery

One of the main features to look for is exactly-once delivery. This ensures that each message is processed only once, mitigating the risk of data duplication and reducing the complexity of your code.

  • Why It Matters: In an ETL pipeline, processing the same data multiple times can lead to inconsistencies and incorrect results.
  • How to Achieve It: Some message brokers provide exactly-once delivery guarantees, while others require idempotent consumers or additional mechanisms to prevent duplicate processing.

FIFO Ordering

Some use cases require FIFO (First-In, First-Out) ordering to maintain the sequence of message processing.

  • Challenges:
    • Implementing ordered processing in a distributed system is complex.
    • It may limit asynchronous execution and scalability.
  • Solutions:
    • Use message brokers that support FIFO queues.
    • Implement ordering logic in your application or at the database layer.

Language and Framework Compatibility

The choice of message broker may depend on the programming language you’re using.

  • Considerations:
    • Integration: Some languages have better support or client libraries for certain message brokers.
    • Tooling: Availability of tools for monitoring and managing the message broker can vary.

Scalability Limits

Keep in mind that the pipeline’s horizontal scalability is now partially dependent on the message broker framework.

  • Recommendations:
    • Evaluate the performance characteristics of the message broker.
    • Ensure it can handle your expected message volumes and throughput.
    • Consider the ease of scaling the message broker itself.

Abstraction Layer

To prepare for potential changes, abstract the queue interface in your pipeline.

  • Benefits:
    • Flexibility to switch message brokers with minimal code changes.
    • Easier maintenance and upgrades.
  • Implementation:
    • Create a wrapper or interface for queue operations (enqueue, dequeue, acknowledge).
    • Use dependency injection or configuration to select the message broker implementation.

Conclusion

In this article, we’ve explored how using message brokers can simplify the creation of a stateless distributed ETL pipeline. By treating the message broker as a job server and structuring the pipeline into independent, stateless modules, you can achieve horizontal scalability and robustness.

Key Takeaways:

  • Message Brokers Enable Scalability: They allow for smooth horizontal scaling without complex state management.
  • Decoupled Modules Enhance Flexibility: Each module operates independently and communicates progress through messages, triggering the next step upon completion.
  • Stateless Modules Simplify Scaling: Independent modules can be scaled up or down based on workload.
  • Careful Selection Is Crucial: Choose a message broker with features that suit your needs, such as exactly-once delivery and compatibility with your programming language.
  • Plan for the Future: Abstract interfaces and consider future expansion to ensure the longevity of your pipeline.

Message brokers have more utility than many realize, but they are not a silver bullet. Proper planning, research, and understanding of your specific use case are essential for building an effective and reliable data pipeline.

Introduction

Welcome back to the second installment of our series on building a Salesforce extraction pipeline from scratch, without relying on special libraries or SaaS services. In the first part, we covered how to authenticate with Salesforce using the SOAP API. Now that we can securely connect to Salesforce, it’s time to fetch some data!

In this article, we’ll focus on retrieving records from Salesforce using the REST API. We’ll delve into query generation, handling fields, and dealing with Salesforce’s pagination structure. A key aspect we’ll explore is how to manage field selections in SOQL queries, especially considering the limitations around unbounded queries and the use of FIELDS(ALL). By the end of this article, you’ll be able to extract data efficiently and prepare it for further processing or storage.

As before, all the code discussed here is available in the GitHub repository for this series. Each part of the series has its own branch, with each successive branch building upon the previous one.

You can find the repository here:

[GitHub Repository Link]

Understanding the Salesforce REST API

The Salesforce REST API provides a powerful, convenient, and simple Web services interface for interacting with Salesforce. It allows you to perform various operations such as querying data, updating records, and accessing metadata.

One of the key features we’ll utilize is the ability to execute SOQL (Salesforce Object Query Language) queries via the REST API. SOQL is similar to SQL but is designed specifically for Salesforce data structures.

Retrieving Records: The salesforce_rest_api.py Module

Let’s dive into the salesforce_rest_api.py module, which handles querying Salesforce data using the REST API.

Imports and Dependencies

				
					import requests
import os
from salesforce_authentication import get_session_id
from helper_functions import get_credentials, get_nested_values
				
			
  • requests: Used for making HTTP requests to the Salesforce REST API.
  • os: Used to access environment variables.
  • salesforce_authentication.get_session_id: Reuses the authentication function we created in Part 1 to obtain a session ID.
  • helper_functions.get_credentials: Retrieves stored Salesforce credentials.
  • helper_functions.get_nested_values: A utility function to extract values from nested data structures.

 

Main Function: get_records_list

				
					def get_records_list(object_name, fields=None, limit=None):
    """
    Retrieves a list of records based on the specified object name, fields, and limit.:param object_name: The name of the object to query.
    :param fields: A list of fields to include in the results. If None, all fields are retrieved.
    :param limit: The maximum number of records to retrieve. If None, no limit is applied.
    :return: A list of dictionaries representing the filtered records.
    """
    if not fields and not limit:
        fields = get_field_names(object_name)
    query = get_query(object_name, fields, limit)
    raw_results = get_rest_query_results(query)
    return [
        {k: v for k, v in record.items() if k != "attributes"}
        for record in raw_results
    ]
				
			

This is the primary function you’ll use to retrieve records:

Parameters:

  • object_name: The Salesforce object you want to query (e.g., Account, Contact).
  • fields: A list of specific fields to retrieve. If None, all fields are retrieved.
  • limit: The maximum number of records to return. If None, all records are retrieved.

Process:

  • Checks if both fields and limit are None. If so, it fetches all field names for the object using get_field_names.
  • Constructs a SOQL query using get_query.
  • Executes the query using get_rest_query_results.
  • Cleans up the results by removing the attributes metadata from each record.

 

Handling Fields in SOQL Queries

Before we delve into the helper functions, it’s important to understand how the fields parameter is handled and why.

The Challenge with FIELDS(ALL) and Unbounded Queries

In SOQL, you might be tempted to use SELECT FIELDS(ALL) FROM ObjectName to retrieve all fields, similar to SELECT * in SQL. However, Salesforce imposes limitations on the use of FIELDS(ALL):

  • Unbounded Queries: When you do not specify a LIMIT clause, the query is considered unbounded.
  • Restriction: Salesforce does not allow the use of FIELDS(ALL) in unbounded queries. This is to prevent performance issues that could arise from retrieving all fields for a large number of records.

Therefore, if you want to fetch all fields without specifying a limit, you cannot use FIELDS(ALL). Instead, you must explicitly list all field names in the SELECT clause.

Our Solution

To handle this, our get_records_list function checks if both fields and limit are None. If so, it proceeds to fetch all field names for the specified object using the get_field_names function. This list of field names is then used to construct the SOQL query.

By explicitly listing all field names, we comply with Salesforce’s requirements and avoid the limitations associated with FIELDS(ALL) in unbounded queries.

 

Helper Functions

Let’s explore the helper functions used within get_records_list.

Constructing the SOQL Query: get_query

 
 
				
					def get_rest_query_results(query, next_url=None):
    """
    Executes a REST API GET request to Salesforce with the given query.
    Handles pagination recursively by checking for 'nextRecordsUrl' in the response.:param query: The SOQL query string to execute.
    :param next_url: The nextRecordsUrl for pagination (used internally during recursion).
    :return: A list of all records retrieved from Salesforce.
    :raises Exception: If the REST query fails with a non-200 status code.
    """
    if next_url is None:
        url = f"{os.environ['SALESFORCE_URL']}/services/data/v60.0/{query}"
    else:
        url = f"{os.environ['SALESFORCE_URL']}{next_url}"
    response = requests.get(
        url,
        headers={
            "Authorization": f"Bearer {fetch_session_id()}",
            "Content-Type": "application/json"
        }
    )
    if response.status_code != 200:
        raise Exception(f"REST query failed with a status of {response.status_code}: {response.text}")
    data = response.json()
    records = data.get("records", [])
    # Recursively fetch more records if 'nextRecordsUrl' is present
    if "nextRecordsUrl" in data:
        next_records = get_rest_query_results(
            query, next_url=data['nextRecordsUrl'])
        records.extend(next_records)
    return records
				
			

Purpose:

  • Sends the SOQL query to Salesforce and retrieves the results.

Highlights:

  • Handles pagination by checking for the nextRecordsUrl in the response and recursively fetching additional records.
  • Uses the fetch_session_id function to obtain the session ID for authentication.
  • Parses the JSON response and extracts the records.

Fetching All Field Names: get_field_names

				
					def get_field_names(object_name):
    """
    Retrieves all field names for the specified Salesforce object.:param object_name: The name of the Salesforce object to describe.
    :return: A list of field names for the object.
    :raises Exception: If the field name query fails with a non-200 status code.
    """
    response = requests.get(
        f"{os.environ['SALESFORCE_URL']}/services/data/v60.0/sobjects/{object_name}/describe/",
        headers={
            "Authorization": f"Bearer {fetch_session_id()}"
        }
    )
    if response.status_code != 200:
        raise Exception(f"Field name query failed with a status of {response.status_code}: {response.text}")
    return get_nested_values("name", response.json()["fields"])
				
			

Purpose:

  • Retrieves all field names for a given object by calling the describe endpoint.

Why This Is Important:

  • As discussed, when performing an unbounded query (without a LIMIT), you cannot use FIELDS(ALL).
  • To retrieve all fields in such a case, you must explicitly list all field names in the SELECT clause.

Highlights:

  • Makes a GET request to the describe endpoint for the specified object.
  • Parses the response to extract field names using the get_nested_values utility function.

Fetching the Session ID: fetch_session_id

				
					def fetch_session_id():
    """
    Fetches the current Salesforce session ID using stored credentials.:return: The Salesforce session ID as a string.
    """
    credentials = get_credentials("salesforce")
    return get_session_id(
        credentials["salesforce_username"],
        credentials["salesforce_password"],
        credentials["salesforce_security_token"]
    )
				
			

Purpose:

  • Obtains a session ID using the stored credentials, reusing the authentication function from Part 1.

Highlights:

  • Retrieves credentials using get_credentials.
  • Calls get_session_id to authenticate and obtain a session ID.

Utility Functions: helper_functions.py

The helper_functions.py module provides utility functions used by the main module.

Loading Environment Variables

				
					import os
from dotenv import load_dotenv# Load variables from .env file
load_dotenv()
				
			

Purpose:

  • Loads environment variables from a .env file, which is useful for local development without exposing sensitive information.

Retrieving Credentials: get_credentials

				
					def get_credentials(integration):
    return {
        f"{integration}_password": os.environ[f"{integration.upper()}_PASSWORD"],
        f"{integration}_username": os.environ[f"{integration.upper()}_USERNAME"],
        f"{integration}_security_token": os.environ[f"{integration.upper()}_SECURITY_TOKEN"]
    }
				
			
 

Purpose:

  • Retrieves credentials from environment variables for a given integration (in this case, Salesforce).

Highlights:

  • Uses the integration name to construct the keys for environment variables.
  • Assumes that environment variables are named in the format INTEGRATION_USERNAME, INTEGRATION_PASSWORD, etc.

Extracting Nested Values: get_nested_values

				
					def get_nested_values(key, data):
    values = [item[key] for item in data]
    return values
				
			
  • Purpose: Extracts a list of values corresponding to a specified key from a list of dictionaries.
  • Usage: Used to extract field names from the JSON response in get_field_names.

In Action

Let’s see how to use the get_records_list function to retrieve data from Salesforce.

Example: Fetch All Accounts

				
					import salesforce_rest_api as sf_api
# Retrieve all accounts with all fields
accounts = sf_api.get_records_list("Account")
print(accounts)
				
			

Explanation:

  • Since neither fields nor limit is provided, the function fetches all field names for the Account object.
  • Constructs a SOQL query that explicitly lists all fields.
  • Retrieves all records for the Account object.

Example: Fetch Contacts with Specific Fields and Limit

				
					import salesforce_rest_api as sf_api

fields = ["FirstName", "LastName", "Email"]
contacts = sf_api.get_records_list("Contact", fields=fields, limit=100)
print(contacts)
				
			
 

Explanation:

  • Specifies a list of fields to retrieve.
  • Sets a limit of 100 records.
  • Constructs a SOQL query using FIELDS(ALL) is not necessary here since we have specified fields.

Example: Attempting an Unbounded Query with FIELDS(ALL)

				
					import salesforce_rest_api as sf_api
# This will raise an exception
accounts = sf_api.get_records_list("Account", fields=None, limit=None)
				
			

Explanation:

  • Since both fields and limit are None, the function will attempt to fetch all field names and include them explicitly in the query.
  • If get_field_names fails or is not called, using FIELDS(ALL) without a limit would raise an exception due to Salesforce’s limitations.

Handling Pagination

Salesforce may limit the number of records returned in a single response. If the result set is large, Salesforce provides a nextRecordsUrl in the response, which you can use to fetch the next batch of records.

Our get_rest_query_results function handles this by recursively checking for nextRecordsUrl and fetching additional records until all records are retrieved.

Conclusion

In this article, we’ve built upon our authentication foundation to retrieve data from Salesforce using the REST API. We’ve covered:

  • Constructing SOQL Queries Dynamically:
  • Handling the fields parameter carefully to comply with Salesforce’s limitations.
  • Understanding why unbounded queries cannot use FIELDS(ALL) and how to work around this by explicitly listing all fields.
  • Handling Fields Selection and Limitations:
  • Using the get_field_names function to retrieve all field names when needed.
  • Ensuring that our queries are efficient and compliant with Salesforce’s requirements.
  • Dealing with Salesforce’s Pagination Mechanism:
  • Implementing recursive calls to handle large datasets.
  • Ensuring that all records are retrieved without manual intervention.

With these tools, you can now extract data from Salesforce efficiently and prepare it for storage, analysis, or further processing.

What’s Next

In the next article, we’ll delve into the Salesforce BULK API. We’ll explore how to handle large volumes of data extraction efficiently. The BULK API presents its own challenges, particularly around its asynchronous query system, where you have to wait and check on the query status before fetching results.

Stay tuned as we continue to build our stateless, distributed extraction pipeline, enhancing it to handle larger datasets and more complex scenarios. If you have any questions or need further clarification, feel free to leave a comment below.

I am open to consulting engagements if you need help building this or any other data solution. Feel free to email me directly at jason@hyperfocusdata.io