全国主要流域重点断面水质自动监测周报主要包含分类、序号、采集数据日期、点位名称、周期、PH、DO、CODMn、NH3-N、本周水质、上周水质、主要污染指标,发布时间。


周报数据源:


上述数据源中数据为PDF,在ELT入库过程中,数据抽取模块需要将PDF这样的非结构化数据抽取并转换成excel/csv,以方面后续处理。

PDF周报数据的抽取难点在于PDF中表格只是打印助记符,而不像HTML这样的包含起始标签,行列等,处理起来较为麻烦。


测试了几款python库将PDF转为excel, 均有各种问题,比如pdf2docx,camelot,tabula,最后发现pdfplumber处理效果较好,可以较完整地处理合并的单元格,翻页,复杂表头等情况,附上主要处理代码:

#-#! /usr/bin/python
# -*- coding: UTF-8 -*-

from pymysql_lib import UsingMysql
import os,shutil,re,time,datetime
from os.path import exists
import pdfplumber

def check_it():

    with UsingMysql(log_time=True) as um:
        #um.exec("truncate table mda_aqi_air_hourly_fact_tmp")
        for root, dirs, files in os.walk(os.getcwd()+"\\incoming"):  #
            for file in files:
                if file.endswith(".pdf") and not exists("processed\\"+file):# and file == "2008年第28期.pdf":
                    bValid = 0
                    print ('\nProcessing: '+file)

                    pdf = pdfplumber.open("incoming\\"+file)

                    data = []
                    bHeaderFound = 0

                    for index in range(len(pdf.pages)):  
                        if index == 0:
                            summary = pdf.pages[0].extract_text()
                            #print(summary)

                        page = pdf.pages[index]
                        #print(page.extract_tables())
                        for table in page.extract_tables():
                            for row in table:
                                if ((row[0] == '序\n号' or row[0] == '序号') and bHeaderFound == 1) or row[0] == None or (not row[0].isdigit() and index != 1 and index != 0):
                                    continue

                                if row[0] == '序号' or row[0] == '序\n号':
                                    bHeaderFound = 1

                                data.append(row)

                                #print(row)

                    lastCol1 = ""
                    lastCol2 = ""

                    bDataValid = 1
                    bWithoutRiver = 0

                    if len(data[0]) == 11 and data[0][2] == "点位名称":
                        bWithoutRiver = 1
                        data[0].insert(2, "河流名称")

                    for rowId in range(len(data)):
                        if len(data[rowId]) == 11 and bWithoutRiver == 1:
                            data[rowId].insert(2, "")

                        if len(data[rowId]) != 12:
                            bDataValid = 0
                            print(data[rowId])
                            break

                        if data[rowId][1] != None:
                            data[rowId][1] = data[rowId][1].replace('\n','')

                        if data[rowId][1] != None and len(data[rowId][1]) != 0:
                            lastCol1 = data[rowId][1]

                        if data[rowId][2] != None and len(data[rowId][2]) != 0:
                            lastCol2 = data[rowId][2]

                        if data[rowId][1] == None or len(data[rowId][1]) == 0:
                            data[rowId][1] = lastCol1

                        if data[rowId][2] == None or len(data[rowId][2]) == 0:
                            data[rowId][2] = lastCol2

                        for d in data[rowId]:
                            if d == None and rowId != 0:
                                bDataValid = 0
                                print(data[rowId])
                                break    
                        
                        if bDataValid == 0:
                            break

                    pdf.close()

                    #'序\n号', '水系', '河流名称', '点位名称', '断面情况', '评价因子(单位:mg/L)', None, None, None, '水质类别', None, '主要污染指标'
                    if bDataValid == 1 and len(data) > 0 and len(data[0]) == 12 and (data[0][0] == "序\n号" or data[0][0] == "序号")   \
                         and (data[0][1] == "水系" or data[0][1] == "流域") and data[0][2] == "河流名称" and (data[0][3] == "点位名称" or data[0][3] == "断面(点位)名称") and (data[0][4] == "断面情况" or data[0][4] == "断面(点位)情况") \
                         and (data[0][5] == "评价因子(单位:mg/L)" or data[0][5] == "评价因子(单位:\nmg/L)") and data[0][6] == None and data[0][7] == None\
                         and data[0][8] == None and (data[0][9] == "水质类别" or  data[0][9] == "水质别") and data[0][10] == None \
                         and data[0][11] == "主要污染指标":

                        if loadData(summary, data, file, um) == 1:
                            bValid = 1

                    else:
                        print("unexpceted: "+file+", "+str(len(data))+", "+str(bDataValid))  
                        if len(data) > 0 : 
                            print(data[0])

                    if bValid == 1 :
                        shutil.move("incoming\\"+file, "processed\\"+file)
                        print ('Processed: '+file)

if __name__ == '__main__':
    check_it()


处理PDF周报的过程中主要遇到两个问题,

一是pdfplumber也没能处理翻页时表格不完整的问题,比如下图中第二页最后一行和第三页第一行的情况。

二是周报数据虽然是程序生成的,但还是还是有各种数据的轻微差异,如生成时间属于某年第X周等,还有表头可能不相同,需要校验数据。


那这现两个问题是怎么处理的呢?目前没有发现方便的方法使用程序处理,只能手动进行修复!

针对问题一,使用PDF编辑软件,如wps等,将线条补齐,对,就是在PDF编辑软件里画一条线补齐表格,正如前文说的PDF里的表格仅是打印符,补上一条线后pdfplumber就能正确识别,pdfplumber真是强大!

针对问题二,只能在读取数据后在程序里多校验几遍,要求表头正确,取出来的各数据通过正则进行校验,数据清洗貌似也只能这样简单粗暴地处理


OK,这两个问题解决后,基本能处理80%的数据了,剩下的20%是各种数据奇葩问题,慢慢一条一条处理就好了


最后再感慨一下python写小工具是真方便,各种类库非常多,用来做数据仓库过程中多源数据清洗抽取融合非常方便。同时,在AI处理方面比如语音,图片等开源实现非常多,用来实现各类工具事半功倍。

月萌API采集部分使用c++,因为最熟悉c++,写不需要很多上下游生态的后台程序很方便,什么都能实现,资源消耗能控制得很小。

月萌API的API网关部分使用由Nitfix开源而来的Spring Cloud微服务网关,使用Spring Cloud的第一感觉就是一个语言的生态太重要了,几行自定义代码就能实现很复杂的功能,比如鉴权、流量控制、灰度、负载均衡等等。

另外,使用PHP实现了月萌API的主站(基于ThinkPHP),可用性监控站及采集部分的控制UI实现(C++写UI太麻烦了,MFC代码写起来会比php多几倍),也使用PHP实现了一些在线的小工具,PHP写网页太方便了。

还有.Net写了一些小工具,有了Nuget后生态也很不错,很多库可以使用


好像跑题了,不过上面主要说的就是工欲善其事,必先利其器,选择合适的工具去做对应的事,他山之石,可以攻玉!


waterqweekly.png


另外附上实时水质监控数据源:

国家水质自动水质实时数据: http://106.37.208.243:8068/GJZ/Business/Publish/Main.html

抓取此实时数据可以参考:国家地表水水质自动监测实时数据发布系统——动态网页爬虫 https://blog.csdn.net/qq_20161013/article/details/100174191

当然也可以直接使用本站接口获取实时和历史数据,此类数据采集较为容易,主要难点在于需要长时间周期性地获取并保存数据