MongoDB保存数据的优化方法

举报
zhangslob 发表于 2018/12/20 23:23:00 2018/12/20
【摘要】 这两天频繁遇到MongoDB插入数据的问题,这里记录下。问题描述:我有多个线程在抓数据,每天数据里有含有多个文档(Document),使用Pymongo的插入方法,逐条插入。形如下def save_to_mongo(data): for i in data: db.insert_one(i)在接收到数据后直接调用该方法即可。但是运维那边反馈,数据库压力比较大,让我修改。仔...

这两天频繁遇到MongoDB插入数据的问题,这里记录下。

问题描述:我有多个线程在抓数据,每天数据里有含有多个文档(Document),使用Pymongo的插入方法,逐条插入。形如下

def save_to_mongo(data):
   for i in data:
       db.insert_one(i)

在接收到数据后直接调用该方法即可。但是运维那边反馈,数据库压力比较大,让我修改。仔细想了想,可以使用insert_many方法。

插入可迭代的文档

>>> db.test.count_documents({})
0
>>> result = db.test.insert_many([{'x': i} for i in range(2)])
>>> result.inserted_ids
[ObjectId('54f113fffba522406c9cc20e'), ObjectId('54f113fffba522406c9cc20f')]
>>> db.test.count_documents({})
2

有几个参数需要了解

  1. documents: 可迭代文档

  2. ordered :(可选)如果“True”(默认)文档将按顺序插入服务器,按提供的顺序。 如果发生错误,则中止所有剩余插入。 如果为“False”,文档将以任意顺序插入服务器,可能并行,并且将尝试所有文档插入。

  3. bypass_document_validation: (可选)如果为“True”,则允许写入选择退出文档级别验证。 默认为“False”。

  4. session (optional): a ClientSession.

好了最简单的方法就是把所有需要保存的数据暂时存放在列表中,最后再插入。建议加上ordered=False参数,可以防止数据保存异常。

def save_mongo():
    while True:
        while len(tmp) > 100:
            try:
                c = db[collection_name]
                c.insert_many(tmp, ordered=False)
                tmp.clear()
            except pymongo.errors.BulkWriteError:
                tmp.clear()
            except Exception as e:
                logging.error('mongodb_save insert_many: {}, {}'.format(e, tmp))
        time.sleep(3)

tmp = []
for i in data:
    tmp.append(i)


t_save = threading.Thread(target=save_mongo)
t_save.setDaemon(True)
t_save.start()


新开一个线程去不停的检查,如果列表数据大于100,则批量插入,或者等待3秒。

这里捕获pymongo.errors.BulkWriteError异常,如果在insert_many时发生错误,会产生该异常。在我这里通常是插入重复数据引起的。


还有一种情况,是在多线程情况下。多个线程共享一个列表对象,肯定是需要加锁的,如果使用Lock来管理数据插入问题,需要去给列表加锁。之前还没用过锁,去看看教程。

import threading

class SharedCounter:
   '''
   A counter object that can be shared by multiple threads.
   '''
   def __init__(self, initial_value = 0):
       self._value = initial_value
       self._value_lock = threading.Lock()

   def incr(self,delta=1):
       '''
       Increment the counter with locking
       '''
       self._value_lock.acquire()
       self._value += delta
       self._value_lock.release()

   def decr(self,delta=1):
       '''
       Decrement the counter with locking
       '''
       self._value_lock.acquire()
       self._value -= delta
       self._value_lock.release()

觉得太麻烦,可以将保存数据等方法封装成一个类对象,实例化一个列表,在每个线程中实例化一个类对象即可,这样多个线程中是不会共享列表数据的。

当然也可以使用另外一种数据结构:Queue队列。Queue是线程安全的,自带锁,使用的时候,不用对队列加锁操作。可以将数据暂时存入queue,然后用列表取出来,数量大于100则插入,并清空列表。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。