InfluxDB是一个开源的没有外部依赖的时间序列数据库。适用于记录度量,事件及执行分析。
特点:
轻量,部署方便,基于go 无依赖
概念:
这里使用mysql与之对比
mysql : database table row
influx: database retention measurement series point
database : 数据库名
measurement: 数据表
retention: 存储策略,用于设置数据保存的时间
series: 数据集合,tag相同的数据的集合,可以理解为折线图上的一条线,(当然折线图上可以有很多条线)
point:单条数据,由3部分组成 time,tag[属性],field[值],可以理解为折线图上的一个点。
CURD:
增:
无需建表,直接插入指定表,插入语句
insert <measurement>[,<tag-key>=<tag-value>...] <field-key>=<field-value>[,<field2-key>=<field2-value>...] [timestamp]
删:
支持删库,删表,使用存储策略删除数据,删除数据使用数据标记删除
改:
不支持修改数据
查:
select * from mytable
支持的函数,列举:
COUNT 返回唯一字段值的列表
DISTINCT 返回唯一字段值的列表
MEAN 返回字段值的算术平均值
MEDIAN 从排序的字段值列表中返回中间值
MODE 返回字段值列表中最常用的值
SPREAD 返回最小和最大字段值之间的差异
SUM 返回字段值的总和
FIRST 返回具有最早时间戳的字段值
LAST 返回具有最新时间戳的字段值
MAX 返回最大的字段值
MIN 返回最小的字段值
储存引擎:
TSM 基于LSM演变
物理上,一个measurement 由多个shard(数据分片)组成,(此处shard不是一个物理概念,可以理解为一个盒子)
一个shard由cache,wal,tsm file,compactor 组成。
cache 数据的内存缓存
wal 内存缓存的文件备份
tsm file 数据文件
compactor 合并与压缩 默认1秒执行一次
shard:
influxdb中,会按照数据的时间戳所在范围,去创建shard,这样做的目的是为了根据时间取查询时,可以快速定位到指定的物理文件,并且删除数据的时候,可以直接删除指定块文件。
//在创建db时,会指定db的存储策略,此函数根据不同的存储策略,得到每个shard存储的数据时间段。
func shardGroupDuration(d time.Duration) time.Duration {
if d >= 180*24*time.Hour || d == 0 { // 6 months or 0
return 7 * 24 * time.Hour
} else if d >= 2*24*time.Hour { // 2 days
return 1 * 24 * time.Hour
}
return 1 * time.Hour
}
cache:
influxdb在接收到一条数据时,会将数据缓存在内存中,每一个这样的cache模块对应有一个wal物理文件,在写入缓存的同时,也会将数据落地到文件。保证掉电时不丢失数据。
cache不是无限增长的,参数maxSzie 用于控制当cache中的内存占用量超过此数值,就会将内存中的数据进行一次快照,之后清空cache,并且删除wal文件,然后将快照中的数据进行时间排序,写入到tsm文件中。对应的struct结构:
type Cache struct {
commit sync.Mutex
mu sync.RWMutex
store map[string]*entry // 存储的数据内容
size uint64 // 当前使用内存的大小
maxSize uint64 // 缓存最大值
// snapshots are the cache objects that are currently being written to tsm files
// they're kept in memory while flushing so they can be queried along with the cache.
// they are read only and should never be modified
// 快照,用于写入 tsm 文件,只读
snapshot *Cache
snapshotSize uint64
snapshotting bool
// This number is the number of pending or failed WriteSnaphot attempts since the last successful one.
snapshotAttempts int
stats *CacheStatistics
lastSnapshot time.Time
}
实际写入数据部分:
func (e *Engine) WritePoints(points []models.Point) error {
values := make(map[string][]Value, len(points))
var (
keyBuf []byte
baseLen int
seriesErr error
)
//忽略一些代码
......
e.mu.RLock()
defer e.mu.RUnlock()
// first try to write to the cache
if err := e.Cache.WriteMulti(values); err != nil {
return err
}
//此处可以看到,可以设置WALEncbled 来关闭wal文件的写入,提高效率,但会存在掉电丢数据的风险
if e.WALEnabled {
if _, err := e.WAL.WriteMulti(values); err != nil {
return err
}
}
return seriesErr
}
WAL:
从上面的代码可以看出wal是cache的物理备份,作用就是数据持久化,防止掉电丢数据。由于数据是被顺序插入到wal文件的,所以这里的写入效率非常高。但是如果插入的数据的时间戳是乱序的,会导致数据被分配到不同的shard,从而会将数据插入到多个wal文件中,会有影响性能。
此外,也可以关闭wal写入,达到更高的写入能力。我们来看一下wal的写入的关键代码部分:
func (l *WAL) WriteMulti(values map[string][]Value) (int, error) {
entry := &WriteWALEntry{
Values: values,
}
id, err := l.writeToLog(entry)
if err != nil {
atomic.AddInt64(&l.stats.WriteErr, 1)
return -1, err
}
atomic.AddInt64(&l.stats.WriteOK, 1)
return id, nil
}
func (l *WAL) writeToLog(entry WALEntry) (int, error) {
// limit how many concurrent encodings can be in flight. Since we can only
// write one at a time to disk, a slow disk can cause the allocations below
// to increase quickly. If we're backed up, wait until others have completed.
bytes := bytesPool.Get(entry.MarshalSize())
b, err := entry.Encode(bytes)
if err != nil {
bytesPool.Put(bytes)
return -1, err
}
encBuf := bytesPool.Get(snappy.MaxEncodedLen(len(b)))
compressed := snappy.Encode(encBuf, b)
bytesPool.Put(bytes)
syncErr := make(chan error)
segID, err := func() (int, error) {
l.mu.Lock()
defer l.mu.Unlock()
// Make sure the log has not been closed
select {
case <-l.closing:
return -1, ErrWALClosed
default:
}
// roll the segment file if needed
if err := l.rollSegment(); err != nil {
return -1, fmt.Errorf("error rolling WAL segment: %v", err)
}
// write and sync
if err := l.currentSegmentWriter.Write(entry.Type(), compressed); err != nil {
return -1, fmt.Errorf("error writing WAL entry: %v", err)
}
select {
case l.syncWaiters <- syncErr:
default:
return -1, fmt.Errorf("error syncing wal")
}
//刷数据到磁盘 调用bufio的Flush。详细见后文,落盘实现
l.scheduleSync()
// Update stats for current segment size
atomic.StoreInt64(&l.stats.CurrentBytes, int64(l.currentSegmentWriter.size))
l.lastWriteTime = time.Now().UTC()
return l.currentSegmentID, nil
}()
bytesPool.Put(encBuf)
if err != nil {
return segID, err
}
// schedule an fsync and wait for it to complete
return segID, <-syncErr
}
此处可以看到实际的文件落地方式是一个bufio类型。
l.currentSegmentWriter.Writes实现:
func NewWALSegmentWriter(w io.WriteCloser) *WALSegmentWriter {
return &WALSegmentWriter{
bw: bufio.NewWriterSize(w, 16*1024),
w: w,
}
}
// Write writes entryType and the buffer containing compressed entry data.
func (w *WALSegmentWriter) Write(entryType WalEntryType, compressed []byte) error {
var buf [5]byte
buf[0] = byte(entryType)
binary.BigEndian.PutUint32(buf[1:5], uint32(len(compressed)))
if _, err := w.bw.Write(buf[:]); err != nil {
return err
}
if _, err := w.bw.Write(compressed); err != nil {
return err
}
w.size += len(buf) + len(compressed)
return nil
}
// Sync flushes the file systems in-memory copy of recently written data to disk,
// if w is writing to an os.File.
func (w *WALSegmentWriter) sync() error {
if err := w.bw.Flush(); err != nil {
return err
}
if f, ok := w.w.(*os.File); ok {
return f.Sync()
}
return nil
}
落盘实现:
// scheduleSync will schedule an fsync to the current wal segment and notify any
// waiting gorutines. If an fsync is already scheduled, subsequent calls will
// not schedule a new fsync and will be handle by the existing scheduled fsync.
func (l *WAL) scheduleSync() {
// If we're not the first to sync, then another goroutine is fsyncing the wal for us.
if !atomic.CompareAndSwapUint64(&l.syncCount, 0, 1) {
return
}
// Fsync the wal and notify all pending waiters
go func() {
var timerCh <-chan time.Time
// time.NewTicker requires a > 0 delay, since 0 indicates no delay, use a closed
// channel which will always be ready to read from.
if l.syncDelay == 0 {
// Create a RW chan and close it
timerChrw := make(chan time.Time)
close(timerChrw)
// Convert it to a read-only
timerCh = timerChrw
} else {
t := time.NewTicker(l.syncDelay)
defer t.Stop()
timerCh = t.C
}
for {
select {
case <-timerCh:
l.mu.Lock()
if len(l.syncWaiters) == 0 {
atomic.StoreUint64(&l.syncCount, 0)
l.mu.Unlock()
return
}
l.sync()
l.mu.Unlock()
case <-l.closing:
atomic.StoreUint64(&l.syncCount, 0)
return
}
}
}()
}
// sync fsyncs the current wal segments and notifies any waiters. Callers must ensure
// a write lock on the WAL is obtained before calling sync.
func (l *WAL) sync() {
err := l.currentSegmentWriter.sync()
for len(l.syncWaiters) > 0 {
errC := <-l.syncWaiters
errC <- err
}
}
TSM File:
单个tsm file,最大大小为2g,用于存放数据, 上文中对内存数据刷到磁盘就会生成一个tsm文件,然后合并进程,慢慢将这些tsm文件合并到一个大的tsm文件
Compactor:
每秒执行一次,主要是两个任务:
1.cache是否需要落地磁盘,需要就将cache落地到磁盘,生成一个tsm文件。
2.tsm文件是否需要合并, 需要的话就将多个tsm文件合并成一个大文件。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。