Flask 实现远程日志实时监控

前言

在自动化运维系统中,常常需要监控日志,这些日志是不断更新的。本文提供了一种实时日志监控的 Python 实现。主要实现以下功能:

  • 抓取远程机器的终端输出到服务器上。
  • 将服务器的日志更新实时显示到客户端网页上。

文中示例基于 Python 以及 Flask。

主要依赖:

分析

服务器的主要工作是通过 SSH 连接到远程机器上执行任务,在项目中使用了 Worker 来管理每个工作。Worker 在单独的线程中运行,运行结束后将结果保存至数据库并把自己从「包工头」处删除。

这样,在工人工作过程中,将产生日志。由于日志更新频率快,于是我想到了用 Redis 来做中间存储,数据结构用 Redis 的列表,一有更新就发给客户。另外,工人结束后,我还想查看日志,就必须将日志保存成静态文件。

总体逻辑是这样的:

请求网页 -> 获取日志 -> Worker 是否工作中? -> 是:到 Redis 日志队列中获取并实时更新

-> 否:从文件读取

获取远程输出

那么下面要解决的问题是如何从远程机器上获取终端输出并添加到日志队列中。在 Python 中,SSH 连接相关的库是 paramiko,于是我自然就想用下面的方法:

Python
client = paramiko.SSHClient()
client.load_system_host_keys()
client.connect(host)
stdin, stdout, stderr = client.exec_command(command)
while stdout.channel.exit_status_ready():
    logger.log(stdout.readline())

然而运行后发现,无论是 readline 还是 read(size),都是阻塞的,要等到运行完成才能输出。这显然不符合我的要求,还好有对应的channel对象供我使用,于是代码改成:

Python
stdin, stdout, stderr = client.exec_command(command)
channel = stdout.channel
pending = err_pending = None
while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready():
    readq, _, _ = select.select([channel], [], [], 1)
        for c in readq:
            if c.recv_ready():
                chunk = c.recv(len(c.in_buffer))
                if pending is not None:
                    chunk = pending + chunk
                lines = chunk.splitlines()
                if lines and lines[-1] and lines[-1][-1] == chunk[-1]:
                    pending = lines.pop()
                else:
                    pending = None
                [logger.log(line) for line in lines]
            if c.recv_stderr_ready():
                chunk = c.recv_stderr(len(c.in_stderr_buffer))
                if err_pending is not None:
                    chunk = err_pending + chunk
                lines = chunk.splitlines()
                if lines and lines[-1] and lines[-1][-1] == chunk[-1]:
                    err_pending = lines.pop()
                else:
                    err_pending = None
                [logger.log(line) for line in lines]

这里使用了 select 来控制 IO,另外需要说明的是循环条件:当所有输出都读取完毕时channel.closedTrue,而exit_status_ready()是当进程运行结束时就为真了,此时输出不一定都读完了。pendingchunk是用来整行读取的。logger 的 log 方法定义如下:

Python
def log(self, message, *args):
    self.log_queue.put(message % args)

其中log_queue.put将消息推入 Redis 列表。

日志实时更新

下面我们需要实现一种网页显示,当用户访问时,显示当前日志,若日志有更新,只要网页还打开,无需刷新,日志就是实时更新到网页上。另外,还需要考虑到有多个客户端连接的情况,日志应该是同步更新的。

对于一般的 HTTP 连接,客户端一次请求完毕后立即得到响应,若不重新请求就无法得到新的响应,服务器是被动的。要实现这种服务器的主动发送,大致有三种方法:AJAX, SSE 和 Websocket。

AJAX 就是客户端自动定时发请求,定时间隔事先指定,不是真正的实时。 SSE 其实是一种长连接,只能实现服务器向客户端主动发送消息。 Websocket 是服务器与客户端之间的全双工通道,需要后端的软件支持。

权衡以上三者,SSE 是能满足我的要求的代价最小的选择。它的原理是客户端建立一个事件监听器,监听指定 URL 的消息,在服务器端,这个 URL 返回的响应必须是一个流类型。只要将响应体设为一个生成器,并设置头部为mimetype='text/event-stream'就行了。在Flask上,已经有封装好的扩展Flask-SSE,直接安装使用就行了。Flask-SSE是通过 Redis 的 Pubsub 实现的消息队列。然而,只有在连接建立以后发送的数据才能收到。只要在生成网页时,将日志队列中已保存的消息显示出来,并建立事件监听接受新的日志即可。代码如下:

Python
from flask_sse import sse
app.register_blueprint(sse, url_prefix='stream')

@app.route('/job_status/<int:job_id>/log')
def job_log(job_id):
    worker = wk.get_worker(job_id)
    body = []
    if not worker:
        report_folder = utils.get_directory_path(
                            current_app.root_path,
                            current_app.config['REPORT_FOLDER'],
                            str(job_id),
                       )
        log_file = os.path.join(report_folder, 'worker.log')
        if os.path.exists(log_file):
            body = open(log_file, 'r').readlines()
    else:
        body = list(worker.logger.log_queue)
    html_body = '\n'.join('<p>%s</p>' % line for line in body)
    return render_template('job_log.html', body=html_body, job_id=job_id)

相应地,添加日志时就要同时发送消息到Pubsub:

Python
def log(self, message, *args):
    message = message % args
    self.log_queue.put(message)
    sse.publish({'message': message}, type='log')

job_log.html 内容如下:

Html
{% extends 'layout.html' %}

{% block content %}
<div class="row">
  <div class="col-md-10 col-md-offset-1">
	<div class="log-container">
	  {{ body|safe }}
	</div>
  </div>
</div>
{% endblock %}

{% block extra_script %}
<script type="text/javascript">
  var source = new EventSource("{{ url_for('sse.stream') }}");
  var div = document.getElementsByClassName('log-container')[0];
  source.addEventListener('log', function(event) {
      var data = JSON.parse(event.data);
      var newLine = document.createElement('p');
      newLine.innerHTML = data.message;
      $(newLine).appendTo($('.log-container'));
      div.scrollTop = div.scrollHeight;
  }, false);
  $(function() {div.scrollTop = div.scrollHeight;});
</script>
{% endblock %}

参考链接:

最后编辑于
知识共享协议
本作品采用CC-BY-SA许可.