diff --git a/README.md b/README.md index 53ce454..f8d5529 100644 --- a/README.md +++ b/README.md @@ -17,65 +17,65 @@ This extension is developed and maintained by the VidarDB team. Feel free to rep We test this foreign data wrapper on Ubuntu Server 18.04 using PostgreSQL-11 together with RocksDB-6.2.4 (built with GCC-7.4.0). - Install PostgreSQL and the dev library which is required by extensions: - + ```sh # add the repository sudo tee /etc/apt/sources.list.d/pgdg.list << END deb http://apt.postgresql.org/pub/repos/apt/ bionic-pgdg main END - + # get the signing key and import it wget https://www.postgresql.org/media/keys/ACCC4CF8.asc sudo apt-key add ACCC4CF8.asc - + # fetch the metadata from the new repo sudo apt-get update - + # install postgresql and the dev library sudo apt-get install postgresql-11 sudo apt-get install postgresql-server-dev-11 ``` - Install [RocksDB](https://github.com/facebook/rocksdb) from source code: - + ```sh git clone -b v6.2.4 https://github.com/facebook/rocksdb.git - + cd rocksdb - + sudo DEBUG_LEVEL=0 make shared_lib install-shared sudo sh -c "echo /usr/local/lib >> /etc/ld.so.conf" - + sudo ldconfig ``` - Build this foreign data wrapper: - + ```sh git clone git@github.com:postgrespro/lsm.git - + cd lsm - + make - + sudo make install ``` - Before using this foreign data wrapper, we need to add it to `shared_preload_libraries` in the `postgresql.conf`: - + ```sh echo "shared_preload_libraries = 'lsm'" >> postgresql.conf ``` - + and restart PostgreSQL: - + ```sh sudo service postgresql restart ``` - When uninstall this extension, first issue the following commands, and then delete the data by locating PostgreSQL data folder via `show data_directory;` in PostgreSQL terminal. - + ```sh cd PostgresForeignDataWrapper @@ -98,7 +98,6 @@ This extension does not have any parameter. After creating the extension and cor A simple example is as follows (*you can run '`sudo -u postgres psql -U postgres`' to connect the local postgresql server*): - ``` CREATE DATABASE example; \c example @@ -124,17 +123,15 @@ A simple example is as follows (*you can run '`sudo -u postgres psql -U postgres DROP SERVER lsm_server; DROP EXTENSION lsm_fdw; - + \c postgres DROP DATABASE example; - -``` +``` # Testing We have tested certain typical SQL statements and will add more test cases later. The test scripts are in the test/sql folder which are recommended to be placed in a non-root directory. The corresponding results can be found in the test/expected folder. You can run the tests in the following way: - ```sh sudo service postgresql restart @@ -147,16 +144,15 @@ We have tested certain typical SQL statements and will add more test cases later sudo -u postgres psql -U postgres -d lsmtest -a -f test/sql/clear.sql ``` -# Debug +# Debug If you want to debug the source code, you may need to start PostgreSQL in the debug mode: - ```sh sudo service postgresql stop sudo -u postgres /usr/lib/postgresql/11/bin/postgres -d 0 -D /var/lib/postgresql/11/main -c config_file=/etc/postgresql/11/main/postgresql.conf -``` +``` # Docker diff --git a/lsm--0.1.sql b/lsm--0.1.sql deleted file mode 100644 index 34bedd0..0000000 --- a/lsm--0.1.sql +++ /dev/null @@ -1,8 +0,0 @@ -CREATE FUNCTION lsm_fdw_handler() -RETURNS fdw_handler -AS 'MODULE_PATHNAME' -LANGUAGE C STRICT; - -CREATE FOREIGN DATA WRAPPER lsm_fdw - HANDLER lsm_fdw_handler; - diff --git a/lsm.conf b/lsm.conf deleted file mode 100644 index c575d2f..0000000 --- a/lsm.conf +++ /dev/null @@ -1 +0,0 @@ -shared_preload_libraries = 'lsm' diff --git a/lsm.control b/lsm.control deleted file mode 100644 index aacc15e..0000000 --- a/lsm.control +++ /dev/null @@ -1,5 +0,0 @@ -# LSM FDW -comment = 'RocksDB Foreign Data Wrapper' -default_version = '0.1' -module_pathname = '$libdir/lsm' -relocatable = true diff --git a/lsm_api.h b/lsm_api.h index 3c77e1a..e5ab3ad 100644 --- a/lsm_api.h +++ b/lsm_api.h @@ -2,26 +2,27 @@ * This file represents API between client working with Postgres API and C++ server working with RocksDB API. * To avoid collision between C/C++ headers they are sharing just this one header file. */ -#ifndef __LSM_API_H__ -#define __LSM_API_H__ +#ifndef __LSM_API_H__ // 如果没有定义这个宏 +#define __LSM_API_H__ // 就定义下面的宏 #include #include #include #ifdef __cplusplus -extern "C" { +extern "C" { //加上extern "C"后,会指示编译器这部分代码按C语言的进行编译,而不是C++的。 #endif /* * Maximal size of record which can be transfered through client-server protocol and read batch size as well */ -#define LSM_MAX_RECORD_SIZE (64*1024) +// 定义client 和 server之间传送的数据的最大数据量 +#define LSM_MAX_RECORD_SIZE (64*1024) //64KB /* * Name of the directory in $PGDATA */ -#define LSM_FDW_NAME "lsm" +#define LSM_FDW_NAME "lsm" //定义fdw的名称为lsm extern int LsmQueueSize; extern bool LsmSync; @@ -42,6 +43,7 @@ typedef enum LsmOpLookup } LsmOperation; +// 定义client和server之间的各种操作 extern void LsmError(char const* message); extern size_t LsmShmemSize(int maxClients); extern void LsmInitialize(void* ctl, int maxClients); diff --git a/lsm_client.cpp b/lsm_client.cpp index 05ffea2..dedb14c 100644 --- a/lsm_client.cpp +++ b/lsm_client.cpp @@ -57,15 +57,17 @@ LsmDelete(LsmQueueId qid, LsmRelationId rid, char *key, size_t keyLen) return true; } + +// 将传入的数据以LsmMessage进行传输 bool LsmInsert(LsmQueueId qid, LsmRelationId rid, char *key, size_t keyLen, char *val, size_t valLen) { LsmMessage msg; LsmQueue* queue = queues[qid]; msg.hdr.op = LsmOpInsert; - msg.hdr.rid = rid; - msg.hdr.keySize = keyLen; - msg.hdr.valueSize = valLen; + msg.hdr.rid = rid; //外部表oid + msg.hdr.keySize = keyLen; //key的长度 + msg.hdr.valueSize = valLen; //value的长度 msg.key = key; msg.value = val; queue->put(msg); @@ -77,6 +79,8 @@ LsmInsert(LsmQueueId qid, LsmRelationId rid, char *key, size_t keyLen, char *val return true; } + + uint64_t LsmCount(LsmQueueId qid, LsmRelationId rid) { diff --git a/lsm_db.h b/lsm_db.h index 898bda7..48aaefc 100644 --- a/lsm_db.h +++ b/lsm_db.h @@ -8,8 +8,13 @@ using namespace rocksdb; +/** + * 对Rocksdb的封装 + * + * / + /* - * Wrapper for RocksSB + * Wrapper for RocksDB */ struct LsmConnection { @@ -44,6 +49,7 @@ struct LsmMessageHeader /* * Protocol message */ +// 用于包装传输的数据 struct LsmMessage { LsmMessageHeader hdr; @@ -54,8 +60,10 @@ struct LsmMessage /* * Queue for tranferring data between backend and LSM worker thread. */ +// 用于将数据传送到rocksdb中 struct LsmQueue { + // ring buffer 环状缓冲区 volatile int getPos; // get position in ring buffer (updated only by consumer) volatile int putPos; // put position in ring buffer (updated only by producer) volatile int respSize; // response size @@ -84,11 +92,13 @@ struct LsmCursor struct LsmServer; + +// 主要封装了对lsm的操作,也就是lsmServer中的一个工作进程 struct LsmWorker { std::map cursors; - LsmServer* server; - LsmQueue* queue; + LsmServer* server; // 一个server有很多的worker + LsmQueue* queue; //一个worker对应一个queue pthread_t thread; LsmWorker(LsmServer* s, LsmQueue* q) : server(s), queue(q) {} @@ -150,6 +160,8 @@ class CriticalSection } }; + +// 此结构体包含了很多对rocksdb的操作,非常重要,是一个对于rocksdb封装的最大对象,包含了很多的LsmWorker struct LsmServer { LsmWorker** workers; diff --git a/lsm_fdw.c b/lsm_fdw.c index fccd04d..82ad48f 100644 --- a/lsm_fdw.c +++ b/lsm_fdw.c @@ -34,6 +34,16 @@ PG_MODULE_MAGIC; PG_FUNCTION_INFO_V1(lsm_fdw_handler); + +/* +root是规划器的关于该查询的全局信息 +baserel是规划器的关于该表的信息 +foreigntableid是外部表在pg_class中的 OID (foreigntableid可以从规划器的数据结构中获得,但是为了减少工作量,这里直接显式地将它传递给函数)。 +*/ + +// 这些hook函数的参数都是系统定义好的 +// 获取外部表格的size +// baserel 是planner中关于外部表格的信息 static void GetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, @@ -63,9 +73,12 @@ GetForeignRelSize(PlannerInfo *root, * we should open & close db multiple times. */ /* TODO: better estimation */ + // baserel is the planer's informatino about this table baserel->rows = LsmCount(MyBackendId, foreignTableId); } + +// 创建一个扫描外部表的访问路径 static void GetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, @@ -92,6 +105,7 @@ GetForeignPaths(PlannerInfo *root, Cost totalCost = startupCost + baserel->rows; /* Create a ForeignPath node and add it as only possible path */ + // https://doxygen.postgresql.org/pathnode_8c.html#a20b2c8a564bb57ed4187825dec56f707 add_path(baserel, (Path *) create_foreignscan_path(root, baserel, @@ -105,13 +119,15 @@ GetForeignPaths(PlannerInfo *root, NIL)); /* no fdw_private data */ } + +// 创建一个ForeignScan 计划的节点,从选择的外部acess path中创建 static ForeignScan* GetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId, ForeignPath *bestPath, List *targetList, - List *scanClauses, + List *scanClauses, // Plan *outerPlan) { /* @@ -136,6 +152,7 @@ GetForeignPlan(PlannerInfo *root, * handled elsewhere). */ + // clause 从句 scanClauses = extract_actual_clauses(scanClauses, false); /* Create the ForeignScan node */ @@ -231,8 +248,11 @@ GetKeyBasedQual(ForeignScanState *scanState, return; } + +// 开始执行外部表格的扫描 static void -BeginForeignScan(ForeignScanState *scanState, int executorFlags) +BeginForeignScan(ForeignScanState *scanState, + int executorFlags) { static LsmCursorId operationId = 0; /* a SQL might cause multiple scans */ @@ -391,6 +411,7 @@ GetNextFromBatch(Oid relationId, return found; } +// 从外部表格数据中,返回一行数据从外部表的slot中 static TupleTableSlot* IterateForeignScan(ForeignScanState *scanState) { @@ -556,6 +577,8 @@ AddForeignUpdateTargets(Query *parsetree, parsetree->targetList = lappend(parsetree->targetList, entry); } + + static List* PlanForeignModify(PlannerInfo *root, ModifyTable *plan, @@ -587,6 +610,7 @@ PlanForeignModify(PlannerInfo *root, return NULL; } +// 开始执行一个外部表的修改,比如curd static void BeginForeignModify(ModifyTableState *modifyTableState, ResultRelInfo *resultRelInfo, @@ -639,6 +663,8 @@ BeginForeignModify(ModifyTableState *modifyTableState, resultRelInfo->ri_FdwState = (void *) writeState; } + +// 将slot序列化为key和val进行插入操作 static void SerializeTuple(StringInfo key, StringInfo val, @@ -652,25 +678,50 @@ SerializeTuple(StringInfo key, Datum datum = tupleSlot->tts_values[index]; if (tupleSlot->tts_isnull[index]) { if (index == 0) { + // 元组的第一个属性为空 ereport(ERROR, (errmsg("LSM: first column cannot be null!"))); } SerializeNullAttribute(tupleDescriptor, index, val); } else { - SerializeAttribute(tupleDescriptor, - index, - datum, - index == 0 ? key : val); + // 序列化非空的字段 + SerializeAttribute(tupleDescriptor, //元组 + index, // 当前属性的下标 + datum, // 当前属性所对应的值 + index == 0 ? key : val); //key,val初始化为空 } } } + +// 插入一个tuple到外部表中 +// slot : 槽 +// resultRelInfo 用于描述外部表 +// slot +// planSlot static TupleTableSlot* ExecForeignInsert(EState *executorState, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot) { + + /** + * + * 执行器机制被用于四种基本SQL查询类型:SELECT、INSERT、 UPDATE以及DELETE。对于SELECT, + * 顶层执行器代码只需要发送查询计划树返回的每个行给客户端。 + * 对于INSERT,每一个被返回的行被插入到INSERT中指定的目标表中。 + * 这通过一个被称为ModifyTable的特殊顶层计划节点完成 + * (一个简单的INSERT ... VALUES命令会创建一个由一个Result节点组成的简单计划树, + * 该节点只计算一个结果行,在它之上的ModifyTable节点会执行插入。但是INSERT ... SELECT可以用到执行器机制的全部功能)。 + * 对于UPDATE,规划器会安排每一个计算行包含所有被更新的列值加上原始目标行的TID(元组ID或行ID), + * 这些数据也会被输入到一个ModifyTable节点, + * 该节点将利用这些信息创建一个新的被更新行并标记旧行为被删除。 + * 对于DELETE,唯一被计划返回的列是TID,ModifyTable节点简单地使用TID访问每一个目标行并将其标记为被删除。 + * ModifyTable以及CRUD操作的底层原理:https://www.cnblogs.com/flying-tiger/p/8418293.html + * / + + /* * Insert one tuple into the foreign table. executorState is global * execution state for the query. resultRelInfo is the ResultRelInfo struct @@ -728,6 +779,7 @@ ExecForeignInsert(EState *executorState, initStringInfo(&val); SerializeTuple(&key, &val, slot); + // 调用lsm_client中的接口进行插入 if (!LsmInsert(MyBackendId, foreignTableId, key.data, key.len, val.data, val.len)) elog(ERROR, "LSM: Failed to insert tuple"); @@ -741,6 +793,8 @@ ExecForeignInsert(EState *executorState, return slot; } + +// 执行外部数据更新 static TupleTableSlot* ExecForeignUpdate(EState *executorState, ResultRelInfo *resultRelInfo, @@ -753,7 +807,7 @@ ExecForeignUpdate(EState *executorState, * the target foreign table. slot contains the new data for the tuple; it * will match the rowtype definition of the foreign table. planSlot contains * the tuple that was generated by the ModifyTable plan node's subplan; it - * differs from slot in possibly containing additional "junk" columns. In + * differs from slot in possibly containing additional "junk" columns. In // 重要:in possibly containing additional "junk" columns * particular, any junk columns that were requested by * AddForeignUpdateTargets will be available from this slot. * @@ -804,8 +858,9 @@ ExecForeignUpdate(EState *executorState, initStringInfo(&key); initStringInfo(&val); + // 将slot序列化为key 和 val SerializeTuple(&key, &val, slot); - + // 将获取到的key和val进行insert操作 LsmInsert(MyBackendId, foreignTableId, key.data, key.len, val.data, val.len); #if PG_VERSION_NUM>=130000 @@ -975,8 +1030,13 @@ AnalyzeForeignTable(Relation relation, return false; } +// 文档:https://www.postgresql.org/docs/9.6/fdwhandler.html +// 这是整个fdw程序的入口 Datum lsm_fdw_handler(PG_FUNCTION_ARGS) { + //FDW 处理函数返回一个 palloc 的FdwRoutine结构,其中包含指向下面描述的回调函数的指针。 + //FdwRoutine结构类型在src/include/foreign/fdwapi.h中声明 + //https://doxygen.postgresql.org/fdwapi_8h_source.html#l00204 FdwRoutine *routine = makeNode(FdwRoutine); ereport(DEBUG1, (errmsg("LSM: entering function %s", __func__))); @@ -991,6 +1051,10 @@ Datum lsm_fdw_handler(PG_FUNCTION_ARGS) */ /* these are required */ + // http://www.postgres.cn/docs/12/fdw-callbacks.html + /* + 在对一个扫描外部表的查询进行规划的开头将调用该函数 + */ routine->GetForeignRelSize = GetForeignRelSize; routine->GetForeignPaths = GetForeignPaths; routine->GetForeignPlan = GetForeignPlan; @@ -999,6 +1063,7 @@ Datum lsm_fdw_handler(PG_FUNCTION_ARGS) routine->ReScanForeignScan = ReScanForeignScan; routine->EndForeignScan = EndForeignScan; + // remainder-余下的,余下的函数是可选的,如果不需要的话,可以为NULL /* remainder are optional - use NULL if not required */ /* support for insert / update / delete */ routine->AddForeignUpdateTargets = AddForeignUpdateTargets; diff --git a/lsm_posix.h b/lsm_posix.h index 3999e09..5d219a1 100644 --- a/lsm_posix.h +++ b/lsm_posix.h @@ -10,6 +10,9 @@ extern "C" { #endif + +// 此文件定义了大量关于线程的操作 + #define PthreadCreate(t,attr,start,arg) PthreadCreate_(t,attr,start,arg,__func__) #define PthreadJoin(t,exitcode) PthreadJoin_(t,exitcode,__func__) #define SemInit(sem,shared,value) SemInit_(sem,shared,value,__func__) diff --git a/lsm_server.cpp b/lsm_server.cpp index 808c615..b81e4cf 100644 --- a/lsm_server.cpp +++ b/lsm_server.cpp @@ -2,7 +2,7 @@ // Worker's part of LSM queue // #include "lsm_api.h" -#include "lsm_db.h" +#include "lsm_db.h" //包含lsm_storage.cpp中的一些实现的函数,也就是对rocksdb进行操作的函数,例如crud操作 static LsmServer* server;; @@ -14,6 +14,7 @@ bool LsmUpsert; /* * Enqueue message */ +// 传入的参数为LsmMessage,将LsmMessage中的数据存入到数据库中 void LsmQueue::put(LsmMessage const& msg) { int size = sizeof(LsmMessageHeader) + msg.hdr.keySize + msg.hdr.valueSize; @@ -60,6 +61,7 @@ void LsmQueue::put(LsmMessage const& msg) // Copy key if (tail <= msg.hdr.keySize) { + // C 库函数 void *memcpy(void *str1, const void *str2, size_t n) 从存储区 str2 复制 n 个字节到存储区 str1。 memcpy(&req[putPos], msg.key, tail); memcpy(&req[0], msg.key + tail, msg.hdr.keySize - tail); putPos = msg.hdr.keySize - tail; @@ -193,6 +195,7 @@ void LsmWorker::insert(LsmMessage const& msg) { LsmConnection& con(open(msg)); + // 通过调用con插入key,value数据,resp为插入之后的返回值 queue->resp[0] = (char)con.insert(msg.key, msg.hdr.keySize, msg.value, msg.hdr.valueSize); if (LsmSync) SemPost(&queue->ready); @@ -260,12 +263,16 @@ LsmWorker::fetch(LsmMessage const& msg) SemPost(&queue->ready); } + + +// 主循环,进行rocksdb的主操作 /* * Worker main loop */ void LsmWorker::run() { + // 一直处于监听情况下 while (true) { LsmMessage msg; @@ -300,6 +307,9 @@ LsmWorker::run() } } + + +// 开启一个LsmWorker,调用其主函数 void LsmWorker::start() { @@ -320,6 +330,8 @@ LsmWorker::wait() PthreadJoin(thread, &status); } + +// 主函数 void* LsmWorker::main(void* arg) { @@ -349,6 +361,8 @@ LsmStopWorkers(void) server->stop(); } + +// 封装了对LsmWorker的操作,也就是当外部数据来的时候,将插入数据等操作用一个LsmWorker来操作 LsmServer::LsmServer(size_t maxClients) : nWorkers(maxClients) { workers = new LsmWorker*[nWorkers]; @@ -395,6 +409,9 @@ LsmServer::stop() } } + + +// 返回rocksdb的con LsmConnection& LsmServer::open(LsmMessage const& msg) { diff --git a/lsm_storage.cpp b/lsm_storage.cpp index 07300ca..694bd37 100644 --- a/lsm_storage.cpp +++ b/lsm_storage.cpp @@ -3,6 +3,8 @@ // #include "lsm_db.h" +// 此函数是LsmConnection 中的成员函数, +// 而LsmConnection中有一个非常重要的成员变量:DB* db; void LsmConnection::open(char const* path) { diff --git a/lsm_util.c b/lsm_util.c index 3d02c32..f4fe2ba 100644 --- a/lsm_util.c +++ b/lsm_util.c @@ -95,11 +95,17 @@ DecodeVarintLength(char* start, char* limit, uint64* len) * Checks if the given foreign server belongs to kv_fdw. If it * does, the function returns true. Otherwise, it returns false. */ +//http://www.postgres.cn/docs/9.4/fdw-helpers.html static bool LsmServer(ForeignServer *server) { char *fdwName = GetForeignDataWrapper(server->fdwid)->fdwname; return strncmp(fdwName, LSM_FDW_NAME "_fdw", NAMEDATALEN) == 0; } + + + + +// 判断给定了表是否是属于外部表格 /* * Checks if the given table name belongs to a foreign Lsm table. * If it does, the function returns true. Otherwise, it returns false. @@ -224,10 +230,12 @@ void SerializeNullAttribute(TupleDesc tupleDescriptor, buffer->len += headerLen; } -void SerializeAttribute(TupleDesc tupleDescriptor, - Index index, - Datum datum, - StringInfo buffer) { + +// 序列化属性 +void SerializeAttribute(TupleDesc tupleDescriptor, //元组 + Index index, // 当前属性的下标 + Datum datum, // 当前属性的值 + StringInfo buffer) { //以当前这一行的第一个属性的值作为key,其他的属性为val Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, index); bool byValue = attributeForm->attbyval; int typeLength = attributeForm->attlen; @@ -255,6 +263,7 @@ void SerializeAttribute(TupleDesc tupleDescriptor, if (typeLength > 0) { if (byValue) { + // 存储指定的值到指定地址中 store_att_byval(current, datum, typeLength); } else { memcpy(current, DatumGetPointer(datum), typeLength);