You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

153 lines
5.1KB

  1. import redis
  2. import os
  3. from config import conf
  4. import uuid
  5. import time
  6. import threading
  7. # 定义全局 redis_helper
  8. redis_helper = None
  9. class RedisHelper:
  10. def __init__(self, host='localhost', port=6379, password=None ,db=0):
  11. # 初始化 Redis 连接
  12. self.client = redis.Redis(host=host, port=port,db=db,password=password)
  13. self.lock_renewal_thread = None
  14. def set_hash(self, hash_key, data, timeout=None):
  15. """添加或更新哈希,并设置有效期"""
  16. self.client.hset(hash_key, mapping=data)
  17. if timeout:
  18. # 设置有效期(单位:秒)
  19. self.client.expire(hash_key, timeout)
  20. def get_hash(self, hash_key):
  21. """获取整个哈希表数据"""
  22. result = self.client.hgetall(hash_key)
  23. # 将字节数据解码成字符串格式返回
  24. return {k.decode('utf-8'): v.decode('utf-8') for k, v in result.items()}
  25. def get_hash_field(self, hash_key, field):
  26. """获取哈希表中的单个字段值"""
  27. result = self.client.hget(hash_key, field)
  28. return result.decode('utf-8') if result else None
  29. def delete_hash(self, hash_key):
  30. """删除整个哈希表"""
  31. self.client.delete(hash_key)
  32. def delete_hash_field(self, hash_key, field):
  33. """删除哈希表中的某个字段"""
  34. self.client.hdel(hash_key, field)
  35. def update_hash_field(self, hash_key, field, value):
  36. """更新哈希表中的某个字段"""
  37. self.client.hset(hash_key, field, value)
  38. def acquire_lock(self, lock_name, timeout=60):
  39. """
  40. 尝试获取分布式锁,成功返回 True,失败返回 False
  41. :param lock_name: 锁的名称
  42. :param timeout: 锁的超时时间(秒)
  43. :return: bool
  44. """
  45. #print('获取锁')
  46. identifier = str(time.time()) # 使用时间戳作为唯一标识
  47. #if self.client.set(lock_name, identifier, nx=True, ex=timeout):
  48. if self.client.setnx(lock_name, identifier):
  49. self.client.expire(lock_name, timeout)
  50. self.lock_renewal_thread = threading.Thread(target=self.renew_lock, args=(lock_name, identifier, timeout))
  51. self.lock_renewal_thread.start()
  52. return True
  53. return False
  54. def renew_lock(self, lock_name, identifier, timeout):
  55. """
  56. 锁的自动续期
  57. :param lock_name: 锁的名称
  58. :param identifier: 锁的唯一标识
  59. :param timeout: 锁的超时时间(秒)
  60. """
  61. while True:
  62. time.sleep(timeout / 2)
  63. if self.client.get(lock_name) == identifier.encode():
  64. self.client.expire(lock_name, timeout)
  65. else:
  66. break
  67. def release_lock(self, lock_name, identifier):
  68. """
  69. 释放分布式锁
  70. :param lock_name: 锁的名称
  71. :param identifier: 锁的唯一标识
  72. """
  73. if self.client.get(lock_name) == identifier.encode():
  74. self.client.delete(lock_name)
  75. if self.lock_renewal_thread:
  76. self.lock_renewal_thread.join()
  77. def enqueue(self, queue_name, item):
  78. """
  79. 将元素添加到队列的尾部(右侧)
  80. :param queue_name: 队列名称
  81. :param item: 要添加到队列的元素
  82. """
  83. self.client.rpush(queue_name, item)
  84. print(f"Enqueued: {item} to queue {queue_name}")
  85. def dequeue(self, queue_name):
  86. """
  87. 从队列的头部(左侧)移除并返回元素
  88. :param queue_name: 队列名称
  89. :return: 移除的元素,如果队列为空则返回 None
  90. """
  91. item = self.client.lpop(queue_name)
  92. if item:
  93. print(f"Dequeued: {item.decode('utf-8')} from queue {queue_name}")
  94. return item.decode('utf-8')
  95. print(f"Queue {queue_name} is empty")
  96. return None
  97. def get_queue_length(self, queue_name):
  98. """
  99. 获取队列的长度
  100. :param queue_name: 队列名称
  101. :return: 队列的长度
  102. """
  103. length = self.client.llen(queue_name)
  104. print(f"Queue {queue_name} length: {length}")
  105. return length
  106. def peek_queue(self, queue_name):
  107. """
  108. 查看队列的头部元素,但不移除
  109. :param queue_name: 队列名称
  110. :return: 队列的头部元素,如果队列为空则返回 None
  111. """
  112. item = self.client.lrange(queue_name, 0, 0)
  113. if item:
  114. print(f"Peeked: {item[0].decode('utf-8')} from queue {queue_name}")
  115. return item[0].decode('utf-8')
  116. print(f"Queue {queue_name} is empty")
  117. return None
  118. def clear_queue(self, queue_name):
  119. """
  120. 清空队列
  121. :param queue_name: 队列名称
  122. """
  123. self.client.delete(queue_name)
  124. print(f"Cleared queue {queue_name}")
  125. def start():
  126. global redis_helper
  127. host=conf().get("redis_host")
  128. port=conf().get("redis_port")
  129. password=conf().get("redis_password")
  130. db=conf().get("redis_db")
  131. redis_helper = RedisHelper(host=host,port=port,password=password,db=db)