Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created a new example for OAuth2 with Token Refresh #74

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"refresh_token": "<YOUR_REFRESH_TOKEN>",
"client_secret": "<YOUR_CLIENT_SECRET>",
"client_id": "<YOUR_CLIENT_ID>"
}
183 changes: 183 additions & 0 deletions examples/source_examples/oauth2_with_token_refresh/connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
"""
Hubspot OAuth2.0 Sample API Connector for Fivetran

This module implements a connector for syncing data from the Hubspot API.
NOTE: Hubspot connector is already present in Fivetran, and can be directly created
from the dashboard. Please do not use this as an alternative
It is an example of using OAuth 2.0 client credentials flow, and
the refresh of Access token from the provided refresh token.

Date: 2025-01-29
fivetran-pranavtotala marked this conversation as resolved.
Show resolved Hide resolved
"""
import json
import requests
import time
import urllib
from fivetran_connector_sdk import Connector, Logging as log, Operations as op
fivetran-pranavtotala marked this conversation as resolved.
Show resolved Hide resolved

BASE_URL = "https://api.hubapi.com/"
AUTH_URL = BASE_URL + "oauth/v1/token?"
CONTACTS_URL = BASE_URL + "contacts/v1/lists/all/contacts/all"
COMPANY_URL = BASE_URL + "companies/v2/companies/paged"
ACCESS_TOKEN = ""
REFRESH_TIME = 0

def get_access_token(configuration: dict):
global AUTH_URL
global ACCESS_TOKEN
global REFRESH_TIME

param_dict = {
"grant_type": "refresh_token",
"client_id": configuration.get("client_id"),
"refresh_token": configuration.get("refresh_token"),
"client_secret": configuration.get("client_secret")
}

uri = AUTH_URL + urllib.parse.urlencode(param_dict)
headers = {'content-type': 'application/x-www-form-urlencoded'}

response = requests.request("POST", uri, headers=headers)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let use response = requests.post(AUTH_URL, headers=headers, data=param_dict) or similar, to make code more readable

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

passing it in params is not working as it is not encoding the url as required. changing it to request.post


if response.status_code == 200:
fivetran-pranavtotala marked this conversation as resolved.
Show resolved Hide resolved
log.info("Access token obtained successfully")
data = response.json()
# This sets the ACCESS TOKEN and the updated REFRESH TIME
# REFRESH TIME is the epoch time in seconds when the ACCESS TOKEN will expire
fivetran-pranavtotala marked this conversation as resolved.
Show resolved Hide resolved
ACCESS_TOKEN = data["access_token"]
REFRESH_TIME = int(data["expires_in"]) + time.time()
return
else:
log.severe(f"Failed to obtain access token: {response.text}")
log.info(uri)
raise Exception("Failed to obtain access token")


def sync_contacts(configuration, cursor, curr_time, state):
# this is a custom function, meant to process the contacts
# this processed data is then sent to fivetran
def process_record(raw_contact):
contact = {}
contact["vid"] = raw_contact["vid"]
contact["firstname"] = raw_contact["properties"]["firstname"]["value"]
contact["company"] = raw_contact["properties"].get("company", {"value" : ""})["value"]
contact["lastmodifieddate"] = raw_contact["properties"]["lastmodifieddate"]["value"]
contact["email"] = raw_contact["identity-profiles"][0]["identities"][0]["value"]
fivetran-pranavtotala marked this conversation as resolved.
Show resolved Hide resolved
return contact

has_more = True
params = {"count":100}
fivetran-pranavtotala marked this conversation as resolved.
Show resolved Hide resolved
while has_more:
data = get_data("contacts", params, {}, configuration)
# Checking if more data is available, setting the required offset for the next request
if data["has-more"]:
params["vidOffset"] = data["vid-offset"]
else:
has_more = False
# sending contact details to fivetran connector
fivetran-pranavtotala marked this conversation as resolved.
Show resolved Hide resolved
for contact in data["contacts"]:
if contact["properties"].get("firstname") and contact["identity-profiles"][0].get("identities"):
yield op.upsert("contacts", process_record(contact))


def sync_companies(configuration, cursor, curr_time, state):
# this is a custom function, meant to process the company record
def process_record(raw_company):
company = {}
company["companyId"] = raw_company["companyId"]
company["name"] = raw_company["properties"]["name"]["value"]
company["timestamp"] = raw_company["properties"]["name"]["timestamp"]
return company

has_more = True
params = {"properties":"name", "limit":250}
fivetran-pranavtotala marked this conversation as resolved.
Show resolved Hide resolved
while has_more:
data = get_data("companies", params, {}, configuration)
# Checking if more data is available, setting the required offset for the next request
if data["has-more"]:
params["offset"] = data["offset"]
else:
has_more = False
# sending company details to fivetran connector
for company in data["companies"]:
yield op.upsert("companies", process_record(company))

def update(configuration: dict, state: dict):
curr_time = time.time()

# Retrieve the cursor from the state to determine the current position in the data sync.
# If the cursor is not present in the state, start from the beginning of time ('0001-01-01T00:00:00Z').
cursor = state['last_updated_at'] if 'last_updated_at' in state else '0001-01-01T00:00:00Z'
log.info(f"Starting update process. Initial state: {cursor}")

# Yeilds all the required data from individual methods, and pushes them into the connector upsert function
yield from sync_contacts(configuration, cursor, curr_time, state)
yield from sync_companies(configuration, cursor, curr_time, state)

# Save the final checkpoint by updating the state with the current time
state['last_updated_at'] = curr_time
yield op.checkpoint(state)

log.info(f"Completed the update process. Total duration of sync(in s): " + str(time.time()-curr_time))


def get_data(method, params, headers, configuration, body=None):
global ACCESS_TOKEN
global REFRESH_TIME
global CONTACTS_URL
global COMPANY_URL
# This checks the refresh time set while fetching the last access token
# if REFRESH TIME is less than the current time, it means the ACCESS TOKEN is expired and need a refresh
if REFRESH_TIME<time.time():
get_access_token(configuration)

headers["authorization"] = "Bearer " + ACCESS_TOKEN
if method=="contacts":
response = requests.get(CONTACTS_URL, params=params, headers=headers)
elif method=="companies":
response = requests.get(COMPANY_URL, params=params, headers=headers)
else:
log.severe(f"Failed to fetch data. Method: " + method)
raise Exception("Unknown method")

if 200 <= response.status_code < 300:
log.info("Fetched data for method: " + method)
data = response.json()
return data
else:
log.severe(f"Failed to obtain access token: {response.text}")
raise Exception("Failed to obtain access token")


def schema(configuration: dict):
fivetran-pranavtotala marked this conversation as resolved.
Show resolved Hide resolved
return [
{
"table": "contacts",
"primary_key": ["vid"],
"columns": {
"vid": "LONG",
"lastmodifieddate": "STRING",
"firstname": "STRING",
"company": "STRING",
"email": "STRING"
}
},
{
"table": "companies",
"primary_key": ["companyId"],
"columns": {
"companyId": "LONG",
"name": "STRING",
"timestamp": "LONG"
}
}
]

connector = Connector(update=update, schema=schema)

if __name__ == "__main__":
# Open the configuration.json file and load its contents into a dictionary.
with open("conf.json", 'r') as f:
configuration = json.load(f)
# Adding this code to your `connector.py` allows you to test your connector by running your file directly from your IDE.
connector.debug(configuration=configuration)
54 changes: 54 additions & 0 deletions examples/source_examples/oauth2_with_token_refresh/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# OAuth2 Refresh Token Hubspot Connector example

**Note:** There is an existing Fivetran connector for HubSpot that users can integrate directly on the dashboard [here](https://fivetran.com/docs/connectors/applications/hubspot#hubspot). This example is for reference purposes to integrate with a custom OAuth2 source that requires access token refresh.

## Prerequisites

1. **HubSpot Account:**
- If you don't have one, follow the steps [here](https://developers.hubspot.com/docs/guides/apps/public-apps/overview).

2. **Developer Account and HubSpot App:**
- Create a developer account and create a HubSpot app with scopes and redirect URL: [ref](https://developers.hubspot.com/docs/reference/api/app-management/oauth)

3. **Fetch the Code:**
- Use this curl command to get the code appended to your redirect URL:

```bash
curl --location '[https://app.hubspot.com/oauth/authorize?client_id=xxxxxx&scope=xxx&redirect_uri=xxxx](https://app.hubspot.com/oauth/authorize?client_id=xxxxxx&scope=xxx&redirect_uri=xxxx)'
fivetran-pranavtotala marked this conversation as resolved.
Show resolved Hide resolved
```

4. **Fetch the Refresh Token:**
- Use the code obtained in step 3 and the following curl command to fetch the refresh token:

```bash
curl --location --request POST '[https://api.hubapi.com/oauth/v1/token?grant_type=authorization_code&client_id=xxxxx&client_secret=xxxxxx&redirect_uri=xxxx&code=xxxx](https://api.hubapi.com/oauth/v1/token?grant_type=authorization_code&client_id=xxxxx&client_secret=xxxxxx&redirect_uri=xxxx&code=xxxx)' \
--header 'Content-Type: application/x-www-form-urlencoded'
```

5. **HubSpot API Collection:**
- Access the HubSpot API collection [here](https://developers.hubspot.com/docs/reference/api/crm/objects).

## Debug

1. **Replace Credentials:** Once you have the refresh token, client secret, and ID, replace them in the `configuration.json` file.
2. **Run the Main Function:** Run the main function to trigger the debug command and start syncing your code to your local machine.

## Deploy

1. **Fivetran API Key:**
- Get your base64 API key from the Fivetran dashboard: [https://fivetran.com/dashboard/user/api-config](https://fivetran.com/dashboard/user/api-config)

2. **Fivetran Destination:**
- Create a required destination from the Fivetran dashboard: [https://fivetran.com/dashboard/destinations](https://fivetran.com/dashboard/destinations)

3. **Deploy the Connector:**
- Use the following command in the folder containing the `connector.py` file to deploy:

```bash
python connector.py --api-key <FIVETRAN-API-KEY> --destination <DESTINATION-NAME> --connection <CONNECTION-NAME> --configuration configuration.json
```

4. **Monitor Sync Status:**
- Once deployed, follow the link in the terminal or search in the dashboard with the connection name to view the sync status and logs.

**Note:** This example only supports cases where the refresh token does not have a TTL, and only the access token is refreshed with the refresh token. If you occasionally need to update the refresh token, you can do it via the dashboard and in the connection setup. We will update this example with a similar approach once we support refreshing passed credentials via the connector code.