HYPERFOCUSData

๐—•๐˜‚๐—ถ๐—น๐—ฑ๐—ถ๐—ป๐—ด ๐—ฎ ๐—ฆ๐—ฎ๐—น๐—ฒ๐˜€๐—ณ๐—ผ๐—ฟ๐—ฐ๐—ฒ ๐—˜๐˜…๐˜๐—ฟ๐—ฎ๐—ฐ๐˜๐—ถ๐—ผ๐—ป ๐—ฃ๐—ถ๐—ฝ๐—ฒ๐—น๐—ถ๐—ป๐—ฒ ๐—ณ๐—ฟ๐—ผ๐—บ ๐—ฆ๐—ฐ๐—ฟ๐—ฎ๐˜๐—ฐ๐—ต: ๐—ฃ๐—ฎ๐—ฟ๐˜ ๐Ÿญ โ€“ ๐—”๐˜‚๐˜๐—ต๐—ฒ๐—ป๐˜๐—ถ๐—ฐ๐—ฎ๐˜๐—ถ๐—ผ๐—ป ๐˜„๐—ถ๐˜๐—ต ๐˜๐—ต๐—ฒ ๐—ฆ๐—ข๐—”๐—ฃ ๐—”๐—ฃ๐—œ

Introduction

Hello! Welcome to my new series of articles focused on building a Salesforce extraction pipeline from scratch, without relying on any special libraries or SaaS services. In this series, I will use Python to demonstrate the pipeline, but I aim to present it in a generalized way that should allow you to port it over to any other language.

In this first part of the series, we will dive into authentication using the SOAP API method, which will enable connections via both the REST API and the BULK API. By the end of the series, you will have a stateless, distributed extraction pipeline that can handle both Salesforce APIs and could easily be extended to other data sources as well. This will all be done from scratch, without using any Salesforce-specific libraries.

I chose to use SOAP because, although it’s harder to find good documentation on it, it is actually easier to get a demo up and running. You do not need to have an app already set up in Salesforce, nor do you need to use the CLI to create an OAuth token. If you’re planning to deploy this in production and security is a concern (which it probably should be), then setting up OAuth would be ideal. But for learning, testing, and demo purposes, SOAP authentication is sufficient.

I will maintain a GitHub repository for this series, and I will put each part in its own branch, with each successive branch expanding upon the previous part’s codebase.

You can find the repository here:

GitHub Repository Link

Authentication Implementation

In this section, I will walk through the salesforce_authentication.py code step by step.

Imports and Requirements

The first section of the code includes the necessary imports:

import requests 
import os
from xml.sax.saxutils import escape
import xml.etree.ElementTree as ET
  • requests: Used to make HTTP requests to the Salesforce API.
  • os: Used to access environment variables, such as the Salesforce URL.
  • xml.sax.saxutils.escape: Used to escape special characters in XML, ensuring that the XML request is well-formed.
  • xml.etree.ElementTree: Used to parse XML responses from Salesforce.

Understanding Salesforce Security Tokens

Before we proceed, it’s important to understand what the security token is and how to obtain it. Salesforce requires a security token in addition to your password when logging in from an untrusted network. The security token is a unique key tied to your user account.

To obtain your security token:

  1. Log in to your Salesforce account.
  2. Click on your avatar or name in the top-right corner and selectย Settings.
  3. In the left-hand sidebar, navigate toย My Personal Informationย >ย Reset My Security Token.
  4. Clickย Reset Security Token. Salesforce will send a new security token to your registered email address.

Make sure to keep your security token secure, as it provides access to your Salesforce data.

Main Authentication Function

The next section is the main public interface for the module, the get_session_id function:

def get_session_id(username, password, security_token):
    """
    Authenticates with Salesforce and retrieves a session ID.

    :param username: The Salesforce username.
    :param password: The Salesforce password.
    :param security_token: The Salesforce security token.
    :return: The Salesforce session ID.
    :rtype: str
    :raises Exception: If the authentication request fails or the session ID is not found.
    """
    response = requests.post(
        get_url(), 
        data=get_body(username, password, security_token), 
        headers=get_headers()
        )
    if response.status_code != 200:
        raise Exception(f"Authentication failed with status code {response.status_code}: {response.text}")
    session_id = extract_session_id(response.text)
    return session_id

This function encapsulates the entire login process: constructing the request body, sending it, and handling the response.

  • URL Construction:ย get_url()ย retrieves the Salesforce login URL for the SOAP API.
  • Body Construction:ย get_body()ย builds the XML-formatted request that Salesforce expects, including your username, password, and security token.
  • Header Setup:ย get_headers()ย defines the necessary headers to inform Salesforce that this is a SOAP request.
  • Session ID Extraction: The session ID is extracted from the SOAP response usingย extract_session_id().

Helper Functions

The rest of the file consists of helper functions:

def get_url():
    url = f"{os.environ['SALESFORCE_URL']}/services/Soap/u/60.0"
    return url

def get_body(username, password, security_token):
    """
    Constructs the XML body for the Salesforce SOAP login request.

    :param username: The Salesforce username.
    :param password: The Salesforce password.
    :param security_token: The Salesforce security token.
    :return: The XML-formatted body for the login request.
    """
    formatted_password = escape_chars(password)
    formatted_secutiry_token = escape_chars(security_token)
    return f"""<?xml version="1.0" encoding="utf-8" ?>
    <env:Envelope xmlns:xsd="http://www.w3.org/2001/XMLSchema"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:env="http://schemas.xmlsoap.org/soap/envelope/">
      <env:Body>
        <n1:login xmlns:n1="urn:partner.soap.sforce.com">
          <n1:username>{username}</n1:username>
          <n1:password>{formatted_password}{formatted_secutiry_token}</n1:password>
        </n1:login>
      </env:Body>
    </env:Envelope>"""

def escape_chars(string):
    """
    Escapes special XML characters in a string to ensure valid XML formatting.

    :param string: The string to escape.
    :return: The escaped string.
    """
    extra_entities = {"'": "&apos;", '"': "&quot;"}
    escaped_string = escape(string, extra_entities)
    return escaped_string

def get_headers():
    headers = {
        'SOAPAction': 'login',
        'Content-Type': 'text/xml',
        'charset': 'UTF-8'
    }
    return headers

def extract_session_id(xml_str):
    """
    Parses the Salesforce SOAP response XML to extract the session ID.

    :param xml_str: The XML response as a string.
    :return: The extracted session ID, or None if not found.
    :raises ET.ParseError: If the XML parsing fails.
    """
    namespaces = {
        'soapenv': 'http://schemas.xmlsoap.org/soap/envelope/',
        'ns': 'urn:partner.soap.sforce.com'
    }

    root = ET.fromstring(xml_str)
    session_id_elem = root.find(
        './/soapenv:Body/ns:loginResponse/ns:result/ns:sessionId', namespaces)

    if session_id_elem is not None:
        return session_id_elem.text
    else:
        print("sessionId not found in the response.")
        return None
  • get_url: Fetches the base Salesforce URL from an environment variable (SALESFORCE_URL) and appends the path needed to call the SOAP authentication API (/services/Soap/u/60.0). The version number (60.0) corresponds to the API version you are targeting.
  • get_body: Constructs the XML body required by the SOAP API using the providedย username,ย password, andย security_token. It sanitizes theย passwordย andย security_tokenย by escaping special XML characters to prevent formatting errors.
  • escape_chars: Escapes special XML characters in a string to ensure valid XML formatting. This is important because certain characters (likeย <, data-preserve-html-node=”true”ย >,ย &,ย ',ย ") can break the XML structure if not properly escaped.
  • get_headers: Returns the necessary headers for the authentication request. Specifically, it sets theย SOAPActionย toย loginย and specifies the content type asย text/xmlย withย UTF-8ย charset.
  • extract_session_id: Parses the SOAP response XML to extract the session ID. It uses the namespaces defined in the response to correctly locate theย sessionIdย element.In the SOAP response, elements are namespaced with prefixes likeย soapenvย andย ns. Theย namespacesย dictionary maps these prefixes to their corresponding URIs, allowingย ElementTreeย to correctly parse the XML and find theย sessionIdย element using an XPath expression.

In Action

Once you have created the salesforce_authentication.py file (or cloned it from the repository), you can test the authentication by running the following code:

import salesforce_authentication as SA

username = 'your_username_here'
password = 'your_password_here'
security_token = 'your_security_token_here'

session_id = SA.get_session_id(username, password, security_token)
print(session_id)

Remember to replace 'your_username_here''your_password_here', and 'your_security_token_here' with your actual Salesforce credentials.

Alternatively, you might want to store your credentials securely, such as in environment variables or a configuration file that is not checked into version control.

Example Using Environment Variables

Here’s how you could modify the code to use environment variables:

import os
import salesforce_authentication as SA

username = os.environ.get('SALESFORCE_USERNAME')
password = os.environ.get('SALESFORCE_PASSWORD')
security_token = os.environ.get('SALESFORCE_SECURITY_TOKEN')

session_id = SA.get_session_id(username, password, security_token)
print(session_id)

This approach helps keep your credentials out of your codebase.

Fetching Data Using the Session ID

Once you have the session ID, you can use it to make authenticated requests to Salesforce’s REST API. Below is a modified sneak peek at next week’s focusโ€”grabbing records from Salesforce using SOQL and the REST API.

def get_rest_query_results(query):
    """
    Executes a REST API GET request to Salesforce with the given query.

    :param query: The SOQL query string to execute.
    :return: The JSON response from the Salesforce API.
    :raises Exception: If the REST query fails with a non-200 status code.
    """
    response = requests.get(
        f"{os.environ['SALESFORCE_URL']}/services/data/v60.0/{query}",
        headers={
            "Authorization": f"Bearer {fetch_session_id()}"
        }
    )
    if response.status_code != 200:
        raise Exception(f"Rest query failed with a status of {response.status_code}: {response.text}")
    return response.json()

def fetch_session_id():
    """
    Fetches the current Salesforce session ID using stored credentials.

    :return: The Salesforce session ID as a string.
    """
    credentials = get_credentials("salesforce")
    return get_session_id(
        credentials["salesforce_username"],
        credentials["salesforce_password"],
        credentials["salesforce_security_token"]
    )

In this snippet:

  • get_rest_query_results: Executes a REST API GET request to Salesforce with the given SOQL query.
    • It constructs the request URL by appending the query to the base Salesforce URL.
    • It sets theย Authorizationย header using the session ID obtained fromย fetch_session_id().
    • It checks for a successful response and returns the JSON data.
  • fetch_session_id: Fetches the current Salesforce session ID using stored credentials.
    • get_credentials("salesforce"): A function that retrieves your Salesforce credentials from a secure location (e.g., environment variables, a configuration file, or a secrets manager).
    • It then callsย get_session_idย with the retrieved credentials.

Conclusion

Authenticating with Salesforce using the SOAP API is relatively straightforward once you understand the process, but there are gaps in the documentation that can make it a bit frustrating to figure out. I hope that what I’ve shown here is helpful to you, regardless of the programming language you’re using. My goal was to demonstrate the underlying mechanisms needed for authentication, rather than just showing “This is how you connect to Salesforce using Python.”

If I left anything unclear, or if you have any comments or questions, please leave them below.

What’s Next

In the next article, I plan to show how to fetch records from Salesforce using the REST API. This will include query generation and dealing with how to add fields to queriesโ€”since bounded and unbounded queries have different limitationsโ€”and handling Salesforce’s pagination structure.

In the article after that, I will delve into the Salesforce BULK API, which presents its own challengesโ€”mainly surrounding the asynchronous query system, where you have to wait and check on the query status before you can fetch results.

Introduction

Welcome back to the second installment of our series on building a Salesforce extraction pipeline from scratch, without relying on special libraries or SaaS services. In the first part, we covered how to authenticate with Salesforce using the SOAP API. Now that we can securely connect to Salesforce, itโ€™s time to fetch some data!

In this article, weโ€™ll focus on retrieving records from Salesforce using the REST API. Weโ€™ll delve into query generation, handling fields, and dealing with Salesforceโ€™s pagination structure. A key aspect weโ€™ll explore is how to manage field selections in SOQL queries, especially considering the limitations around unbounded queries and the use of FIELDS(ALL). By the end of this article, youโ€™ll be able to extract data efficiently and prepare it for further processing or storage.

As before, all the code discussed here is available in the GitHub repository for this series. Each part of the series has its own branch, with each successive branch building upon the previous one.

You can find the repository here:

[GitHub Repository Link]

Understanding the Salesforce REST API

The Salesforce REST API provides a powerful, convenient, and simple Web services interface for interacting with Salesforce. It allows you to perform various operations such as querying data, updating records, and accessing metadata.

One of the key features weโ€™ll utilize is the ability to execute SOQL (Salesforce Object Query Language) queries via the REST API. SOQL is similar to SQL but is designed specifically for Salesforce data structures.

Retrieving Records: The salesforce_rest_api.py Module

Letโ€™s dive into the salesforce_rest_api.py module, which handles querying Salesforce data using the REST API.

Imports and Dependencies

				
					import requests
import os
from salesforce_authentication import get_session_id
from helper_functions import get_credentials, get_nested_values
				
			
  • requests: Used for making HTTP requests to the Salesforce REST API.
  • os: Used to access environment variables.
  • salesforce_authentication.get_session_id: Reuses the authentication function we created in Part 1 to obtain a session ID.
  • helper_functions.get_credentials: Retrieves stored Salesforce credentials.
  • helper_functions.get_nested_values: A utility function to extract values from nested data structures.

ย 

Main Function: get_records_list

				
					def get_records_list(object_name, fields=None, limit=None):
    """
    Retrieves a list of records based on the specified object name, fields, and limit.:param object_name: The name of the object to query.
    :param fields: A list of fields to include in the results. If None, all fields are retrieved.
    :param limit: The maximum number of records to retrieve. If None, no limit is applied.
    :return: A list of dictionaries representing the filtered records.
    """
    if not fields and not limit:
        fields = get_field_names(object_name)
    query = get_query(object_name, fields, limit)
    raw_results = get_rest_query_results(query)
    return [
        {k: v for k, v in record.items() if k != "attributes"}
        for record in raw_results
    ]
				
			

This is the primary function youโ€™ll use to retrieve records:

Parameters:

  • object_name: The Salesforce object you want to query (e.g., Account, Contact).
  • fields: A list of specific fields to retrieve. If None, all fields are retrieved.
  • limit: The maximum number of records to return. If None, all records are retrieved.

Process:

  • Checks if both fields and limit are None. If so, it fetches all field names for the object using get_field_names.
  • Constructs a SOQL query using get_query.
  • Executes the query using get_rest_query_results.
  • Cleans up the results by removing the attributes metadata from each record.

ย 

Handling Fields in SOQL Queries

Before we delve into the helper functions, itโ€™s important to understand how the fields parameter is handled and why.

The Challenge with FIELDS(ALL) and Unbounded Queries

In SOQL, you might be tempted to use SELECT FIELDS(ALL) FROM ObjectName to retrieve all fields, similar to SELECT * in SQL. However, Salesforce imposes limitations on the use of FIELDS(ALL):

  • Unbounded Queries: When you do not specify a LIMIT clause, the query is considered unbounded.
  • Restriction: Salesforce does not allow the use of FIELDS(ALL) in unbounded queries. This is to prevent performance issues that could arise from retrieving all fields for a large number of records.

Therefore, if you want to fetch all fields without specifying a limit, you cannot use FIELDS(ALL). Instead, you must explicitly list all field names in the SELECT clause.

Our Solution

To handle this, our get_records_list function checks if both fields and limit are None. If so, it proceeds to fetch all field names for the specified object using the get_field_names function. This list of field names is then used to construct the SOQL query.

By explicitly listing all field names, we comply with Salesforceโ€™s requirements and avoid the limitations associated with FIELDS(ALL) in unbounded queries.

ย 

Helper Functions

Letโ€™s explore the helper functions used within get_records_list.

Constructing the SOQL Query: get_query

ย 
ย 
				
					def get_rest_query_results(query, next_url=None):
    """
    Executes a REST API GET request to Salesforce with the given query.
    Handles pagination recursively by checking for 'nextRecordsUrl' in the response.:param query: The SOQL query string to execute.
    :param next_url: The nextRecordsUrl for pagination (used internally during recursion).
    :return: A list of all records retrieved from Salesforce.
    :raises Exception: If the REST query fails with a non-200 status code.
    """
    if next_url is None:
        url = f"{os.environ['SALESFORCE_URL']}/services/data/v60.0/{query}"
    else:
        url = f"{os.environ['SALESFORCE_URL']}{next_url}"
    response = requests.get(
        url,
        headers={
            "Authorization": f"Bearer {fetch_session_id()}",
            "Content-Type": "application/json"
        }
    )
    if response.status_code != 200:
        raise Exception(f"REST query failed with a status of {response.status_code}: {response.text}")
    data = response.json()
    records = data.get("records", [])
    # Recursively fetch more records if 'nextRecordsUrl' is present
    if "nextRecordsUrl" in data:
        next_records = get_rest_query_results(
            query, next_url=data['nextRecordsUrl'])
        records.extend(next_records)
    return records
				
			

Purpose:

  • Sends the SOQL query to Salesforce and retrieves the results.

Highlights:

  • Handles pagination by checking for the nextRecordsUrl in the response and recursively fetching additional records.
  • Uses the fetch_session_id function to obtain the session ID for authentication.
  • Parses the JSON response and extracts the records.

Fetching All Field Names: get_field_names

				
					def get_field_names(object_name):
    """
    Retrieves all field names for the specified Salesforce object.:param object_name: The name of the Salesforce object to describe.
    :return: A list of field names for the object.
    :raises Exception: If the field name query fails with a non-200 status code.
    """
    response = requests.get(
        f"{os.environ['SALESFORCE_URL']}/services/data/v60.0/sobjects/{object_name}/describe/",
        headers={
            "Authorization": f"Bearer {fetch_session_id()}"
        }
    )
    if response.status_code != 200:
        raise Exception(f"Field name query failed with a status of {response.status_code}: {response.text}")
    return get_nested_values("name", response.json()["fields"])
				
			

Purpose:

  • Retrieves all field names for a given object by calling the describe endpoint.

Why This Is Important:

  • As discussed, when performing an unbounded query (without a LIMIT), you cannot use FIELDS(ALL).
  • To retrieve all fields in such a case, you must explicitly list all field names in the SELECT clause.

Highlights:

  • Makes a GET request to the describe endpoint for the specified object.
  • Parses the response to extract field names using the get_nested_values utility function.

Fetching the Session ID: fetch_session_id

				
					def fetch_session_id():
    """
    Fetches the current Salesforce session ID using stored credentials.:return: The Salesforce session ID as a string.
    """
    credentials = get_credentials("salesforce")
    return get_session_id(
        credentials["salesforce_username"],
        credentials["salesforce_password"],
        credentials["salesforce_security_token"]
    )
				
			

Purpose:

  • Obtains a session ID using the stored credentials, reusing the authentication function from Part 1.

Highlights:

  • Retrieves credentials using get_credentials.
  • Calls get_session_id to authenticate and obtain a session ID.

Utility Functions: helper_functions.py

The helper_functions.py module provides utility functions used by the main module.

Loading Environment Variables

				
					import os
from dotenv import load_dotenv# Load variables from .env file
load_dotenv()
				
			

Purpose:

  • Loads environment variables from a .env file, which is useful for local development without exposing sensitive information.

Retrieving Credentials: get_credentials

				
					def get_credentials(integration):
    return {
        f"{integration}_password": os.environ[f"{integration.upper()}_PASSWORD"],
        f"{integration}_username": os.environ[f"{integration.upper()}_USERNAME"],
        f"{integration}_security_token": os.environ[f"{integration.upper()}_SECURITY_TOKEN"]
    }
				
			
ย 

Purpose:

  • Retrieves credentials from environment variables for a given integration (in this case, Salesforce).

Highlights:

  • Uses the integration name to construct the keys for environment variables.
  • Assumes that environment variables are named in the format INTEGRATION_USERNAME, INTEGRATION_PASSWORD, etc.

Extracting Nested Values: get_nested_values

				
					def get_nested_values(key, data):
    values = [item[key] for item in data]
    return values
				
			
  • Purpose: Extracts a list of values corresponding to a specified key from a list of dictionaries.
  • Usage: Used to extract field names from the JSON response in get_field_names.

In Action

Letโ€™s see how to use the get_records_list function to retrieve data from Salesforce.

Example: Fetch All Accounts

				
					import salesforce_rest_api as sf_api
# Retrieve all accounts with all fields
accounts = sf_api.get_records_list("Account")
print(accounts)
				
			

Explanation:

  • Since neither fields nor limit is provided, the function fetches all field names for the Account object.
  • Constructs a SOQL query that explicitly lists all fields.
  • Retrieves all records for the Account object.

Example: Fetch Contacts with Specific Fields and Limit

				
					import salesforce_rest_api as sf_api

fields = ["FirstName", "LastName", "Email"]
contacts = sf_api.get_records_list("Contact", fields=fields, limit=100)
print(contacts)
				
			
ย 

Explanation:

  • Specifies a list of fields to retrieve.
  • Sets a limit of 100 records.
  • Constructs a SOQL query using FIELDS(ALL) is not necessary here since we have specified fields.

Example: Attempting an Unbounded Query with FIELDS(ALL)

				
					import salesforce_rest_api as sf_api
# This will raise an exception
accounts = sf_api.get_records_list("Account", fields=None, limit=None)
				
			

Explanation:

  • Since both fields and limit are None, the function will attempt to fetch all field names and include them explicitly in the query.
  • If get_field_names fails or is not called, using FIELDS(ALL) without a limit would raise an exception due to Salesforceโ€™s limitations.

Handling Pagination

Salesforce may limit the number of records returned in a single response. If the result set is large, Salesforce provides a nextRecordsUrl in the response, which you can use to fetch the next batch of records.

Our get_rest_query_results function handles this by recursively checking for nextRecordsUrl and fetching additional records until all records are retrieved.

Conclusion

In this article, weโ€™ve built upon our authentication foundation to retrieve data from Salesforce using the REST API. Weโ€™ve covered:

  • Constructing SOQL Queries Dynamically:
  • Handling the fields parameter carefully to comply with Salesforceโ€™s limitations.
  • Understanding why unbounded queries cannot use FIELDS(ALL) and how to work around this by explicitly listing all fields.
  • Handling Fields Selection and Limitations:
  • Using the get_field_names function to retrieve all field names when needed.
  • Ensuring that our queries are efficient and compliant with Salesforceโ€™s requirements.
  • Dealing with Salesforceโ€™s Pagination Mechanism:
  • Implementing recursive calls to handle large datasets.
  • Ensuring that all records are retrieved without manual intervention.

With these tools, you can now extract data from Salesforce efficiently and prepare it for storage, analysis, or further processing.

Whatโ€™s Next

In the next article, weโ€™ll delve into the Salesforce BULK API. Weโ€™ll explore how to handle large volumes of data extraction efficiently. The BULK API presents its own challenges, particularly around its asynchronous query system, where you have to wait and check on the query status before fetching results.

Stay tuned as we continue to build our stateless, distributed extraction pipeline, enhancing it to handle larger datasets and more complex scenarios. If you have any questions or need further clarification, feel free to leave a comment below.

I am open to consulting engagements if you need help building this or any other data solution. Feel free to email me directly at jason@hyperfocusdata.io