Azure lock and unlock images

Feb. 1, 2024

The simple way to lock and unlock images, can be done using azure CLI.But what if you dealing with thousands of images and would like to secure only active production images. For this you’d have to query against all your K8’s clusters.

Lock an Image

az acr repository update \
    --name myregistry --image myrepo@sha256:123456abcdefg \
    --write-enabled false

Unlock an Image

az acr repository update \
    --name myregistry --image myrepo@sha256:123456abcdefg \
    --write-enabled true

Lock a repository

az acr repository update \
    --name myregistry --repository myrepo \ 
    --list-enabled false

One solution is to query the azure monitoring logs

Sample code I implemented

import argparse
import sys
import subprocess
import json
import logging

# command processes return shell result
def _run_command(command):
    logging.debug("Command: {}".format(command))
    result = subprocess.run(command, shell=True, capture_output=True)
    print(result)
    if  result.returncode > 0 and result.stderr:
        raise subprocess.CalledProcessError(
                returncode = result.returncode,
                cmd = result.args,
                stderr = result.stderr.decode('utf-8')
                )
    if result.stdout:
        logging.debug("Command Result: {}".format(result.stdout.decode('utf-8')))
    return result

def getClusterList(azQuery,val_uuid):
    try:
        monitor_command="az monitor log-analytics query -w {} --analytics-query ".format(val_uuid)
        monitor_command+= '"' +azQuery+'"'
        print(monitor_command)
        result=_run_command(monitor_command)
        result_stdout =  result.stdout.decode("utf-8")
        return json.loads(result_stdout)
    except subprocess.CalledProcessError as error:
        print("Error inside getClusterList"+ result.stderr.decode('utf-8'))
    except Exception as e:
        print("Error inside getClusterList query "+ str(e))
        sys.exit(1)

# locks the images, takes monitoring data as input
def lockImage(imageList):
    try:
        if imageList:
            result=""
            for q in imageList:
                try:
                    print("Lock images,Processing: "+ q['Image']+"@"+q['ImageID'])
                    az_command='az acr repository update --name {} --image {}@{} --write-enabled false'.format(q['Repository'],q['Image'],q['ImageID'])
                    print(az_command)
                    result= _run_command(az_command)
                    result_stdout=result.stdout.decode("utf-8")
                    print(result_stdout)
                except subprocess.CalledProcessError as error:
                    print("Error inside getClusterList"+ result.stderr.decode('utf-8'))
                    continue
    except subprocess.CalledProcessError as error:
        print("Error inside lockImage "+ error)
    except Exception as e:
        print("Error inside lockImage "+ str(e))
        sys.exit(1)

def acr_login(imagelist):
    try:
        acrname=[]
        for q in imagelist:
            if q['Repository'] not in acrname:
                acrname.append(q['Repository'])
        print(acrname)
        if acrname:
            for a in acrname:
                az_command= 'az acr login -n {} --expose-token'.format(a)
                val=_run_command(az_command)
                print(val.stdout.decode("utf-8"))
        return True
    except subprocess.CalledProcessError as error:
        print("Error inside acr_login "+ str(error))
        sys.exit(1)

# unlocks the images,takes monitoring data as input
def unLockImage(imageList):
    try:
        if imageList:
            r=""
            for q in imageList:
                try:
                    print("Unlock images, Processing: "+ q['Repository'])
                    az_command='az acr repository update --name {} --repository {} --write-enabled true'.format(q['Repository'],q['Image'])
                    print(az_command)
                    r= _run_command(az_command)
                    r_stdout=r.stdout.decode("utf-8")
                    print(r_stdout)
                except subprocess.CalledProcessError as error:
                    print("Error inside getClusterList"+ r.stderr.decode('utf-8'))
                    continue
    except subprocess.CalledProcessError as error:
        print("Error inside unLockImage "+ error)
        sys.exit(1)
    except Exception as e:
        print("Error inside unLockImage "+ str(e))
        sys.exit(1)

def azureQuery(repo):
    azureQuery="KubePodInventory| where TimeGenerated > ago(1h)| where PodStatus == 'Running'| project ClusterName,ContainerID, PodName=Name, Namespace| distinct  ClusterName, ContainerID, PodName, Namespace| join (ContainerInventory | where ContainerState == 'Running'| where  Repository contains '{}'| distinct  ContainerID, Image, ImageTag, ImageID,Repository) on ContainerID | summarize by Image, ImageTag, ImageID, Repository".format(repo)
    return azureQuery

def main():
    query=azureQuery('xxxx')
    imageList=getClusterList(query,'xxx')
    if len(imageList) > 0 and acr_login(imageList):
        unLockImage(imageList)
        lockImage(imageList)

if __name__ == "__main__":
    main()