// port是外部请求信息的封装
Port *port;
// 初始化port
port = ConnCreate(ListenSocket[i]);
// 启动一个用来处理外部请求的服务子进程
BackendStartup(port);
static int BackendStartup(Port *port)
{
Backend *bn; /* for backend cleanup */
pid_t pid;
pid = fork_process();
if (pid == 0) /* child */
{
free(bn);
/* Detangle from postmaster */
InitPostmasterChild();
/* Close the postmaster's sockets */
ClosePostmasterPorts(false);
/* Perform additional initialization and collect startup packet */
BackendInitialize(port);
// 在子进程中进行初始化和运行外部请求和子进程的交互
BackendRun(port);
}
}
static void BackendRun(Port *port)
{
char *av[2];
const int ac = 1;
av[0] = "postgres";
av[1] = NULL;
// 子进程的内存初始化
MemoryContextSwitchTo(TopMemoryContext);
// 启动一个子进程
PostgresMain(ac, av, port->database_name, port->user_name);
}
// PG用于处理外部请求的子进程入口函数
void PostgresMain(int argc, char *argv[],
const char *dbname,
const char *username)
{
int firstchar;
StringInfoData input_message;
sigjmp_buf local_sigjmp_buf;
volatile bool send_ready_for_query = true;
bool idle_in_transaction_timeout_enabled = false;
bool idle_session_timeout_enabled = false;
// 子进程的初始化
InitPostgres(dbname, InvalidOid, username, InvalidOid, NULL, false);
// 是否配置wal sender
if (am_walsender)
InitWalSender();
for (;;)
{
DoingCommandRead = true;
// 通过TCP读取客户端传过来的数据,该数据封装了PG客户端和该子进程的协议和SQL语句
firstchar = ReadCommand(&input_message);
DoingCommandRead = false;
// 根据读取到的第一个字符来判断请求类型
switch (firstchar)
{
// 简单SQL语句的处理
case 'Q': /* simple query */
{
const char *query_string;
query_string = pq_getmsgstring(&input_message);
pq_getmsgend(&input_message);
if (am_walsender)
{
if (!exec_replication_command(query_string))
exec_simple_query(query_string);
}
else
exec_simple_query(query_string);
send_ready_for_query = true;
}
break;
// SQL语句的协议解析
case 'P': /* parse */
{
const char *stmt_name;
const char *query_string;
int numParams;
Oid *paramTypes = NULL;
forbidden_in_wal_sender(firstchar);
/* Set statement_timestamp() */
SetCurrentStatementStartTimestamp();
stmt_name = pq_getmsgstring(&input_message);
query_string = pq_getmsgstring(&input_message);
numParams = pq_getmsgint(&input_message, 2);
if (numParams > 0)
{
paramTypes = (Oid *) palloc(numParams * sizeof(Oid));
for (int i = 0; i < numParams; i++)
paramTypes[i] = pq_getmsgint(&input_message, 4);
}
pq_getmsgend(&input_message);
exec_parse_message(query_string, stmt_name,
paramTypes, numParams);
}
break;
case 'C': /* close */
{
int close_type;
const char *close_target;
forbidden_in_wal_sender(firstchar);
close_type = pq_getmsgbyte(&input_message);
close_target = pq_getmsgstring(&input_message);
pq_getmsgend(&input_message);
switch (close_type)
{
case 'S':
if (close_target[0] != '\0')
DropPreparedStatement(close_target, false);
else
{
/* special-case the unnamed statement */
drop_unnamed_stmt();
}
break;
case 'P':
{
Portal portal;
portal = GetPortalByName(close_target);
if (PortalIsValid(portal))
PortalDrop(portal, false);
}
break;
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid CLOSE message subtype %d",
close_type)));
break;
}
if (whereToSendOutput == DestRemote)
pq_putemptymessage('3'); /* CloseComplete */
}
break;
case 'D': /* describe */
{
int describe_type;
const char *describe_target;
forbidden_in_wal_sender(firstchar);
/* Set statement_timestamp() (needed for xact) */
SetCurrentStatementStartTimestamp();
describe_type = pq_getmsgbyte(&input_message);
describe_target = pq_getmsgstring(&input_message);
pq_getmsgend(&input_message);
switch (describe_type)
{
case 'S':
exec_describe_statement_message(describe_target);
break;
case 'P':
exec_describe_portal_message(describe_target);
break;
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid DESCRIBE message subtype %d",
describe_type)));
break;
}
}
break;
case 'H': /* flush */
pq_getmsgend(&input_message);
if (whereToSendOutput == DestRemote)
pq_flush();
break;
case 'S': /* sync */
pq_getmsgend(&input_message);
finish_xact_command();
send_ready_for_query = true;
break;
case EOF:
pgStatSessionEndCause = DISCONNECT_CLIENT_EOF;
case 'X':
if (whereToSendOutput == DestRemote)
whereToSendOutput = DestNone;
proc_exit(0);
}
}
}