Skip to content

Python

AWS

Assume a role

It's a bit painful to assume a role in Python. Use this function to simplify things

import boto3

def boto3_assume(**KW):
    assumed_role_object=boto3.client('sts').assume_role(
        RoleArn=f"arn:aws:iam::{KW['account']}:role/{KW['role']}",
        RoleSessionName=KW.get('session',KW['role'])
    )

    return boto3.Session(
        aws_access_key_id     = assumed_role_object['Credentials']['AccessKeyId'],
        aws_secret_access_key = assumed_role_object['Credentials']['SecretAccessKey'],
        aws_session_token     = assumed_role_object['Credentials']['SessionToken'],
    )

if __name__=='__main__':
    who_am_i = boto3_assume(role='MyRole',account='000000000000).sts.get_caller_identity()

Organisations

Find my master account

import boto3

def find_master_account():
    return boto3.client('organizations').describe_organization().get('Organization',{}).get('MasterAccountId')

Athena

Run an SQL query

WORK IN PROGRESS

import boto3
import time

# == function to convert the result from athena to a dictionary
def process_athena(response):
    _key_row = response["ResultSet"]["Rows"].pop(0)
    _keys = []
    for key in _key_row["Data"]:
        _keys += key.values()
    output = []
    while response["ResultSet"]["Rows"]:
        result = {_k: None for _k in _keys} 
        row = response["ResultSet"]["Rows"].pop(0)
        _values = []
        for key in row["Data"]:
            _values += key.values()

        for idx, _value in enumerate(_values):
            result[_keys[idx]] = _value
        output.append(result)

    return output

def query_athena(QueryString):
    Database = 'default'
    WorkGroup = 'default'

    athena = boto3.client('athena')
    # == make the SQL query
    query_id = athena.start_query_execution(
        QueryString           = QueryString,
        #ExecutionParameters = ExecutionParameters,
        QueryExecutionContext = { 'Database' : Database },
        WorkGroup             = WorkGroup
    )['QueryExecutionId']

    # == wait for it to complete
    while True:
        finish_state = athena.get_query_execution(QueryExecutionId=query_id)["QueryExecution"]["Status"]["State"]
        if finish_state == "RUNNING" or finish_state == "QUEUED":
            time.sleep(1)
        else:
            break

    # == retrieve the results
    # TODO - if we get next_token, do the call again
    resultx = athena.get_query_results(QueryExecutionId=query_id, MaxResults = 1000)

    # == save the results in a dictionary
    result = process_athena(resultx)

    return result

def main():
    result = query_athena('''
        SELECT
            name,
            surname,
            phoneno
        FROM
            employee_data''')

    print(result)

S3

Read all objects in a bucket

import boto3

Bucket = "MYBUCKETNAME"
Prefix = "myPrefix"

for P in boto3.client('s3').get_paginator('list_objects').paginate(Bucket=Bucket,Prefix = Prefix):
    for k in P['Contents']:
        print(k['Key'])
        content = boto3.client('s3').get_object(Bucket=Bucket, Key=k['Key'])['Body'].read().decode('utf-8')

Write to an S3 bucket

boto3.resource('s3').Bucket(os.environ['S3BUCKET']).put_object(
    ACL         = 'bucket-owner-full-control',
    ContentType = 'application/json',
    Key         = key,
    Body        = json.dumps(newstate,indent=2,default=str)
)

Read an S3 object

state = json.loads(boto3.client('s3').get_object(Bucket=os.environ['S3BUCKET'], Key=key)['Body'].read().decode('utf-8'))

SNS

Publish a topic

boto3.client('sns',region_name = 'ap-southeast-2').publish(TopicArn=os.environ['MonitorTopicSNS'],Message = msg, Subject = subject)

Date and time

Convert epoch time to date time

import datetime

x = datetime.datetime.fromtimestamp(epoch).strftime('%Y-%m-%d %H:%M:%S')

Convert string to a Python datetime

import datetime

def string_to_datestamp(t):
    return datetime.datetime.strptime(t, '%Y-%m-%d %H:%M:%S')

Get the current datestamp in a string

import datetime

def datestamp():
    return datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')

Find the first day of the week

import datetime

def firstDayoftheWeek(dte):
    FDW = dte - datetime.timedelta(days = dte.weekday())
    return FDW

Email

Send email through Outlook

import win32com.client

def send_email(to,cc,subject,body):
    outlook = win32com.client.Dispatch('outlook.application')
    mail = outlook.CreateItem(0)
    mail.To = to
    mail.Subject = subject
    mail.HTMLBody = body
    mail.cc = cc
    mail.Display()
    #mail.Send()

Files

Find a file age in seconds

import datetime
import os

filename = '/tmp/myfile.txt'
age = (datetime.datetime.today() - datetime.datetime.fromtimestamp(os.path.getmtime(filename))).seconds
print(f"age in seconds is {age}"

Find all files in a directory

import os
import fnmatch

def findFiles(path,filter = '*'):
    q = []
    for r, d, f in os.walk(path):
        for file in f:
            if fnmatch.fnmatch(file,filter):
                q.append(os.path.join(r, file).replace('\\','/')[len(path)+1:])
    return q

Find the path of my script

import os.path

file_path = os.path.dirname(os.path.realpath(__file__))

Write a file to disk or S3

import json
import boto3
import botocore

def writeFile(filename,data):
    # -- what type of data is it?
    if(type(data) != str):
        Body = json.dumps(data,default=str)
        ContentType = 'application/json'
    else:
        Body = data
        ContentType = 'application/text'

    # - is it local, or s3?
    if filename.startswith('s3://'):
        bucket = filename.split('/')[2]
        key = '/'.join(filename.split('/')[3:])

        print(f"bucket = {bucket}")
        print(f"key = {key}")
        try:
            boto3.resource('s3').Bucket(bucket).put_object(
                ACL         = 'bucket-owner-full-control',
                ContentType = ContentType,
                Key         = key,
                Body        = Body
            )
        except botocore.exceptions.ClientError as error:
            print(f"WARNING - s3.put_object - {error.response['Error']['Code']}")

    else:
        print(f"filename = {filename}")
        with open(filename,'wt') as f:
            f.write(Body)

if __name__=='__main__':
    x = { 'some' : 'data'}

    writeFile('myfile.txt',x)

    writeFile('s3://mybucket/2023/05/02/myfile.txt',x)

CSV

Write a CSV file

import csv

def writeCSV(file,data):
    os.makedirs(os.path.dirname(file),exist_ok = True)
    headers = list(data[0])
    with open(file, 'wt', newline='') as csvfile:
        csvwriter = csv.writer(csvfile)
        csvwriter.writerow(headers)

        for x in data:
            row = []
            for h in headers:
                row.append(x.get(h))
            csvwriter.writerow(row)

if __name__ == '__main__':
    data = [
        { "header1" : "data1", "header2" : "data2" },
        { "header1" : "data3", "header2" : "data4" }
    ]
    writeCSV('output.csv',data)

Read a CSV file

import csv

def readCSV(file):
    output = []
    with open(file, 'rt',encoding='utf-8') as csvfile:      
        reader = csv.DictReader(csvfile)
        for row in reader:
            output.append(row)
    return output

if __name__ == '__main__':
    for x in readCSV('c:/temp/test.csv'):
        print(x)

Slack or Discord

Sending messages via Slack or Discord

import json
import urllib.request
import urllib.parse

def slackdiscord(webHook,message):
    if 'discord' in webHook:
        msg = { 'content' : message }
    else:
        msg = { 'text' : message }
    req = urllib.request.Request(
        webHook,
        json.dumps(msg).encode('utf-8'),
        {   
            'Content-Type': 'application/json',
            'User-Agent' : 'Mozilla/5.0 (X11; U; Linux i686) Gecko/20071127 Firefox/2.0.0.11'
        }
    )
    resp = urllib.request.urlopen(req)
    return resp.read()

Web monitoring

import urllib.request

def website_monitor(url):
    #print(f'URL : {url}')
    try:
        req = urllib.request.Request(url, method='HEAD')
        resp = urllib.request.urlopen(req, timeout=10)
        #print(resp.getcode())
        return resp.getcode() == 200
    except:
        return False
    return False

Command-line arguments

import argparse
parser = argparse.ArgumentParser(description='CloudFormation Helper')

# -- read a variable
parser.add_argument('-desc',help='Set a description for the CloudFormation file')

# -- Mandatory variable
parser.add_argument('-cf', help='Path to the CloudFormation json file', required=True)

# -- Provide multiple variables in a list
parser.add_argument('-add',help='Add a new resource to the CloudFormation file',nargs='+')

# -- Make it a toggle switch
parser.add_argument('-overwrite', help='Forces an overwrite of a resource if it already exists', action='store_true')

# -- consume the data
args = parser.parse_args()
print(args.desc)

Jinja

Write a template

import jinja2
import os

def render_jinja(template,output,**KW):
    os.makedirs(os.path.dirname(output),exist_ok = True)
    template_dir = os.path.dirname(output)
    env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir)).get_template(template)
    result = env.render(**KW)
    print(f"Writing {output}")
    with open(output,'wt',encoding='utf-8') as q:
        q.write(result)

Logging

import logging

try:
    import colorama
    colorama.init()
except ImportError:
    os.system('')  # Enables ANSI escape characters in terminal on Windows

class ColorFormatter(logging.Formatter):
    COLORS = {
        'DEBUG': '\033[36m',    # Cyan
        'INFO': '\033[32m',     # Green
        'WARNING': '\033[33m',  # Yellow
        'ERROR': '\033[31m',    # Red
        'CRITICAL': '\033[41m', # Red background
    }
    RESET = '\033[0m'

    def format(self, record):
        # Center the levelname in a field of width 8
        levelname = record.levelname.center(8)
        color = self.COLORS.get(record.levelname.strip(), '')
        record.levelname = f"{color}{levelname}{self.RESET}"
        return super().format(record)

handler = logging.StreamHandler()
handler.setFormatter(ColorFormatter('%(asctime)s - %(levelname)s %(message)s'))
logging.basicConfig(level=logging.INFO, handlers=[handler])