【数仓搬迁】gsql结合python交互应用的实用技巧
【摘要】 在常规python程序使用gsql交互数据时,每次都需要重新连接数据库,然后再执行查询SQL,特别是Select,查询结果还想继续和数据库交互时,利用以下方式实现。
在常规python程序使用gsql交互数据时,每次都需要重新连接数据库,然后再执行查询SQL,特别是Select,查询结果还想继续和数据库交互时,利用以下方式实现。
即使用expect实现该能力,代码参见如下,<gsql.py>
#!/usr/bin/env python #coding:utf-8 import pexpect import sys import os import uuid import time import re # similar psycopg2 to easy operate GaussDB(DWS) RDBMS. # require install ptyprocess,pexpect def analyze_result(str): strList = str.replace("\n","").split('|') strListOut = [line.strip() for line in strList] return strListOut def printL(str): print(time.strftime('[%Y-%m-%d %H:%M:%S]'),str) class con_gsql(): def __init__(self,host,port="25308",db="postgres",usr="omm",passwd="omm"): tmp = host.split(':') if len(tmpList) == 5: self,host,port,db,usr,passwd=tmp[0],tmp[1],tmp[2],tmp[3],tmp[4] self.RowCount = 0 self.Error = '' self.RowDict = {} self.debug = 0 self.cmd = '''gsql -h {host} -p {port} -d {db} -U {usr} -W {passwd} -L sql.log '''.format( host=host,port=port,db=db,usr=usr,passwd=passwd ) self.Db = db self.child = pexpect.spawn(self.cmd) self.index = self.child.expect([db+'=.',pexpect.EOF,pexpect.TIMEOUT]) if self.index != 0: printL("connect fail:"+re.sub('(\-W\s+)[^\s]+','{1} ***',self.cmd)) raise "connect fail" self.child.sendline('\\timing on') self.index = self.child.expect([db+'=.',pexpect.EOF,pexpect.TIMEOUT],timeout=5) if self.index != 0: printL("set timing fail:"+re.sub('(\-W\s+)[^\s]+','{1} ***',self.cmd)) raise "set timing fail" printL('connect %s successful'%host) def submit_sql(self,sql,TMOUT=600): printL(sql) if self.debug else None self.RowCount,self.Error,self.RowDict = 0,'',{} def_result,output = 0,'' if re.findall('^\s*select\s', sql, flags=re.IGNORECASE) \ or re.findall('^\s*\\[dsz]',sql) \ or re.findall('^\s*\\l+?\s*$',sql): def_result = 1 output = os.path.join(os.path.split( os.path.abspath(sys.argv[0]))[0], 'gsql_output_%s.log'%str(uuid.uuid1().hex)) self.child.sendline('\o %s' % output) self.child.expect([self.Db+'=.',pexpect.EOF,pexpect.TIMEOUT],timeout=TMOUT) if not re.findall('\;\s*$',sql): sql += ';' self.child.sendline(sql) self.child.expect([self.Db+'=.',pexpect.EOF,pexpect.TIMEOUT],timeout=TMOUT) sql_msg = self.child.before.decode('UTF-8') if 'ERROR:' in sql_msg: self.Error = re.findall('ERROR:\s(.*)',sql_msg)[-1] self.Error = 'known.' if not self.Error else None return self.Error if not def_result: return self.Error while not os.path.exists(output): time.sleep(0.2) fmtime = 0 while os.path.getmtime(output) != fmtime: fmtime = os.path.getmtime(output) time.sleep(1) with open(output,'r') as fh: all_rows = fh.readlines() try: os.remove(output) except: pass tmp = all_rows[2:-2] if not tmp: return self.Error self.RowCount = len(tmp) rows_list = [analyze_result(rowValue) for rowValue in tmp] for ind in range(len(rows_list)): self.RowDict[ind] = rows_list[ind] return self.Error def close(self,): self.child.sendline('\q')
日常利用上面<gsql.py>当前数据库操作连接,实现SQL交互操作,实现具体业务逻辑。
如希望进行一个schema下所有表执行一次analyze和count操作,
import gsql con = gsql.con_gsql(host="xxx.xxx.xxx.xxx",port="xxx",db="xxx",usr="omm",passwd="omm") #also same method #con = gsql.con_gsql("xxx.xxx.xxx.xxx:xxx:xxx:omm:omm") sql = """select n.nspname,c.relname from pg_class c,pg_namespace n where c.relnamespace=n.oid and n.nspname='public' and c.relkind='r';""" rc = con.submit_sql(sql) if rc: print("sql error: %s"%rc) exit(1) print('output rows: %s'%con.RowCount) for i in con.RowDict: db,tb = con.RowDict[i][0],con.RowDict[i][1] sql = 'analyze %s.%s;'%(db,tb) rc = con.submit_sql(sql) if rc: print("sql error: %s"%rc) exit(1) sql = 'select count(*) from %s.%s;'%(db,tb) rc = con.submit_sql(sql) if rc: print("sql error: %s"%rc) exit(1) print('Table %s.%s rowcount: %s'% (db,tb,con.RowDict[i][0]) con.close()
有了该能力之后,相应多进程或多线程并发均可轻松实践;
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)