This code is licensed under MIT license. See license.txt for details.

Provides useful libraries for processing large data sets. Developed by the team at deductive.com as we find them useful in our projects.

You can view the source code here

Installation

Install using PIP by using

pip install newtools

The default install does not include any of the dependent libraries and runtime errors will be raised if the required libraries are not included.

You can install all dependencies as follows:

pip install newtools[full]

S3Location class

class newtools.S3Location(*args, **kwargs)

Bases: str

Class that parses out an S3 location from a passed string. Subclass of str so supports most string operations.

Parameters:
  • s3_str – string representation of s3 location, accepts most common formats Also accepts None if using bucket and key keywords
  • bucket – ignored if s3_str is not None. can specify only bucket for bucket=’mybucket’ - ‘s3://mybucket/’ or in conjuction with key
  • key – ignored if s3_str is not None. Bucket must be set. bucket=’mybucket’, key=’path/to/file’
  • ignore_double_slash – default False. If true allows s3 locations containing ‘//’ these are valid s3 paths, but typically result from mistaken joins

Examples:

loc1 = S3Location('s3://bucket/folder/file.txt')
loc2 = S3Location('bucket/folder')
loc3 = S3Location('http://s3*.amazonaws.com/bucket-name/')
loc4 = S3Location('https://s3*.amazonaws.com/bucket-name/')
loc5 = S3Location(bucket='bucket', key='path/to/file')
bucket

the bucket part of the S3 URL (same as the key)

Type:return
file

File property :return: the file name part of the S3 URL

join(*other, ignore_double_slash=False)

Join s3 location to string or list of strings similar to os.path.join :param other: nargs to join, :param ignore_double_slash: default False. set true to allow ‘//’ in the link, eg ‘s3://bucket/folder//path/key’ :return: the new S3Location

Examples:

loc1 = S3Location('s3://bucket/folder/')
loc2 = loc1.join('file.txt')
loc3 = loc1.join('path', 'to', 'file')
key

the key part of the S3 URL

Type:return
path

the path part of the S3 URL (same as the key)

Type:return
prefix

the prefix part of the S3 URL

Type:return
s3_url

AthenaClient class

class newtools.db.athena.AthenaClient(region, db, max_queries=3, max_retries=3, query_terminating=True, df_handler=None, workgroup=None, logger=<Logger newtools.athena (WARNING)>)

Bases: newtools.queue.task_queue.TaskQueue

A client for AWS Athena that runs queries against Athena. Includes queuing functionality to run multiple queries and wait until they have completed.

We can generate a CREATE TABLE Athena query using the .get_ct_query() function. To use this class, we have the following example:

from newtools import AthenaClient
from newtools.db.data_types import StringType, TimestampType

ac = AthenaClient(**kwargs)
query = ac.get_ct_query(
    table_name='test_table',
    s3_location='s3://test-bucket/test_prefix',
    column_schema={
        'COL1': StringType(),
        'COL2': TimestampType(),
    },
    partition_columns={
        'COL3': StringType()
    },
    partition_projection={
        'COL3': {
            'type': 'date',
            'lookback': '2MONTHS'
        }
    }
)
add_query(sql, name=None, output_location=None, caching=0)

Adds a query to Athena. Respects the maximum number of queries specified when the module was created. Retries queries when they fail so only use when you are sure your syntax is correct! Returns a query object :param sql: the SQL query to run :param name: an optional name which will be logged when running this query :param output_location: the S3 prefix where you want the results stored (required if workgroup is not specified) :param caching: if greater than 0 represents the max age in minutes a query can be to reuse the result Athena3 :return: a unique identified for this query

static get_column_schema(column_schema: dict) → str

Function to return a string of the column schema for CREATE TABLE Athena queries.

get_ct_query(table_name: str, s3_location: newtools.aws.s3_location.S3Location, column_schema: dict, partition_columns: Optional[dict] = None, partition_projection: Optional[dict] = None, db_name: Optional[str] = None, file_format: Optional[str] = 'PARQUET') → str

Function to generate a CREATE TABLE query for Athena.

Parameters:
  • table_name – the name for the table
  • s3_location – the s3 location where the data is stored
  • column_schema – a dictionary of column names along with data types
  • partition_columns – a dictionary of column names along with data types specifically for partition columns
  • partition_projection

    a dictionary with information about the partition columns for the projection for example,

    {
    ‘column_name_1’: {
    ‘type’: ‘date’

    }, ‘column_name_2’: {

    ’type’: ‘enum’, ‘values’: [‘val1’, ‘val2’]

    }, ‘column_name_3’: {

    ’type’: ‘date’, ‘lookback’: ‘1MONTH’

    }

    }

    see here for more information on formats for specific variables: https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html

  • db_name – the database to create the table in (will default to db from __init__ if not provided)
  • file_format – one of ‘PARQUET’ or ‘CSV’
static get_partition_projection_props(column: str, metadata: dict) → dict

Function to return the TBLPROPERTIES for the partition projection for the given column.

Parameters:
  • column – the colum name
  • metadata – metadata for the properties
get_query_result(query)

Returns Pandas DataFrame containing query result if query has completed :param query: the query ID returned from add_query()

static get_rfs(file_format: str) → str

Return the ROW FORMAT SERDE property for the Athena table depending on the file format.

Parameters:file_format – either ‘PARQUET’ or ‘CSV’
static get_sql(sql_loc: str) → str

Function to return the SQL file as an un-formatted string.

Parameters:sql_loc – the location of the sql to load
get_tbl_props(file_format: str, partition_proj: Optional[dict] = None) → str

Get the TBLPROPERTIES property for the Athena table.

Parameters:
  • file_format – either ‘PARQUET’ or ‘CSV’
  • partition_proj – a dictionary with information about the partition columns for the projection
stop_and_delete_all_tasks()

stops active tasks and removes pending tasks for a given client :return: None

wait_for_completion()

Check if jobs have failed, if so trigger deletion event for AthenaClient, else wait for completion of any queries . Will automatically remove all pending and stop all active queries upon completion.

Cached Queries

These classes provide functionality to cache database results locally or on S3 and only re-execute the database query if the query SQL or parameters have changed.

Cache structure

Each query is cached to the path specified in cache_path with a generated filename that uniquely represents the querythat has been run, for example:

s3://cache_bucket/cache_prefix/get_results_5e5920011161b3081937572d25140237_ba98cb6452cf711ae0cec5c2dadc2585.csv.gz

This is made up as follows:

  • s3://cache_bucket/cache_prefix/ - The path provided in the cache_path parameter when instantiating the class
  • get_results - The first part of the file name represents the name of the query run, e.g. get_tune_in_outcomes.sql
  • 5e5920011161b3081937572d25140237 - This is a unique hash calculated from the contents of the SQL file. When a new version of the file is created, a new hash will also be created.
  • ba98cb6452cf711ae0cec5c2dadc2585 - This is a unique hash calculated from the parameters passed to the SQL file. A new file is created for each set of parameters passed to the file
  • .csv.gz - A fixed prefix, always set to “.csv.gz”

Caches can optionally be set to expiree very n seconds in which case the cache will appear as follows: s3://cache_bucket/cache_prefix/get_results_5e5920011161b3081937572d25140237_ba98cb6452cf711ae0cec5c2dadc2585_1616518700.csv.gz

In this case 1616518700 refers to the epoch time, at which the n seconds starts for which this cache is valid. For example, if you set for the cache to be valid for 60 seconds then a new cache will be created each minute

Hash collisions

The hash is designed to be unique for each execution of the SQL file but there is about a 1 in a million chance that we’ll get the same hash for two different SQL files. If this occurs then the campaign results should look like total garbage so please double check the QA reports before sending to a customer.

Archive queries

Each query that is run can be saved to an optionally specified sql_archive_path. In this location the full text of every executed query with the timestamp will be saved.

Params

When creating a CachedQuery class you specify a set of parameters that are used across all queries executed using the class. This can be used to store common parameters for queries such as access tokens and secrets.

Running multiple clients

Before running the query, the client attempts to lock the file. This is either done by creating a .lock file with a similar name or by creating a row in DynamoDB to lock the file. The table in DynamoDB is “newtools.dynamo.doggo.lock” in us-east-1 and contains one entry per locked files. Entries are typically deleted after the lock is released.

class newtools.db.cached_query.BaseCachedClass(cache_path, logger=<Logger newtools.cached_query (WARNING)>, expiry_seconds=0, boto_session: Optional[Type[boto3.Session]] = None)

Bases: object

static get_cache_file_format(suffix: str) → str

Return the file format for the cache path.

get_cache_path(prefix, files_to_hash, data_hash='', suffix='')

Returns the location to cache the output to

Parameters:
  • prefix – the name of the file
  • files_to_hash – the name of the file to hash in the output name
  • data_hash – an additional hash to add to the string
  • suffix – a suffix to add to the end
Returns:

the full path to cache to

validation_mode = False
class newtools.db.cached_query.BaseCachedQuery(params=None, cache_path='', sql_archive_path=None, sql_paths=None, gzip=True, dynamodb_lock=True, logger=<Logger newtools.cached_query (WARNING)>, expiry_seconds=0, use_lock: Optional[bool] = True, boto_session: Optional[Type[boto3.Session]] = None)

Bases: newtools.db.cached_query.BaseCachedClass

archive_path(file_path)

Calculates the archive path for the SQL queries

Param:the path of the SQL files
Returns:the location to save the archived SQL file
check_exists(output_file: str, refresh_cache: bool) → Optional[str]

Function to check if the output file already exists in S3. If so, return the file location or remove the file if opting to refresh the cache.

clear_cache(sql_file, output_prefix, params=None, replacement_dict=None)

Clears the cache of the specified SQL file

Parameters:
  • sql_file – the SQL file to delete the cachec
  • output_prefix – the output path to use for the cache
  • params – the parameters to use in the cache
  • replacement_dict – and replacement to be made in the query’s text
Returns:

get_query_params(params: Optional[dict], ignore_list: List[str]) → dict

Get a set of query parameters, with passed arguments taking precedence. Also formats any list parameters as per __format_list().

Parameters:ignore_list – any parameters to ignore from list formatting
get_results(sql_file: str, output_prefix: str, params: Optional[dict] = None, replacement_dict: Optional[dict] = None, refresh_cache: Optional[bool] = False, ignore_list: Optional[List[str]] = None) → str

Runs the specified SQL file

Parameters:
  • sql_file – the SQL file to run
  • output_prefix – the output path to use for this query
  • params – the parameters to use in the query
  • replacement_dict – and replacement to be made in the query’s text
  • refresh_cache – if set, then the cache for this query is refreshed when it is run
  • ignore_list – any parameters we do not want to __format_list on
get_sql_file(file)

Search SQL paths for the named query

Parameters:file – the SQL file to get
Returns:a full path to the SQL file
lock_and_execute(output_file: str, file_path: str, query_parameters: dict, replacement_dict: dict) → None

Function to obtain the lock on the output file path and execute the query.

maximum_age = 3600
time_out_seconds = 1800
wait_period = 30
class newtools.db.cached_query.CachedAthenaQuery(params=None, cache_path='', sql_archive_path=None, sql_paths=None, gzip=True, dynamodb_lock=True, logger=<Logger newtools.cached_query (WARNING)>, queue_queries=False, expiry_seconds=0, use_lock: Optional[bool] = True, boto_session: Optional[Type[boto3.Session]] = None)

Bases: newtools.db.cached_query.BaseCachedQuery

The cached AthenaQuery will execute queries against AWS Athena and cache results in csv.gz format.

It takes single additional arguments, queue_queries, which defaults to False but will queue the queries before executing them if set to true.

Two special parameters must be passed to connect to Athena:

  • aws_region - the AWS region
  • athena_db - the Athena database to use

The Athena connection is created using Boto and uses the current AWS profile or AWS access keys and tokens.

Use the class as follows

import pandas as pd
from newtools import CachedAthenaQuery

q = CachedAthenaQuery(cache_path="s3://here-is-my-cache/",
                      sql_paths=["local/path/with/sql/file", "another/local/path"],
                      params={"aws_region": "us-east-1",
                              "athena_db": "my_db"},
                      gzip=True)


results = q.get_results(sql_file="my_query.sql",
              output_prefix="test_query"
              )

df = pd.load_df(results, compression="gzip")
static get_query_body(query_file: str, query_parameters: dict, replacement_dict: dict) → str

Function to open the query and format with the specified parameters.

wait_for_completion()
class newtools.db.cached_query.CachedCTASQuery(params: Optional[dict] = None, cache_path: Union[str, newtools.aws.s3_location.S3Location, None] = '', sql_archive_path: Optional[str] = None, sql_paths: Union[str, List[str], None] = None, dynamodb_lock: Optional[bool] = True, use_lock: Optional[bool] = True, logger: Optional[logging.Logger] = <Logger newtools.db.cached_query (WARNING)>, expiry_seconds: Optional[int] = 0, queue_queries: Optional[bool] = False, return_meta: Optional[bool] = False, boto_session: Optional[Type[boto3.Session]] = None)

Bases: newtools.db.cached_query.CachedAthenaQuery

Class allowing functionality to run Athena queries using the CTAS operation.

static extract_ctas_parameters(params: dict, output_file: str) → dict

Function to extract the CTAS parameters from the query parameters.

Parameters:
  • params – the query parameters and generic SQL parameters
  • output_file – the external location for CTAS, of the form {cache_path}/{sql_hash}/
static format_df_to_dict(df: Type[pd.DataFrame], index_cols: Optional[List[str]] = None) → dict

Function to format a dataframe into a dictionary of values.

Parameters:
  • df – the dataframe to format
  • index_cols – the columns to set as the index
static get_cache_file_format(*args, **kwargs) → str

Return the file format for the cache path for CTAS.

get_partitions(load_path: Union[str, newtools.aws.s3_location.S3Location]) → List[Tuple[Any]]
Function to load all partitions under the load_path into a list of tuples of the form:
(‘partition_col1=partition_val1’, ‘partition_col2=partition_val2’, …, ‘{path_to_data}’)
Parameters:load_path – the directory containing the CTAS data in S3
classmethod get_query_header(database: str, table_name: str, external_location: Union[str, newtools.aws.s3_location.S3Location], output_format: Optional[str] = None, output_compression: Optional[str] = None, partition_columns: Union[str, List[str], None] = None, bucket_info: Optional[Tuple[str, int]] = None) → str

Function to calculate the CTAS header for the query to run.

Parameters:
  • database – the database to CTAS from
  • table_name – the name of the CTAS temporary table
  • external_location – where to save the CTAS contents to in S3
  • output_format – the format of the output data, which is PARQUET by default
  • output_compression – the compression of the output data https://docs.aws.amazon.com/athena/latest/ug/compression-formats.html
  • partition_columns – the columns to partition by must ensure the partition columns are the last columns of the SELECT statement in the query moreover, if multiple partition columns are specified, their ordering must be as in the query
  • bucket_info – a tuple containing the column to bucket on and the bucket count
get_query_metadata(database: str, table_name: str, partition_columns: Union[str, list, None] = None) → None

Function to run queries to get metadata about the results of the query from CTAS.

Parameters:
  • database – the database the table is stored in
  • table_name – the name of the CTAS table
  • partition_columns – the partitions columns used in the query (if any)
get_results(sql_file: str, output_prefix: str, params: Optional[dict] = None, replacement_dict: Optional[dict] = None, refresh_cache: Optional[bool] = False, **kwargs) → Union[str, dict]

Function to run the specified query and return the output path(/directory) to the data. If return_meta is set to True, will return a dictionary containing the output path and the metadata. The query metadata is refreshed on get_results() call.

load_dir(load_path: Union[str, newtools.aws.s3_location.S3Location], file_format: Optional[str] = None) → Type[pd.DataFrame]

Function to load in the CTAS output and return one unified dataframe.

Parameters:
  • load_path – the directory containing the CTAS data in S3
  • file_format – the format of the save files, e.g. PARQUET or CSV
load_dir_iter(load_path: Union[str, newtools.aws.s3_location.S3Location], file_format: Optional[str] = None) → Iterator[Type[pd.DataFrame]]

Function to return a generator of the output CTAS data.

Parameters:
  • load_path – the directory containing the CTAS data in S3
  • file_format – the format of the save files, e.g. PARQUET or CSV
static validate_ctas_params(parameters: dict) → None

Function to check the required parameters for CTAS are specified in the parameters.

class newtools.db.cached_query.CachedPep249Query(pep_249_obj, params=None, cache_path='', sql_archive_path=None, sql_paths=None, gzip=True, dynamodb_lock=True, logger=<Logger newtools.cached_query (WARNING)>, expiry_seconds=0, boto_session: Optional[Type[boto3.Session]] = None)

Bases: newtools.db.cached_query.BaseCachedQuery

A CachedQuery class compatible with PEP249 classes.

The PEP249 query takes an additional argument containing any PEP249 compliant connection object.

Use the CachedPep249Query class with any PEP249 compliant database connector, like this:

import pandas as pd
import sqlite3
from newtools import CachedPep249Query

pep_249_obj =  sqlite3.connect("mydb.sqlite")

q = CachedPep249Query(pep_249_obj,
                      cache_path="s3://here-is-my-cache/",
                      sql_paths=["local/path/with/sql/file", "another/local/path"],
                      gzip=True)

results = q.get_results(sql_file="my_query.sql",
              output_prefix="test_query"
              )

df = pd.load_df(results, compression="gzip")
class newtools.db.cached_query.UselessLock(*args, **kwargs)

Bases: object

Class to allow the use of a lock to be optional.

SQLClient class

class newtools.db.sql_client.SqlClient(db_connection, logging_level=10, log_query_text=False, logger=<Logger newtools.sql_client (WARNING)>)

Bases: object

A wrapper for PEP249 connection objects to provide additional logging and simple execution of queries and optional writing out of results to DataFrames or CSV

The client runs multi-statement SQL queries from file or from strings and can return the result of the final SQL statement in either a DataFrame or as a CSV.

Archives the text of the executed queries to an optionally specified location.

Parameters: - db_connection - a connection object from a PEP249 compliant class

execute_query(query, parameters=None, replace=None, first_to_run=1, archive_query=None, dry_run=False)

Runs a query and ignores any output

Parameters: - query - the query to run, either a SQL file or a SQL query - parameters - a dict of parameters to substitute in the query - replace - a dict or items to be replaced in the SQL text - first_to_run - the index of the first query in a multi-command query to be executed - archive_query - save the query that is run to file. Default=False,

execute_query_to_csv(query, csvfile, parameters=None, replace=None, append=False, first_to_run=1, archive_query=None, dry_run=False)

Runs a query and writes the output of the final statement to a CSV file.

Parameters: - query - the query to run, either a SQL file or a SQL query - csvfile - the file name to save the query results to - parameters - a dict of parameters to substitute in the query - replace - a dict or items to be replaced in the SQL text - first_to_run - the index of the first query in a multi-command query to be executed

execute_query_to_df(query, parameters=None, replace=None, first_to_run=1, dry_run=False, archive_query=None)

Runs a query and returns the output of the final statement in a DataFrame.

Parameters: - query - the query to run, either a SQL file or a SQL query - parameters - a dict of parameters to substitute in the query - replace - a dict or items to be replaced in the SQL text - first_to_run - the index of the first query in a multi-command query to be executed - archive_query - save the query that is run to file. Default=False,

LoadPartitions class

class newtools.aws.load_partitions.LoadPartitions(bucket, s3_client=None, request_payer='bucketowner')

Bases: object

The LoadPartitions class will recurse to a specified depth in S3 and create SQL to add all partitions to a table in Athena. This is significantly faster than using MSCK REPAIR TABLE as it only searches to the specified depth.

generate_sql(table, partitions_list, s3_path=None, athena_client=None, output_location=None, athena_s3_client=None)
Parameters:
  • table – The athena table partitions are being loaded for
  • partitions_list – The list of all the partitions to generate the alter table query for
  • s3_path – The prefix for where the data is stored, optional, can also be empty string to look at the toplevel of the bucket
  • athena_client – The current Athena Client, optional, will be used to list partitions already loaded
  • output_location – Output location of queries parsed into athena_client
  • athena_s3_client – The s3 client that matches with the athena client passed in to be able to

access the queries produced by it if in different account to default :return: A list of queries that can be run to add all partitions

get_sql(table, partition_depth=None, s3_path=None, athena_client=None, output_location=None, athena_s3_client=None)
Parameters:
  • table – The athena table partitions are being loaded for
  • partition_depth – The number of partition levels to look for, optional param should be specified if known as will yield best performance.
  • s3_path – The prefix for where the data is stored, optional, can also be empty string to look at the top level of the bucket
  • athena_client – The current Athena Client, optional, will be used to list partitions already loaded
  • output_location – Output location of queries parsed into athena_client
  • athena_s3_client – The s3 client that matches with the athena client passed in to be able to access the queries produced by it if in different account to default
Returns:

A list of queries that can be run to add all partitions, each query is limited to 10 partitions

get_sql_from_file_names(table, file_names, s3_path=None, athena_client=None, output_location=None, athena_s3_client=None)

Adds functionality to get the load partitions sql from the output of save_partitioned (PandasDoggo). All other params are the same as in get_sql (with removal of partition_depth as we do not use list_partitions).

Parameters:file_names – Full file names (as output from PandasDoggo.save_partitioned)
list_partitions(table=None, partition_depth=None, s3_path=None)
Parameters:
  • table – The athena table partitions are being loaded for, either this or s3_path has to be specified
  • partition_depth – The number of partition levels to look for, optional param should be specified if known significant performance impact
  • s3_path – The prefix for where the data is stored, can be set to blank string to look at top level of the bucket
Returns:

A list of all the full paths of the partitions

CSV Doggo

class newtools.doggo.csv.CSVDoggo(base_path='', detect_parameters=False, *args, **kwargs)

Bases: object

Loads and saves CSV files from pandas DataFrames. Supports GZIP, Parquet, S3, local file system interchangably

SAMPLE_SIZE = 1048576
df_to_string(df, *args, **kwargs)

Returns a formatted string from a dataframe using the specified configuration for the class

Parameters:df – the data frame to cast to string
load_df(file, *args, **kwargs)

Loads a data frame

Parameters:
  • file – the file to load
  • args – any args to use
  • kwargs – any keyword args to use
Returns:

a pandas data frame

save_df(df, file, *args, **kwargs)

Saves a data frame

Parameters:
  • df – the data frame to save
  • file – the path to save to
  • args – any args to use
  • kwargs – any keyword args to use

File Doggo

class newtools.doggo.FileDoggo(path, mode='rb', is_s3=None, client=None, compression=None, request_payer='bucketowner')

Bases: object

He fetches the things you want. Tv remotes, slippers, newspapers, files. Mostly files though.

from newtools import FileDoggo
import boto3

path = 's3 or local path'
s3_client = boto3.Session().client('s3')
with FileDoggo(path, mode='rb', client=s3_client) as f:
    f.read()

This is written to treat local paths and s3 paths the same, returning a file like object for either.

client
close()
connection

Pandas Doggo

class newtools.doggo.doggo.PandasDoggo(boto_session=None, logger=<Logger newtools.pandas_doggo (WARNING)>)

Bases: object

Is a Panda a doggo?

░░░░░░░░▄██▄░░░░░░▄▄░░
░░░░░░░▐███▀░░░░░▄███▌
░░▄▀░░▄█▀▀░░░░░░░░▀██░
░█░░░██░░░░░░░░░░░░░░░
█▌░░▐██░░▄██▌░░▄▄▄░░░▄
██░░▐██▄░▀█▀░░░▀██░░▐▌
██▄░▐███▄▄░░▄▄▄░▀▀░▄██
▐███▄██████▄░▀░▄█████▌
▐████████████▀▀██████░
░▐████▀██████░░█████░░
░░░▀▀▀░░█████▌░████▀░░
░░░░░░░░░▀▀███░▀▀▀░░░░

A class designed to simplify file IO operation to and from local or s3 files, with specific functionality for csv and parquet file formats

  • read / write csv and parquet files
  • read and write both locally and to s3
  • support gzip, snappy and zip compression
  • sensible s3 handling of different profiles / credentials
from newtools import PandasDoggo

fh = PandasDoggo()

df = fh.load('filename.ext')

fh.save(df, 'path/to/new_file/ext')
load(path, file_format=None, compression=None, request_payer='bucketowner', *args, **kwargs)

Load a file into a Pandas.DataFrame from local or s3 locations.

Parameters:
  • path (str) – required. Can be s3 or local s3 must be in s3:// format - accepts S3Location
  • file_format – None. Autodetects from path, can be set to csv or parquet to explicitly force a format
  • compression – optional, ‘gzip’, ‘snappy’ or None. Autodetects from path if path ends in .gz, .gzip or contains .snappy
  • request_payer – either ‘bucketowner’ (default) or ‘requester’ - who incurs the cost of said operation
  • args – args to pass to the panda to load the file
  • kwargs – kwargs to pass to the panda to load the file eg columns=[‘subset’, ‘of’, ‘columns’]
Returns:

Pandas.DataFrame

load_csv(path, compression=None, request_payer='bucketowner', *args, **kwargs)

alias for .load(path, format=’csv’)

load_dir(load_path: Union[str, newtools.aws.s3_location.S3Location], file_format: Optional[str] = None) → Type[pandas.core.frame.DataFrame]

Function to load in the CTAS output and return one unified dataframe.

Parameters:
  • load_path – the directory containing the CTAS data in S3
  • file_format – the format of the save files, e.g. PARQUET or CSV
load_dir_iter(load_path: Union[str, newtools.aws.s3_location.S3Location], file_format: Optional[str] = None) → Iterator[Type[pandas.core.frame.DataFrame]]

Function to return a generator of the output under the given directory.

Parameters:
  • load_path – the directory containing the data in S3
  • file_format – the format of the save files, e.g. PARQUET or CSV
load_parquet(path, compression=None, request_payer='bucketowner', *args, **kwargs)

alias for .load(path, format=’parquet’)

save(df, path, file_format=None, compression=None, request_payer='bucketowner', *args, **kwargs)

Save a file into a Pandas.DataFrame from local or s3 locations.

Parameters:
  • df (Pandas.DataFrame) – Data frame
  • path (str) – required. Can be s3 or local s3 must be in s3:// format - accepts S3Location
  • file_format – ‘None. Autodetects from path, can be set to csv or parquet to explicitly force a format
  • compression – None. Supports gzip and snappy, autodetects from path if path ends in .gz, .gzip or contains .snappy
  • request_payer – either ‘bucketowner’ (default) or ‘requester’ - who incurs the cost of said operation
  • args – args to pass to the panda to save the file
  • kwargs – kwargs to pass to the panda to save the file eg index=None
save_csv(df, path, compression=None, request_payer='bucketowner', *args, **kwargs)

Alias for .save(df, format=’csv’)

save_parquet(df, path, compression=None, request_payer='bucketowner', *args, **kwargs)

Alias for .save(df, format=’parquet’)

save_partitioned(df, base_path, name, suffix, partition_columns, partition_string='', date_time_format='%Y-%m-%d_%H%M%S.%f', compression=None, request_payer='bucketowner', *args, **kwargs)

Save a data frame into multiple files, partitioned by the specified columns. The base path can be local file system, or S3.

Based on athena_partition from old tools. Notable differences are as follows: * PandasDoggo saves the index as default so pass index=False for comparable behavior * base_path - new parameter that was previously taken from the passed file_handler * partition_columns - replaces partition_categories * partition_dtypes - not supported so please apply any changes to dtype before passing using df.astype(dict(zip([“col1”, “col7”], [int, int]) * columns_to_keep - not supported, please only send a slice of the data frame with the columns you want to partition or save * file_handler - not supported, uses the PandasDoggo class.

Parameters:
  • df – The data frame to be partitioned
  • base_path – The base path to save the files to
  • name – If provided all files filename will start with this
  • suffix – The extension the file should be saved with, .csv for csv, and .parquet for parquet
  • partition_columns – The columns to partition the data on
  • partition_string – Allows formatting folder names, will be dependant on how many partition categories there are, defaults to creating hive-format folders and sub folders in order of partitioning
  • date_time_format – To minimise chances of overwrite the saved files contain the date time of when this function was called, this param specifies the format of the date time
  • compression – None. Supports gzip and snappy, autodetects from suffix if it ends in .gz, .gzip or contains .snappy
  • request_payer – either ‘bucketowner’ (default) or ‘requester’ - who incurs the cost of said operation
  • args – args to pass to the panda to save the file
  • kwargs – kwargs to pass to the panda to save the file eg index=None
Returns:

Returns a full list of all file paths created, doesnt return base path as part of this

Doggo File System

class newtools.doggo.fs.DoggoFileSystem(session=None, boto3_session=None)

Bases: object

Implements common file operations using either S3 or local file system depending on whether the path begins “s3://”

cp(source, destination)

Copies a file or folder, per shutil copy() or shutil.copytree() depending on if source is a /folder/ or a /file.extension

Parameters:
  • source – source path, folders must be specified by trailing ‘/’
  • destination – destination path, folders must be specified by trailing ‘/’
exists(path)

Returns true if a path exists, per os.path.exists()

Parameters:path – the path to check
Returns:True if the path exists, otherwise False
glob(glob_string)

Searched for a file per glob.glob(recursive=True)

Parameters:glob_string – the path to search
Returns:
is_s3(path1, path2=None)

Returns true if the passed path is on S3

Parameters:
  • path1 – the first path to check
  • path2 – the second path to check
Raises:

NotImplementedError – if only one of the two paths in on S3

Returns:

True if both are S3, False if neither are, and raises an exception for mixed types

join(path, *paths)

Joins to paths per os.path.join() :param path: the first path :param paths: the paths to joins :return:

ls(location: str, recursive: bool = False)

returns the list of contents of a directory. If directory is empty or doesnt exist, returns an empty list

if recursive = False, returns contents of specified directory, including folders if recursive = True, returns all files including full paths that are below the specified location, and no directories

mv(source, destination)

Moves a file per shutil.move() except that it does not copy WITHIN. i.e. /location/folderA/ >> /destination/folderA/ rather than i.e. /location/folderA/ >> /destination/folderA/folderA/

Parameters:
  • source – source path, folders must be specified by trailing ‘/’
  • destination – destination path, folders must be specified by trailing ‘/’
open(path, mode, *args, **kwargs)

Opens a file, per open()

Parameters:
  • path – the path to open
  • mode – the mode to open in
  • args – any arguments in the FileDoggo class
  • kwargs – any keyword arguments for the FileDoggo class
Returns:

a file handle

rm(path, **kwargs)

Removes a file, per os.remove()

Parameters:path – the file to remove
size(path)

Returns the size of a file per os.path.getsize()

Parameters:path – the path to check
Returns:the size of the file at this path
split(path)

Splits a path into prefix and file, per os.path.split() :param path: :return:

Doggo Locks

class newtools.doggo.lock.DoggoLock(file, wait_period=30, time_out_seconds=1800, maximum_age=3600, logger=<Logger newtools.doggo.lock (WARNING)>)

Bases: newtools.doggo.lock.LockBase

Implements locking using an additional file on the file system. Works for local files and for S3.

For file systems that are only eventually consistent, use a longer wait_period to wait for consistency when multiple clients are reading at the same time.

Locks a file across multiple process and clients using an additional file on the file system.

The lock file has the following format: “.lock-{file}.{timestamp}-{random}”

where:

  • file - is the file being locked
  • timestamp - is the timestamp the lock was requested
  • random - is a random number
acquire()
release()
class newtools.doggo.lock.DynamoDogLock(file, wait_period=0.1, time_out_seconds=1800, maximum_age=3600, logger=<Logger newtools.doggo.dynamolock (WARNING)>, region='us-east-1')

Bases: newtools.doggo.lock.LockBase

Implements a lock using a table in DynamoDB.

By default uses the region us-east-1 but this can be specified as an optional parameter when requesting the lock.

The table in dynamo DB is called newtools.dynamo.doggo.lock

acquire()
release()

Doggo Wait

class newtools.doggo.lock.DoggoWait(wait_period, time_out_seconds)

Bases: object

A generic wait and timeout function. Raises TimeoutError() if the requested task doesn’t occur in the period.

dw = DoggoWait()
dw.start_timeout()
while not check_my_condition():
    self.dw.check_timeout()
check_timeout()

Waits, and raises an exception if the timer has timed out

start_timeout()

Starts a time out

timed_out()

Checks for a time out

Returns:true if the timer has timed out, otherwise false
wait()

Waits for the defined period

Logging

class newtools.log.log.log_to_stdout

Adds a handler to the specified logger to pipe it to stdout.

Checks the entire logging hierarchy for other instances of the same logger to prevent duplicates. :param logger_name: the logger to pip to stdout, defaults to deductive :param level: the level of logs to pipe to stdout, default to INFO :return: the logger

class newtools.log.persistent_field_logger.PersistentFieldLogger(logger, persistent_fields, sep=' ', message_key='message')

Bases: object

Wrapper for logging.logger or equivalent, will track useful metrics defined in persistent_fields in a specified Dictionary.

Useful when processing specific files, create a logger with a persistent field of the filename, pass the filename and then all subsequent messages against that logger will contain the filename.

debug(*args, **kwargs)
error(*args, **kwargs)
info(*args, **kwargs)
set_field_value(field, value)

Add or override a field which is persistently logged - will raise if object is not JSON serializable :param field: name of field :param value: value to assign - must be JSON serializable :return: None

set_multiple_values(fields_and_values)

Allows multiple keys and values to be added or overridden in a PersistentFieldLogger object :param fields_and_values: dictionary with keys and values to add/override, must be JSON serializable :return: None

warning(*args, **kwargs)

Task Queue

class newtools.queue.task_queue.TaskQueue(max_size, retry_limit=3, logger=<Logger newtools.queue (WARNING)>)

This is an abstract class that contains all required common functionality to support implementation of queues in aws client libraries.

Two separate queues are maintained:

  • Only tasks in this queue are allowed to run.
  • Once tasks are completed they are removed from this queue.
  • No of tasks in active queue <= max_size.
  • Contains tasks that are awaiting execution.
  • Tasks from pending_queue are added to active_queue in FIFO fashion.