计数滚动时间范围内的唯一IDs是指在动态时间窗口内统计不重复的标识符数量。这是一种常见的数据分析需求,特别是在处理实时数据流或时间序列数据时。
from collections import defaultdict
import time
class RollingUniqueCounter:
def __init__(self, window_seconds):
self.window_seconds = window_seconds
self.id_timestamps = defaultdict(list)
def add_id(self, id, timestamp=None):
timestamp = timestamp or time.time()
self.id_timestamps[id].append(timestamp)
self._cleanup(timestamp)
def count_unique(self, current_time=None):
current_time = current_time or time.time()
self._cleanup(current_time)
return len(self.id_timestamps)
def _cleanup(self, current_time):
cutoff = current_time - self.window_seconds
to_delete = []
for id, timestamps in self.id_timestamps.items():
# 保留在时间窗口内的记录
new_timestamps = [t for t in timestamps if t >= cutoff]
if new_timestamps:
self.id_timestamps[id] = new_timestamps
else:
to_delete.append(id)
for id in to_delete:
del self.id_timestamps[id]
import redis
import time
class RedisRollingUniqueCounter:
def __init__(self, redis_client, key_prefix, window_seconds):
self.redis = redis_client
self.key_prefix = key_prefix
self.window_seconds = window_seconds
def add_id(self, id):
current_time = time.time()
key = f"{self.key_prefix}:{id}"
self.redis.zadd(key, {str(current_time): current_time})
self.redis.expire(key, self.window_seconds)
def count_unique(self):
current_time = time.time()
cutoff = current_time - self.window_seconds
# 使用SCAN模式遍历所有键
unique_count = 0
cursor = '0'
while True:
cursor, keys = self.redis.scan(cursor, match=f"{self.key_prefix}:*")
for key in keys:
# 检查该键是否有在时间窗口内的记录
count = self.redis.zcount(key, cutoff, current_time)
if count > 0:
unique_count += 1
if cursor == '0':
break
return unique_count
# 假设使用InfluxDB
from influxdb import InfluxDBClient
class TSDBRollingUniqueCounter:
def __init__(self, db_client, measurement, window_seconds):
self.db = db_client
self.measurement = measurement
self.window_seconds = window_seconds
def add_id(self, id, timestamp=None):
timestamp = timestamp or int(time.time() * 1e9) # 纳秒时间戳
data = [{
"measurement": self.measurement,
"tags": {"id": id},
"time": timestamp,
"fields": {"value": 1}
}]
self.db.write_points(data)
def count_unique(self):
query = f"""
SELECT COUNT(DISTINCT("id")) FROM "{self.measurement}"
WHERE time > now() - {self.window_seconds}s
"""
result = self.db.query(query)
return list(result.get_points())[0]['count']
原因:存储了过多历史数据,没有及时清理过期记录
解决方案:
from hyperloglog import HyperLogLog
class HLLRollingCounter:
def __init__(self, window_seconds, precision=10):
self.window_seconds = window_seconds
self.hll = HyperLogLog(precision)
self.last_cleanup = time.time()
def add_id(self, id):
current_time = time.time()
self.hll.add(id)
# 定期重置计数器
if current_time - self.last_cleanup >= self.window_seconds:
self.hll = HyperLogLog(self.hll.p)
self.last_cleanup = current_time
def count_unique(self):
return len(self.hll)
原因:分布式环境下数据同步延迟或网络分区
解决方案:
原因:多个线程/进程同时更新计数器
解决方案:
import threading
from queue import Queue
class ThreadSafeRollingCounter:
def __init__(self, window_seconds):
self.window_seconds = window_seconds
self.id_timestamps = defaultdict(list)
self.lock = threading.Lock()
self.queue = Queue()
self.worker_thread = threading.Thread(target=self._process_queue)
self.worker_thread.daemon = True
self.worker_thread.start()
def add_id(self, id):
self.queue.put((id, time.time()))
def _process_queue(self):
while True:
id, timestamp = self.queue.get()
with self.lock:
self.id_timestamps[id].append(timestamp)
self._cleanup(timestamp)
def count_unique(self):
with self.lock:
self._cleanup(time.time())
return len(self.id_timestamps)
def _cleanup(self, current_time):
# 同上
pass
滚动时间窗口的唯一ID计数是一个复杂但非常有用的技术,根据具体场景选择适合的实现方式可以显著提高系统性能和准确性。