HYPERFOCUSData

Generalize to Optimize: Enhancing Data Pipelines for Future Growth

Introduction

When designing a data pipeline—or any project, for that matter—it’s best to build it as generalized or abstracted as possible. While an exception might be made if it’s intended only for a single, specific data source or target, things rarely remain static in the long run. A well-built pipeline will likely be used by multiple existing sources and almost certainly by new ones. Although this approach may involve a bit more time during the initial design or coding phase, it’s smart to invest that time upfront rather than deal with the technical debt of inadequate planning later on. In the following sections, I will discuss common methods and benefits of generalization, potential issues that might arise, and offer some concluding thoughts and advice on implementation.

Common Methods of Generalization

A common method of generalization is to follow the DRY (Don’t Repeat Yourself) principle by moving all common or repeated functions or code into separate modules or classes. For example, if you have multiple modules connecting to different data sources (and each formats incoming data into specific data types or schemas) it would be beneficial to consolidate all formatting logic into a single module. This way, if you need to adjust how your code handles capitalization or NULL values, you only have to change one function instead of updating multiple places. Admittedly, this approach requires some planning. The formatting for one source might interfere with another, but this issue can often be resolved through function overloading or using flag arguments. While it may initially add what feels like unnecessary planning time, it will save a significant amount of time when testing and adding or modifying data sources.

Here is an example of a simple `formatter` module (it is purposefully simple for illustration purposes), it can handle basic transformations for several sources, and has a flag to treat `example_source` slightly differently:


Building on the previous method, it’s advisable not to name functions or variables too specifically when generalizing. There’s often a tendency to assign a name to a function or variable that fits its initial use, but this can cause problems when you later abstract or generalize your codebase or pipeline. For instance, if you begin by extracting data from a single API—say, the Salesforce Bulk API—you might name the function that retrieves data something like `salesforce_bulk_api_results`. However, as you generalize this function to handle multiple APIs, such a specific name becomes misleading and may necessitate changes in several places within the codebase. The same applies to variable names within functions. If a generalized function still uses a specific name like `salesforce_api_results_dict`, it becomes harder to read and understand when used with other APIs.

Another highly beneficial method of generalization is the use of interfaces. In this context, interfaces involve moving all direct access to external systems into dedicated interface modules for each system. External systems include:

  • databases
  • shared APIs (those accessed by multiple modules)
  • message brokers

For example, consider a [distributed pipeline that uses a message broker to pass jobs sequentially through the pipeline stages](https://medium.com/@jkimmerling/stateless-distributed-etl-pipelines-using-message-brokers-for-horizontal-scaling-96035f341899). 

Without interface generalization, switching the message broker (e.g. from Kafka to Redis) requires modifications in multiple files throughout the pipeline. However, if you use an interface module to abstract access to the message broker, only the code within that module, which directly connects to the broker, needs to change. The internal access functions—the “interface”—remain the same, so other modules or parts of the pipeline remain unaffected. This makes the pipeline much more adaptable. Additionally, it helps avoid cloud vendor lock-in. If access to services like S3, Pub/Sub, or Cloud Composer is routed through their own interfaces, transitioning to a different cloud provider would require minimal re-coding, making such changes smoother and more cost-effective.

An example of a caching interface, currently using Redis:

This is the same interface, modified to use MongoDB instead:

To the rest of the codebase nothing has changed. The class methods used to access the “cache” stay the same, so no other re-coding needs to be done.

The final method is equally important: moving configuration values or variables outside of the code itself. Configuration can be ambiguous, but here it refers to anything that might need to be changed or adjusted, such as delay timers, database IPs, bucket names, and so on. While you’ll need to decide what fits under this umbrella—and items can be added or removed over time—the key point is to extract them from the code. Having to redeploy a pipeline every time you want to adjust a delay timer is not only frustrating but also a significant waste of time and can risk causing outages, depending on the pipeline’s uptime requirements. Here are three ways it can be achieved:

  • The simplest way to externalize configuration is to place all values in a flat file accessible by the running pipeline. This works but is clunky and requires direct access to the VM or container file system. 
  • A better approach is to use environment variables within the VM or container. While an improvement, this method isn’t ideal, especially if sensitive credentials are involved. 
  • In my experience, the optimal solution is a combination of methods. I prefer to store credentials in a secure service like AWS Secret Manager, place any configuration required to start the pipeline in environment variables, and keep the remaining configuration values in a database. This setup allows changes without redeploying the pipeline and makes it easy to implement an API for adjusting parameters like source tables to extract or the maximum number of workers for each extraction. 

Potential Issues with Generalization

The issues surrounding generalization typically arise when it’s taken to an extreme. The deeper you delve into generalization, the more likely these issues are to appear.

The first problem which can arise with generalization is diminishing returns. Significant rewards can be reaped by implementing basic interfaces and consolidating repeated code into modules or functions, often with minimal effort for substantial rewards. However, after completing these initial generalizations, it’s prudent to pause and assess the effort versus the reward. If restructuring two dissimilar functions to extract partially repeated code requires substantial effort and adds complexity, it might be better to forgo that change.

Another dilemma is reduced code readability. This often occurs when someone new to generalization starts applying it excessively. The temptation to generalize everything can lead to splitting functions into too many small parts, making them little more than wrappers around built-in functions, or creating function names so generic that they fail to convey their true purpose.

Below is an example of going too far with function splitting:

The creation of trim_string and `capitalize_string` provided no real benefit, and it would have been much better to use `.strip()` and `.capitalize()` in the `format_a_string` function.

Lastly, there’s the issue of justifying the time spent. While it might seem ideal to extract every possible benefit and make the code “perfect,” this consumes time and, consequently, money. If these efforts no longer contribute meaningfully to the bottom line, they become hard to justify. After making initial improvements to existing code or establishing basic abstractions in new code, it’s wise to evaluate the advantages of further generalization. If the cost outweighs the benefits, it’s best to move on.

Conclusion

In conclusion, the advantages of generalizing or abstracting your code when creating, designing, or maintaining a data pipeline or service are substantial. By investing a bit more time and effort upfront, you reap enormous benefits in the long run. The key to success is moderation. While generalization is highly advantageous, overdoing it can lead to excessive time spent on diminishing returns and may even introduce issues in future development. Overly abstracted code can become unreadable, confusing, or overly complex, posing challenges for new team members and ongoing maintenance.

I am open to consulting engagements if you need help building this or any other data solution. Feel free to email me directly at jason@hyperfocusdata.io

Introduction

Welcome back to the second installment of our series on building a Salesforce extraction pipeline from scratch, without relying on special libraries or SaaS services. In the first part, we covered how to authenticate with Salesforce using the SOAP API. Now that we can securely connect to Salesforce, it’s time to fetch some data!

In this article, we’ll focus on retrieving records from Salesforce using the REST API. We’ll delve into query generation, handling fields, and dealing with Salesforce’s pagination structure. A key aspect we’ll explore is how to manage field selections in SOQL queries, especially considering the limitations around unbounded queries and the use of FIELDS(ALL). By the end of this article, you’ll be able to extract data efficiently and prepare it for further processing or storage.

As before, all the code discussed here is available in the GitHub repository for this series. Each part of the series has its own branch, with each successive branch building upon the previous one.

You can find the repository here:

[GitHub Repository Link]

Understanding the Salesforce REST API

The Salesforce REST API provides a powerful, convenient, and simple Web services interface for interacting with Salesforce. It allows you to perform various operations such as querying data, updating records, and accessing metadata.

One of the key features we’ll utilize is the ability to execute SOQL (Salesforce Object Query Language) queries via the REST API. SOQL is similar to SQL but is designed specifically for Salesforce data structures.

Retrieving Records: The salesforce_rest_api.py Module

Let’s dive into the salesforce_rest_api.py module, which handles querying Salesforce data using the REST API.

Imports and Dependencies

				
					import requests
import os
from salesforce_authentication import get_session_id
from helper_functions import get_credentials, get_nested_values
				
			
  • requests: Used for making HTTP requests to the Salesforce REST API.
  • os: Used to access environment variables.
  • salesforce_authentication.get_session_id: Reuses the authentication function we created in Part 1 to obtain a session ID.
  • helper_functions.get_credentials: Retrieves stored Salesforce credentials.
  • helper_functions.get_nested_values: A utility function to extract values from nested data structures.

 

Main Function: get_records_list

				
					def get_records_list(object_name, fields=None, limit=None):
    """
    Retrieves a list of records based on the specified object name, fields, and limit.:param object_name: The name of the object to query.
    :param fields: A list of fields to include in the results. If None, all fields are retrieved.
    :param limit: The maximum number of records to retrieve. If None, no limit is applied.
    :return: A list of dictionaries representing the filtered records.
    """
    if not fields and not limit:
        fields = get_field_names(object_name)
    query = get_query(object_name, fields, limit)
    raw_results = get_rest_query_results(query)
    return [
        {k: v for k, v in record.items() if k != "attributes"}
        for record in raw_results
    ]
				
			

This is the primary function you’ll use to retrieve records:

Parameters:

  • object_name: The Salesforce object you want to query (e.g., Account, Contact).
  • fields: A list of specific fields to retrieve. If None, all fields are retrieved.
  • limit: The maximum number of records to return. If None, all records are retrieved.

Process:

  • Checks if both fields and limit are None. If so, it fetches all field names for the object using get_field_names.
  • Constructs a SOQL query using get_query.
  • Executes the query using get_rest_query_results.
  • Cleans up the results by removing the attributes metadata from each record.

 

Handling Fields in SOQL Queries

Before we delve into the helper functions, it’s important to understand how the fields parameter is handled and why.

The Challenge with FIELDS(ALL) and Unbounded Queries

In SOQL, you might be tempted to use SELECT FIELDS(ALL) FROM ObjectName to retrieve all fields, similar to SELECT * in SQL. However, Salesforce imposes limitations on the use of FIELDS(ALL):

  • Unbounded Queries: When you do not specify a LIMIT clause, the query is considered unbounded.
  • Restriction: Salesforce does not allow the use of FIELDS(ALL) in unbounded queries. This is to prevent performance issues that could arise from retrieving all fields for a large number of records.

Therefore, if you want to fetch all fields without specifying a limit, you cannot use FIELDS(ALL). Instead, you must explicitly list all field names in the SELECT clause.

Our Solution

To handle this, our get_records_list function checks if both fields and limit are None. If so, it proceeds to fetch all field names for the specified object using the get_field_names function. This list of field names is then used to construct the SOQL query.

By explicitly listing all field names, we comply with Salesforce’s requirements and avoid the limitations associated with FIELDS(ALL) in unbounded queries.

 

Helper Functions

Let’s explore the helper functions used within get_records_list.

Constructing the SOQL Query: get_query

 
 
				
					def get_rest_query_results(query, next_url=None):
    """
    Executes a REST API GET request to Salesforce with the given query.
    Handles pagination recursively by checking for 'nextRecordsUrl' in the response.:param query: The SOQL query string to execute.
    :param next_url: The nextRecordsUrl for pagination (used internally during recursion).
    :return: A list of all records retrieved from Salesforce.
    :raises Exception: If the REST query fails with a non-200 status code.
    """
    if next_url is None:
        url = f"{os.environ['SALESFORCE_URL']}/services/data/v60.0/{query}"
    else:
        url = f"{os.environ['SALESFORCE_URL']}{next_url}"
    response = requests.get(
        url,
        headers={
            "Authorization": f"Bearer {fetch_session_id()}",
            "Content-Type": "application/json"
        }
    )
    if response.status_code != 200:
        raise Exception(f"REST query failed with a status of {response.status_code}: {response.text}")
    data = response.json()
    records = data.get("records", [])
    # Recursively fetch more records if 'nextRecordsUrl' is present
    if "nextRecordsUrl" in data:
        next_records = get_rest_query_results(
            query, next_url=data['nextRecordsUrl'])
        records.extend(next_records)
    return records
				
			

Purpose:

  • Sends the SOQL query to Salesforce and retrieves the results.

Highlights:

  • Handles pagination by checking for the nextRecordsUrl in the response and recursively fetching additional records.
  • Uses the fetch_session_id function to obtain the session ID for authentication.
  • Parses the JSON response and extracts the records.

Fetching All Field Names: get_field_names

				
					def get_field_names(object_name):
    """
    Retrieves all field names for the specified Salesforce object.:param object_name: The name of the Salesforce object to describe.
    :return: A list of field names for the object.
    :raises Exception: If the field name query fails with a non-200 status code.
    """
    response = requests.get(
        f"{os.environ['SALESFORCE_URL']}/services/data/v60.0/sobjects/{object_name}/describe/",
        headers={
            "Authorization": f"Bearer {fetch_session_id()}"
        }
    )
    if response.status_code != 200:
        raise Exception(f"Field name query failed with a status of {response.status_code}: {response.text}")
    return get_nested_values("name", response.json()["fields"])
				
			

Purpose:

  • Retrieves all field names for a given object by calling the describe endpoint.

Why This Is Important:

  • As discussed, when performing an unbounded query (without a LIMIT), you cannot use FIELDS(ALL).
  • To retrieve all fields in such a case, you must explicitly list all field names in the SELECT clause.

Highlights:

  • Makes a GET request to the describe endpoint for the specified object.
  • Parses the response to extract field names using the get_nested_values utility function.

Fetching the Session ID: fetch_session_id

				
					def fetch_session_id():
    """
    Fetches the current Salesforce session ID using stored credentials.:return: The Salesforce session ID as a string.
    """
    credentials = get_credentials("salesforce")
    return get_session_id(
        credentials["salesforce_username"],
        credentials["salesforce_password"],
        credentials["salesforce_security_token"]
    )
				
			

Purpose:

  • Obtains a session ID using the stored credentials, reusing the authentication function from Part 1.

Highlights:

  • Retrieves credentials using get_credentials.
  • Calls get_session_id to authenticate and obtain a session ID.

Utility Functions: helper_functions.py

The helper_functions.py module provides utility functions used by the main module.

Loading Environment Variables

				
					import os
from dotenv import load_dotenv# Load variables from .env file
load_dotenv()
				
			

Purpose:

  • Loads environment variables from a .env file, which is useful for local development without exposing sensitive information.

Retrieving Credentials: get_credentials

				
					def get_credentials(integration):
    return {
        f"{integration}_password": os.environ[f"{integration.upper()}_PASSWORD"],
        f"{integration}_username": os.environ[f"{integration.upper()}_USERNAME"],
        f"{integration}_security_token": os.environ[f"{integration.upper()}_SECURITY_TOKEN"]
    }
				
			
 

Purpose:

  • Retrieves credentials from environment variables for a given integration (in this case, Salesforce).

Highlights:

  • Uses the integration name to construct the keys for environment variables.
  • Assumes that environment variables are named in the format INTEGRATION_USERNAME, INTEGRATION_PASSWORD, etc.

Extracting Nested Values: get_nested_values

				
					def get_nested_values(key, data):
    values = [item[key] for item in data]
    return values
				
			
  • Purpose: Extracts a list of values corresponding to a specified key from a list of dictionaries.
  • Usage: Used to extract field names from the JSON response in get_field_names.

In Action

Let’s see how to use the get_records_list function to retrieve data from Salesforce.

Example: Fetch All Accounts

				
					import salesforce_rest_api as sf_api
# Retrieve all accounts with all fields
accounts = sf_api.get_records_list("Account")
print(accounts)
				
			

Explanation:

  • Since neither fields nor limit is provided, the function fetches all field names for the Account object.
  • Constructs a SOQL query that explicitly lists all fields.
  • Retrieves all records for the Account object.

Example: Fetch Contacts with Specific Fields and Limit

				
					import salesforce_rest_api as sf_api

fields = ["FirstName", "LastName", "Email"]
contacts = sf_api.get_records_list("Contact", fields=fields, limit=100)
print(contacts)
				
			
 

Explanation:

  • Specifies a list of fields to retrieve.
  • Sets a limit of 100 records.
  • Constructs a SOQL query using FIELDS(ALL) is not necessary here since we have specified fields.

Example: Attempting an Unbounded Query with FIELDS(ALL)

				
					import salesforce_rest_api as sf_api
# This will raise an exception
accounts = sf_api.get_records_list("Account", fields=None, limit=None)
				
			

Explanation:

  • Since both fields and limit are None, the function will attempt to fetch all field names and include them explicitly in the query.
  • If get_field_names fails or is not called, using FIELDS(ALL) without a limit would raise an exception due to Salesforce’s limitations.

Handling Pagination

Salesforce may limit the number of records returned in a single response. If the result set is large, Salesforce provides a nextRecordsUrl in the response, which you can use to fetch the next batch of records.

Our get_rest_query_results function handles this by recursively checking for nextRecordsUrl and fetching additional records until all records are retrieved.

Conclusion

In this article, we’ve built upon our authentication foundation to retrieve data from Salesforce using the REST API. We’ve covered:

  • Constructing SOQL Queries Dynamically:
  • Handling the fields parameter carefully to comply with Salesforce’s limitations.
  • Understanding why unbounded queries cannot use FIELDS(ALL) and how to work around this by explicitly listing all fields.
  • Handling Fields Selection and Limitations:
  • Using the get_field_names function to retrieve all field names when needed.
  • Ensuring that our queries are efficient and compliant with Salesforce’s requirements.
  • Dealing with Salesforce’s Pagination Mechanism:
  • Implementing recursive calls to handle large datasets.
  • Ensuring that all records are retrieved without manual intervention.

With these tools, you can now extract data from Salesforce efficiently and prepare it for storage, analysis, or further processing.

What’s Next

In the next article, we’ll delve into the Salesforce BULK API. We’ll explore how to handle large volumes of data extraction efficiently. The BULK API presents its own challenges, particularly around its asynchronous query system, where you have to wait and check on the query status before fetching results.

Stay tuned as we continue to build our stateless, distributed extraction pipeline, enhancing it to handle larger datasets and more complex scenarios. If you have any questions or need further clarification, feel free to leave a comment below.

I am open to consulting engagements if you need help building this or any other data solution. Feel free to email me directly at jason@hyperfocusdata.io