The simple way to lock and unlock images, can be done using azure CLI.But what if you dealing with thousands of images and would like to secure only active production images. For this you’d have to query against all your K8’s clusters.
az acr repository update \
--name myregistry --image myrepo@sha256:123456abcdefg \
--write-enabled false
az acr repository update \
--name myregistry --image myrepo@sha256:123456abcdefg \
--write-enabled true
az acr repository update \
--name myregistry --repository myrepo \
--list-enabled false
One solution is to query the azure monitoring logs
import argparse
import sys
import subprocess
import json
import logging
# command processes return shell result
def _run_command(command):
logging.debug("Command: {}".format(command))
result = subprocess.run(command, shell=True, capture_output=True)
print(result)
if result.returncode > 0 and result.stderr:
raise subprocess.CalledProcessError(
returncode = result.returncode,
cmd = result.args,
stderr = result.stderr.decode('utf-8')
)
if result.stdout:
logging.debug("Command Result: {}".format(result.stdout.decode('utf-8')))
return result
def getClusterList(azQuery,val_uuid):
try:
monitor_command="az monitor log-analytics query -w {} --analytics-query ".format(val_uuid)
monitor_command+= '"' +azQuery+'"'
print(monitor_command)
result=_run_command(monitor_command)
result_stdout = result.stdout.decode("utf-8")
return json.loads(result_stdout)
except subprocess.CalledProcessError as error:
print("Error inside getClusterList"+ result.stderr.decode('utf-8'))
except Exception as e:
print("Error inside getClusterList query "+ str(e))
sys.exit(1)
# locks the images, takes monitoring data as input
def lockImage(imageList):
try:
if imageList:
result=""
for q in imageList:
try:
print("Lock images,Processing: "+ q['Image']+"@"+q['ImageID'])
az_command='az acr repository update --name {} --image {}@{} --write-enabled false'.format(q['Repository'],q['Image'],q['ImageID'])
print(az_command)
result= _run_command(az_command)
result_stdout=result.stdout.decode("utf-8")
print(result_stdout)
except subprocess.CalledProcessError as error:
print("Error inside getClusterList"+ result.stderr.decode('utf-8'))
continue
except subprocess.CalledProcessError as error:
print("Error inside lockImage "+ error)
except Exception as e:
print("Error inside lockImage "+ str(e))
sys.exit(1)
def acr_login(imagelist):
try:
acrname=[]
for q in imagelist:
if q['Repository'] not in acrname:
acrname.append(q['Repository'])
print(acrname)
if acrname:
for a in acrname:
az_command= 'az acr login -n {} --expose-token'.format(a)
val=_run_command(az_command)
print(val.stdout.decode("utf-8"))
return True
except subprocess.CalledProcessError as error:
print("Error inside acr_login "+ str(error))
sys.exit(1)
# unlocks the images,takes monitoring data as input
def unLockImage(imageList):
try:
if imageList:
r=""
for q in imageList:
try:
print("Unlock images, Processing: "+ q['Repository'])
az_command='az acr repository update --name {} --repository {} --write-enabled true'.format(q['Repository'],q['Image'])
print(az_command)
r= _run_command(az_command)
r_stdout=r.stdout.decode("utf-8")
print(r_stdout)
except subprocess.CalledProcessError as error:
print("Error inside getClusterList"+ r.stderr.decode('utf-8'))
continue
except subprocess.CalledProcessError as error:
print("Error inside unLockImage "+ error)
sys.exit(1)
except Exception as e:
print("Error inside unLockImage "+ str(e))
sys.exit(1)
def azureQuery(repo):
azureQuery="KubePodInventory| where TimeGenerated > ago(1h)| where PodStatus == 'Running'| project ClusterName,ContainerID, PodName=Name, Namespace| distinct ClusterName, ContainerID, PodName, Namespace| join (ContainerInventory | where ContainerState == 'Running'| where Repository contains '{}'| distinct ContainerID, Image, ImageTag, ImageID,Repository) on ContainerID | summarize by Image, ImageTag, ImageID, Repository".format(repo)
return azureQuery
def main():
query=azureQuery('xxxx')
imageList=getClusterList(query,'xxx')
if len(imageList) > 0 and acr_login(imageList):
unLockImage(imageList)
lockImage(imageList)
if __name__ == "__main__":
main()