Skip to content

Python

AWS

Athena

Run an SQL query

WORK IN PROGRESS

import boto3
import time

# == function to convert the result from athena to a dictionary
def process_athena(response):
    _key_row = response["ResultSet"]["Rows"].pop(0)
    _keys = []
    for key in _key_row["Data"]:
        _keys += key.values()
    output = []
    while response["ResultSet"]["Rows"]:
        result = {_k: None for _k in _keys} 
        row = response["ResultSet"]["Rows"].pop(0)
        _values = []
        for key in row["Data"]:
            _values += key.values()

        for idx, _value in enumerate(_values):
            result[_keys[idx]] = _value
        output.append(result)

    return output

def query_athena(QueryString):
    Database = 'default'
    WorkGroup = 'default'

    athena = boto3.client('athena')
    # == make the SQL query
    query_id = athena.start_query_execution(
        QueryString           = QueryString,
        #ExecutionParameters = ExecutionParameters,
        QueryExecutionContext = { 'Database' : Database },
        WorkGroup             = WorkGroup
    )['QueryExecutionId']

    # == wait for it to complete
    while True:
        finish_state = athena.get_query_execution(QueryExecutionId=query_id)["QueryExecution"]["Status"]["State"]
        if finish_state == "RUNNING" or finish_state == "QUEUED":
            time.sleep(1)
        else:
            break

    # == retrieve the results
    # TODO - if we get next_token, do the call again
    resultx = athena.get_query_results(QueryExecutionId=query_id, MaxResults = 1000)

    # == save the results in a dictionary
    result = process_athena(resultx)

    return result

def main():
    result = query_athena('''
        SELECT
            name,
            surname,
            phoneno
        FROM
            employee_data''')

    print(result)

S3

Read all objects in a bucket

import boto3

Bucket = "MYBUCKETNAME"
Prefix = "myPrefix"

for P in boto3.client('s3').get_paginator('list_objects').paginate(Bucket=Bucket,Prefix = Prefix):
    for k in P['Contents']:
        print(k['Key'])
        content = boto3.client('s3').get_object(Bucket=Bucket, Key=k['Key'])['Body'].read().decode('utf-8')

Write to an S3 bucket

boto3.resource('s3').Bucket(os.environ['S3BUCKET']).put_object(
    ACL         = 'bucket-owner-full-control',
    ContentType = 'application/json',
    Key         = key,
    Body        = json.dumps(newstate,indent=2,default=str)
)

Read an S3 object

state = json.loads(boto3.client('s3').get_object(Bucket=os.environ['S3BUCKET'], Key=key)['Body'].read().decode('utf-8'))

SNS

Publish a topic

boto3.client('sns',region_name = 'ap-southeast-2').publish(TopicArn=os.environ['MonitorTopicSNS'],Message = msg, Subject = subject)

Date and time

Convert epoch time to date time

import datetime

x = datetime.datetime.fromtimestamp(epoch).strftime('%Y-%m-%d %H:%M:%S')

Convert string to a Python datetime

import datetime

def string_to_datestamp(t):
    return datetime.datetime.strptime(t, '%Y-%m-%d %H:%M:%S')

Get the current datestamp in a string

import datetime

def datestamp():
    return datetime.datetime.now(datetime.UTC).strftime('%Y-%m-%d %H:%M:%S')

Find the first day of the week

import datetime

def firstDayoftheWeek(dte):
    FDW = dte - datetime.timedelta(days = dte.weekday())
    return FDW

Email

Send email through Outlook

import win32com.client

def send_email(to,cc,subject,body):
    outlook = win32com.client.Dispatch('outlook.application')
    mail = outlook.CreateItem(0)
    mail.To = to
    mail.Subject = subject
    mail.HTMLBody = body
    mail.cc = cc
    mail.Display()
    #mail.Send()

Files

Find a file age in seconds

import datetime
import os

filename = '/tmp/myfile.txt'
age = (datetime.datetime.today() - datetime.datetime.fromtimestamp(os.path.getmtime(filename))).seconds
print(f"age in seconds is {age}"

Find all files in a directory

import os
import fnmatch

def findFiles(path,filter = '*'):
    q = []
    for r, d, f in os.walk(path):
        for file in f:
            if fnmatch.fnmatch(file,filter):
                q.append(os.path.join(r, file).replace('\\','/')[len(path)+1:])
    return q

Find the path of my script

import os.path

file_path = os.path.dirname(os.path.realpath(__file__))

Write a file to disk or S3

import json
import boto3
import botocore

def writeFile(filename,data):
    # -- what type of data is it?
    if(type(data) != str):
        Body = json.dumps(data,default=str)
        ContentType = 'application/json'
    else:
        Body = data
        ContentType = 'application/text'

    # - is it local, or s3?
    if filename.startswith('s3://'):
        bucket = filename.split('/')[2]
        key = '/'.join(filename.split('/')[3:])

        print(f"bucket = {bucket}")
        print(f"key = {key}")
        try:
            boto3.resource('s3').Bucket(bucket).put_object(
                ACL         = 'bucket-owner-full-control',
                ContentType = ContentType,
                Key         = key,
                Body        = Body
            )
        except botocore.exceptions.ClientError as error:
            print(f"WARNING - s3.put_object - {error.response['Error']['Code']}")

    else:
        print(f"filename = {filename}")
        with open(filename,'wt') as f:
            f.write(Body)

if __name__=='__main__':
    x = { 'some' : 'data'}

    writeFile('myfile.txt',x)

    writeFile('s3://mybucket/2023/05/02/myfile.txt',x)

CSV

Write a CSV file

import csv

def writeCSV(file,data):
    os.makedirs(os.path.dirname(file),exist_ok = True)
    headers = list(data[0])
    with open(file, 'wt', newline='') as csvfile:
        csvwriter = csv.writer(csvfile)
        csvwriter.writerow(headers)

        for x in data:
            row = []
            for h in headers:
                row.append(x.get(h))
            csvwriter.writerow(row)

if __name__ == '__main__':
    data = [
        { "header1" : "data1", "header2" : "data2" },
        { "header1" : "data3", "header2" : "data4" }
    ]
    writeCSV('output.csv',data)

Read a CSV file

import csv,codecs

def readCSV(file):
    output = []
    with open(file, 'rb') as csvfile:       
        # = read the file line by line
        reader = csv.DictReader(codecs.iterdecode(csvfile, 'UTF-8'))
        for row in reader:
            output.append(row)
    return output

if __name__ == '__main__':
    for x in readCSV('c:/temp/test.csv'):
        print(x)

Slack or Discord

Sending messages via Slack or Discord

import json
import urllib.request
import urllib.parse

def slackdiscord(webHook,message):
    if 'discord' in webHook:
        msg = { 'content' : message }
    else:
        msg = { 'text' : message }
    req = urllib.request.Request(
        webHook,
        json.dumps(msg).encode('utf-8'),
        {   
            'Content-Type': 'application/json',
            'User-Agent' : 'Mozilla/5.0 (X11; U; Linux i686) Gecko/20071127 Firefox/2.0.0.11'
        }
    )
    resp = urllib.request.urlopen(req)
    return resp.read()

Web monitoring

import urllib.request

def website_monitor(url):
    #print(f'URL : {url}')
    try:
        req = urllib.request.Request(url, method='HEAD')
        resp = urllib.request.urlopen(req, timeout=10)
        #print(resp.getcode())
        return resp.getcode() == 200
    except:
        return False
    return False

Excel

Write an Excel file from a dictionary

import xlsxwriter

def dict2xls(L,F,fmt = {}):
    print(f"Writing {F}")
    workbook = xlsxwriter.Workbook(F)

    COLOURS = {
        'darkblue'  : { 'bold': True,  'font_color' : 'white', 'bg_color' : '#44546A', 'align' : 'center' },
        'green'     : { 'bold': True,  'font_color' : 'white', 'bg_color' : '#00B050', 'align' : 'center' },
        'red'       : { 'bold': True,  'font_color' : 'white', 'bg_color' : '#C00000', 'align' : 'center' },
        'gray'      : { 'bold': False, 'font_color' : 'black', 'bg_color' : '#C0C0C0', 'align' : 'center' }
    }

    heading = workbook.add_format(COLOURS['darkblue'])

    for sheet in L:
        worksheet = workbook.add_worksheet(sheet)

        row = 0
        col = 0
        for H in L[sheet][0]:
            worksheet.write(row,col,H,heading)
            col += 1

        for a in L[sheet]:
            row += 1
            col = 0
            for H in L[sheet][0]:
                # -- is there formatting involved?
                cellformat = None
                if H in fmt:
                    F = fmt[H].get(a[H])
                    if F != None:
                        cellformat = workbook.add_format(COLOURS[F])

                worksheet.write(row,col,a[H],cellformat)
                col += 1

        worksheet.autofilter(0,0,row,col-1)
        worksheet.freeze_panes(1, 1)

    workbook.close()

if __name__ == '__main__':

    data = { 'Sheet1' : [
        {
            'Heading 1' : 'Data 1',
            'Heading 2' : 'Data 2'
        },
        {
            'Heading 1' : 'Data 3',
            'Heading 2' : 'Data 4'
        },
    ]}
    dict2xls(data, 'c:/temp/myexcelfile.xlsx')

Reporting

crosstab

import json

def add(matrix,x,y,z):
    # -- index X
    if not 'x' in matrix:
        matrix['x'] = []
    if not x in matrix['x']:
        matrix['x'].append(x)

    # -- index Y
    if not 'y' in matrix:
        matrix['y'] = []
    if not y in matrix['y']:
        matrix['y'].append(y)

    # -- index z
    if not 'z' in matrix:
        matrix['z'] = {}

    if not x in matrix['z']:
        matrix['z'][x] = {}

    matrix['z'][x][y] = z

def render_markdown(matrix):

    matrix['x'].sort()
    matrix['y'].sort()

    out = "||**" + "**|**".join(matrix['x']) + "**|\n"
    out += "|--" * (len(matrix['x']) + 1) + "|\n"
    for y in matrix['y']:
        out += f"|**{y}**|"
        for x in matrix['x']:
            out += f"{matrix['z'][x].get(y,'')}|"
        out += "\n"

    return out

def render_html(matrix):

    matrix['x'].sort()
    matrix['y'].sort()

    out = '<table>\n'
    out += "<tr><th>&nbsp;</th><th>" + "</th><th>".join(matrix['x']) + "</th></tr>\n"

    for y in matrix['y']:
        out += f"<tr><th>{y}</th>"
        for x in matrix['x']:
            out += f"<td>{matrix['z'][x].get(y,'')}</td>"
        out += "</tr>\n"

    out += '</table>\n'
    return out

def main():

    x = {}

    add(x,'X1','Y1','hello')
    add(x,'X2','Y1','there')

    add(x,'X1','Y2','Is')
    add(x,'X2','Y2','this')
    add(x,'X3','Y2','working')

    add(x,'X4','Y3','Howdy')

    # render markdown
    with open('output.md','wt') as F:
        F.write('# The main thing\n\n')
        F.write(render_markdown(x))

    # render HTML
    with open('output.html','wt') as F:
        F.write('<html>\n')
        F.write('<head><style>body { font-family:verdana;} th { background-color: #0047AB; color: #FFFFFF } td { background-color: ##B2FFFF; } table,th,td { border: 1px solid gray; border-collapse: collapse; padding: 5px; } </style></head>')
        F.write('<h1>The HTML thing</h1>\n\n')
        F.write(render_html(x))
        F.write('</html>')

main()

Command-line arguments

import argparse
parser = argparse.ArgumentParser(description='CloudFormation Helper')

# -- read a variable
parser.add_argument('-desc',help='Set a description for the CloudFormation file')

# -- Mandatory variable
parser.add_argument('-cf', help='Path to the CloudFormation json file', required=True)

# -- Provide multiple variables in a list
parser.add_argument('-add',help='Add a new resource to the CloudFormation file',nargs='+')

# -- Make it a toggle switch
parser.add_argument('-overwrite', help='Forces an overwrite of a resource if it already exists', action='store_true')

# -- consume the data
args = parser.parse_args()
print(args.desc)