Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

153 rindas
5.1KB

  1. import redis
  2. import os
  3. from config import conf
  4. import uuid
  5. import time
  6. import threading
  7. # 定义全局 redis_helper
  8. redis_helper = None
  9. class RedisHelper:
  10. def __init__(self, host='localhost', port=6379, password=None ,db=0):
  11. # 初始化 Redis 连接
  12. self.client = redis.Redis(host=host, port=port,db=db,password=password)
  13. self.lock_renewal_thread = None
  14. def set_hash(self, hash_key, data, timeout=None):
  15. """添加或更新哈希,并设置有效期"""
  16. self.client.hset(hash_key, mapping=data)
  17. if timeout:
  18. # 设置有效期(单位:秒)
  19. self.client.expire(hash_key, timeout)
  20. def get_hash(self, hash_key):
  21. """获取整个哈希表数据"""
  22. result = self.client.hgetall(hash_key)
  23. # 将字节数据解码成字符串格式返回
  24. return {k.decode('utf-8'): v.decode('utf-8') for k, v in result.items()}
  25. def get_hash_field(self, hash_key, field):
  26. """获取哈希表中的单个字段值"""
  27. result = self.client.hget(hash_key, field)
  28. return result.decode('utf-8') if result else None
  29. def delete_hash(self, hash_key):
  30. """删除整个哈希表"""
  31. self.client.delete(hash_key)
  32. def delete_hash_field(self, hash_key, field):
  33. """删除哈希表中的某个字段"""
  34. self.client.hdel(hash_key, field)
  35. def update_hash_field(self, hash_key, field, value):
  36. """更新哈希表中的某个字段"""
  37. self.client.hset(hash_key, field, value)
  38. def acquire_lock(self, lock_name, timeout=60):
  39. """
  40. 尝试获取分布式锁,成功返回 True,失败返回 False
  41. :param lock_name: 锁的名称
  42. :param timeout: 锁的超时时间(秒)
  43. :return: bool
  44. """
  45. #print('获取锁')
  46. identifier = str(time.time()) # 使用时间戳作为唯一标识
  47. #if self.client.set(lock_name, identifier, nx=True, ex=timeout):
  48. if self.client.setnx(lock_name, identifier):
  49. self.client.expire(lock_name, timeout)
  50. self.lock_renewal_thread = threading.Thread(target=self.renew_lock, args=(lock_name, identifier, timeout))
  51. self.lock_renewal_thread.start()
  52. return True
  53. return False
  54. def renew_lock(self, lock_name, identifier, timeout):
  55. """
  56. 锁的自动续期
  57. :param lock_name: 锁的名称
  58. :param identifier: 锁的唯一标识
  59. :param timeout: 锁的超时时间(秒)
  60. """
  61. while True:
  62. time.sleep(timeout / 2)
  63. if self.client.get(lock_name) == identifier.encode():
  64. self.client.expire(lock_name, timeout)
  65. else:
  66. break
  67. def release_lock(self, lock_name, identifier):
  68. """
  69. 释放分布式锁
  70. :param lock_name: 锁的名称
  71. :param identifier: 锁的唯一标识
  72. """
  73. if self.client.get(lock_name) == identifier.encode():
  74. self.client.delete(lock_name)
  75. if self.lock_renewal_thread:
  76. self.lock_renewal_thread.join()
  77. def enqueue(self, queue_name, item):
  78. """
  79. 将元素添加到队列的尾部(右侧)
  80. :param queue_name: 队列名称
  81. :param item: 要添加到队列的元素
  82. """
  83. self.client.rpush(queue_name, item)
  84. print(f"Enqueued: {item} to queue {queue_name}")
  85. def dequeue(self, queue_name):
  86. """
  87. 从队列的头部(左侧)移除并返回元素
  88. :param queue_name: 队列名称
  89. :return: 移除的元素,如果队列为空则返回 None
  90. """
  91. item = self.client.lpop(queue_name)
  92. if item:
  93. print(f"Dequeued: {item.decode('utf-8')} from queue {queue_name}")
  94. return item.decode('utf-8')
  95. print(f"Queue {queue_name} is empty")
  96. return None
  97. def get_queue_length(self, queue_name):
  98. """
  99. 获取队列的长度
  100. :param queue_name: 队列名称
  101. :return: 队列的长度
  102. """
  103. length = self.client.llen(queue_name)
  104. print(f"Queue {queue_name} length: {length}")
  105. return length
  106. def peek_queue(self, queue_name):
  107. """
  108. 查看队列的头部元素,但不移除
  109. :param queue_name: 队列名称
  110. :return: 队列的头部元素,如果队列为空则返回 None
  111. """
  112. item = self.client.lrange(queue_name, 0, 0)
  113. if item:
  114. print(f"Peeked: {item[0].decode('utf-8')} from queue {queue_name}")
  115. return item[0].decode('utf-8')
  116. print(f"Queue {queue_name} is empty")
  117. return None
  118. def clear_queue(self, queue_name):
  119. """
  120. 清空队列
  121. :param queue_name: 队列名称
  122. """
  123. self.client.delete(queue_name)
  124. print(f"Cleared queue {queue_name}")
  125. def start():
  126. global redis_helper
  127. host=conf().get("redis_host")
  128. port=conf().get("redis_port")
  129. password=conf().get("redis_password")
  130. db=conf().get("redis_db")
  131. redis_helper = RedisHelper(host=host,port=port,password=password,db=db)