Download bid event log files and upload to AWS S3

This example demonstrates how to download bid event client audit log files.

""" The script demonstrates how to download bid-events files from CAL. """

#!/usr/bin/env python

import sys
import hashlib
import json
import pandas
import requests
import s3fs
import tempfile
from datetime import datetime

# Stores mapping DataFrame objects queried through the CAL API
mapping_cache = {}

def get_token(encoded_client_key):
    """ Request access token from Keycloak using an encoded client key """

    auth_url = 'https://identity.indexexchange.com/auth/realms/eventlog/protocol/openid-connect/token'
    headers = {'Authorization': 'Basic ' + encoded_client_key}
    data = {'grant_type': 'client_credentials'}
    resp = requests.post(auth_url, headers=headers, data=data)
    resp.raise_for_status()

    resp_json = resp.json()
    return resp_json['access_token']

def get_available_downloads(token):
    """ Given an access token, retrieve list of available downloads as a JSON object """

    request_url = 'https://app.indexexchange.com/api/cal/v1/downloads/bidevents'
    token_header = {'Authorization': 'Bearer ' + token}
    resp = requests.get(request_url, headers=token_header)
    resp.raise_for_status()

    resp_json = resp.json()
    return resp_json['availableDownloads']

def get_mappings(token, mapping_url_suffix):
    """ Given an access token, retrieve mappings for a given url_suffix as a DataFrame """

    if mapping_url_suffix in mapping_cache:
        return mapping_cache[mapping_url_suffix]

    request_url = 'https://app.indexexchange.com/api/cal/v1/mappings/{}'.format(mapping_url_suffix)
    token_header = {'Authorization': 'Bearer ' + token}
    resp = requests.get(request_url, headers=token_header)
    resp.raise_for_status()

    resp_json = resp.json()
    mapping_json = resp_json['data']
    mapping_str = json.dumps(mapping_json)
    mapping_df = pandas.read_json(mapping_str)
    mapping_cache[mapping_url_suffix] = mapping_df
    return mapping_df

def get_audit_log(token, request_url, expected_md5sum):
    """ Given an access token and file metadata, download file from CAL as a DataFrame """

    token_header = {'Authorization': 'Bearer ' + token}
    resp = requests.get(request_url, headers=token_header, timeout=50, stream=True)
    resp.raise_for_status()

    # Download in chunks so we don't hold huge files in memory
    with tempfile.NamedTemporaryFile() as tmp_file:
        md5sum = hashlib.md5()
        for chunk in resp.iter_content(chunk_size=32000):
            if chunk:
                tmp_file.write(chunk)
                md5sum.update(chunk)
        tmp_file.flush()

        actual_md5sum = md5sum.hexdigest()
        if actual_md5sum != expected_md5sum:
            error_message = 'md5sum of downloaded file does not match expected value. Actual: {}, Expected: {}'
            raise Exception(error_message.format(actual_md5sum, expected_md5sum))

        log_df = pandas.read_csv(tmp_file.name, compression='gzip')
        return log_df

def get_path_s3(hour, partnum, bucket_name):
    """ Destination file is named after the hour """

    bucket_key = '{}_{}.csv'.format(hour,partnum)
    path = '{}/{}'.format(bucket_name, bucket_key)
    return path

def upload_df_to_s3(df, fs, path):
    """ Upload a DataFrame to the S3 bucket """

    with fs.open(path, mode='w') as file:
        df.to_csv(file, index=False)

def truncate_hour(timestamp_str):
    """ Truncates an hour by eliminating minutes and seconds """
    timestamp_obj = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
    return timestamp_obj.strftime('%Y-%m-%d %H:00')

def timestamp_to_hour(df):
    """ Reformats the timestamp column of a DataFrame by truncating the hour """

    col_ts = 'timestamp'
    df[col_ts] = df[col_ts].apply(truncate_hour)
    hour_df = df.rename(columns={'timestamp': 'hour'})
    return hour_df

def join_mapping(base_df, token, mapping_url_suffix, col_join):
    """
    Joins a base DataFrame against a mapping DataFrame using one or more columns
        base_df:                Base DataFrame on which mappings are applied
        token:                  Auth token for CAL API access
        mapping_url_suffix:     Suffix of the mapping API URL for the desired mapping (see get_mappings() for details)
        col_join:               Columns that represent the join conditions against the mapping DataFrame (can be String or String[])
    """

    mapping_df = get_mappings(token, mapping_url_suffix)
    merged_df = base_df.merge(mapping_df, how='left', on=col_join)
    return merged_df

def get_hourly_df(encoded_client_key, request_url, expected_md5sum):
    """
    Restructures a raw audit log to produce predefined hourly aggregations
        encoded_client_key:     Encoded key with enough information to acquire an auth token for CAL API access
        download_json:          JSON structure containing download file metadata

    The resulting report can be customized:
        * Extra dimensions can be modified to produce different aggregation granularities
        * Additional information for certain dimensions can be added using join_mapping pipes
        * Sorting can be modified to ensure that relevant aggregations are shown in priority order

    Note:
        * The 'hour' dimension is generated in case we want to see it in the final report
    """

    token = get_token(encoded_client_key)
    log_df = get_audit_log(token, request_url, expected_md5sum)
    hourly_df = (log_df
                 .pipe(timestamp_to_hour)
                 .pipe(join_mapping, token=token, mapping_url_suffix='dsps', col_join='dsp_id')
                 .pipe(join_mapping, token=token, mapping_url_suffix='buyers', col_join=['dsp_id', 'trading_desk_id']))
    return hourly_df

def main():
    """
    Standard process for retrieving CAL files:
        * Retrieve auth token
        * Get list of available files
        * Ensure available files have been processed and uploaded to the S3 bucket
    """
    if len(sys.argv) != 5:
        print("Usage: python {} <encoded_client_key> <aws_access_key> <aws_secret_key> <s3_bucket_name>".format(sys.argv[0]))
        quit()

    encoded_client_key = sys.argv[1]
    aws_access_key_id = sys.argv[2]
    aws_secret_access_key = sys.argv[3]
    bucket_name = sys.argv[4]

    token = get_token(encoded_client_key)
    downloads_json = get_available_downloads(token)

    print('Number of available hours to download: {}'.format(len(downloads_json)))

    # Connect to S3 filesystem via access key and secret key
    fs = s3fs.S3FileSystem(
        key=aws_access_key_id,
        secret=aws_secret_access_key,
    )

    for download_json in downloads_json:
        hour = download_json['hour']
        parts = download_json['parts']
        for part in parts:
            request_url = part['downloadURL']
            expected_md5sum = part['md5sum']
            file_id = request_url.split("/")[-1]
            path = get_path_s3(hour, file_id, bucket_name)
            if fs.exists(path):
                print('Already uploaded path: {}'.format(path))
                continue
            hourly_df = get_hourly_df(encoded_client_key, request_url, expected_md5sum)

            upload_df_to_s3(hourly_df, fs, path)
            print('Successfully uploaded path: {}'.format(path))

if __name__ == "__main__":
    main()