用AI撰写高质量网络小说:技术实操与效率提升指南

核心内容摘要

JavaScript脚本引擎:提升跨平台自动化效率的7个实战技巧
通义千问3-Reranker-0.6B案例分享:电商搜索优化实战

2026|写论文被坑到怀疑人生,唯一救星竟然是它

ä»�零开始学 Python自动化 / è¿�ç»´å¼€å�‘å®�æˆ˜æ ¸å¿ƒåº“ 3 大å®�战场景在è¿�维工作中é‡�å¤�çš„æœ�务器巡检ã€�批é‡�部署ã€�日志分æ��等任务ä¸�仅耗时耗力还容易出ç�°äººä¸ºå¤±è¯¯ã€‚而 Python 凭借丰富的è¿�ç»´æ ¸å¿ƒåº“èƒ½è½»æ�¾å®�ç�°è¿™äº›ä»»åŠ¡çš„è‡ªåŠ¨åŒ–å¤§å¹…æ��å�‡è¿�维效ç�‡ã€�é™�ä½�æ“�作é£�险。本文将ä»�è¿�ç»´å¼€å�‘çš„æ ¸å¿ƒåº“å…¥æ‰‹è¯¦ç»†è®²è§£paramikoSSH 远程æ“�作ã€�fabric批é‡�è¿�ç»´ã€�ansible自动化é…�ç½®ã€�psutil系统监æ�§çš„使用å†�通过 3 个è�½åœ°å®�战场景æœ�务器批é‡�巡检ã€�自动化部署ã€�日志分æ��å·¥å…·å¸®ä½ ä»�零基础æ�Œæ�¡ Python 自动化 / è¿�ç»´å¼€å�‘å®�ç�°è¿�维工作的 “æ��è´¨å¢�效â€�。一ã€�è¿�ç»´å¼€å�‘æ ¸å¿ƒåº“å¼€ç®±å�³ç”¨çš„è¿�维工具Python è¿�维生æ€�æˆ�ç†Ÿä»¥ä¸‹æ ¸å¿ƒåº“è¦†ç›–äº†è¿œç¨‹æ“�作ã€�批é‡�管ç�†ã€�系统监æ�§ç­‰æ ¸å¿ƒåœºæ™¯æ˜¯è¿�维自动化的基石。

1 paramikoSSH 远程æ“�ä½œæ ¸å¿ƒåº“paramiko是 Python å®�ç�°çš„ SSH2 å��议库支æŒ� SSH 远程登录ã€�执行命令ã€�ä¸Šä¼ ä¸‹è½½æ–‡ä»¶æ— éœ€ä¾�赖系统 SSH 客户端是å®�ç�°å�•å�° / 多å�°æœ�务器远程æ“�作的基础。å‰�ç½®æ�¡ä»¶å®‰è£… paramikopip install paramikoæ ¸å¿ƒç”¨æ³•åœºæ™¯ 1SSH 远程执行命令import paramiko def ssh_execute_command(host, port, username, password, command): SSH远程登录并执行命令 :param host: æœ�务器IP :param port: SSH端å�£ :param username: 登录用户å�� :param password: 登录密ç � :param command: è¦�执行的系统命令 :return: 命令执行结æ�œ #

创建SSH客户端对象 ssh_client paramiko.SSHClient() #

è‡ªåŠ¨æ·»åŠ æœªçŸ¥ä¸»æœºå¯†é’¥ç”Ÿäº§ç�¯å¢ƒå»ºè®®æ‰‹åЍé…�ç½® ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: #

���务器 ssh_client.connect( hostnamehost, portport, usernameusername, passwordpassword, timeout10 ) #

执行命令stdin: 输入, stdout: 输出, stderr: 错误 stdin, stdout, stderr ssh_client.exec_command(command) #

��执行结� stdout_result stdout.read().decode(utf-8, errorsignore) stderr_result stderr.read().decode(utf-8, errorsignore) return { status: success, stdout: stdout_result, stderr: stderr_result } except Exception as e: return { status: failed, error: str(e) } finally: #

关闭�� ssh_client.close() # 调用示例 result ssh_execute_command( host

192.

168.

100, port22, usernameroot, passwordyour_server_password, commanddf -h # 查看ç£�盘使用情况 ) # 打å�°ç»“æ�œ if result[status] success: print(命令执行æˆ�功输出结æ�œ) print(result[stdout]) if result[stderr]: print(命令执行警告/错误) print(result[stderr]) else: print(f命令执行失败{result[error]})场景 2SFTP ä¸Šä¼ / 下载文件import paramiko def sftp_upload_file(host, port, username, password, local_file, remote_file): SFTPä¸Šä¼ æœ¬åœ°æ–‡ä»¶åˆ°è¿œç¨‹æœ�务器 transport paramiko.Transport((host, port)) try: # è¿�æ�¥SFTP transport.connect(usernameusername, passwordpassword) sftp paramiko.SFTPClient.from_transport(transport) # ä¸Šä¼ æ–‡ä»¶ sftp.put(local_file, remote_file) print(f文件 {local_file} å·²æˆ�åŠŸä¸Šä¼ åˆ° {remote_file}) return True except Exception as e: print(fæ–‡ä»¶ä¸Šä¼ å¤±è´¥{str(e)}) return False finally: transport.close() def sftp_download_file(host, port, username, password, remote_file, local_file): SFTPä»�远程æœ�务器下载文件到本地 transport paramiko.Transport((host, port)) try: transport.connect(usernameusername, passwordpassword) sftp paramiko.SFTPClient.from_transport(transport) # 下载文件 sftp.get(remote_file, local_file) print(f文件 {remote_file} å·²æˆ�功下载到 {local_file}) return True except Exception as e: print(f文件下载失败{str(e)}) return False finally: transport.close() # 调用示例 sftp_upload_file( host

192.

168.

100, port22, usernameroot, passwordyour_server_password, local_filelocal_test.txt, remote_file/root/remote_test.txt ) sftp_download_file( host

192.

168.

100, port22, usernameroot, passwordyour_server_password, remote_file/root/remote_test.txt, local_filedownloaded_test.txt )

2 fabric批é‡�è¿�维自动化工具fabric是基äº�paramikoå°�装的高级批é‡�è¿�维库支æŒ�批é‡�执行命令ã€�批é‡�ä¸Šä¼ ä¸‹è½½æ–‡ä»¶ã€�任务编æ�’语法更简æ´�是批é‡�管ç�†æœ�务器的首选工具当å‰�主æµ�版本为Fabric 3兼容 Python3。å‰�ç½®æ�¡ä»¶å®‰è£… fabricpip install fabric3æ ¸å¿ƒç”¨æ³•æ‰¹é‡�执行è¿�维任务from fabric import Connection from invoke import Responder #

定义�务器列表���置文件读� servers [ {host:

192.

168.

100, port: 22, user: root, password: your_server_password}, {host:

192.

168.

101, port: 22, user: root, password: your_server_password} ] #

定义批�执行命令的函数 def batch_execute_command(servers, command): 批�在多��务器上执行命令 for server in servers: print(f 开始处��务器 {server[host]} ) try: # 创建�� conn Connection( hostserver[host], portserver[port], userserver[user], connect_kwargs{password: server[password]} ) # 执行命令支�sudo��若需� sudo_responder Responder( patternr\[sudo\] password for .*:, responsef{server[password]}\n ) # 执行普通命令 result conn.run(command, hideFalse, warnTrue) if result.ok: print(f�务器 {server[host]} 命令执行�功输出\n{result.stdout}) else: print(f�务器 {server[host]} 命令执行失败错误\n{result.stderr}) except Exception as e: print(f�务器 {server[host]} ��/执行失败{str(e)}) finally: print(f 结�处��务器 {server[host]} \n) #

定义批é‡�ä¸Šä¼ æ–‡ä»¶çš„å‡½æ•° def batch_upload_file(servers, local_file, remote_file): 批é‡�ä¸Šä¼ æ–‡ä»¶åˆ°å¤šå�°æœ�务器 for server in servers: print(f å¼€å§‹ä¸Šä¼ æ–‡ä»¶åˆ°æœ�务器 {server[host]} ) try: conn Connection( hostserver[host], portserver[port], userserver[user], connect_kwargs{password: server[password]} ) # ä¸Šä¼ æ–‡ä»¶ conn.put(local_file, remote_file) print(f文件已æˆ�åŠŸä¸Šä¼ åˆ°æœ�务器 {server[host]} çš„ {remote_file}) except Exception as e: print(fæœ�务器 {server[host]} æ–‡ä»¶ä¸Šä¼ å¤±è´¥{str(e)}) finally: print(f 结æ�Ÿä¸Šä¼ æœ�务器 {server[host]} \n) #

调用批é‡�任务 if __name__ __main__: # 批é‡�执行查看内存命令 batch_execute_command(servers, free -h) # 批é‡�ä¸Šä¼ é…�置文件 batch_upload_file(servers, app.conf, /etc/app.conf)

3 Ansible自动化é…�ç½®ä¸�è¿�ç»´Python 脚本集æˆ�Ansible 是一款强大的 IT 自动化工具基äº� SSH å®�ç�°æ— 代ç�†æ‰¹é‡�è¿�维支æŒ�é…�置管ç�†ã€�应用部署ã€�任务编æ�’å…¶æ ¸å¿ƒåŠŸèƒ½å�¯é€šè¿‡ Python 脚本调用å®�ç�°æ›´ç�µæ´»çš„自动化æµ�程。å‰�ç½®æ�¡ä»¶å®‰è£… Ansible# Ubuntu/Debian apt install ansible # CentOS/RHEL yum install ansible # 或通过pip安装 pip install ansibleæ ¸å¿ƒç”¨æ³•Python 脚本调用 Ansibleimport ansible.runner import ansible.playbook import ansible.inventory #

定义Ansible�置 inventory ansible.inventory.Inventory([

192.

168.

100,

192.

168.

101]) private_key_file /root/.ssh/id_rsa # �密登录密钥�� remote_user root #

执行å�•个命令runneræ–¹å¼� def ansible_execute_command(hosts, command): 用Ansible批é‡�执行命令 runner ansible.runner.Runner( pattern*, module_namecommand, module_argscommand, inventoryinventory, remote_userremote_user, private_key_fileprivate_key_file ) # 执行并è�·å�–结æ�œ result runner.run() if result: for host, host_result in result[contacted].items(): print(f æœ�务器 {host} ) if stdout in host_result: print(f执行æˆ�功\n{host_result[stdout]}) else: print(f执行失败\n{host_result[msg]}) # 处ç�†æœªè¿�æ�¥çš„æœ�务器 for host in result[dark]: print(fæœ�务器 {host} æ— æ³•è¿�æ�¥) #

执行Ansible Playbook更��的任务编� def ansible_run_playbook(playbook_path): 用Ansible执行Playbook pb ansible.playbook.PlayBook( playbookplaybook_path, inventoryinventory, remote_userremote_user, private_key_fileprivate_key_file ) # 执行Playbook result pb.run() print(fPlaybook执行完�结�{result}) #

调用示例 if __name__ __main__: # 批�执行�盘查看命令 ansible_execute_command([

192.

168.

100,

192.

168.

101], df -h) # 执行Playbook需��编写deploy.yml # ansible_run_playbook(deploy.yml)补充简易 Ansible Playbook 示例deploy.yml- hosts: all remote_user: root tasks: - name: 安装nginx yum: name: nginx state: present - name: �动nginx并设置开机自� service: name: nginx state: started enabled: yes

4 psutil系统监æ�§ä¸�ä¿¡æ�¯é‡‡é›†psutil是 Python 跨平å�°ç³»ç»Ÿç›‘æ�§åº“支æŒ�è�·å�– CPUã€�内存ã€�ç£�盘ã€�网络ã€�进程等系统信æ�¯æ— 需调用系统命令返å›�结æ�„化数æ�®æ˜¯æœ�务器巡检ã€�监æ�§å·¥å…·å¼€å�‘çš„æ ¸å¿ƒåº“ã€‚å‰�ç½®æ�¡ä»¶å®‰è£… psutilpip install psutilæ ¸å¿ƒç”¨æ³•è�·å�–ç³»ç»Ÿæ ¸å¿ƒä¿¡æ�¯import psutil import datetime def get_system_info(): 采集æœ�åŠ¡å™¨æ ¸å¿ƒç³»ç»Ÿä¿¡æ�¯ :return: 系统信æ�¯å­—å…¸ system_info {} #

基本系统信� system_info[boot_time] datetime.datetime.fromtimestamp( psutil.boot_time() ).strftime(%Y-%m-%d %H:%M:%S) #

CPU信� cpu_count psutil.cpu_count(logicalTrue) # 逻辑CPU数 cpu_percent psutil.cpu_percent(interval1, percpuTrue) # �个CPU使用�间隔1秒 system_info[cpu] { logical_count: cpu_count, total_percent: psutil.cpu_percent(interval

, per_cpu_percent: cpu_percent } #

内存信� mem psutil.virtual_memory() system_info[memory] { total_gb: round(mem.total / (1024**

,

, # 总内存GB used_gb: round(mem.used / (1024**

,

, # 已使用内存GB free_gb: round(mem.free / (1024**

,

, # 空闲内存GB used_percent: mem.percent # 内存使用� } #

�盘信�仅��挂载的本地�盘 disk_info {} for partition in psutil.disk_partitions(allFalse): if partition.fstype: disk_usage psutil.disk_usage(partition.mountpoint) disk_info[partition.mountpoint] { total_gb: round(disk_usage.total / (1024**

,

, used_gb: round(disk_usage.used / (1024**

,

, free_gb: round(disk_usage.free / (1024**

,

, used_percent: disk_usage.percent } system_info[disk] disk_info #

网络信���总收��� net_io psutil.net_io_counters() system_info[network] { sent_gb: round(net_io.bytes_sent / (1024**

,

, recv_gb: round(net_io.bytes_recv / (1024**

,

} return system_info # 调用示例打å�°ç³»ç»Ÿä¿¡æ�¯ if __name__ __main__: sys_info get_system_info() print( æœ�务器系统信æ�¯ ) print(f开机时间{sys_info[boot_time]}) print(fCPU使用ç�‡{sys_info[cpu][total_percent]}%) print(f内存使用ç�‡{sys_info[memory][used_percent]}%) print(fç£�盘使用ç�‡/{sys_info[disk][/][used_percent]}%)二ã€�è¿�ç»´å®�战场景 1æœ�务器批é‡�巡检脚本场景需求批é‡�巡检多å�°æœ�务器的 CPUã€�内存ã€�ç£�盘使用ç�‡è®¾ç½®é˜ˆå€¼å‘Šè­¦å¦‚ CPU 使用ç�‡ 80%ã€�内存 90%ã€�ç£�盘 95%将巡检结æ�œè¾“出到日志文件并生æˆ�简易报告。完整å®�ç�°ä»£ç �import psutil import datetime import logging from fabric import Connection #

�置日志 logging.basicConfig( levellogging.INFO, format%(asctime)s - %(levelname)s - %(message)s, handlers[ logging.FileHandler(server_inspection.log, encodingutf-

, logging.StreamHandler() ] ) logger logging.getLogger(__name__) #

�置项 SERVERS [ {host:

192.

168.

100, port: 22, user: root, password: your_server_password}, {host:

192.

168.

101, port: 22, user: root, password: your_server_password} ] # 告警阈值 THRESHOLDS { cpu: 80, memory: 90, disk: 95 } # 巡检报告�存路径 REPORT_PATH server_inspection_report.txt #

æœ¬åœ°å·¡æ£€å‡½æ•°è‹¥è„šæœ¬åœ¨ç›®æ ‡æœ�务器è¿�行 def local_inspection(): 本地æœ�务器巡检 try: sys_info get_system_info() return {status: success, data: sys_info} except Exception as e: logger.error(f本地巡检失败{str(e)}) return {status: failed, error: str(e)} #

远程巡检函数通过fabric远程调用psutiléœ€ç›®æ ‡æœ�务器已安装psutil def remote_inspection(server): 远程æœ�务器巡检 logger.info(f开始巡检远程æœ�务器{server[host]}) try: # è¿�æ�¥è¿œç¨‹æœ�务器 conn Connection( hostserver[host], portserver[port], userserver[user], connect_kwargs{password: server[password]} ) # 远程执行Python脚本采集系统信æ�¯ remote_script import psutil import datetime def get_system_info(): system_info {} system_info[boot_time] datetime.datetime.fromtimestamp(psutil.boot_time()).strftime(%Y-%m-%d %H:%M:%S) system_info[cpu] {total_percent: psutil.cpu_percent(interval

} mem psutil.virtual_memory() system_info[memory] {used_percent: mem.percent} disk_info {} for p in psutil.disk_partitions(allFalse): if p.fstype: du psutil.disk_usage(p.mountpoint) disk_info[p.mountpoint] {used_percent: du.percent} system_info[disk] disk_info return system_info print(get_system_info()) # 执行远程脚本并��结� result conn.run(fpython3 -c {remote_script}, hideTrue, warnTrue) if not result.ok: logger.error(f�务器 {server[host]} 执行脚本失败{result.stderr}) return {status: failed, host: server[host], error: result.stderr} # 解�结�简化处��际�使用json�列化 import ast sys_info ast.literal_eval(result.stdout) logger.info(f�务器 {server[host]} 巡检�功) return {status: success, host: server[host], data: sys_info} except Exception as e: logger.error(f�务器 {server[host]} 巡检异常{str(e)}) return {status: failed, host: server[host], error: str(e)} #

生�巡检报告 def generate_report(inspection_results): 生�巡检报告 now datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S) report [f# �务器批�巡检报告 {now}, *50, ] for result in inspection_results: if result[status] success: host result[host] data result[data] report.append(f## �务器{host}) report.append(f 开机时间{data[boot_time]}) report.append(f CPU使用�{data[cpu][total_percent]}% {[告警] if data[cpu][total_percent] THRESHOLDS[cpu] else }) report.append(f 内存使用�{data[memory][used_percent]}% {[告警] if data[memory][used_percent] THRESHOLDS[memory] else }) report.append( �盘使用�) for mount_point, disk_data in data[disk].items(): alarm [告警] if disk_data[used_percent] THRESHOLDS[disk] else report.append(f {mount_point}{disk_data[used_percent]}% {alarm}) else: report.append(f## �务器{result[host]} [巡检失败]) report.append(f 错误信�{result[error]}) report.append() # 写入报告文件 with open(REPORT_PATH, w, encodingutf-

as f: f.write(\n.join(report)) logger.info(f巡检报告已生��存路径{REPORT_PATH}) #

æ ¸å¿ƒç³»ç»Ÿä¿¡æ�¯é‡‡é›†ä¸�å‰�文一致简化版 def get_system_info(): system_info {} system_info[boot_time] datetime.datetime.fromtimestamp( psutil.boot_time() ).strftime(%Y-%m-%d %H:%M:%S) system_info[cpu] {total_percent: psutil.cpu_percent(interval

} mem psutil.virtual_memory() system_info[memory] {used_percent: mem.percent} disk_info {} for partition in psutil.disk_partitions(allFalse): if partition.fstype: disk_usage psutil.disk_usage(partition.mountpoint) disk_info[partition.mountpoint] {used_percent: disk_usage.percent} system_info[disk] disk_info return system_info #

主函数批é‡�巡检 def main(): logger.info( 开始æœ�务器批é‡�巡检 ) inspection_results [] # é��å�†æ‰€æœ‰æœ�务器进行巡检 for server in SERVERS: result remote_inspection(server) inspection_results.append(result) # 生æˆ�巡检报告 generate_report(inspection_results) logger.info( æœ�务器批é‡�巡检结æ�Ÿ ) if __name__ __main__: main()使用说æ˜�修改SERVERSé…�置中的æœ�务器 IPã€�è´¦å�·ã€�密ç �ç›®æ ‡æœ�务器需安装 Python3 å’Œpsutilå�¯é€šè¿‡æ‰¹é‡�命令æ��å‰�安装pip3 install psutilè¿�行脚本å��生æˆ�server_inspection.log巡检日志和server_inspection_report.txtå·¡æ£€æŠ¥å‘Šè¶…è¿‡é˜ˆå€¼çš„æŒ‡æ ‡ä¼šæ ‡æ³¨[告警]å�¯å��续扩展邮件 / 钉钉告警功能。三ã€�è¿�ç»´å®�战场景 2自动化部署脚本场景需求å®�ç�° Python/Web é¡¹ç›®çš„è‡ªåŠ¨åŒ–éƒ¨ç½²æ ¸å¿ƒæµ�程「ä»� Git 拉å�–最新代ç �ã€�→「安装 / æ›´æ–°ä¾�èµ–ã€�→「项目打包å�¯é€‰ã€�→「å�œæ­¢æ—§æœ�务ã€�→「å�¯åŠ¨æ–°æœ�务ã€�→「验è¯�æœ�务å�¯ç”¨æ€§ã€�。完整å®�ç�°ä»£ç �import logging from fabric import Connection from datetime import datetime #

�置日志 logging.basicConfig( levellogging.INFO, format%(asctime)s - %(levelname)s - %(message)s, handlers[ logging.FileHandler(deploy_automation.log, encodingutf-

, logging.StreamHandler() ] ) logger logging.getLogger(__name__) #

部署�置 DEPLOY_CONFIG { server: { host:

192.

168.

100, port: 22, user: root, password: your_server_password }, project: { name: my_python_project, git_repo: https://github.com/your-name/your-project.git, deploy_path: /opt/projects, venv_path: /opt/venv, # 虚拟�境路径 requirements_file: requirements.txt, service_name: my_project.service # systemd�务� } } #

定义å�•个部署步骤 def git_pull_code(conn, project_config): ä»�Git拉å�–最新代ç � project_path f{project_config[deploy_path]}/{project_config[name]} logger.info(f开始拉å�–代ç �项目路径{project_path}) # 检查项目目录是å�¦å­˜åœ¨ä¸�存在则克隆 result conn.run(ftest -d {project_path}, warnTrue) if result.failed: logger.info(项目目录ä¸�存在开始克隆仓库) conn.run(fgit clone {project_config[git_repo]} {project_path}, hideFalse) else: logger.info(项目目录已存在拉å�–最新代ç �) conn.run(fcd {project_path} git pull, hideFalse) logger.info(代ç �拉å�–完æˆ�) def install_dependencies(conn, project_config): 安装/更新项目ä¾�èµ– project_path f{project_config[deploy_path]}/{project_config[name]} venv_pip f{project_config[venv_path]}/bin/pip requirements_path f{project_path}/{project_config[requirements_file]} logger.info(开始安装/更新项目ä¾�èµ–) result conn.run( f{venv_pip} install -r {requirements_path} --upgrade, hideFalse, warnTrue ) if result.ok: logger.info(ä¾�赖安装/更新完æˆ�) else: logger.error(fä¾�赖安装失败{result.stderr}) raise Exception(ä¾�赖安装步骤失败) def stop_old_service(conn, project_config): å�œæ­¢æ—§æœ�务 logger.info(开始å�œæ­¢æ—§æœ�务) result conn.run( fsystemctl stop {project_config[service_name]}, warnTrue, hideFalse ) if result.ok or Unit * does not exist in result.stderr: logger.info(æ—§æœ�务已å�œæ­¢æˆ–æœ�务未存在) else: logger.error(få�œæ­¢æ—§æœ�务失败{result.stderr}) raise Exception(å�œæ­¢æ—§æœ�务步骤失败) def start_new_service(conn, project_config): å�¯åŠ¨æ–°æœ�务 logger.info(开始å�¯åŠ¨æ–°æœ�务) # é‡�æ–°åŠ è½½systemdé…�置若æœ�务文件有修改 conn.run(systemctl daemon-reload, hideFalse) # å�¯åЍæœ�务并设置开机自å�¯ result conn.run( fsystemctl start {project_config[service_name]} systemctl enable {project_config[service_name]}, hideFalse, warnTrue ) if result.ok: logger.info(æ–°æœ�务å�¯åЍæˆ�功并设置开机自å�¯) else: logger.error(få�¯åŠ¨æ–°æœ�务失败{result.stderr}) raise Exception(å�¯åŠ¨æ–°æœ�务步骤失败) def verify_service(conn, project_config): 验è¯�æœ�务å�¯ç”¨æ€§ç¤ºä¾‹æ£€æŸ¥æœ�务状æ€�访问æ�¥å�£ logger.info(开始验è¯�æœ�务å�¯ç”¨æ€§) #

检查systemd�务状� service_result conn.run( fsystemctl is-active {project_config[service_name]}, hideTrue, warnTrue ) if service_result.stdout.strip() ! active: logger.error(f�务未正常�行状�{service_result.stdout}) raise Exception(�务验�失败�务未激活) #

�选访问项目��验�示例curl访问本地端� api_result conn.run( fcurl -s -w % http://

127.

0.

1:8000/health -o /dev/null, hideTrue, warnTrue ) if api_result.stdout.strip() 200: logger.info(æœ�务æ�¥å�£éªŒè¯�æˆ�功返å›�200 OK) else: logger.warning(fæœ�务æ�¥å�£éªŒè¯�è¿”å›�é��200状æ€�ç �{api_result.stdout}) #

主部署æµ�程 def automated_deploy(): 自动化部署主æµ�程 logger.info( 开始项目自动化部署 ) server_config DEPLOY_CONFIG[server] project_config DEPLOY_CONFIG[project] # 创建æœ�务器è¿�æ�¥ try: conn Connection( hostserver_config[host], portserver_config[port], userserver_config[user], connect_kwargs{password: server_config[password]} ) # 执行部署步骤按顺åº�ç¼–æ�’ git_pull_code(conn, project_config) install_dependencies(conn, project_config) stop_old_service(conn, project_config) start_new_service(conn, project_config) verify_service(conn, project_config) logger.info( 项目自动化部署全部完æˆ� ) return True except Exception as e: logger.error(f 自动化部署失败{str(e)} ) return False if __name__ __main__: automated_deploy()补充说æ˜�项目需使用systemd管ç�†æœ�务创建my_project.service放在/etc/systemd/system/ç›®æ ‡æœ�务器需安装 Gitã€�Python3ã€�虚拟ç�¯å¢ƒå�¯æ��å‰�批é‡�é…�置若为 Java 项目å�¯ä¿®æ”¹æ­¥éª¤ä¸ºã€Œæ‰“包mvn packageã€�→「替æ�¢ JAR 包ã€�→「é‡�å�¯æœ�务ã€�å�¯æ‰©å±•å›�滚功能部署å‰�备份旧代ç � / 旧包部署失败时æ�¢å¤�。示例 systemd æœ�务文件my_project.service[Unit] DescriptionMy Python Project Service Afternetwork.target [Service] Userroot WorkingDirectory/opt/projects/my_python_project ExecStart/opt/venv/bin/python3 app.py Restarton-failure RestartSec5 [Install] WantedBymulti-user.targetå››ã€�è¿�ç»´å®�战场景 3日志分æ��工具场景需求开å�‘一款通用日志分æ��工具支æŒ�「按关键è¯�过滤日志ã€�→「统计指定报错信æ�¯å‡ºç�°æ¬¡æ•°ã€�→「按时间范围筛选日志ã€�→「生æˆ�分æ��报告ã€�适用äº� Nginxã€�Pythonã€�Java ç­‰å�„类日志文件。完整å®�ç�°ä»£ç �import logging import re from datetime import datetime from collections import Counter #

�置日志 logging.basicConfig( levellogging.INFO, format%(asctime)s - %(levelname)s - %(message)s, handlers[logging.StreamHandler()] ) logger logging.getLogger(__name__) #

日志分æ��æ ¸å¿ƒåŠŸèƒ½ class LogAnalyzer: def __init__(self, log_file_path): self.log_file_path log_file_path self.all_log_lines [] # 存储所有日志行 self.load_log_file() def load_log_file(self): åŠ è½½æ—¥å¿—æ–‡ä»¶åˆ°å†…å­˜å¤§æ–‡ä»¶å�¯æ”¹ä¸ºé€�行处ç�† logger.info(få¼€å§‹åŠ è½½æ—¥å¿—æ–‡ä»¶{self.log_file_path}) try: with open(self.log_file_path, r, encodingutf-8, errorsignore) as f: self.all_log_lines [line.strip() for line in f if line.strip()] logger.info(fæ—¥å¿—æ–‡ä»¶åŠ è½½å®Œæˆ�å…± {len(self.all_log_lines)} 行有效日志) except FileNotFoundError: logger.error(f日志文件ä¸�存在{self.log_file_path}) raise except Exception as e: logger.error(fåŠ è½½æ—¥å¿—æ–‡ä»¶å¤±è´¥{str(e)}) raise def filter_by_keyword(self, keywords, exclude_keywordsNone): 按关键è¯�过滤日志 :param keywords: 包å�«çš„关键è¯�列表任æ„�匹é…� :param exclude_keywords: æ�’除的关键è¯�列表任æ„�匹é…� :return: 过滤å��的日志列表 if exclude_keywords is None: exclude_keywords [] filtered_lines [] for line in self.all_log_lines: # 检查是å�¦åŒ…å�«ä»»æ„�关键è¯� include_flag any(keyword.lower() in line.lower() for keyword in keywords) # 检查是å�¦æ�’除任æ„�关键è¯� exclude_flag any(exclude_keyword.lower() in line.lower() for exclude_keyword in exclude_keywords) if include_flag and not exclude_flag: filtered_lines.append(line) logger.info(f按关键è¯�过滤完æˆ�å…± {len(filtered_lines)} 行匹é…�日志) return filtered_lines def count_error_messages(self, error_patterns): 统计报错信æ�¯å‡ºç�°æ¬¡æ•° :param error_patterns: 报错正则表达å¼�列表 :return: 报错统计结æ�œCounter error_messages [] for line in self.all_log_lines: for pattern in error_patterns: match re.search(pattern, line, re.IGNORECASE) if match: error_msg match.group(

if match.group(

else line[:100] # 截å�–å‰�100字符 error_messages.append(error_msg) break error_counter Counter(error_messages) logger.info(f报错信æ�¯ç»Ÿè®¡å®Œæˆ�共识别 {len(error_counter)} ç§�ä¸�å�ŒæŠ¥é”™) return error_counter def filter_by_time_range(self, start_time, end_time, time_patternr\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}): 按时间范围筛选日志 :param start_time: å¼€å§‹æ—¶é—´å­—ç¬¦ä¸²æ ¼å¼�匹é…�time_pattern :param end_time: 结æ�Ÿæ—¶é—´å­—ç¬¦ä¸²æ ¼å¼�匹é…�time_pattern :param time_pattern: æ—¥å¿—ä¸­çš„æ—¶é—´æ ¼å¼�正则 :return: 过滤å��的日志列表 # è§£æ��开始/结æ�Ÿæ—¶é—´ try: start_dt datetime.strptime(start_time, %Y-%m-%d %H:%M:%S) end_dt datetime.strptime(end_time, %Y-%m-%d %H:%M:%S) except ValueError: logger.error(æ—¶é—´æ ¼å¼�错误需符å�ˆYYYY-MM-DD HH:MM:SS) raise filtered_lines [] for line in self.all_log_lines: # æ��å�–日志中的时间 time_match re.search(time_pattern, line) if not time_match: continue try: log_dt datetime.strptime(time_match.group(

, %Y-%m-%d %H:%M:%S) if start_dt log_dt end_dt: filtered_lines.append(line) except ValueError: continue logger.info(f按时间范围过滤完�共 {len(filtered_lines)} 行匹�日志) return filtered_lines def generate_analysis_report(self, report_path, keywordsNone, error_patternsNone, time_rangeNone): 生�日志分�报告 :param report_path: 报告�存路径 :param keywords: 过滤关键� :param error_patterns: 报错正则 :param time_range: 时间范围(start_time, end_time) if keywords is None: keywords [] if error_patterns is None: error_patterns [rerror|exception|fail|fatal, r500 Internal Server Error, r404 Not Found] if time_range is None: time_range (, ) logger.info(f开始生�日志分�报告{report_path}) report [ f# 日志分�报告, f生�时间{datetime.now().strftime(%Y-%m-%d %H:%M:%S)}, f日志文件{self.log_file_path}, f日志总行数{len(self.all_log_lines)}, *60, ] #

关键�过滤结�若指定关键� if keywords: keyword_lines self.filter_by_keyword(keywords) report.append(f## 一�关键�过滤结�关键�{,.join(keywords)}) report.append(f匹�行数{len(keyword_lines)}) report.append(�10�匹�日志) for i, line in enumerate(keyword_lines[:10]): report.append(f {i1}. {line}) report.append() #

报错信�统计 error_counter self.count_error_messages(error_patterns) report.append(f## 二�报错信�统计) report.append(f报错类�数{len(error_counter)}) report.append(f报错总次数{sum(error_counter.values())}) report.append(报错Top10��) for i, (error_msg, count) in enumerate(error_counter.most_common(

,

: report.append(f {i}. 「{error_msg}�{count} 次) report.append() #

时间范围过滤结�若指定时间范围 if all(time_range): time_lines self.filter_by_time_range(time_range[0], time_range[1]) report.append(f## 三�时间范围过滤结�{time_range[0]} - {time_range[1]}) report.append(f匹�行数{len(time_lines)}) report.append() #

总结 report.append(f## 四�分�总结) if sum(error_counter.values()) 0: report.append(未检测到�显报错信�日志整体正常。) else: report.append(f检测到 {sum(error_counter.values())} 次报错需�点关注Top3报错类�。) # 写入报告文件 with open(report_path, w, encodingutf-

as f: f.write(\n.join(report)) logger.info(f日志分�报告生�完��存路径{report_path}) #

主函数使用日志分�工具 def main(): # �置项 LOG_FILE /var/log/nginx/access.log # 日志文件路径 REPORT_FILE log_analysis_report.txt # �始化日志分�器 try: analyzer LogAnalyzer(LOG_FILE) # 生�分�报告�自定义关键��报错正则�时间范围 analyzer.generate_analysis_report( report_pathREPORT_FILE, keywords[404, 500], time_range(

00:00:00,

23:59:

) except Exception as e: logger.error(f日志分æ��失败{str(e)}) if __name__ __main__: main()使用说æ˜�修改LOG_FILEä¸ºç›®æ ‡æ—¥å¿—æ–‡ä»¶è·¯å¾„Nginxã€�Pythonã€�Java 日志å�‡å�¯æ”¯æŒ�自定义keywords过滤关键è¯�ã€�error_patterns报错正则ã€�time_range时间范围大日志文件å�¯ä¼˜åŒ–为「é€�行处ç�†ã€�é�¿å…�内存溢出生æˆ�çš„log_analysis_report.txt包å�«è¯¦ç»†åˆ†æ��结æ�œå�¯ç›´æ�¥ç”¨äº�è¿�ç»´æ�’查。五ã€�总结本文系统讲解了 Python 自动化 / è¿�ç»´å¼€å�‘çš„æ ¸å¿ƒåº“å’Œ 3 个è�½åœ°å®�æˆ˜åœºæ™¯æ ¸å¿ƒè¦�ç‚¹æ€»ç»“å¦‚ä¸‹æ ¸å¿ƒè¿�维库paramikoå®�ç�°åŸºç¡€ SSH/SFTP æ“�作fabric简化批é‡�è¿�ç»´ansible适å�ˆå¤�æ�‚é…�置编æ�’psutilå®�ç�°ç³»ç»Ÿä¿¡æ�¯é‡‡é›†å››è€…覆盖è¿�ç»´æ ¸å¿ƒéœ€æ±‚æ‰¹é‡�巡检脚本基äº�fabricpsutilå®�ç�°å¤šæœ�åŠ¡å™¨æŒ‡æ ‡é‡‡é›†è®¾ç½®é˜ˆå€¼å‘Šè­¦ç”Ÿæˆ�结æ�„化报告解决é‡�å¤�巡检痛点自动化部署脚本按「拉å�–代ç �→安装ä¾�赖→å�¯å�œæœ�务→验è¯�å�¯ç”¨æ€§ã€�ç¼–æ�’任务支æŒ�扩展å›�滚ã€�多ç�¯å¢ƒéƒ¨ç½²æ��å�‡éƒ¨ç½²æ•ˆç�‡å’Œä¸€è‡´æ€§æ—¥å¿—分æ��工具基äº�re正则和Counter统计支æŒ�关键è¯� / 时间过滤ã€�报错统计生æˆ�å�¯è§†åŒ–报告辅助快速æ�’查问题。

中国大片又大又好看的ppt模板免费-中国大片又大又好看的ppt模板免费应用

百度百家号客服电话人工服务

123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123