近期文章:推荐最好的fomepay虚拟信用卡,解决ChatGPT Plus Telegram会员无法支付的问题等。
对于生产项目,4w个压缩包的跨项目迁移需要下载后删除压缩包中以html、txt、url结尾的文件,上传到oss。不过在下载之前,需要对文件列表中的地址进行裁剪,最后与域名进行拼接。使用shell命令zip -d去下载并删除压缩包中的文件。这里注意,上传的时候需要使用文件列表中的地址来上传,但是上传到oss的时候需要去掉第一个/。使用lstrip(‘/’)删除,上传时不会报错电报TG。
我是自学的python,水平有限。总感觉自己写的不太理想,但是想要的功能是可以实现的,以后会慢慢完善的。
Python3异步并发下载脚本
导入aiohttp
导入异步
导入操作系统
导入压缩文件
导入子流程
进口2
导入日志记录
#指定文件列表文件
file_list=’zip.txt’
#指定数据保存目录
DATA_DIR=’/数据/’
BASE_URL=’下载域名’
#指定最大并发数
最大并发下载数=8
#上传成功后是否删除本地文件
DELETE_LOCAL_FILE_AFTER_UPLOAD=True #False
#几秒后重试以及最大重试次数
重试延迟=5
最大重试次数=3
#指定下载超时时间
超时=300
#配置下载成功和失败记录文件
SUCCESS_FILE=os.path.join(DATA_DIR, ‘success_log.txt’)
FAILURE_FILE=os.path.join(DATA_DIR, ‘failure_log.txt’)
# 配置oss
OSS_ACCESS_KEY_ID=”
OSS_ACCESS_KEY_SECRET=”
OSS_ENDPOINT=”
OSS_BUCKET_NAME=”
#配置日志文件
logging.basicConfig(filename=’download_upload.log’, level=logging.INFO)
异步def extract_content_from_url(url):
#urldo 切割
parts=url.split(‘/’, 4)
first_part=’/’ + ‘/’.join(parts[1:4])
内容=部分[4]
返回first_part,内容
异步def download_from_ku(url, 会话, 信号量, success_file, failure_file, retries=MAX_RETRIES):
第一部分,内容=等待extract_content_from_url(url)
#构建完整的下载地址
full_url=BASE_URL + 内容
data_dir=DATA_DIR + os.path.dirname(内容)
os.makedirs(data_dir,exist_ok=True)
local_filename=os.path.join(data_dir, os.path.basename(内容))
如果os.path.exists(local_filename):
logging.info(f’文件已存在: {local_filename}’)
返回本地文件名
重试时0:
尝试:
与信号量异步,session.get(full_url, timeout=TIMEOUT) 作为response:
以open(local_filename, ‘wb’) 作为file:
块=等待响应.content.read(8192)
如果不是chunk:
文件.write(块)
print(f’下载成功: {first_part}/{content}’)
logging.info(f’下载成功: {first_part}/{content}’)
打开(SUCCESS_FILE,’a’)作为success_log:
success_log.write(f’下载成功: {first_part}/{content}\n’)
返回本地文件名Telegram会员
除了asyncio.TimeoutError:
logging.warning(f’下载超时,跳过下载{first_part}/{content}’)
打开(FAILURE_FILE,’a’)作为failure_log:
failure_log.write(f’下载超时: {first_part}/{content}\n’)
返回无
除了异常e:
print(f’下载错误{内容}: {e}’)
logging.error(f’下载错误{content}: {e}’)
打开(FAILURE_FILE,’a’)作为failure_log:
failure_log.write(f’下载错误{内容}: {e}\n’)
重试-=1
如果重试==0:
logging.error(f’达到最大重试次数并放弃下载{content}’)
否则:
logging.info(f’重试下载{content},剩余次数: {retries}’)
等待asyncio.sleep(RETRY_DELAY)
异步def remove_files_from_zip(zip_file, *files_to_remove):
#删除压缩包中指定类型的文件,调用zip -d
data_dir=DATA_DIR + os.path.dirname(zip_file)
zip_file_path=os.path.join(data_dir, zip_file)
命令=[‘zip’, ‘-d’, zip_file_path] + list(files_to_remove)
尝试:
子进程.运行(命令)
logging.info(f’已从: {zip_file_path} 中删除文件’)
除了subprocess.CalledProcessError 为e:
logging.error(f’从{zip_file_path}: {e} 删除文件时出错’)
异步def upload_to_oss(local_path, oss_key, 重试=MAX_RETRIES):
# 上传文件到OSS
auth=oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
桶=oss2.Bucket(auth, OSS_ENDPOINT, OSS_BUCKET_NAME)
重试时0:
尝试:
以open(local_path, ‘rb’) 作为file:
Bucket.put_object(oss_key, 文件)
oss_url=f’https://{OSS_BUCKET_NAME}.{OSS_ENDPOINT}/{oss_key}’
print(f’上传成功: {oss_url}’)
logging.info(f’上传成功: {oss_url}’)
如果DELETE_LOCAL_FILE_AFTER_UPLOAD:
os.remove(本地路径)
除了oss2.exceptions.OssError 为e:
print(f’上传失败: {e}’)
logging.error(f’上传失败: {e}’)
重试-=1
如果重试==0:
logging.error(f’已达到最大重试次数,放弃上传{local_path}’)
否则:
logging.info(f’重试上传{local_path},剩余次数: {retries}’)
等待asyncio.sleep(RETRY_DELAY)
异步def download_remove_upload_task(url, 会话, 信号量):
第一部分,内容=等待extract_content_from_url(url)
如果不满足:
local_filename=等待download_from_ku(url, 会话, 信号量, SUCCESS_FILE, FAILURE_FILE)
如果本地文件名:
等待remove_files_from_zip(local_filename, ‘*.html’, ‘*.txt’, ‘*.url’)
第一部分,内容=等待extract_content_from_url(url)
oss_key=(first_part.lstrip(‘/’) + ‘/’ + os.path.dirname(内容)+ ‘/’ + os.path.basename(local_filename)
)
等待upload_to_oss(local_filename, oss_key)
除了异常e:
print(f’处理{url} 时出现错误: {e}’)
logging.error(f’处理{url} 时出现错误: {e}’)
异步def main():
使用open(file_list, ‘r’) 作为file:
信号量=asyncio.Semaphore(MAX_CONCURRENT_DOWNLOADS)
连接器=aiohttp.TCPConnector(限制=MAX_CONCURRENT_DOWNLOADS)
与aiohttp.ClientSession(connector=connector) 异步作为session:
#创建电报TG下载任务列表
任务=[
download_remove_upload_task(url.strip().strip(”’), 会话, 信号量)
对于文件中的url
]
#同时执行下载和删除上传任务
等待asyncio.gather(*tasks)
如果__name__==’__main__’:
asyncio.get_event_loop().run_until_complete(main())
pip3安装依赖aiohttp asyncio oss2子进程
pip3 安装aiohttp
pip3安装异步
pip3安装oss2
pip3 安装子进程
python3脚本内容简单备注
正确的下载地址是去掉第四个/前面的。使用split 剪切路径的第一部分和第二部分并将其保存到变量中。
异步def extract_content_from_url(url):
#urldo 切割
parts=url.split(‘/’, 4)
first_part=’/’ + ‘/’.join(parts[1:4])
内容=部分[4]
返回first_part,内容
文件列表
下载日志构建下载删除上传任务
最后在main函数中使用asyncio.gather(*tasks)启动任务,扔到后台运行一整夜。有些大文件下载超时,如果失败,再次运行就结束了。
python官方中文文档