This example demonstrates how to download bid event client audit log files.
""" The script demonstrates how to download bid-events files from CAL. """
#!/usr/bin/env python
import sys
import hashlib
import json
import pandas
import requests
import s3fs
import tempfile
from datetime import datetime
# Stores mapping DataFrame objects queried through the CAL API
mapping_cache = {}
def get_token(encoded_client_key):
""" Request access token from Keycloak using an encoded client key """
auth_url = 'https://identity.indexexchange.com/auth/realms/eventlog/protocol/openid-connect/token'
headers = {'Authorization': 'Basic ' + encoded_client_key}
data = {'grant_type': 'client_credentials'}
resp = requests.post(auth_url, headers=headers, data=data)
resp.raise_for_status()
resp_json = resp.json()
return resp_json['access_token']
def get_available_downloads(token):
""" Given an access token, retrieve list of available downloads as a JSON object """
request_url = 'https://app.indexexchange.com/api/cal/v1/downloads/bidevents'
token_header = {'Authorization': 'Bearer ' + token}
resp = requests.get(request_url, headers=token_header)
resp.raise_for_status()
resp_json = resp.json()
return resp_json['availableDownloads']
def get_mappings(token, mapping_url_suffix):
""" Given an access token, retrieve mappings for a given url_suffix as a DataFrame """
if mapping_url_suffix in mapping_cache:
return mapping_cache[mapping_url_suffix]
request_url = 'https://app.indexexchange.com/api/cal/v1/mappings/{}'.format(mapping_url_suffix)
token_header = {'Authorization': 'Bearer ' + token}
resp = requests.get(request_url, headers=token_header)
resp.raise_for_status()
resp_json = resp.json()
mapping_json = resp_json['data']
mapping_str = json.dumps(mapping_json)
mapping_df = pandas.read_json(mapping_str)
mapping_cache[mapping_url_suffix] = mapping_df
return mapping_df
def get_audit_log(token, request_url, expected_md5sum):
""" Given an access token and file metadata, download file from CAL as a DataFrame """
token_header = {'Authorization': 'Bearer ' + token}
resp = requests.get(request_url, headers=token_header, timeout=50, stream=True)
resp.raise_for_status()
# Download in chunks so we don't hold huge files in memory
with tempfile.NamedTemporaryFile() as tmp_file:
md5sum = hashlib.md5()
for chunk in resp.iter_content(chunk_size=32000):
if chunk:
tmp_file.write(chunk)
md5sum.update(chunk)
tmp_file.flush()
actual_md5sum = md5sum.hexdigest()
if actual_md5sum != expected_md5sum:
error_message = 'md5sum of downloaded file does not match expected value. Actual: {}, Expected: {}'
raise Exception(error_message.format(actual_md5sum, expected_md5sum))
log_df = pandas.read_csv(tmp_file.name, compression='gzip')
return log_df
def get_path_s3(hour, partnum, bucket_name):
""" Destination file is named after the hour """
bucket_key = '{}_{}.csv'.format(hour,partnum)
path = '{}/{}'.format(bucket_name, bucket_key)
return path
def upload_df_to_s3(df, fs, path):
""" Upload a DataFrame to the S3 bucket """
with fs.open(path, mode='w') as file:
df.to_csv(file, index=False)
def truncate_hour(timestamp_str):
""" Truncates an hour by eliminating minutes and seconds """
timestamp_obj = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
return timestamp_obj.strftime('%Y-%m-%d %H:00')
def timestamp_to_hour(df):
""" Reformats the timestamp column of a DataFrame by truncating the hour """
col_ts = 'timestamp'
df[col_ts] = df[col_ts].apply(truncate_hour)
hour_df = df.rename(columns={'timestamp': 'hour'})
return hour_df
def join_mapping(base_df, token, mapping_url_suffix, col_join):
"""
Joins a base DataFrame against a mapping DataFrame using one or more columns
base_df: Base DataFrame on which mappings are applied
token: Auth token for CAL API access
mapping_url_suffix: Suffix of the mapping API URL for the desired mapping (see get_mappings() for details)
col_join: Columns that represent the join conditions against the mapping DataFrame (can be String or String[])
"""
mapping_df = get_mappings(token, mapping_url_suffix)
merged_df = base_df.merge(mapping_df, how='left', on=col_join)
return merged_df
def get_hourly_df(encoded_client_key, request_url, expected_md5sum):
"""
Restructures a raw audit log to produce predefined hourly aggregations
encoded_client_key: Encoded key with enough information to acquire an auth token for CAL API access
download_json: JSON structure containing download file metadata
The resulting report can be customized:
* Extra dimensions can be modified to produce different aggregation granularities
* Additional information for certain dimensions can be added using join_mapping pipes
* Sorting can be modified to ensure that relevant aggregations are shown in priority order
Note:
* The 'hour' dimension is generated in case we want to see it in the final report
"""
token = get_token(encoded_client_key)
log_df = get_audit_log(token, request_url, expected_md5sum)
hourly_df = (log_df
.pipe(timestamp_to_hour)
.pipe(join_mapping, token=token, mapping_url_suffix='dsps', col_join='dsp_id')
.pipe(join_mapping, token=token, mapping_url_suffix='buyers', col_join=['dsp_id', 'trading_desk_id']))
return hourly_df
def main():
"""
Standard process for retrieving CAL files:
* Retrieve auth token
* Get list of available files
* Ensure available files have been processed and uploaded to the S3 bucket
"""
if len(sys.argv) != 5:
print("Usage: python {} <encoded_client_key> <aws_access_key> <aws_secret_key> <s3_bucket_name>".format(sys.argv[0]))
quit()
encoded_client_key = sys.argv[1]
aws_access_key_id = sys.argv[2]
aws_secret_access_key = sys.argv[3]
bucket_name = sys.argv[4]
token = get_token(encoded_client_key)
downloads_json = get_available_downloads(token)
print('Number of available hours to download: {}'.format(len(downloads_json)))
# Connect to S3 filesystem via access key and secret key
fs = s3fs.S3FileSystem(
key=aws_access_key_id,
secret=aws_secret_access_key,
)
for download_json in downloads_json:
hour = download_json['hour']
parts = download_json['parts']
for part in parts:
request_url = part['downloadURL']
expected_md5sum = part['md5sum']
file_id = request_url.split("/")[-1]
path = get_path_s3(hour, file_id, bucket_name)
if fs.exists(path):
print('Already uploaded path: {}'.format(path))
continue
hourly_df = get_hourly_df(encoded_client_key, request_url, expected_md5sum)
upload_df_to_s3(hourly_df, fs, path)
print('Successfully uploaded path: {}'.format(path))
if __name__ == "__main__":
main()