【数仓搬迁】gsql结合python交互应用的实用技巧
【摘要】 在常规python程序使用gsql交互数据时,每次都需要重新连接数据库,然后再执行查询SQL,特别是Select,查询结果还想继续和数据库交互时,利用以下方式实现。
在常规python程序使用gsql交互数据时,每次都需要重新连接数据库,然后再执行查询SQL,特别是Select,查询结果还想继续和数据库交互时,利用以下方式实现。
即使用expect实现该能力,代码参见如下,<gsql.py>
#!/usr/bin/env python
#coding:utf-8
import pexpect
import sys
import os
import uuid
import time
import re
# similar psycopg2 to easy operate GaussDB(DWS) RDBMS.
# require install ptyprocess,pexpect
def analyze_result(str):
strList = str.replace("\n","").split('|')
strListOut = [line.strip() for line in strList]
return strListOut
def printL(str):
print(time.strftime('[%Y-%m-%d %H:%M:%S]'),str)
class con_gsql():
def __init__(self,host,port="25308",db="postgres",usr="omm",passwd="omm"):
tmp = host.split(':')
if len(tmpList) == 5:
self,host,port,db,usr,passwd=tmp[0],tmp[1],tmp[2],tmp[3],tmp[4]
self.RowCount = 0
self.Error = ''
self.RowDict = {}
self.debug = 0
self.cmd = '''gsql -h {host} -p {port} -d {db} -U {usr} -W {passwd} -L sql.log '''.format(
host=host,port=port,db=db,usr=usr,passwd=passwd
)
self.Db = db
self.child = pexpect.spawn(self.cmd)
self.index = self.child.expect([db+'=.',pexpect.EOF,pexpect.TIMEOUT])
if self.index != 0:
printL("connect fail:"+re.sub('(\-W\s+)[^\s]+','{1} ***',self.cmd))
raise "connect fail"
self.child.sendline('\\timing on')
self.index = self.child.expect([db+'=.',pexpect.EOF,pexpect.TIMEOUT],timeout=5)
if self.index != 0:
printL("set timing fail:"+re.sub('(\-W\s+)[^\s]+','{1} ***',self.cmd))
raise "set timing fail"
printL('connect %s successful'%host)
def submit_sql(self,sql,TMOUT=600):
printL(sql) if self.debug else None
self.RowCount,self.Error,self.RowDict = 0,'',{}
def_result,output = 0,''
if re.findall('^\s*select\s', sql, flags=re.IGNORECASE) \
or re.findall('^\s*\\[dsz]',sql) \
or re.findall('^\s*\\l+?\s*$',sql):
def_result = 1
output = os.path.join(os.path.split(
os.path.abspath(sys.argv[0]))[0],
'gsql_output_%s.log'%str(uuid.uuid1().hex))
self.child.sendline('\o %s' % output)
self.child.expect([self.Db+'=.',pexpect.EOF,pexpect.TIMEOUT],timeout=TMOUT)
if not re.findall('\;\s*$',sql):
sql += ';'
self.child.sendline(sql)
self.child.expect([self.Db+'=.',pexpect.EOF,pexpect.TIMEOUT],timeout=TMOUT)
sql_msg = self.child.before.decode('UTF-8')
if 'ERROR:' in sql_msg:
self.Error = re.findall('ERROR:\s(.*)',sql_msg)[-1]
self.Error = 'known.' if not self.Error else None
return self.Error
if not def_result:
return self.Error
while not os.path.exists(output):
time.sleep(0.2)
fmtime = 0
while os.path.getmtime(output) != fmtime:
fmtime = os.path.getmtime(output)
time.sleep(1)
with open(output,'r') as fh:
all_rows = fh.readlines()
try:
os.remove(output)
except:
pass
tmp = all_rows[2:-2]
if not tmp:
return self.Error
self.RowCount = len(tmp)
rows_list = [analyze_result(rowValue) for rowValue in tmp]
for ind in range(len(rows_list)):
self.RowDict[ind] = rows_list[ind]
return self.Error
def close(self,):
self.child.sendline('\q')
日常利用上面<gsql.py>当前数据库操作连接,实现SQL交互操作,实现具体业务逻辑。
如希望进行一个schema下所有表执行一次analyze和count操作,
import gsql
con = gsql.con_gsql(host="xxx.xxx.xxx.xxx",port="xxx",db="xxx",usr="omm",passwd="omm")
#also same method
#con = gsql.con_gsql("xxx.xxx.xxx.xxx:xxx:xxx:omm:omm")
sql = """select n.nspname,c.relname
from pg_class c,pg_namespace n
where c.relnamespace=n.oid
and n.nspname='public'
and c.relkind='r';"""
rc = con.submit_sql(sql)
if rc:
print("sql error: %s"%rc)
exit(1)
print('output rows: %s'%con.RowCount)
for i in con.RowDict:
db,tb = con.RowDict[i][0],con.RowDict[i][1]
sql = 'analyze %s.%s;'%(db,tb)
rc = con.submit_sql(sql)
if rc:
print("sql error: %s"%rc)
exit(1)
sql = 'select count(*) from %s.%s;'%(db,tb)
rc = con.submit_sql(sql)
if rc:
print("sql error: %s"%rc)
exit(1)
print('Table %s.%s rowcount: %s'% (db,tb,con.RowDict[i][0])
con.close()
有了该能力之后,相应多进程或多线程并发均可轻松实践;
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
yd_2384685842023/08/13 12:41:161楼编辑删除举报