Wiredtiger
支持的存储模型Wiredtiger
目前支持btree
和lsm
的存储模型,也是唯一一个支持2种存储模型的kv磁盘存储引擎Wiredtiger
数据插入实现概览Wiredtiger
引擎的数据插入的的整体概览,从数据库连接初始化->会话初始化->游标初始化->调用游标的内置函数进行数据插入操作Wiredtiger
数据插入实现分析wiredtiger_open
函数wiredtiger_open
函数是整个wt
引擎的初始化函数,定义了WT_CONNECTION
的虚函数操作表,wt
引擎参数初始化、内部线程
初始化等工作,这里我们着重关心db中WT_CONNECTION
默认值。// 通过传入WT_CONNECTION **类型参数初始化WT_CONNECTION,设置WT_CONNECTION操作函数表
int wiredtiger_open(...)
{
// WT_CONNECTION
static const WT_CONNECTION stdc = {__conn_close, __conn_debug_info, __conn_reconfigure,
__conn_get_home, __conn_configure_method, __conn_is_new, __conn_open_session,
__conn_query_timestamp, __conn_set_timestamp, __conn_rollback_to_stable,
__conn_load_extension, __conn_add_data_source, __conn_add_collator, __conn_add_compressor,
__conn_add_encryptor, __conn_add_extractor, __conn_set_file_system, __conn_add_storage_source,
__conn_get_storage_source, __conn_get_extension_api};
// WT_CONNECTION具体实现的结构
WT_CONNECTION_IMPL *conn;
WT_RET(__wt_calloc_one(NULL, &conn));
conn->iface = stdc;
// WT_CONNECTION的初始化设置
*connectionp = &conn->iface;
/*********其他初始化******/
}
open_session
函数conn->open_session
函数实际调用的是WT_CONNECTION->open_session
对应的__conn_open_session
函数.这个函数核心初始化session比如设置session的函数表、会话的事务初始化等工作// WT_CONNECTION->open_session的具体实现
// 参数中包含WT_SESSION **wt_sessionp,初始化session和设置session的函数操作表
static int __conn_open_session(...)
{
WT_SESSION_IMPL *session_ret;
WT_ERR(__wt_open_session(conn, event_handler, config, true, &session_ret))
{
/* Acquire a session. */
WT_RET(__open_session(conn, event_handler, config, &session))
{
static const WT_SESSION
stds = {NULL, NULL, __session_close, __session_reconfigure, __session_flush_tier,
__wt_session_strerror, __session_open_cursor, __session_alter, __session_create,
__wt_session_compact, __session_drop, __session_join, __session_log_flush,
__session_log_printf, __session_rename, __session_reset, __session_salvage,
__session_truncate, __session_upgrade, __session_verify, __session_begin_transaction,
__session_commit_transaction, __session_prepare_transaction, __session_rollback_transaction,
__session_query_timestamp, __session_timestamp_transaction,
__session_timestamp_transaction_uint, __session_checkpoint, __session_reset_snapshot,
__session_transaction_pinned_range, __session_get_rollback_reason, __wt_session_breakpoint},
stds_readonly = {NULL, NULL, __session_close, __session_reconfigure, __session_flush_tier,
__wt_session_strerror, __session_open_cursor, __session_alter_readonly,
__session_create_readonly, __wt_session_compact_readonly, __session_drop_readonly,
__session_join, __session_log_flush_readonly, __session_log_printf_readonly,
__session_rename_readonly, __session_reset, __session_salvage_readonly,
__session_truncate_readonly, __session_upgrade_readonly, __session_verify,
__session_begin_transaction, __session_commit_transaction,
__session_prepare_transaction_readonly, __session_rollback_transaction,
__session_query_timestamp, __session_timestamp_transaction,
__session_timestamp_transaction_uint, __session_checkpoint_readonly,
__session_reset_snapshot, __session_transaction_pinned_range, __session_get_rollback_reason,
__wt_session_breakpoint};
}
}
session_ret->iface = F_ISSET(conn, WT_CONN_READONLY) ? stds_readonly : stds;
session_ret->iface.connection = &conn->iface;
*wt_sessionp = &session_ret->iface;
/*******其他初始化******/
}
create
函数create
函数是session
中的create
函数指针,实际对应的函数是__session_create
,这个函数的核心职责是检查创建的数据表是否在系统中存在->构建新表的元数据信息->在数据库中创建表的数据文件->表对应的元数据信息插入到系统表WiredTiger.wt
中,经历这个几个过程整个的schema(表)
创建成功。table
开头的uri
是针对元数据而言,file
来头的uri对应的是物理磁盘
上的表的结构static int
__session_create(...const char *uri...)
{
__wt_session_create(session, uri, config)
{
__wt_schema_create(session, uri, config)
{
// 这里的uri=table:access
__schema_create(int_session, uri, config)
{
__create_table(session, uri, exclusive, config)
{
// 检查创建表的meta数据是否存在
__wt_metadata_search(session, uri, &tablecfg)
// 构造数据表的meta信息
__wt_import_repair(session, filename, &filecfg)
// 创建表的元数据插入到数据库元数据系统表中(file:WiredTiger.wt),元数据中的key=uri表的uri,value=tablecfg配置信息
__wt_metadata_insert(session, uri, tablecfg)
// 这里对应的uri=colgroup:access
__create_colgroup(session, cgname, exclusive, cgcfg)
{
__wt_schema_create(uri...)
{
// __schema_create函数里调用__create_file
// uri=file:access.wt
__create_file(uri...)
{
__wt_metadata_insert(...)
{
// 插入file:access.wt的元数据
__curfile_insert(...)
}
}
}
}
}
}
}
}
}
open_cursor
函数open_cursor
是WT_SESSION->open_cursor
函数指针的实现,其实际的实现是__session_open_cursor
函数,根据不同的uri关键字来负责初始化游标和设置游标中的操作函数。static int __session_open_cursor(...)
{
__session_open_cursor_int()
{
switch (uri[0]) {
// 初始化btree游标
case `f`:
__wt_curfile_open(session, uri, owner, cfg, cursorp);
/***其他的case****/
}
}
}
// uri = file:access.wt,设置btree插入的函数操作表
int __wt_curfile_open(...uri)
{
__curfile_create(...)
{
WT_CURSOR_STATIC_INIT(iface, __wt_cursor_get_key, /* get-key */
__wt_cursor_get_value, /* get-value */
__wt_cursor_set_key, /* set-key */
__wt_cursor_set_value, /* set-value */
__curfile_compare, /* compare */
__curfile_equals, /* equals */
__curfile_next, /* next */
__curfile_prev, /* prev */
__curfile_reset, /* reset */
__curfile_search, /* search */
__curfile_search_near, /* search-near */
__curfile_insert, /* insert */
__wt_cursor_modify_value_format_notsup, /* modify */
__curfile_update, /* update */
__curfile_remove, /* remove */
__curfile_reserve, /* reserve */
__wt_cursor_reconfigure, /* reconfigure */
__wt_cursor_largest_key, /* largest_key */
__wt_cursor_bound, /* bound */
__curfile_cache, /* cache */
__curfile_reopen, /* reopen */
__wt_cursor_checkpoint_id, /* checkpoint ID */
__curfile_close);
// cursor的初始化
WT_RET(__wt_calloc(session, 1, csize, &cbt));
cursor = &cbt->iface;
*cursor = iface;
cursor->session = (WT_SESSION *)session;
cursor->internal_uri = btree->dhandle->name;
cursor->key_format = btree->key_format;
cursor->value_format = btree->value_format;
// 对应的btree初始化
__wt_btcur_open(cbt);
}
}
insert
函数insert
函数是cursor
的函数指针,实际的调用的函数是__curfile_insert
函数,把数据插入到btree中,最终的数据落盘还是依赖于checkpoint
线程来刷盘// WT_CURSOR->insert具体实现
static int __curfile_insert(WT_CURSOR *cursor)
{
WT_ERR(__wt_btcur_insert(cbt));
}
Wiredtiger
数据插入的示例代码#include <wiredtiger_ext.h>
#include <wiredtiger.h>
#include <unistd.h>
#include <assert.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
static void insert_kv(const char *home,char *counter_str)
{
/*! [access example connection] */
WT_CONNECTION *conn;
WT_CURSOR *cursor;
WT_SESSION *session;
const char *key, *value;
int ret;
mkdir(home,0755);
/* Open a connection to the database, creating it if necessary. */
assert(wiredtiger_open(home, NULL, "create", &conn)!=-1);
/* Open a session handle for the database. */
assert(conn->open_session(conn, NULL, NULL, &session)!=-1);
/*! [access example connection] */
/*! [access example table create] */
assert(session->create(session, "table:access", "key_format=S,value_format=S")!=-1);
/*! [access example table create] */
/*! [access example cursor open] */
assert(session->open_cursor(session, "table:access", NULL, NULL, &cursor)!=-1);
/*! [access example cursor open] */
uint64_t i = 0;
char *endptr=NULL;
uint64_t count= strtol(counter_str, &endptr, 10);
fprintf(stdout,"pid=%ld\n",getpid());
sleep(15);
while (true) {
if (i >= count) {
break;
}
char k_buf[1024] = {'\0'};
char v_buf[1024] = {'\0'};
snprintf((char *)&k_buf, 1024, "key-%ld", i);
snprintf((char *)&v_buf, 1024, "value-%ld", i);
/*! [access example cursor insert] */
cursor->set_key(cursor,(char *)&k_buf); /* Insert a record. */
cursor->set_value(cursor, (char *)&v_buf);
assert(cursor->insert(cursor)!=-1);
usleep(100);
i++;
}
assert(conn->close(conn, NULL)!=-1); /* Close all handles. */
/*! [access example close] */
}
int
main(int argc, char *argv[])
{
insert_kv(argv[1],argv[2]);
return 0;
}