本文介绍了一种监控源站的文件变更,用来控制 CDN 自动执行缓存刷新和文件预热任务的解决方案。该解决方案有以下特征:
为便于描述,本文以火山引擎 CDN 服务为例。其他云服务商的 CDN 服务(例如,阿里云 CDN、华为云 CDN 等)也是支持的。
获取火山引擎账号的 API 访问密钥。具体步骤,请参见 Access Key(密钥)管理。
注意
函数服务使用该账号的 API 访问密钥调用多云CDN的提交刷新/预热任务接口。因此,如果账号是 IAM 用户账号,您需要确保 IAM 用户账号具有调用多云CDN相关接口的权限。
登录火山引擎函数服务控制台。
在左侧导航栏,单击 函数列表。
在 函数列表 页面,单击 创建函数。
完成 创建函数 配置向导。
选择函数模板。
完成以下参数配置,然后单击 下一步:函数配置。
函数配置。
在 函数代码 区域,将 index.py 的内容替换为以下代码,然后单击 确定。
在使用以下代码前,您必须替换部分参数的值,具体包括:
<ak>
和 <sk>
替换成您的火山引擎账号的 API 访问密钥。<domain[N]>
:替换成您的加速域名。如果您有多个加速域名,需要设置多个 updateURL。
示例:
假设您有两个加速域名,您需要分别设置 updateURL1、updateURL2,将其中的<domain1>
和<domain2>
替换成加速域名。
示例:
假设您设置了 updateURL1、updateURL2、updateURL3,则需要将 Urls 设置成updateURL1+"\n"+updateURL2+"\n"+updateURL3
。
#!/usr/bin/env python # -*- coding: utf-8 -*- import datetime import hashlib import hmac import json from urllib.parse import quote import requests Service = "mcdn" Version = "2022-03-01" Region = "cn-north-1" Host = "open.volcengineapi.com" AK = "<ak>" SK = "<sk>" def norm_query(params): query = "" for key in sorted(params.keys()): if type(params[key]) == list: for k in params[key]: query = ( query + quote(key, safe="-_.~") + "=" + quote(k, safe="-_.~") + "&" ) else: query = (query + quote(key, safe="-_.~") + "=" + quote(params[key], safe="-_.~") + "&") query = query[:-1] return query.replace("+", "%20") # 第一步:准备辅助函数。 # sha256 非对称加密 def hmac_sha256(key: bytes, content: str): return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest() # sha256 hash算法 def hash_sha256(content: str): return hashlib.sha256(content.encode("utf-8")).hexdigest() # 第二步:创建一个多云CDN的 API 请求函数。签名计算的过程包含在该函数中。 def request(method, query, header, ak, sk, action, body): # 第三步:创建身份证明。其中的 Service 和 Region 字段是固定的。ak 和 sk 分别代表 # AccessKeyID 和 SecretAccessKey。同时需要初始化签名结构体。一些签名计算时需要的属性也在这里处理。 # 初始化身份证明结构体 credential = { "access_key_id": AK, "secret_access_key": SK, "service": Service, "region": Region, } # 初始化签名结构体 request_param = { "body": json.dumps(body), "host": Host, "path": "/", "method": method, "content_type": "application/json", "date": datetime.datetime.utcnow(), "query": {"Action": action, "Version": Version, **query}, } # 第四步:接下来开始计算签名。在计算签名前,先准备好用于接收签算结果的 signResult 变量,并设置一些参数。 # 初始化签名结果的结构体 x_date = request_param["date"].strftime("%Y%m%dT%H%M%SZ") short_x_date = x_date[:8] x_content_sha256 = hash_sha256(request_param["body"]) sign_result = { "Host": request_param["host"], "X-Content-Sha256": x_content_sha256, "X-Date": x_date, "Content-Type": request_param["content_type"], } # 第五步:计算 Signature 签名。 signed_headers_str = ";".join( ["content-type", "host", "x-content-sha256", "x-date"] ) canonical_request_str = "\n".join( [request_param["method"], request_param["path"], norm_query(request_param["query"]), "\n".join( [ "content-type:" + request_param["content_type"], "host:" + request_param["host"], "x-content-sha256:" + x_content_sha256, "x-date:" + x_date, ] ), "", signed_headers_str, x_content_sha256, ] ) hashed_canonical_request = hash_sha256(canonical_request_str) credential_scope = "/".join([short_x_date, credential["region"], credential["service"], "request"]) string_to_sign = "\n".join(["HMAC-SHA256", x_date, credential_scope, hashed_canonical_request]) k_date = hmac_sha256(credential["secret_access_key"].encode("utf-8"), short_x_date) k_region = hmac_sha256(k_date, credential["region"]) k_service = hmac_sha256(k_region, credential["service"]) k_signing = hmac_sha256(k_service, "request") signature = hmac_sha256(k_signing, string_to_sign).hex() sign_result["Authorization"] = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format( credential["access_key_id"] + "/" + credential_scope, signed_headers_str, signature, ) header = {**header, **sign_result} # 第六步:将 Signature 签名写入 HTTP Header 中,并发送 HTTP 请求。 r = requests.post("https://{}{}".format(request_param["host"], request_param["path"]), headers=header, params=request_param["query"], data=request_param["body"], ) return r.json() def handler(event, context): print(f"received new request, event content: {event}") print(json.dumps(event, sort_keys=True, indent=4)) uri = event["data"]["events"][0]["tos"]['object']['key'] updateURL1 = "https://domain1/" + uri updateURL2 = "https://domain2/" + uri refresh_request_body = { "Urls": updateURL1+"\n"+updateURL2, "Type": "file", } refresh_response_body = request("POST", {}, {}, AK, SK, "SubmitRefreshTask", refresh_request_body) print(refresh_response_body) preload_response_body = { "ResponseMetadata": { "RequestID": "", "Action": "SubmitPreloadTask", "Version": "2022-03-01", "Service": "mcdn", "Region": "cn-north-1" }, "Result": { "TaskId": "" } } if "Error" not in refresh_response_body["ResponseMetadata"]: print("刷新已成功,执行预热步骤") preload_request_body = { "Urls": updateURL1+"\n"+updateURL2, } preload_response_body = request("POST", {}, {}, AK, SK, "SubmitPreloadTask", preload_request_body) print(preload_response_body) result = { 'statusCode': 400, 'headers': { 'Content-Type': 'application/json' }, 'body': json.dumps({ 'message': "预热失败" }) } if "Error" not in preload_response_body["ResponseMetadata"]: print("预热已成功") result = { 'statusCode': 200, 'headers': { 'Content-Type': 'application/json' }, 'body': json.dumps({ 'message': preload_response_body }) } print(result) return result
发布函数。
创建函数触发器。
为方便验证,本教程以选择
tos:ObjectCreated:*
事件为例。tos:ObjectCreated:*
事件表示当 Bucket 发生文件增加事件时,函数将被触发。
完成以上操作后,您可以参照结果验证,验证配置是否已生效。
在 TOS Bucket 中上传一个测试文件,然后前往多云CDN控制台查看刷新和预热任务的操作记录。
登录多云CDN控制台。
配置正确的情况下,您可以在列表中找到相关的刷新和预热任务。
您可以将任务记录展开,查看多云CDN向云服务商提交的刷新和预热请求。您单击对应的 TaskID 后,页面将跳转到对应任务的执行状态页。该页面展示了由云服务商返回的任务执行状态。更多信息,请参见查看执行状态。
在部署自动刷新预热方案后,如果您的 TOS 桶文件发生较大的突然变化,将可能触发大量的刷新和预热请求。这有可能导致您的请求数量超过多云CDN接口的 QPS 限制,或云服务商平台的刷新/预热任务限额。
一旦您的请求超过了相关的限制,您可以在多云CDN控制台的 刷新预热 页面查询到对应错误信息。
对于请求超限的问题,您可以采用以下方法进行缓解:在函数代码中增加随机 sleep 的逻辑,使函数随机暂停运行一段时间。
以下是一段包含 sleep 逻辑的示例代码,供您参考。您可以根据实际情况修改其中的 sleep
相关配置,然后使用这段代码替换函数代码 index.py
的内容。
说明
如果使用该方法仍无法解决您的问题,请提交工单联系技术支持。
#!/usr/bin/env python # -*- coding: utf-8 -*- import datetime import time import random import hashlib import hmac import json from urllib.parse import quote import requests Service = "mcdn" Version = "2022-03-01" Region = "cn-north-1" Host = "open.volcengineapi.com" AK = "AK" SK = "SK" cdnURLs = ['https://example.com/'] Preload = False def norm_query(params): query = "" for key in sorted(params.keys()): if type(params[key]) == list: for k in params[key]: query = ( query + quote(key, safe="-_.~") + "=" + quote(k, safe="-_.~") + "&" ) else: query = (query + quote(key, safe="-_.~") + "=" + quote(params[key], safe="-_.~") + "&") query = query[:-1] return query.replace("+", "%20") # 第一步:准备辅助函数。 # sha256 非对称加密 def hmac_sha256(key: bytes, content: str): return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest() # sha256 hash算法 def hash_sha256(content: str): return hashlib.sha256(content.encode("utf-8")).hexdigest() # 第二步:创建一个多云CDN的 API 请求函数。签名计算的过程包含在该函数中。 def request(method, query, header, ak, sk, action, body): # 第三步:创建身份证明。其中的 Service 和 Region 字段是固定的。ak 和 sk 分别代表 # AccessKeyID 和 SecretAccessKey。同时需要初始化签名结构体。一些签名计算时需要的属性也在这里处理。 # 初始化身份证明结构体 credential = { "access_key_id": ak, "secret_access_key": sk, "service": Service, "region": Region, } # 初始化签名结构体 request_param = { "body": json.dumps(body), "host": Host, "path": "/", "method": method, "content_type": "application/json", "date": datetime.datetime.utcnow(), "query": {"Action": action, "Version": Version, **query}, } # 第四步:接下来开始计算签名。在计算签名前,先准备好用于接收签算结果的 signResult 变量,并设置一些参数。 # 初始化签名结果的结构体 x_date = request_param["date"].strftime("%Y%m%dT%H%M%SZ") short_x_date = x_date[:8] x_content_sha256 = hash_sha256(request_param["body"]) sign_result = { "Host": request_param["host"], "X-Content-Sha256": x_content_sha256, "X-Date": x_date, "Content-Type": request_param["content_type"], } # 第五步:计算 Signature 签名。 signed_headers_str = ";".join( ["content-type", "host", "x-content-sha256", "x-date"] ) canonical_request_str = "\n".join( [request_param["method"], request_param["path"], norm_query(request_param["query"]), "\n".join( [ "content-type:" + request_param["content_type"], "host:" + request_param["host"], "x-content-sha256:" + x_content_sha256, "x-date:" + x_date, ] ), "", signed_headers_str, x_content_sha256, ] ) hashed_canonical_request = hash_sha256(canonical_request_str) credential_scope = "/".join([short_x_date, credential["region"], credential["service"], "request"]) string_to_sign = "\n".join(["HMAC-SHA256", x_date, credential_scope, hashed_canonical_request]) k_date = hmac_sha256(credential["secret_access_key"].encode("utf-8"), short_x_date) k_region = hmac_sha256(k_date, credential["region"]) k_service = hmac_sha256(k_region, credential["service"]) k_signing = hmac_sha256(k_service, "request") signature = hmac_sha256(k_signing, string_to_sign).hex() sign_result["Authorization"] = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format( credential["access_key_id"] + "/" + credential_scope, signed_headers_str, signature, ) header = {**header, **sign_result} # 第六步:将 Signature 签名写入 HTTP Header 中,并发送 HTTP 请求。 r = requests.post("http://{}{}".format(request_param["host"], request_param["path"]), headers=header, params=request_param["query"], data=request_param["body"], ) return r.json() def handler(event, context): print(f"received new request, event content: {event}") print(json.dumps(event, sort_keys=True, indent=4)) uri = event["data"]["events"][0]["tos"]['object']['key'] action = event["data"]["events"][0]["eventName"].split(':')[2] if action in ['Delete', 'DeleteMarkerCreated']: global Preload Preload = False sleep_max_time = 30 updateURL = "" #判断尾部为index.html还有尾部为'/'的uri #尾部为index.html的文件需要额外加上一个刷新cdnurl+index.html之前的部分 #尾部为'/'的文件需要排除,不需要刷新目录(实际上是文件刷新)预热目录会失败 if uri.endswith('/'): print ("{}{} dont need refresh and preload!".format(cdnURLs[0],uri)) refresh_response_body = "{}{} dont need refresh and preload!".format(cdnURLs[0],uri) result = { 'statusCode': 200, 'headers': { 'Content-Type': 'application/json' }, 'body': json.dumps({ 'message': refresh_response_body }) } return result for cdnURL in cdnURLs: if updateURL == "": updateURL = updateURL + cdnURL + uri else: updateURL = updateURL + "\n" + cdnURL + uri if uri == 'index.html': updateURL = updateURL + "\n" + cdnURL sleep_max_time = 3 elif uri.endswith('/index.html'): updateURL = updateURL + "\n" + cdnURL + uri.rsplit('/', 1)[0] + '/' sleep_max_time = 3 refresh_request_body = { "Urls": updateURL, "Type": "file", } preload_request_body = { "Urls": updateURL, } #结果模板,默认失败 result = { 'statusCode': 400, 'headers': { 'Content-Type': 'application/json' }, 'body': json.dumps({ 'message': "刷新失败" }) } #打印一下刷新的urls print (refresh_request_body['Urls']) time.sleep(random.SystemRandom().randint(2, sleep_max_time)) #失败重试5次,中间随机sleep for i in range(5): refresh_response_body = request("POST", {}, {}, AK, SK, "SubmitRefreshTask", refresh_request_body) print(refresh_response_body) if "Error" not in refresh_response_body["ResponseMetadata"]: print("刷新已成功!") result = { 'statusCode': 200, 'headers': { 'Content-Type': 'application/json' }, 'body': json.dumps({ 'message': refresh_response_body }) } break else: time.sleep(random.SystemRandom().randint(2, 5)) print ('start refresh retry {}...'.format(i+1)) #不进行预热直接返回刷新结果 if not Preload: print(result) return result else: print("进行预热!") #预热处理 result = { 'statusCode': 400, 'headers': { 'Content-Type': 'application/json' }, 'body': json.dumps({ 'message': "预热失败" }) } for i in range(5): preload_response_body = request("POST", {}, {}, AK, SK, "SubmitPreloadTask", preload_request_body) print(preload_response_body) if "Error" not in preload_response_body["ResponseMetadata"]: print("预热已成功") result = { 'statusCode': 200, 'headers': { 'Content-Type': 'application/json' }, 'body': json.dumps({ 'message': preload_response_body }) } print(result) if result['statusCode'] == 200: return result else: time.sleep(random.SystemRandom().randint(2, 5)) print ('start preload retry {}...'.format(i+1))