范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

纯干货!python在运维中的应用(一)批量sshsftp

  日常工作中需要大量、频繁地使用ssh到服务器查看、拉取相关的信息或者对服务器进行变更。目前公司大量使用的shell,但是随着逻辑的复杂化、脚本管理的精细化,shell已经不满足日常需求,于是我尝试整合工作中的需求,制作适合的工具。 由于管理制度的缺陷,我以工作流程为核心思考适合自己的运维方式,提升工作效率,把时间留给更有价值的事情。 完整代码在最后,请大家参考。 环境:
  生产:4000+物理服务器,近 3000 台虚拟机。
  开发环境:python3.6、redhat7.9,除了paramiko为第三方模块需要自己安装,其他的直接import即可。 主要应用方向:配置变更: 例如服务器上线时需要批量校正系统分区容量、及挂载数据盘。 配置信息查询过滤 :例如过滤防火墙规则、过滤网卡配置。 存活检测 :设置定时任务,定时轮询服务器的 ssh 状态是否正常。 文件传输 :多个 ip 同时传输目录/文件。 基本原则:
  批量执行操作是一把双刃剑。批量执行操作可以提升工作效率,但是随之而来的风险不可忽略。
  风险案例如下:
  挂载很多数据盘,通常先格式化硬盘,再挂载数据盘,最后再写入将开机挂载信息写入/etc/fstab文件。在批量lsblk检查硬盘信息的时候发现有的系统盘在/sda有的在/sdm,如果不事先检查机器相关配置是否一致直接按照工作经验去执行批量操作,会很容易造成个人难以承受的灾难。
  在执行批量操作时按照惯例:格式化硬盘->挂载->开机挂载的顺序去执行,假设有的机器因为某些故障导致格式化硬盘没法正确执行。在处理这类问题的时候通常会先提取出失败的ip,并再按照惯例执行操作。运维人员会很容易忽略开机挂载的信息已经写过了,导致复写(这都是血和泪的教训)。
  所以,为了避免故障,提升工作效率,我认为应当建立团队在工作上的共识,应当遵守以下原则: 批量操作前应当准备回退方案。 批量操作前作前先确定检查目标服务器相关的配置的一致性。 批量操作时应当把重复执行会影响系统的操作和不影响的分开执行。 批量操作后应当再次验证操作结果是否符合预期。
  当然,代码的规范也应当重视起来,不仅是为了便于审计,同时也需要便于溯源。我认为应当注意以下几点: 关键方法一定要记录传入的参数以及执行后的结果。 为了避免方法返回值不符合预期,该抛异常的地方一定要抛。 优化代码,删去不必要的逻辑分支和尽量不要写重复的代码,使代码看起来整洁。 程序的执行情况、结果一定要保留日志。 技术难点
  1、ssh no existing session,sftp超时时间设置:
  在代码无错的情况下大量ip出现No existing session,排查后定位在代码的写法上,下面是一个正确的示例。由于最开始没考虑到ssh连接的几种情况导致了重写好几遍。另外sftp的实例貌似不能直接设置连接超时时间,所以我采用了先建立ssh连接再打开sftp的方法。 import paramiko  username = "root" port = 22 pkey = paramiko.RSAKey.from_private_key_file("/root/.ssh/id_rsa")  # 导入公钥 timeout=10   def ssh_client( ip, user=None, passwd=None, auth="id_rsa"):     client = paramiko.SSHClient()  # 实例化ssh     client.set_missing_host_key_policy(paramiko.AutoAddPolicy())  # 配置ssh互信     if auth == "id_rsa":  # 公钥认证         client.connect(ip, port, username, pkey=pkey, banner_timeout=60, timeout=timeout)     elif auth == "noAuth":  # 用户名密码认证         if user is not None and passwd is not None:             client.connect(ip, port, user, passwd, banner_timeout=60, timeout=timeout)         else:             raise ValueError("传入的用户名密码不能为空")     else:         raise NameError("不存在此%s认证方式" % auth)     return client   def sftp_client(ip, user=None, passwd=None, auth="id_rsa"):     ssh = ssh_client(ip, user, passwd, auth)     sftp = ssh.open_sftp()     return sftp
  2、sftp中的get()和put()方法仅能传文件,不支持直接传目录:
  不能直接传目录,那换个思路,遍历路径中的目录和文件,先创建目录再传文件就能达到一样的效果了。在paramiko的sftp中sftp.listdir_attr()方法可以获取远程路径中的文件、目录信息。那么我们可以写一个递归来遍历远程路径中的所有文件和目录(传入一个列表是为了接收递归返回的值)。 def check_remote_folders(sftp, remote_path, remote_folders: list):         # 递归遍历远程路径中的目录,并添加到remote_folders中         for f in sftp.listdir_attr(remote_path):             # 检查路径状态是否为目录             if stat.S_ISDIR(f.st_mode):                 # 递归调用自己                 check_remote_folders(sftp, remote_path + "/" + f.filename, remote_folders)                 #  添加遍历到的目录信息到列表中                 remote_folders.append(remote_path + "/" + f.filename)
  python自带的os模块中的os.walk()方法可以遍历到本地路径中的目录和文件。   local_files_path = []         local_directory = []         for root, dirs, files in os.walk(local_path):             local_directory.append(root)             for file in files:if root[-1] != "/":                     local_files_path.append(root + "/" + file)                 elif root[-1] == "/":                     local_files_path.append(root + file)
  3、多线程多个ip使用sftp.get()方法时无法并发。
  改成多进程即可。  def batch_sftp_get(ip, remote_path, local_path, user=None, passwd=None, auth="id_rsa"):         pool = multiprocessing.Pool(5)for i in ip:             pool.apply_async(sftp_get, (i, remote_path, local_path, user, passwd, auth,))         pool.close()         pool.join()
  4、多个ip需要执行相同命令或不同的命令。
  由于是日常使用的场景不会很复杂,所以借鉴了ansible的playbook,读取提前准备好的配置文件即可,然后再整合到之前定义的ssh函数中。 # 配置文件大概是这样 192.168.0.100:df -Th | grep xfs; lsblk | grep disk | wc -l 192.168.0.101:ip a | grep team | wc -l 192.168.0.102:route -n ...
  from concurrent.futures import ThreadPoolExecutor, as_completed import time, json   def batch_ssh(self, ip, cmd=None, user=None, passwd=None, cmd_type="one", task_name="default", auth="id_rsa"):     pool = ThreadPoolExecutor(self.workers)     task = []     if cmd_type == "one":         task = [pool.submit(self.ssh_exec, i, cmd, user, passwd, auth) for i in ip]     elif cmd_type == "many":         if isinstance(ip, list):             for i in ip:                 separator = ""if ":" in i:                     separator = ":"elif "," in i:                     separator = ","if separator != "":                     data = i.split(separator)                     task.append(pool.submit(self.ssh_client, data[0], data[1], user, passwd))                 else:                     return "请检查ip和命令间的分隔符"else:             return "ip的类型为%s, 请传入一个正确的类型" % type(ip)     else:         return "cmd_type不存在%s值, 请传入一个正确的参数" % cmd_type     self.logger.debug("检查变量task:%s" % task)     results = {}     for future in as_completed(task):         res = future.result().split(":")         results[res[1]] = {res[0]: res[2]}         if "success" in future.result():             print("33[32;1m%s33[0m" % future.result().replace("success:", ""))         elif "failed" in future.result():             print("33[31;1m%s33[0m" % future.result().replace("failed:", ""))     pool.shutdown()     json_results = {         "task_name": task_name,         "task_sn": self.task_sn,         "start_time": self.now_time,         "cost_time": "%.2fs" % (time.perf_counter() - self.s),         "results": results     }     self.logger.info("json_results:%s" % json_results)     with open(self.log_path + "task_%s_%s.log" % (task_name, self.task_sn), "a") as f:         f.write(json.dumps(json_results))     return json_results
  同时,我们还衍生出一个需求,既然都要读取配置,那同样也可以提前把ip地址准备在文件里。正好也能读取我们返回的执行程序的结果。 import os import json   def get_info(self, path):     self.logger.debug("接收参数path:%s".encode("utf8") % path.encode("utf8"))     if os.path.exists(path):         info_list = [i.replace(" ", "") for i in open(path, "r", encoding="utf8").readlines()]         return info_list     else:         self.logger.warning("%s不存在,请传入一个正确的目录" % path)         raise ValueError("%s不存在,请传入一个正确的目录" % path)   def log_analysis(filename):if os.path.exists(filename):         try:             data = json.load(open(filename, "r", encoding="utf8"))             return data         except Exception as e:             print("%s无法解析该类型文件" % filename + " " + str(e))             raise TypeError("%s无法解析该类型文件" % filename + " " + str(e))     else:         raise ValueError("该%s文件路径不存在,请传入一个正确的文件路径" % filename)   def show_log(self, filename, mode=None):     data: dict = self.log_analysis(filename)     if isinstance(data, dict):         for key, value in data["results"].items():             if "success" in value.keys():                 if mode == "success":                     print(key)                 elif mode is None:                     print("%s:%s" % (key, value["success"].replace("r ", "")))             elif "failed" in value.keys():                 if mode == "failed":                     print(key)                 elif mode is None:                     print("%s:%s" % (key, value["failed"].replace("r ", ""))) 完整代码展示:from concurrent.futures import ThreadPoolExecutor, as_completed import multiprocessing import os import re import time import stat import json import random import logging import asyncio import argparse import paramiko   class TaskManager:def __init__(self, timeout=10, workers=15, system="linux"):         self.username = "root"         self.port = 22         self.datetime = time.strftime("%Y-%m-%d", time.localtime())         self.timeout = timeout         self.workers = workers         self.now_time = time.strftime("%Y/%m/%d %H:%M:%S", time.localtime())         self.s = time.perf_counter()         self.task_sn = self.sn_random_generator()          if system == "linux":             self.pkey = paramiko.RSAKey.from_private_key_file("/root/.ssh/id_rsa")             self.log_path = "/tmp/TaskManager/log/"             self.log_debug_path = "/tmp/TaskManager/debug/"elif system == "windows":             self.pkey = paramiko.RSAKey.from_private_key_file(r"C:Users01.sshid_rsa")             self.log_path = r"D:	mpTaskManagerlog"             self.log_debug_path = r"D:	mpTaskManagerdebug"          if os.path.exists(self.log_path) is False:             os.makedirs(self.log_path, exist_ok=True)         if os.path.exists(self.log_debug_path) is False:             os.makedirs(self.log_debug_path, exist_ok=True)          self.logger = logging.getLogger(__name__)         self.logger.setLevel(level=logging.DEBUG)         self.handler = logging.FileHandler(self.log_debug_path + "%s_%s.log" % (self.datetime, self.task_sn))         self.formatter = logging.Formatter("%(asctime)s[%(levelname)s][%(funcName)s]%(message)s ")         self.handler.setFormatter(self.formatter)         self.logger.addHandler(self.handler)         self.logger.info("初始化完成".encode(encoding="utf8"))      def ssh_client(self, ip, user=None, passwd=None, auth="id_rsa"):         client = paramiko.SSHClient()         client.set_missing_host_key_policy(paramiko.AutoAddPolicy())         if auth == "id_rsa":             self.logger.info("正在 SSH 连接%s".encode("utf8") % str(ip).encode("utf8"))             client.connect(ip, self.port, self.username, pkey=self.pkey, banner_timeout=60, timeout=self.timeout)         elif auth == "noAuth":             if user is not None and passwd is not None:                 client.connect(ip, self.port, user, passwd, banner_timeout=60, timeout=self.timeout)                 #  allow_agent=False, look_for_keys=False# No existing session 解决办法 else:raise ValueError("传入的用户名密码不能为空")else:raise NameError("不存在此%s 认证方式" % auth)return client      def ssh_exec(self, ip, cmd, user=None, passwd=None, auth="id_rsa"):try:             ssh = self.ssh_client(ip, user, passwd, auth)             stdin, stdout, stderr = ssh.exec_command(command=cmd, get_pty=True)             self.logger.debug("%s:stdin 输入:%s" % (ip, stdin))             self.logger.debug("%s:stderr 错误:%s" % (ip, stderr))             self.logger.debug("%s:stdout 输出:%s" % (ip, stdout))             result = stdout.read().decode("utf-8")             ssh.close()             return "success:" + ip + ":" + str(result)         except Exception as e:             return "failed:" + ip + ":" + str(e)      def batch_ssh(self, ip, cmd=None, user=None, passwd=None, cmd_type="one", task_name="default", auth="id_rsa"):         self.logger.debug("接收参数 ip:%s, cmd:%s, user:%s passwd:%s cmd_type:%s, task_name:%s" %                           (ip, cmd, user, passwd, cmd_type, task_name))         print("33[35;1m-------------------Task is set up! Time:%s ----------------------33[0m" % self.now_time)         pool = ThreadPoolExecutor(self.workers)         task = []         if cmd_type == "one":             task = [pool.submit(self.ssh_exec, i, cmd, user, passwd, auth) for i in ip]         elif cmd_type == "many":             if isinstance(ip, list):                 for i in ip:                     separator = ""if ":" in i:                         separator = ":"elif "," in i:                         separator = ","                     self.logger.debug("检查变量 separator:%s" % separator)                     if separator != "":                         data = i.split(separator)                         task.append(pool.submit(self.ssh_client, data[0], data[1], user, passwd))                     else:                         self.logger.warning("separator:%s 不符合要求" % separator)                         return "请检查 ip 和命令间的分隔符"else:                 self.logger.warning("ip 的类型为%s, 请传入一个正确的类型" % type(ip))                 return "ip 的类型为%s, 请传入一个正确的类型" % type(ip)         else:             self.logger.warning("cmd_type 不存在%s 值, 请传入一个正确的参数" % cmd_type)             return "cmd_type 不存在%s 值, 请传入一个正确的参数" % cmd_type         self.logger.debug("检查变量 task:%s" % task)         results = {}         for future in as_completed(task):             res = future.result().split(":")             results[res[1]] = {res[0]: res[2]}             if "success" in future.result():                 print("33[32;1m%s33[0m" % future.result().replace("success:", ""))             elif "failed" in future.result():                 print("33[31;1m%s33[0m" % future.result().replace("failed:", ""))         pool.shutdown()         print("33[35;1m---------------Task is finished! Cost:%.2fs Task_sn:%s-------------------33[0m"               % ((time.perf_counter() - self.s), self.task_sn))         json_results = {             "task_name": task_name,             "task_sn": self.task_sn,             "start_time": self.now_time,             "cost_time": "%.2fs" % (time.perf_counter() - self.s),             "results": results         }         self.logger.info("json_results:%s" % json_results)         with open(self.log_path + "task_%s_%s.log" % (task_name, self.task_sn), "a") as f:             f.write(json.dumps(json_results))         return json_results      def sftp_put(self, ip, local_path, remote_path, user=None, passwd=None, auth="id_rsa"):# 路径类型:目录、文件、不存在(抛异常)# 本地路径类型检查 if os.path.exists(local_path):if os.path.isdir(local_path):                 local_path_type = "directory"elif os.path.isfile(local_path):                 local_path_type = "file"else:                 raise NotADirectoryError("本地路径%s 无效" % local_path)         else:             raise NotADirectoryError("本地路径%s 无效" % local_path)          # 启动 ssh 连接并打开 sftp 服务(直接启动 sftp 连接无法设置超时时间)         ssh = self.ssh_client(ip, user, passwd, auth)         sftp = ssh.open_sftp()          # 远程路径类型类型 try:if remote_path[0] == "/" and remote_path[-1] != "/":                 remote_path_type = "file"elif stat.S_ISDIR(sftp.stat(remote_path).st_mode):  # 检查远程路径是否为目录                 remote_path_type = "directory"else:                 raise NotADirectoryError("请传入一个正确的远程路径:%s" % remote_path)         except Exception as e:             raise ValueError(ip, e)          # 传目录 if local_path_type == "directory" and remote_path_type == "directory":# 若本地路径为多级目录,则保留末级路径。             directory_split = [i for i in local_path.split("/") if i != ""]              # 检查远程路径是否存在该本地末级目录 try:if stat.S_ISDIR(sftp.stat(remote_path + directory_split[-1] + "/").st_mode):raise NotADirectoryError("%s 远程路径已存在此目录:%s" % (ip, remote_path + directory_split[-1] + "/"))except:pass              # 多余的目录信息             redundant_directory = ""if len(directory_split) > 1:                 redundant_directory = "/" + "/".join(directory_split[:-1]) + "/"  # 过滤上级目录名称              # 遍历本地路径下所有的文件路径, 目录。             local_files_path = []             local_directory = []             for root, dirs, files in os.walk(local_path):                 local_directory.append(root)                 for file in files:                     if root[-1] != "/":                         local_files_path.append(root + "/" + file)                     elif root[-1] == "/":                         local_files_path.append(root + file)              # 在远程路径上创建对应的目录 for directory in local_directory:# 目录切割使用 replace 方法不是很严谨,镜像目录会出问题,留在这里抛个异常,后续再改。try:                     sftp.mkdir(remote_path + directory.replace(redundant_directory, ""))                 except Exception as e:                     raise ValueError(ip, e, remote_path + directory.replace(redundant_directory, ""))              # 上传文件 for local_file in local_files_path:                 sftp.put(local_file, remote_path + local_file.replace(redundant_directory, ""))                 print("%s put: %s" % (ip, remote_path + local_file.replace(redundant_directory, "")))          # 传文件 elif local_path_type == "file":if remote_path_type == "directory":                 file = local_path.split("/")[-1]                 sftp.put(local_path, remote_path + file)                 print("%s put: %s" % (ip, remote_path + file))             elif remote_path_type == "file":  # 远程路径为文件格式                 sftp.put(local_path, remote_path)                 print("%s put: %s" % (ip, remote_path))         else:             raise NotADirectoryError("请检查传入的本地路径:%s 和远程路径:%s 是否正确" % (local_path, remote_path))          sftp.close()         ssh.close()      def sftp_get(self, ip, remote_path, local_path, user=None, passwd=None, auth="id_rsa"):         client = self.ssh_client(ip, user, passwd, auth)         sftp = client.open_sftp()          # 检查本地路径是否存在,再检查是否为目录 if os.path.exists(local_path):if os.path.isdir(local_path):                 local_path_type = "directory"else:                 raise ValueError("本地路径:%s 错误,请传入一个正确的目录路径" % local_path)         else:             raise NotADirectoryError("本地路径:%s 不存在,请传入一个正确的本地路径" % local_path)          # 检查远程路径是否为文件或者目录 try:if stat.S_ISDIR(sftp.stat(remote_path).st_mode):                 remote_path_type = "directory"else:                 remote_path_type = "file"except:             raise NotADirectoryError("远程路径:%s 不存在" % remote_path)          # 如果远程路径和本地路径都是目录 if remote_path_type == "directory" and local_path_type == "directory":             folders = []  # 提前创建一个列表,用来接收递归遍历出的目录路径。#  递归遍历远程路径中的所有目录             self.check_remote_folders(sftp=sftp, remote_path=remote_path, remote_folders=folders)             folders.append(remote_path)  # 别忘了一级目录              # 遍历远程路径中的所有目录 for folder in folders:                 local_download_path = local_path + "/%s/" % ip + folder + "/"# 在本地新建文件夹                 os.makedirs(local_download_path, exist_ok=True)                 files = sftp.listdir_attr(folder)  # 获取远程目录内的文件信息# 遍历文件路径 for file in files:if stat.S_ISDIR(file.st_mode) is False:                         file_path = file.filename                         print("%s get:%s" % (ip, local_download_path + "/" + file_path))                         sftp.get(folder + "/" + file_path, local_download_path + "/" + file_path)         # 如果远程路径为文件,本地路径为目录 elif remote_path_type == "file" and local_path_type == "directory":             remote_file = remote_path.split("/")[-1]             local_download_path = local_path + "%s/" % ip             os.makedirs(local_download_path, exist_ok=True)             print("%s get: %s" % (ip, local_download_path + remote_file))             sftp.get(remote_path, local_download_path + remote_file)      def batch_sftp_get(self, ip, remote_path, local_path, user=None, passwd=None, auth="id_rsa"):         pool = multiprocessing.Pool(5)         for i in ip:             pool.apply_async(self.sftp_get, (i, remote_path, local_path, user, passwd, auth,))         pool.close()         pool.join()      def batch_sftp_put(self, ip, local_path, remote_path, user=None, passwd=None, auth="id_rsa"):         pool = ThreadPoolExecutor(max_workers=10)         task = [pool.submit(self.sftp_put, i, local_path, remote_path, user, passwd, auth) for i in ip]         for future in as_completed(task):             future.result()      def check_remote_folders(self, sftp, remote_path, remote_folders: list):# 递归遍历远程路径中的目录,并添加到 remote_folders 中 for f in sftp.listdir_attr(remote_path):# 检查路径状态是否为目录 if stat.S_ISDIR(f.st_mode):# 递归调用自己                 self.check_remote_folders(sftp, remote_path + "/" + f.filename, remote_folders)                 #  添加遍历到的目录信息到列表中                 remote_folders.append(remote_path + "/" + f.filename)      def get_info(self, path):         self.logger.debug("接收参数 path:%s".encode("utf8") % path.encode("utf8"))         if os.path.exists(path):             info_list = [i.replace(" ", "") for i in open(path, "r", encoding="utf8").readlines()]             return info_list         else:             self.logger.warning("%s 不存在,请传入一个正确的目录" % path)             raise ValueError("%s 不存在,请传入一个正确的目录" % path)      @staticmethoddef sn_random_generator():         lower_letters = random.sample([chr(i + ord("a")) for i in range(26)], 7)         num = random.sample([str(i) for i in range(10)], 5)         passwd = lower_letters + num         random.shuffle(passwd)         return "".join(passwd)      @staticmethoddef log_analysis(filename):if os.path.exists(filename):try:                 data = json.load(open(filename, "r", encoding="utf8"))                 return data             except Exception as e:                 print("%s 无法解析该类型文件" % filename + " " + str(e))                 raise TypeError("%s 无法解析该类型文件" % filename + " " + str(e))         else:             raise ValueError("该%s 文件路径不存在,请传入一个正确的文件路径" % filename)      def show_log(self, filename, mode=None):         data: dict = self.log_analysis(filename)         if isinstance(data, dict):             for key, value in data["results"].items():                 if "success" in value.keys():                     if mode == "success":                         print(key)                     elif mode is None:                         print("%s:%s" % (key, value["success"].replace("r ", "")))                 elif "failed" in value.keys():                     if mode == "failed":                         print(key)                     elif mode is None:                         print("%s:%s" % (key, value["failed"].replace("r ", "")))      @staticmethodasync def ping(ip):         pro = await asyncio.create_subprocess_exec("ping", "-c", "5", "-w", "5", ip, stdout=asyncio.subprocess.PIPE)         stdout = await pro.communicate()         print(stdout[0].decode("utf8"))      async def run(self, ip):         task = [self.ping(i) for i in ip]         await asyncio.wait(task)      def start(self, ip):         loop = asyncio.get_event_loop()         start_time = time.time()         loop.run_until_complete(self.run(ip))         print("总共耗时: %.2f" % (time.time() - start_time))      @staticmethoddef ping_analysis(filename):         data = [i.replace(" ", "") for i in open(filename, "r", encoding="utf8").readlines()]         ip_list = []         for i in data:             if "64 bytes" in i:                 ip_data = re.search(r"[0-9]+.[0-9]+.[0-9]+.[0-9]+", i)                 ip_list.append(ip_data.group())         if len(ip_list) > 0:             ip = list(set(ip_list))             ip.sort()             for j in ip:                 print(j)             return ip         else:             print("所有 ip 均不通,或者传入的文件有误")             return "所有 ip 均不通,或者传入的文件有误"   if __name__ == "__main__":     taskManager = TaskManager()     parser = argparse.ArgumentParser(description="TaskManager v1.3.9 build by hrj")     parser.add_argument("path", help="文件路径")     parser.add_argument("-a", action="store_true", help="多个 ip 执行不同的命令")     parser.add_argument("-b", action="store", help="多个 ip 执行相同命令", metavar="[shell 命令]")     parser.add_argument("-l", action="store_true", help="解析执行命令生成的日志")     parser.add_argument("-ls", action="store_true", help="获取执行成功的 ip")     parser.add_argument("-lf", action="store_true", help="获取执行失败的 ip")     parser.add_argument("-p", action="store_true", help="异步 ping 多个 ip")     parser.add_argument("-q", action="store_true", help="解析异步 ping 的能通的 ip")     parser.add_argument("-put", nargs=2, help="多个 ip 批量 sftp 上传相同的文件", metavar=("[本地路径]", "[远程路径]"))     parser.add_argument("-get", nargs=2, help="多个 ip 批量 sftp 下载相同的文件", metavar=("[远程路径]", "[本地路径]"))     parser.add_argument("-auth", nargs=2, help="ssh/sftp 用户名、密码认证", metavar=("[用户名]", "[密码]"))      args = parser.parse_args()     taskManager.logger.debug("接受参数:%s".encode("utf8") % str(vars(args)).encode("utf8"))      # ssh     username = None     password = None     auth_type = "id_rsa"      file_info = taskManager.get_info(args.path)      # ssh 认证 if args.auth:         username = args.auth[0]         password = args.auth[1]         auth_type = "noAuth"      if args.a:         taskManager.batch_ssh(ip=file_info, cmd_type="many", user=username, passwd=password, auth=auth_type)     elif args.b:         taskManager.batch_ssh(ip=file_info, cmd=args.b, user=username, passwd=password, auth=auth_type)      # sftpelif args.put:         taskManager.batch_sftp_put(             ip=file_info, local_path=args.put[0], remote_path=args.put[1],             user=username, passwd=password, auth=auth_type)     elif args.get:         # sftp 多进程并发会出现代码瞬间执行完毕,不报错的情况,待解决。for ip in file_info:             taskManager.sftp_get(                 ip=ip, remote_path=args.get[0], local_path=args.get[1], user=username, passwd=password,                 auth=auth_type             )      # 日志 elif args.l:         taskManager.show_log(args.path)     elif args.ls:         taskManager.show_log(args.path, mode="success")     elif args.lf:         taskManager.show_log(args.path, mode="failed")      # 异步 pingelif args.p:         taskManager.start(file_info)     elif args.q:         taskManager.ping_analysis(args.path)  使用示例:
  参数说明: path为IP地址文件路径时,使用-a , -b , -get , -put, -p。-auth仅支持配合前4个参数一起使用。 path为日志文件(/tmp/TaskManager/log/task_defalut_xxxxxx.log)路径时,使用-l, -ls, -lf。 当使用-p时需要重定向到文件才能用-q解析文件得到ping通的ip。 批量执行命令时,命令行末尾会有一个Task_sn:xxxxxxxxx这个是日志文件的名称不重复的部分,便于去找日志(感觉有点蹩脚,如果有好的办法请告诉我)。
  日常刷命令:
  密码认证:
  公钥认证:
  获取日志:
  可以配合 grep,awk 等命令精准过滤。
  总结
  个人认为 Python 在初中级运维工作中的性质更像是工具,以提升工作效率、减少管理成本为主。可以从当前繁琐的工作中解脱出来,去探索更有价值的事情。python 本质上并不会减少故障的产生,所以在不同的阶段合理利用自身掌握的知识解决当前最重要的痛点,千万不要本末倒置。

华为MateX全新折叠专利曝光,大屏又想来抢我钱包了根据LetsGoDigital的一份报告,华为在今年早些时候向WIPO(世界知识产权局)申请了专利。该专利主要内容为一款带有滚动显示屏的手机,该专利显示手机搭载有磁吸组件,可在拉出直播利器阿斯盾AW651电脑摄像头前言(此图片来源于网络)短视频和直播平台的流行,让现在几乎人人都能成为一名网红无论你是什么样的身份,游戏打得出色的大学生养竹鼠的草根网会抓泥鳅的小姑娘只要你的视频内容够新够奇够野够4999元起,惠普推出星13Air轻薄本AMDR75800U2。5K屏幕IT之家8月9日消息惠普电脑再上新,现已推出两款星13Air机型,搭载AMDR55600U和R75800U的版本首发价分别为4999元和5499元。IT之家了解到,该机采用了13。四路投屏120Hz高刷屏,TCL巨幕智屏带来影院般震撼体验受消费升级面板涨价和产品均价提升等多重因素影响,当下大部分的年轻人越来越倾向于选购大屏高端电视。为紧跟消费趋势,国内外各大家电企业更是纷纷入局大屏电视市场,不断推出平价大屏液晶电视VivoiQOO8Pro颜值出炉,搭载微云台?全系采用120W充电VivoiQOO8系列的海报已经出炉,从渲染图看,这次iQOO8Pro将搭载微云台,拍照防抖上基本没有什么缺憾,屏幕也是采用一块6。78英寸的三星OLED曲面居中打孔屏,E5发光材提前抢跑!三星GalaxyBuds2已被摆上亚马逊货架,外观长这样在不久前,三星官方正式官宣,将于8月11日召开秋季新品发布会。根据网上爆料信息可以获悉,在本次发布会上,三星将为我们带来包括ZFold3ZFlip3两款新款折叠屏手机新一代智能手表家用投影仪推荐一下哪款比较好?做了些功课,最后选择它投影机市场这几年间屡创新高,投影仪从企业办公用具走入家电行业,甚至可随身携带,与你一同旅行而这都要归功于越来越火热的智能微投也可以称之为家用投影仪。家用投影仪推荐一下哪款比较好?对有多少人用着5G手机,却把5G网络关掉或是依然使用4G套餐5g手机也已降到了千元机的范畴内,5G手机出货量大增,目前总出货量的8成都是5G手机,并且势头仍在延续。但是有多少人用着5G手机,却把5G网络关掉或是依然使用4G套餐。为什么会造成易感人群睡眠成奢侈品?TCL卧室新风空调吹来健康空气成全民刚需空调,英文名AirConditioner,空气调节器的简称,顾名思义用来调节空气。但在过去很长一段时间内,空调更多只是一个温度调节器,对于空气湿度洁净度含氧量等调节作用几乎微乎其微华为MatePad2021款详细测评120Hz高刷全面屏配合全家桶,真的爽特意选了一个128内存,使用起来无压力,主要给家里小朋友上网课用,屏幕尺寸够大,清晰度,刷新率都不错,而且最主要的一点是有儿童乐园,可以保护眼睛,纠正坐姿和看平板的距离,真是非常贴王思聪百万元配电脑详细配置曝光性能全球仅排第四最近王思聪装了台电脑,不过严格来说这个配置应该是台服务器,废话不多说,先上配置列表机箱DellEMCPowerEdgeR7525机架式服务器CPUAMDEPYC7763内存SKHy
巨头流量花园封锁下,品牌如何打造私域流量池?巨头流量花园有他巨头强势的优势,小家品牌流量自有小家的灵活优势,名家大宛占有声势显赫为主宰,他不屑委身小微答理家常,他的流量源大都居于讲坛,而讲坛下面的后勤有着为数不少的各个行业,曝vivoX70Pro将首次升级50W无线充电今日,微博大V数码闲聊站爆料,vivo将有一款新机配备4500mAh电池55W有线快充和50W无线快充。结合vivo目前的产品更新进度推测,该机应该属于X70系列。此前,博主熊猫很消费降级6块9的宿舍神器,无极调光3色温触控酷毙灯晒单简评Tony哥的Buy家日常车站6块9的精装高配版宿舍酷毙灯到手,对比以往34块钱的廉价入门产品贵了近一倍,功能表现强了几何?闲话少说,老规矩看图说话包装盒还算正规,比之前3块5酷毙灯iPhone13下月就来,还有这些重磅新品今年的八月已经过去了一半,对于科技数码爱好者们来说,又到数着日子等iPhone发布会的时候了。在公认的科技春晚这一天,库克会现身ApplePark,与诸位高管一同揭晓苹果打磨的最新重拾社交那一刻,美团就停战了自2020年拼多多上线主打真实评价的产品拼小圈后,各大电商也纷纷上线了小圈类产品。拼多多以外,淘宝目前上线了淘友圈,京东上线了京友圈近日,美团也开始内测能够建立社交关系的外卖分享功喜讯五笑智能科技同屏器获颁HUAWEIHiLink生态产品技术认证喜讯8月13日,由东莞市五笑智能(联力胜科技)自主研制的同屏器NC81216XS经全球著名通讯设备商华为技术有限公司认证,产品符合HUAWEIHiLink技术规范和要求,获颁华为公李彦宏发布百度Apollo汽车机器人不设方向盘无需人类驾驶在2021百度世界大会上,百度CEO李彦宏正式发布了一款名叫Apollo的汽车机器人。根据李彦宏的介绍,这款汽车机器人内部没有设置方向盘踏板这种需要人为控制的操作设施。这款机器人主用过小米平板5后,我只想说在座各位都是渣渣8月10日晚上,被米粉称之为咕咕派的小米平板5,终于正式发布。而且从当晚机哥的文章留言来看,机友们对那场发布会最感兴趣的产品,也是它。其实说到安卓平板,小米算得上的比较早入局的那一iPhone13也可能会出现缺货,这次台积电拖后腿,污染惹的祸?华为P50系列发布以后,遗憾缺失5G功能,现在开售的华为P50Pro麒麟芯片版本依旧缺货需要抢购。迫于无奈,不少小伙伴把目光放在了9月份即将发布的iPhone13新机身上。但就在近方太让厨房凉了,技术矩阵香了2021年8月12日,方太年度发布会又爆了。很多业界人士几乎习惯了,在每年的年中等候方太这样一场发布会,期待这个高端厨电的引领者,在这一年的年度会上将带来怎样震惊业界的新物种。已经华为显示器S24正式开售低蓝光无频闪售899元起CNMO新闻对于需要使用电脑进行工作的人来说,一块好的电脑显示器可是非常重要,它的显示效果可能就关乎着大家是否能够舒适工作。8月18日,华为显示器S24正式开售,售899元起,是一