如何通过 Python 操作 Redis

举报
宇宙之一粟 发表于 2022/06/30 23:16:42 2022/06/30
【摘要】 安装redis-py模块如果 pip 下载太慢,推荐使用清华源安装命令:pip install redis -i 创建连接import redisr = redis.redis( host:"localhost", port:6379, password="", db=0) 创建连接池import redispool = redis.ConnectionPool(redis.red...

安装redis-py模块

如果 pip 下载太慢,推荐使用清华源

安装命令:

pip install redis -i

创建连接

import redis

r = redis.redis(
		host:"localhost",
		port:6379,
		password="",
		db=0
)

创建连接池

import redis
pool = redis.ConnectionPool(
redis.redis(
		host:"localhost",
		port:6379,
		password="",
		db=0,
		max_connections=20)

从连接池中获取的连接,不必关闭,垃圾回收的时候,连接会自动被归还到连接池

r = redis.Redis(
  	connection_pool=pool,
)
...

del r

创建

# redis_db.py
import redis

pool = redis.ConnectionPool(
		host="localhost",
		port=6379,
		password="",
		db=0,
		max_connection=20)
# demo_redis.py
from redis_db import pool
import redis

con = redis.Redis(
		connection_pool=pool
)

del con

操作指令

操作字符串

con.set("country", "England")
con.set("city", "London")
city = con.get("city").decode("utf-8")
print(city)
r.delete("country", "city")
r.mset({"country":"Japan", "city":"tokin"})
result = r.mget("counry", "city")
for one in result:
  	print(one.decode("utf-8"))

try-catch版:

import redistry:
try:
		pool = redis.ConnectionPool(
			host="localhost",
			port=6379,
			password="",
			db=0,
			max_connection=20
    )
except Exception as e:
  	print(e)
# demo2.py
from redis_db import pool
import redis

con = redis.Redis(
		connection_pool=pool
)

try:
  	con.delete("country", "city")
    con.mset("country": "德国", "city": "柏林")
    result = con.mget("country", "city")
    for one in result:
      	print(one.decode("utf-8"))
except Exception as e:
  	print(e)
finally 
		del con

操作列表

# demo3.py
from redis_db import pool
import redis

con = redis.Redis(connection_pool=pool)

try:
  	con.rpush("dname", "董事会", "秘书处", "财务部", "技术部")
    con.lpop("dname")
    result = con.lrange("dname", 0, -1)
    for one in result:
      	print(one.decode("utf-8"))
except Exception as e:
  	print(e)
finally:
  del con

操作集合和有序集合

# demo4.py
con.sadd("empolyee", 9001, 9002, 9003)
con.srem("employee", 9001)
result = con.smembers("employee")

con.zadd("keyword", {"马云": 0, ""})
con.zadd("keyword", {"马云": 0, "张朝阳":0, "丁磊": 0})
con.zincrby("keyword", "10", "马云")
result = con.revrange("keyword", 0, -1)

操作哈希表

# demo5.py
from redis_db import pool
import redis

con = redis.Redis(
		connection_pool=pool
)

try:
  	con.hmset("9527", {"name":"Scott", "sex":"male"})
		con.hset("9527", "city", "纽约")
		con.hdel("9527", "age")
    result = con.hexists("9527", "name")
    
    print(result)
    
    result = con.hgetall("9527")
    for one in result:
      	print(one.decode("utf-8"), result[one].decode("utf-8"))

except Exception as e:
  	print(e)
finally:
    del con

redis-py的事务函数

redis-py模块用pipline(管道)的方式向Redis服务器传递批处理处理命令和执行事务

pipline = con.pipline()  # 创建pipline对象
pipline.watch(...)  # 监视记录
pipline.multi()		# 开启事务
pipline.excute()	# pipline来增删改查,对事务进行提交
pipline.reset()  # 一定要关闭pipline,不然连接池没法回收
# demo6.py
from redis_db import pool
import redis

con = redis.Redis(
		conection_pool=pool
)

try:
  	pipline = con.pipline()
    pipline.watch("9527")
    pipline.multi()
    pipline.hset("9527", "name", "Jackylove")
    pipline.hset("9527", "age", 23)
    pipline.execute
except Exceptyion as e:
		print(e)
finally:
  if "pipline" in dir():
    	pipline.reset()
  del con
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。