本文选自《交易技术前沿》总第四十五期文章(2021年6月)

赵川 / 国联证券股份有限公司 zhaoc@glsc.com.cn

金融企业部署了大量的安全设备及平台对日趋加剧的网络攻击进行安全防护,面对海量的告警数据,如何快速从中识别出真正的恶意地址对其实施拦截是安全人员急需解决的问题。国联证券基于开源组件,通过自建统一日志管理平台,对各类安全设备告警日志进行统一管理,并采用自动化方式,将告警源地址汇集清洗后,发往第三方威胁情报平台进行威胁查询,对于查询结果明确为恶意的地址,自动在边界防护设备中实施永久封禁,实现统一日志平台、威胁情报、安全设备之间的多方联动。

一、引言

近年来,网络攻击呈现攻击源多样化的趋势,来自云主机、工控设备、智能产品等具有联网功能的网络终端发起的攻击流量占比明显增多。根据国家计算机网络应急技术处理协调中心发布的《2019年中国互联网网络安全报告》显示,在DDOS攻击中参与真实地址攻击的肉鸡达到340余万个。网络安全威胁信息共享平台发布的《2020上半年公共互联网网络安全态势及威胁监测处置报告》统计数据显示,该平台收集的恶意IP地址数量约188万个,以恶意扫描服务器IP地址为主。

金融企业由于行业特殊性,天然成为网络攻击的首选目标,金融行业的重要信息系统无时无刻不面临来自互联网的攻击流量,在这些流量中,又以利用特定漏洞为目的的无差别扫描为甚。同时金融企业大多已经部署了较为全面的安全防护设备,如防火墙、Web应用防火墙、入侵检测/防御等,对攻击行为进行拦截并产生攻击告警,还有部分企业通过购买第三方机构威胁情报服务,对这些攻击流量的源地址进行威胁分析,采取进一步的安全防护策略,比如安全设备当前仅对当次攻击流量进行检测并拦截,若该地址进行下一次攻击时,安全设备仍需要再次识别,所以对这些地址进行永久封禁是比较好的选择,这在重保等特殊时期是非常有用的手段。但由于安全设备的多样性,安全设备之间告警日志无法统一管理,大多安全设备无法直接与威胁情报系统进行集成,面对海量的告警数据,如何快速从中识别出真正的恶意地址,给安全人员带来了不小的挑战。另一方面,由于金融信息系统运行实时性要求较高,为了避免变更对业务系统带来不可预估的影响,通常在夜间执行变更操作,这就压缩了运维人员的可操作时间,在识别出真正具备威胁的恶意地址后,如何在安全设备中对这些地址进行准确快速地实施封禁策略同时避免频繁的变更,也是安全运维人员需要着重考虑的问题。

国联证券基于开源组件,通过自建统一日志管理平台,对各类安全设备告警日志进行统一管理,并采用自动化方式,将告警源地址汇集清洗后,发往第三方威胁情报平台进行威胁查询,对于查询结果明确为恶意的地址,自动在边界防护设备中实施永久封禁,达到不同安全设备、威胁情报、统一日志平台三者之间多方联动效果。

二、技术架构

(一)总体设计

本技术实践总体设计框架由安全设备、统一日志平台、威胁情报源和处理引擎四部分组成,技术架构如图1所示:

(1)安全设备

安全设备在企业安全防护中起到攻击监测告警和攻击拦截作用,是安全日志的直接输出者和防护策略生效者。但在企业安全实践中,部署的安全设备往往存在品牌多样、类型繁复等问题,不同类型品牌的安全设备提供的安全功能不尽相同,产生的告警日志也无法做到格式统一,需要在统一日志平台进行结构化处理后再行入库。另外由于边界安全设备起到对企业整网入口的防护功能,所以最终经威胁情报判定为恶意的源地址,应在边界设备进行黑名单写入。

(2)统一日志平台

统一日志平台是本次技术实践的核心节点,提供了安全日志处理、存储、分析、查询等核心功能,也是安全运营、态势感知等上层安全系统依赖的基础。本例直接使用ELK+Kafka的形式搭建自有统一日志平台。ELK指的是Elastic Stack,由Beat、Logstash、Elasticsearch和Kibana四个开源组件组成,Beat用于接收安全设备的Syslog日志;Logstash 是服务器端数据处理管道,能够同时从多个来源采集数据,转换数据;Elasticsearch 是一个基于Lucene的搜索和分析引擎;Kibana可以对Elasticsearch 中存储的数据以图形和图表形式进行可视化;Kafka是一个消息队列组件,加入Kafka的目的,是为了将安全设备的原始日志通过Beat接收后先发送至Kafka消息队列,再由Logstash进行消费,避免Logstash在日志转换过程中由于处理性能问题导致原始日志积压或丢失。

(3)威胁情报源

根据Gartner对威胁情报的定义,威胁情报是某种基于证据的知识,主要内容为用于识别和检测威胁的失陷标识,市面上知名的威胁情报源有微步在线情报社区、绿盟威胁情报中心、IBM X-Force Exchange威胁情报共享平台等,在本例中,主要利用了微步威胁情报的云API接口,实现自动化的情报查询和获取。

(4)处理引擎

处理引擎的作用主要是检索日志平台中存储的结构化日志数据,将源地址进行清洗后批量在威胁情报源中查询,并将判定为恶意的地址自动写入边界防护设备要读取的黑名单列表。

(二)处理流程

整体的处理流程大致可分为安全日志转换与存储、地址清洗与情报查询、黑名单写入与策略下发三个步骤,如图2所示:

1、安全日志转换与存储

在本次技术实践中,采用标准化的Syslog协议,将安全设备的原始日志传送至统一日志平台,由日志平台对原始日志进行统一结构化处理,格式化成由Key-Value构成的JSON键值对形式保存在日志平台中,方便日后对日志的分析和查询。本例中我们在部署在网络边界的下一代防火墙(NGFW)上启用了动态地址对象读取功能,并设置一条对动态地址执行DROP的默认策略,实现自动化的地址读取和策略加载。

2、地址清洗与情报查询

处理引擎可以实现读取日志平台的结构化日志数据,提取有用的字段,本例中主要是源地址这一字段,在获取源地址后,需要进行去重,再将地址列表通过请求威胁情报API方式,查询该部分地址的威胁属性,确认是否为恶意地址。

3、黑名单写入与策略下发

在经过威胁情报源判定后,可以获得源地址的威胁属性,对于判定为恶意的源地址,由处理引擎输出至黑名单列表,该列表以Web资源方式部署在Web服务上供NGFW读取,NGFW在读取黑名单列表后,会自动将恶意IP关联至策略源地址对象,执行阻断操作。

三、具体实现

按照核心功能的不同,可以从统一日志平台和处理引擎两个模块进行技术实现,下面主要对这两个模块的具体实现细节进行分析。

(一)统一日志平台

统一日志平台在Elastic Stack技术栈的基础上,加入Kafka消息队列,实现对安全设备告警日志的统一管理,系统架构如图3所示。

1、日志采集

日志采集主要由Syslog、Beats、Kafka三部分组成,完成了从安全设备源端将告警日志采集至消息队列的过程。

(1)Syslog

Syslog广泛应用于系统日志,是日志传输的消息标准,安全设备均支持以Syslog方式将自身日志发送至第三方平台。这里提供两种收集Syslog的方式,第一种可以依赖Linux系统中的rsyslog系统,接收安全设备的日志,转存为文件保存在rsyslog服务器上,再使用Beats家族的Filebeat组件,对日志文本进行采集;第二种可以直接使用Beats的Syslog功能,监听安全设备发送过来的Syslog,两种方式的区别在于前一种多保留了一次设备的原始日志。

(2)Kafka

Kafka是一种高吞吐的分布式消息发布订阅系统,引入Kafka的目的,是为了解决下游的Logstash在进行日志转换时可能产生的日志丢失问题,若直接将Syslog发送至Logstash,因Logstash需要进行大量的正则匹配处理,当Logstash未能及时处理消息时,就可能造成日志丢失。而引入Kafka后,Logstash就可以根据自身的处理能力,顺序从Kafka消费日志数据,避免了日志丢失。

在接收Syslog后,需要通过Beats,将Syslog发送至Kafka。在Kafka的配置上,应考虑建立的Topics数,在本例中,我们根据不同品牌的产品,建立Topic,对于同一品牌的设备,无论部署位置,均发送至同一Topic,便于读取,如图4,建立了checkpoint、paloalto、waf三个topic,分别存储NGFW和WAF的告警日志,经过Beats采集后的日志,安全设备的原始Syslog会被存放在message字段中,如图5。

图4:

图5:2、日志转换

日志转换的作用是将Message中的Syslog进行结构化清洗,转换成键值对形式,再存储至Elasticsearch。这里需要依赖Logstash组件完成,Logstash先从Kafka消息队列消费日志数据,再对数据进行清洗与转换,丢弃不需要的字段,对于部分日志内容,添加自定义字段名,然后生成Elasticsearch需要的索引。表1是对WAF日志进行转换后,最终的字段形式(部分)。

表1: 图6展示了经过Logstash处理后,一串原始Syslog在Elasticsearch中存储的形式。在对Syslog进行转换时,需要在Logstash的配置文件中,按照不同的Kafka Topic,编写不同的解析策略,生成不同的Elasticsearch Index。

图6:

3、日志存储

(1)Elasticsearch

经过Logstash转换的Syslog数据,便可以Kafka Topic加日期为名称在Elasticsearch建立索引。Elasticsearch是Elastic Stack技术栈中最重要的组件,是一个高扩展的分布式全文检索引擎,可以近乎实时的存储、检索数据,同时支持方便的进行扩展。Elasticsearch属于非关系型数据库的一种,本质上存储的数据就是JSON格式的文档,这里存储的就是JSON格式的Syslog数据,Elasticsearch对JSON文档中的每一个字段都会构建一个对应的倒排索引。

(2)Kibana

Kibana则提供了一个友好的Web界面,可以搜索、查看、操作存储在Elasticsearch索引中的数据,通过使用Kibana能对处理后的数据进行可视化的展示。在将Elasticsearch Index全部汇总至Kibana Index后,便可以绘制仪表盘,对告警日志进行集中展示,如图7。

图7:

(二)处理引擎

处理引擎需要实现日志检索、情报查询和黑名单写入三方面的功能,Java或Python均能较为容易的实现上述功能的代码编写,这里采用Python语言来编写处理引擎。

另一方面,由于金融行业变更操作规范要求,一般不在日间执行变更操作,故处理引擎并不适合实时运行,因此将处理引擎设定为夜间运行是比较好的做法。处理引擎汇总前一天告警日志中的所有源地址,去重后在威胁情报源查询,再将恶意地址写入黑名单列表,最后由边界防护设备自动读取黑名单,下发拦截策略。

1、日志检索

官方提供了Python用于ElasticSearch数据检索的专用代码包,只需引入ElasticSearch包,便可方便的操作ElasticSearch存储的数据。这部分的主要代码逻辑就是从ElasticSearch中读取前一天所有安全设备Index的所有日志数据,提取源地址字段的值并进行去重,最终作为要发往威胁情源报进行查询的地址。

图8:

2、情报查询

情报查询需要连接第三方威胁情报源API,这里采用微步在线情报社区提供的在线云API。微步云API提供了IP 分析、IP 信誉、域名分析等基础功能接口,支持HTTP方式对这些接口进行调用,由于微步在线对一个地址是否为恶意自有的一套判定机制,这里直接采用微步情报的判定结果,scene/ip_reputation接口的is_malicious响应字段表示该地址是否为恶意。可以将接口返回的is_malicious(是否为恶意IP)、confidence_level(可信度)、severity(严重级别)等数据与地址列表合并形成一个Dataframe格式的数据,写入查询结果文件,如图9。

图9:

3、黑名单写入

处理引擎的黑名单写入需要先过滤出is_malicious为Ture的行,再将这些行的源地址,写入一个文本文件,这也在处理引擎实现。之后边界防火墙便可以读取写有恶意地址的文本文件,以Paloalto防火墙为例,Paloalto防火墙的外部动态列表功能,支持从一个URL读取地址列表,写入自身的地址对象,并且可以规定读取的时间和频率。为了配合处理引擎的运行时间,通常设定在处理引擎生成每日的封禁地址列表之后进行读取,随后便可创建一条默认拦截策略,源地址设置为外部动态列表对象。对于不支持类似外部动态列表功能的防火墙,也可以使用脚本方式,通过命令行将这些恶意地址批量刷入配置文件,以达到相同的功能。

四、总结

本次技术实践对传统安全运维中依赖人工执行恶意地址封禁这一典型场景进行了优化与改进,首先利用Elastic Stack开源平台对不同品牌类型的安全设备告警日志实现了统一管理,相较于使用某一厂商的特定统一日志管理系统,使用开源组件具有更好的灵活性与可维护性。在统一日志管理的基础上,本次技术实践还引入了威胁情报,可以更好地对恶意地址从各维度进行分析,最终结合威胁情报判定结果,对恶意地址实施自动化封禁,实现统一日志平台、威胁情报、安全设备之间的多方联动。

随着网络攻击的日益增多,也催生了更多种类的安全设备、安全技术或安全名词,对于金融企业来说,如何将不同的安全产品纳入统一管理,避免产生安全孤岛,真正实现多方联动,是安全建设的重点也是难点。国联证券将对此进行更多的探索,提高安全运营水平。

声明:本文来自上交所技术服务,版权归作者所有。文章内容仅代表作者独立观点,不代表安全内参立场,转载目的在于传递更多信息。如有侵权,请联系 anquanneican@163.com。