python操作MongoDB

举报
天元浪子 发表于 2021/07/26 23:13:57 2021/07/26
【摘要】 文章目录 前言MongoDB客户端类连接管理集合管理索引管理增删改查文件操作聚合操作 前言 曾经有一段时间,大约是2014年和2015年,在所有的项目里面,但凡需要数据库的场合,我们无一例外地选择了MongoDB。在此之前,我们更多的是使用Oracle数据库。尽管Oracle已经做得非常棒了,但面对来自全球各地空间天气观测台站和世界各国卫星数据的时候...

前言

曾经有一段时间,大约是2014年和2015年,在所有的项目里面,但凡需要数据库的场合,我们无一例外地选择了MongoDB。在此之前,我们更多的是使用Oracle数据库。尽管Oracle已经做得非常棒了,但面对来自全球各地空间天气观测台站和世界各国卫星数据的时候,仍然显得捉襟见肘,疲于应付。不是Oracle不够优秀,是数据太过繁杂,仅仅依赖二维关系模型已经无法高效地处理数据了。

MongoDB 是由C++语言编写的,是一个基于分布式文件存储的开源数据库系统。MongoDB号称是最像关系型数据库的非关系型数据库,在一些简单的项目里面,我们也用它来存储一些结构化的数据。总结使用感受,我觉得MongoDB 就像一个个性鲜明的优秀青年,有能力,也有脾气,优点和缺点一样突出。要用好MongoDB,首先要弄清楚你是否真的需要NoSQL数据库。

MongoDB的优点:

  1. 无模式(太过随意,有时反倒是缺点)
  2. 支持对象存储
  3. 支持Map/reduce和聚合操作
  4. 扩展方便
  5. 可靠性高

MongoDB的缺点不多,但很要命,这就是被很多人诟病的“内存贪婪”:它会占用操作系统几乎所有的空闲内存,让其他进程活得不舒适,而我们一直对该机制缺乏了解,也没有相应的应对手段。

MongoDB客户端类

pymongo是python访问MongoDB的模块,使用该模块,我们定义了一个操作MongoDB的类PyMongoClient,包含了连接管理、集合管理、索引管理、增删改查、文件操作、聚合操作等方法。

# ---------------------------------------------------------------------------------------------------
# PyMongoClient(host='localhost', port='27017', db='test', user=None, passwd=None, loop=5, rate=8)
# ---------------------------------------------------------------------------------------------------
# PyMongoClient.SetDatabase(db, user, passwd) 设置(修改)当前数据库
# PyMongoClient.CloseConnection() 关闭连接
# PyMongoClient.Logout() 注销用户
# PyMongoClient.GetStatus() 获取MogoDB服务器的状态
# PyMongoClient.IsMongos() 判断是否是MongoS
# PyMongoClient.GetDateTime() 获取MongoDB服务器的当前时间(需要权限支持)
# ---------------------------------------------------------------------------------------------------
# PyMongoClient.GetCollections() 获取当前数据库的全部集合
# PyMongoClient.CreateCollection(collection) 创建集合
# PyMongoClient.DropCollection(collection) 删除集合
# ---------------------------------------------------------------------------------------------------
# PyMongoClient.IndexInformation(collection) 获取集合的索引信息
# PyMongoClient.EnsureIndex(collection, key_or_list) 检查索引是否存在,若不存在,则创建索引
# PyMongoClient.CreateIndex(collection, key_or_list) 创建索引
# PyMongoClient.DropIndex(collection, key=None) 删除索引,key=None时删除全部索引(_id除外)
# ---------------------------------------------------------------------------------------------------
# PyMongoClient.InsertDoc(collection, data) data为字典时,单条插入,data为列表时,批量插入
# PyMongoClient.RemoveDoc(collection, docFilter=None) 删除文档,docFilter=None时删除集合的全部文档
# PyMongoClient.UpdateDoc(collection, docFilter, data, modifier=None) 更新文档,支持使用$inc/$set/$unset等修改器
# PyMongoClient.UpsertDoc(collection, docFilter, data) 如果文档不存在,则插入文档;如果文档存在,则更新文档
# PyMongoClient.GetDoc(collection, docFilter=None, colFilter=None) 返回单个文档
# PyMongoClient.CountDoc(collection, docFilter=None) 返回集合或查询的文档总数
# PyMongoClient.GetCursor(collection, docFilter=None, colFilter=None) 返回多个文档的游标
# PyMongoClient.CountCursor(cursor) 返回游标的文档总数
# PyMongoClient.SortCursor(cursor, col_or_list, director='ASC') 游标排序,默认升序,取值ASC/DESC
# PyMongoClient.SubCursor(cursor, limit, skip=0) 游标截取
# ---------------------------------------------------------------------------------------------------
# PyMongoClient.Aggregate(collection, pipleline) 聚合
# PyMongoClient.RunCommand(collection, cmdObj) 运行数据库命令
# ---------------------------------------------------------------------------------------------------
# PyMongoClient.Str2ObjectId(id_str) id字符串转id对象
# PyMongoClient.ObjectId2Str(id_obj) id对象转id字符串
# PyMongoClient.GetBinaryFromFile(sourceFile) 读文件,返回二进制内容
# PyMongoClient.SaveBinaryToFile(binary, targetFile) 将二进制内容保存为文件
# ---------------------------------------------------------------------------------------------------
# PyMongoClient.PutFile(localFilePath, dbFileName=None) 将文件保存到GridFS并返回FileId
# PyMongoClient.GetFile(fileId, localFilePath) 将文件从GridFS取出,并保存到文件中
# PyMongoClient.GetFilesCursor(docFilter=None, colFilter=None) 取得文件信息游标
# PyMongoClient.DeleteFile(fileId) 删除GridFS中的文件

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

连接管理

class PyMongoClient(): def __init__(self, host='localhost', port=27017, db='test', user=None, passwd=None, loop=5, rate=8): self.loop = loop # 数据库失去连接后,尝试执行数据库操作的次数 self.rate = float(rate) # 数据库失去连接后,尝试执行数据库操作的时间间隔,首次尝试的间隔是rate的倒数,以后间隔时间增倍 try: self.conn = pymongo.MongoClient(host, int(port)) self.SetDatabase(db, user, passwd) except Exception, errMsg: raise Exception(errMsg) # --------------------------------------------------------------------------------------------------- def SetDatabase(self, db, user, passwd): # 设置(修改)当前数据库 self.db = self.conn[db] if user and passwd: if not self.db.authenticate(user, passwd): raise Exception(u'数据库权限验证失败!') def CloseConnection(self): # 关闭数据库连接 self.conn.close() def Logout(self): # 注销用户 self.db.logout() def GetStatus(self): # 获取MogoDB服务器的状态 return self.db.last_status() def IsMongos(self): # 判断是否是MongoS return self.conn.is_mongos def GetDateTime(self): # 获取MongoDB服务器的当前时间(需要权限支持,若无权限,则返回本地时间) for i in range(self.loop): try: return self.db.eval("return new Date();") except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) except Exception, e: return datetime.datetime.now() raise Exception(u'重连数据库失败!')

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

集合管理

class PyMongoClient(): def GetCollections(self): # 获取当前数据库的全部集合 for i in range(self.loop): try: return self.db.collection_names() except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重连数据库失败!') def CreateCollection(self, collection): # 在当前数据库内创建新的集合 for i in range(self.loop): try: self.db.create_collection(collection) return except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) except pymongo.errors.CollectionInvalid: return raise Exception(u'重连数据库失败!') def DropCollection(self, collection): # 删除当前数据库内名为collection的集合 for i in range(self.loop): try: self.db.drop_collection(collection) return except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重连数据库失败!')

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

索引管理

class PyMongoClient(): def IndexInformation(self, collection): # 获取索引信息 for i in range(self.loop): try: return self.db[collection].index_information().keys() except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重连数据库失败!') def EnsureIndex(self, collection, key_or_list): # 检查索引是否存在,若不存在,则创建索引,若存在,返回None # list参数形如:[('start_time', pymongo.ASCENDING), ('end_time', pymongo.ASCENDING)] for i in range(self.loop): try: self.db[collection].ensure_index(key_or_list) return except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重连数据库失败!') def CreateIndex(self, collection, key_or_list): # 创建索引(推荐使用EnsureIndex) for i in range(self.loop): try: self.db[collection].create_index(key_or_list) return except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重连数据库失败!') def DropIndex(self, collection, key=None): # 删除索引,key=None时删除全部索引(_id除外) for i in range(self.loop): try: if key: self.db[collection].drop_index(key) else: self.db[collection].drop_indexes() return except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重连数据库失败!')

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

增删改查

class PyMongoClient(): def InsertDoc(self, collection, data): # data为字典时,单条插入,data为列表时,批量插入。批量上限20W # 单条插入时返回单个id对象,批量插入时,返回id对象列表 for i in range(self.loop): try: return self.db[collection].insert(data, manipulate=True, save=False, check_keys=True) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重连数据库失败!') def RemoveDoc(self, collection, docFilter=None): # 删除文档,docFilter=None时删除集合collection的全部文档 for i in range(self.loop): try: return self.db[collection].remove(docFilter) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重连数据库失败!') def UpdateDoc(self, collection, docFilter, data, modifier=None): # 更新文档,docFilter为更新对象的查找条件,data为更新数据,可以使用$inc/$set/$unset等修改器 for i in range(self.loop): try: if modifier: return self.db[collection].update(docFilter, {modifier:data}, multi=True) else: return self.db[collection].update(docFilter, data, multi=True) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重连数据库失败!') def UpsertDoc(self, collection, docFilter, data): # 如果文档不存在,则插入文档;如果文档存在,则更新文档 for i in range(self.loop): try: return self.db[collection].update(docFilter, data, True) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重连数据库失败!') def GetDoc(self, collection, docFilter=None, colFilter=None, sort=None): # 返回单个文档 for i in range(self.loop): try: if colFilter: return self.db[collection].find_one(docFilter, colFilter, sort=sort) else: return self.db[collection].find_one(docFilter, sort=sort) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重连数据库失败!') def CountDoc(self, collection, docFilter=None): # 返回集合或查询的文档总数 for i in range(self.loop): try: return self.db[collection].find(docFilter).count() except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重连数据库失败!') def GetCursor(self, collection, docFilter=None, colFilter=None): # 返回多个文档的游标 for i in range(self.loop): try: if colFilter: return self.db[collection].find(docFilter, colFilter).batch_size(100) else: return self.db[collection].find(docFilter).batch_size(100) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重连数据库失败!') def CountCursor(self, cursor): # 返回游标的文档总数 for i in range(self.loop): try: return cursor.count() except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重连数据库失败!') def SortCursor(self, cursor, col_or_list, director='ASC'): # 游标排序,默认ASCENDING(升序),取值ASC/DESC # col_or_list,列名或者是由(列名,方向)组成的列表 if isinstance(col_or_list, list): args = [] for col in col_or_list: if col[1] == 'ASC': args.append((col[0],pymongo.ASCENDING)) else: args.append((col[0],pymongo.DESCENDING)) for i in range(self.loop): try: return cursor.sort(args) # cursor.sort([("UserName",pymongo.ASCENDING),("Email",pymongo.DESCENDING)]) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重连数据库失败!') else: if director == 'ASC': director = pymongo.ASCENDING else: director = pymongo.DESCENDING for i in range(self.loop): try: return cursor.sort(col_or_list, director) # director取值:pymongo.ASCENDING(升序)、pymongo.DESCENDING(降序) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重连数据库失败!') def SubCursor(self, cursor, limit, skip=0): # 截取游标 for i in range(self.loop): try: if skip: return cursor.skip(skip).limit(limit) else: return cursor.limit(limit) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重连数据库失败!')

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122

文件操作

class PyMongoClient(): def GetBinaryFromFile(self, sourceFile): # 读文件,返回二进制内容 # 适用于在文档中直接保存小于16M的小文件,若文件较大时,应使用GridFS try: fp = open(sourceFile,'rb') return bson.Binary(fp.read()) except: return False finally: fp.close() def SaveBinaryToFile(self, binary, targetFile): # 将二进制内容保存为文件 try: fp = open(targetFile,'wb') fp.write(binary) return True except: return False finally: fp.close() def Str2ObjectId(self, id_str): return bson.ObjectId(id_str) def ObjectId2Str(self, id_obj): return str(id_obj) def PutFile(self, localFilePath, dbFileName=None): ''' 向GridFS中上传文件,并返回文件ID @localFilePath  本地文件路径 @dbFileName 保存到GridFS中的文件名,如果为None则使用本地路径中的文件名 ''' fs = gridfs.GridFS(self.db) fp = open(localFilePath, 'rb') if dbFileName == None: dbFileName = os.path.split(localFilePath)[1] id = fs.put(fp,filename=dbFileName, chunkSize=4*1024*1024) fp.close() return id def GetFile(self, fileId, localFilePath=None): ''' 根据文件ID从GridFS中下载文件 @fileId 文件ID @localFilePath  要保存的本地文件路径 ''' if isinstance(fileId, str): fileId = self.Str2ObjectId(fileId) fs = gridfs.GridFS(self.db) if localFilePath: fp = open(localFilePath, 'wb') try: fp.write(fs.get(fileId).read()) return True except: return False finally: fp.close() else: try: return fs.get(fileId).read() except: return False def GetFilesCursor(self, docFilter=None, colFilter=None): ''' 取得GridFS中文件的游标 可以进行过滤或检索的字段名有 _id 文件ID filename 文件名 length 文件大小 md5 md5校验码 chunkSize   文件块大小 uploadDate  更新时间 ''' return self.GetCursor('fs.files', docFilter=docFilter, colFilter=colFilter) def DeleteFile(self, fileId): ''' 根据文件ID从GridFS中删除文件 @fileId 文件ID ''' fs = gridfs.GridFS(self.db) fs.delete(fileId)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91

聚合操作

class PyMongoClient(): def Aggregate(self, collection, pipleline): # 聚合 # pipleline是一个由筛选、投射、分组、排序、限制、跳过等一系列构件组成管道队列 for i in range(self.loop): try: return self.db[collection].aggregate(pipleline) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重连数据库失败!') def RunCommand(self, collection, cmdObj): # 运行数据库命令 # if cmdObj is a string, turns it into {cmdObj:1} for i in range(self.loop): try: return self.db[collection].runCommand(cmdObj) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重连数据库失败!')

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

文章来源: xufive.blog.csdn.net,作者:天元浪子,版权归原作者所有,如需转载,请联系作者。

原文链接:xufive.blog.csdn.net/article/details/84104174

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。