Hourly aggregations to AWS via python

This example demonstrates how to aggregate client audit logs hourly and push the aggregations to your AWS instance.

''' The script demonstrates how to perform basic aggregations on files retrieved from CAL. '''
 
#!/usr/bin/env python
 
import sys
import hashlib
import json
import pandas
import requests
import s3fs
import tempfile
from collections import OrderedDict
from datetime import datetime
 
# Stores mapping DataFrame objects queried through the CAL API
mapping_cache = {}
 
def get_token(encoded_client_key):
    ''' Request access token from Keycloak using an encoded client key '''
 
    auth_url = 'https://identity.indexexchange.com/auth/realms/eventlog/protocol/openid-connect/token'
    headers = {'Authorization': 'Basic ' + encoded_client_key}
    data = {'grant_type': 'client_credentials'}
    resp = requests.post(auth_url, headers=headers, data=data)
    resp.raise_for_status()
 
    resp_json = resp.json()
    return resp_json['access_token']
 
def get_available_downloads(token):
    ''' Given an access token, retrieve list of available downloads as a JSON object '''
 
    request_url = 'https://app.indexexchange.com/api/cal/v1/downloads'
    token_header = {'Authorization': 'Bearer ' + token}
    resp = requests.get(request_url, headers=token_header)
    resp.raise_for_status()
 
    resp_json = resp.json()
    return resp_json['availableDownloads']
 
def get_mappings(token, mapping_url_suffix):
    ''' Given an access token, retrieve mappings for a given url_suffix as a DataFrame '''
 
    if mapping_url_suffix in mapping_cache:
        return mapping_cache[mapping_url_suffix]
 
    request_url = 'https://app.indexexchange.com/api/cal/v1/mappings/{}'.format(mapping_url_suffix)
    token_header = {'Authorization': 'Bearer ' + token}
    resp = requests.get(request_url, headers=token_header)
    resp.raise_for_status()
 
    resp_json = resp.json()
    mapping_json = resp_json['data']
    mapping_str = json.dumps(mapping_json)
    mapping_df = pandas.read_json(mapping_str)
    mapping_cache[mapping_url_suffix] = mapping_df
    return mapping_df
 
def get_audit_log(token, download_json):
    ''' Given an access token and file metadata, download file from CAL as a DataFrame '''
 
    request_url = download_json['downloadURL']
    token_header = {'Authorization': 'Bearer ' + token}
    resp = requests.get(request_url, headers=token_header, timeout=5, stream=True)
    resp.raise_for_status()
 
    # Download in chunks so we don't hold huge files in memory
    with tempfile.NamedTemporaryFile() as tmp_file:
        expected_md5sum = download_json['md5sum']
        md5sum = hashlib.md5()
        for chunk in resp.iter_content(chunk_size=32000):
            if chunk:
                tmp_file.write(chunk)
                md5sum.update(chunk)
        tmp_file.flush()
 
        actual_md5sum = md5sum.hexdigest()
        if actual_md5sum != expected_md5sum:
            error_message = 'md5sum of downloaded file does not match expected value. Actual: {}, Expected: {}'
            raise Exception(error_message.format(actual_md5sum, expected_md5sum))
 
        log_df = pandas.read_csv(tmp_file.name, compression='gzip')
        return log_df
 
def get_path_s3(download_json, bucket_name):
    ''' Destination file is named after the hour '''
 
    hour = download_json['hour']
    bucket_key = '{}.csv'.format(hour)
    path = '{}/{}'.format(bucket_name, bucket_key)
    return path
 
def upload_df_to_s3(df, fs, path):
    ''' Upload a DataFrame to the S3 bucket '''
 
    with fs.open(path, mode='wb') as file:
        df.to_csv(file, index=False)
 
def truncate_hour(timestamp_str):
    ''' Truncates an hour by eliminating minutes and seconds '''
 
    timestamp_obj = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
    return timestamp_obj.strftime('%Y-%m-%d %H:00')
 
def timestamp_to_hour(df):
    ''' Reformats the timestamp column of a DataFrame by truncating the hour '''
 
    col_ts = 'timestamp'
    df[col_ts] = df[col_ts].apply(truncate_hour)
    hour_df = df.rename(columns={'timestamp': 'hour'})
    return hour_df
 
def aggregate_audit_log(df, dimensions):
    ''' Some required dimensions are predefined while others are specified by the caller '''
 
    measure_aggregations = OrderedDict([
        ('event_opportunity', 'size'),
        ('net_revenue', 'sum'),
        ('gross_revenue', 'sum')
    ])
    measures = measure_aggregations.keys()
 
    agg_df = (df[dimensions + measures]
        .groupby(dimensions, as_index=False)
        .agg(measure_aggregations)
        .rename(columns={'event_opportunity': 'impressions'}))
    return agg_df
 
def join_mapping(base_df, token, mapping_url_suffix, col_join):
    '''
    Joins a base DataFrame against a mapping DataFrame using one or more columns
        base_df:                Base DataFrame on which mappings are applied
        token:                  Auth token for CAL API access
        mapping_url_suffix:     Suffix of the mapping API URL for the desired mapping (see get_mappings() for details)
        col_join:               Columns that represent the join conditions against the mapping DataFrame (can be String or String[])
    '''
 
    mapping_df = get_mappings(token, mapping_url_suffix)
    merged_df = base_df.merge(mapping_df, how='left', on=col_join)
    return merged_df
 
def get_hourly_df(encoded_client_key, download_json):
    '''
    Restructures a raw audit log to produce predefined hourly aggregations
        encoded_client_key:     Encoded key with enough information to acquire an auth token for CAL API access
        download_json:          JSON structure containing download file metadata
 
    The resulting report can be customized:
        * Extra dimensions can be modified to produce different aggregation granularities
        * Additional information for certain dimensions can be added using join_mapping pipes
        * Sorting can be modified to ensure that relevant aggregations are shown in priority order
 
    Note:
        * The 'hour' dimension is generated in case we want to see it in the final report
    '''
    dimensions = ['hour', 'partner_id', 'site_id', 'supply_source', 'dsp_id', 'trading_desk_id']
    sort_order = ['partner_id', 'site_id', 'impressions']
    sort_ascending = [True, True, False]
 
    token = get_token(encoded_client_key)
    log_df = get_audit_log(token, download_json)
    hourly_df = (log_df
        .pipe(timestamp_to_hour)
        .pipe(aggregate_audit_log, dimensions=dimensions)
        .sort_values(by=sort_order, ascending=sort_ascending)
        .pipe(join_mapping, token=token, mapping_url_suffix='dsps', col_join='dsp_id')
        .pipe(join_mapping, token=token, mapping_url_suffix='buyers', col_join=['dsp_id', 'trading_desk_id']))
    return hourly_df
 
def main():
    '''
    Standard process for retrieving CAL files:
        * Retrieve auth token
        * Get list of available files
        * Ensure available files have been processed and uploaded to the S3 bucket
    '''
 
    if len(sys.argv) != 5:
        print 'Usage: python get_event_logs.py <encoded_client_key> <aws_access_key> <aws_secret_key> <s3_bucket_name>'
        quit()
 
    encoded_client_key = sys.argv[1]
    aws_access_key_id = sys.argv[2]
    aws_secret_access_key = sys.argv[3]
    bucket_name = sys.argv[4]
 
    token = get_token(encoded_client_key)
    downloads_json = get_available_downloads(token)
 
    print 'Number of available downloads: {}'.format(len(downloads_json))
 
    # Connect to S3 filesystem via access key and secret key
    fs = s3fs.S3FileSystem(
        key=aws_access_key_id,
        secret=aws_secret_access_key,
    )
 
    for download_json in downloads_json:
        path = get_path_s3(download_json, bucket_name)
        if fs.exists(path):
            print 'Already uploaded path: {}'.format(path)
            continue
 
        hourly_df = get_hourly_df(encoded_client_key, download_json)
        upload_df_to_s3(hourly_df, fs, path)
        print "Successfully uploaded path: {}".format(path)
 
if __name__ == "__main__":
    main()