【数仓搬迁】gsql结合python交互应用的实用技巧

举报
wzhyalfa 发表于 2020/11/21 22:48:03 2020/11/21
【摘要】 在常规python程序使用gsql交互数据时,每次都需要重新连接数据库,然后再执行查询SQL,特别是Select,查询结果还想继续和数据库交互时,利用以下方式实现。

在常规python程序使用gsql交互数据时,每次都需要重新连接数据库,然后再执行查询SQL,特别是Select,查询结果还想继续和数据库交互时,利用以下方式实现。

即使用expect实现该能力,代码参见如下,<gsql.py>

#!/usr/bin/env python
#coding:utf-8
import pexpect
import sys
import os
import uuid
import time
import re

# similar psycopg2 to easy operate GaussDB(DWS) RDBMS.
# require install ptyprocess,pexpect

def analyze_result(str):
    strList = str.replace("\n","").split('|')
    strListOut = [line.strip() for line in strList]
    return strListOut

def printL(str):
    print(time.strftime('[%Y-%m-%d %H:%M:%S]'),str)
    
class con_gsql():
    def __init__(self,host,port="25308",db="postgres",usr="omm",passwd="omm"):
        tmp = host.split(':')
        if len(tmpList) == 5:
            self,host,port,db,usr,passwd=tmp[0],tmp[1],tmp[2],tmp[3],tmp[4]
        self.RowCount = 0
        self.Error    = ''
        self.RowDict  = {}
        self.debug    = 0
        self.cmd = '''gsql -h {host} -p {port} -d {db} -U {usr} -W {passwd} -L sql.log '''.format(
            host=host,port=port,db=db,usr=usr,passwd=passwd
        )
        self.Db = db
        self.child = pexpect.spawn(self.cmd)
        self.index = self.child.expect([db+'=.',pexpect.EOF,pexpect.TIMEOUT])
        if self.index != 0:
            printL("connect fail:"+re.sub('(\-W\s+)[^\s]+','{1} ***',self.cmd))
            raise "connect fail"
        self.child.sendline('\\timing on')
        self.index = self.child.expect([db+'=.',pexpect.EOF,pexpect.TIMEOUT],timeout=5)
        if self.index != 0:
            printL("set timing fail:"+re.sub('(\-W\s+)[^\s]+','{1} ***',self.cmd))
            raise "set timing fail"
        printL('connect %s successful'%host)
        
    def submit_sql(self,sql,TMOUT=600):
        printL(sql) if self.debug else None
        self.RowCount,self.Error,self.RowDict = 0,'',{}
        def_result,output = 0,''
        if re.findall('^\s*select\s', sql, flags=re.IGNORECASE) \
           or re.findall('^\s*\\[dsz]',sql) \
           or re.findall('^\s*\\l+?\s*$',sql):
            def_result = 1
            output = os.path.join(os.path.split(
              os.path.abspath(sys.argv[0]))[0],
              'gsql_output_%s.log'%str(uuid.uuid1().hex))
            self.child.sendline('\o %s' % output)
            self.child.expect([self.Db+'=.',pexpect.EOF,pexpect.TIMEOUT],timeout=TMOUT)
        if not re.findall('\;\s*$',sql):
            sql += ';'
        self.child.sendline(sql)
        self.child.expect([self.Db+'=.',pexpect.EOF,pexpect.TIMEOUT],timeout=TMOUT)
        sql_msg = self.child.before.decode('UTF-8')
        if 'ERROR:' in sql_msg:
            self.Error = re.findall('ERROR:\s(.*)',sql_msg)[-1]
            self.Error = 'known.' if not self.Error else None
            return self.Error
        if not def_result:
            return self.Error
        while not os.path.exists(output):
            time.sleep(0.2)
        fmtime = 0
        while os.path.getmtime(output) != fmtime:
            fmtime = os.path.getmtime(output)
            time.sleep(1)
        with open(output,'r') as fh:
            all_rows = fh.readlines()
        try:
            os.remove(output)
        except:
            pass
        tmp = all_rows[2:-2]
        if not tmp:
            return self.Error
        self.RowCount = len(tmp)
        rows_list = [analyze_result(rowValue) for rowValue in tmp]
        
        for ind in range(len(rows_list)):
            self.RowDict[ind] = rows_list[ind]
        return self.Error
        
    def close(self,):
        self.child.sendline('\q')


日常利用上面<gsql.py>当前数据库操作连接,实现SQL交互操作,实现具体业务逻辑。

如希望进行一个schema下所有表执行一次analyze和count操作,

import gsql

con = gsql.con_gsql(host="xxx.xxx.xxx.xxx",port="xxx",db="xxx",usr="omm",passwd="omm")
#also same method
#con = gsql.con_gsql("xxx.xxx.xxx.xxx:xxx:xxx:omm:omm")
sql = """select n.nspname,c.relname 
from pg_class c,pg_namespace n 
where c.relnamespace=n.oid 
and n.nspname='public' 
and c.relkind='r';"""
rc = con.submit_sql(sql)
if rc:
    print("sql error: %s"%rc)
    exit(1)
print('output rows: %s'%con.RowCount)
for i in con.RowDict:
    db,tb = con.RowDict[i][0],con.RowDict[i][1]
    sql = 'analyze %s.%s;'%(db,tb)
    rc = con.submit_sql(sql)
    if rc:
        print("sql error: %s"%rc)
        exit(1)
    sql = 'select count(*) from %s.%s;'%(db,tb)
    rc = con.submit_sql(sql)
    if rc:
        print("sql error: %s"%rc)
        exit(1)
    print('Table %s.%s rowcount: %s'% (db,tb,con.RowDict[i][0])
con.close()

有了该能力之后,相应多进程或多线程并发均可轻松实践;


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。