本节,我们尝试读取写入的数据。
select id, val + 10 from t where val < 10;
我们以这条 SQL 为例。首先它是一条 Select 语句,Select 会通过 executor/table_reader.go
中的 TableReaderExecutor
执行,但是这里并不是直接返回读取到的结果,需要将 val + 10 后的结果返回给客户端,因此还需要使用到 executor/projection.go
中的 ProjectionExec
来对 Select 的结果做一次运算处理。
- 1.
executor/builder.go
,因为数据处理的顺序是先通过SelectionExec
获取数据再使用ProjectionExec
进行计算处理,所以最外层的是ProjectionExec
,内层是TableReaderExecutor
。在 build 阶段,首先会执行executorBuilder.build
中调用到executorBuilder.buildProjection
函数,ProjectionExec
一定会对下层的结果进行处理,所以有 children,这里会递归的调用executorBuilder.build
函数来 build 子 Executor。 - 2.
executor/table_reader.go
,TableReaderExecutor
的数据源是TableReaderExecutor.resultHandler
,最后会通过distsql/select_result.go
中的SelectResult
来执行。SelectResult
仅会从 TiKV 中获取所需要的数据来减少数据的传输量。具体的调用链路是TableReaderExecutor.Next
调用tableResultHandler.nextChunk
,其中通过selectResult.Next
方法(定义在SelectResult
接口中)填充 Chunk。 - 3.
executor/projection.go
,我们来看一看ProjectionExec
的ProjectionExec.parallelExecute
是怎么运行的,可以结合 lab0 的 Map-Reduce 来理解。下面所描述的流程在ProjectionExec.Next
的注释中有示意图。- 3.1 外部线程不停的调用
ProjectionExec.Next
获取处理完成的数据,在并行处理时会调用ProjectionExec.parallelExecute
。ProjectionExec.parallelExecute
函数中会从ProjectionExec.outputCh
中拿到数据并且通过Chunk.SwapColumns
将数据写入外部传入的Chunk
中。 - 3.2
fetcher
线程负责从内部的 Executor 获取读到的数据,这里是从projectionInputFetcher.inputCh
拿到projectionInput
,然后把TableReaderExecutor
中读数据通过projectionInput.chk.SetRequiredRows
写入,最后将带有数据的projectionInput
发送到input.targetWorker.inputCh
当中。从projectionInputFetcher.outputCh
读到的数据是worker
线程处理完的结果,将结果发送给ProjectionExec.outputCh
(也是projectionInputFetcher.globalOutputCh
),同时也会发送到input.targetWorker.outputCh
。 - 3.3
worker
线程会把fetcher
写入到projectionWorker.inputCh
当中的内部 Executor 结果数据取出,把projectionWorker.outputCh
的结果写入用的projectionOutput
取出,计算后写入从projectionOutput.chk
。在处理之后,只需要将projectionInput
从projectionWorker.inputGiveBackCh
(3.2 中的projectionInputFetcher.inputCh
) 还给fetcher
。
- 3.1 外部线程不停的调用
以上是读取数据的关键路径,这个调用链路中的关键函数也被移除了,你需要根据调用链路的描述进行填充。
由于数据库的初始化需要依赖 lab4a
, lab4b
, lab4c
的逻辑,所以我们需要在完成 lab4 的所有代码之后再进行测试。
运行 make lab4
,通过所有测试用例。
可以使用下面的命令测试指定的单个或多个用例。
go test {package path} -check.f ^{regex}$
# example
go test -timeout 5s ./store/tikv -check.f ^TestFailAfterPrimary$