本文共 8212 字,大约阅读时间需要 27 分钟。
我们在命令行启动 substrate 节点,到底发生了什么呢?本文基于 substrate 源码,对其启动流程进行了简单的分析。
命令行启动 substrate,主要是解析命令行参数并配置服务。
主程序在substrate/node/main.rs
中,入口是main()
函数。其中的关键代码如下:
fn main() { ... if let Err(e) = cli::run(::std::env::args(), Exit, version) { eprintln!("Error starting the node: {}\n\n{:?}", e, e); std::process::exit(1) }}
这行代码调用的是node/cli/src/lib.rs
的run
函数。进入该run
函数,有如下代码:
pub fn run<I, T, E>(args: I, exit: E, version: cli::VersionInfo) -> error::Result<()> where I: IntoIterator<Item = T>, T: Into<std::ffi::OsString> + Clone, E: IntoExit,{ match parse_and_prepare::<CustomSubcommands, NoCustom, _>(&version, "substrate-node", args) { ParseAndPrepare::Run(cmd) => cmd.run(load_spec, exit, |exit, _cli_args, _custom_args, config| { info!("{}", version.name); ...}
parse_and_prepare
函数(位于core/cli/src/lib.rs
中),这个函数类似于所有的区块链启动,主要是对命令行参数进行解析,并启动相关的操作。
在parse_and_prepare
函数中,会根据不同的参数,返回不同类型的ParseAndPrepare
。
pub fn parse_and_prepare<'a, CC, RP, I>( version: &'a VersionInfo, impl_name: &'static str, args: I,) -> ParseAndPrepare<'a, CC, RP>where CC: StructOpt + Clone + GetLogFilter, RP: StructOpt + Clone + AugmentClap, I: IntoIterator, <I as IntoIterator>::Item: Into<std::ffi::OsString> + Clone,{ ... let matches = CoreParams::<CC, RP>::clap() .name(version.executable_name) .author(version.author) .about(version.description) .version(&(full_version + "\n")[..]) .setting(AppSettings::GlobalVersion) .setting(AppSettings::ArgsNegateSubcommands) .setting(AppSettings::SubcommandsNegateReqs) .get_matches_from(args); let cli_args = CoreParams::<CC, RP>::from_clap(&matches); ... match cli_args { params::CoreParams::Run(params) => ParseAndPrepare::Run( ParseAndPrepareRun { params, impl_name, version } ), ...}
各种参数在core/cli/src/param.rs
中有相关的定义,部分代码如下:
pub enum CoreParams<CC, RP> { /// Run a node. Run(MergeParameters<RunCmd, RP>), /// Build a spec.json file, outputing to stdout. BuildSpec(BuildSpecCmd), /// Export blocks to a file. ExportBlocks(ExportBlocksCmd), /// Import blocks from file. ImportBlocks(ImportBlocksCmd), /// Revert chain to the previous state. Revert(RevertCmd), /// Remove the whole chain data. PurgeChain(PurgeChainCmd), /// Further custom subcommands. Custom(CC),}
而对于枚举类型ParseAndPrepare
,每一类结构体,均会实现各自的run
方法,解析参数生成配置,并根据配置运行服务。
pub enum ParseAndPrepare<'a, CC, RP> { /// Command ready to run the main client. Run(ParseAndPrepareRun<'a, RP>), /// Command ready to build chain specs. BuildSpec(ParseAndPrepareBuildSpec<'a>), /// Command ready to export the chain. ExportBlocks(ParseAndPrepareExport<'a>), /// Command ready to import the chain. ImportBlocks(ParseAndPrepareImport<'a>), /// Command ready to purge the chain. PurgeChain(ParseAndPreparePurge<'a>), /// Command ready to revert the chain. RevertChain(ParseAndPrepareRevert<'a>), /// An additional custom command passed to `parse_and_prepare`. CustomCommand(CC),}
以结构体ParseAndPrepareRun
为例,其run
函数的实现代码如下:
impl<'a, RP> ParseAndPrepareRun<'a, RP> { /// Runs the command and runs the main client. pub fn run<C, G, S, E, RS>( self, spec_factory: S, exit: E, run_service: RS, ) -> error::Result<()> where S: FnOnce(&str) -> Result<Option<ChainSpec<G>>, String>, RP: StructOpt + Clone, C: Default, G: RuntimeGenesis, E: IntoExit, RS: FnOnce(E, RunCmd, RP, Configuration<C, G>) -> Result<(), String> { let config = create_run_node_config(self.params.left.clone(), spec_factory, self.impl_name, self.version)?; run_service(exit, self.params.left, self.params.right, config).map_err(Into::into) }}
其实执行服务,具体是在run
函数的闭包函数中,代码如下:
ParseAndPrepare::Run(cmd) => cmd.run(load_spec, exit, |exit, _cli_args, _custom_args, config| { info!("{}", version.name); info!(" version {}", config.full_version()); info!(" by Parity Technologies, 2017-2019"); info!("Chain specification: {}", config.chain_spec.name()); info!("Node name: {}", config.name); info!("Roles: {:?}", config.roles); let runtime = RuntimeBuilder::new().name_prefix("main-tokio-").build() .map_err(|e| format!("{:?}", e))?; match config.roles { ServiceRoles::LIGHT => run_until_exit( runtime, service::Factory::new_light(config).map_err(|e| format!("{:?}", e))?, exit ), _ => run_until_exit( runtime, service::Factory::new_full(config).map_err(|e| format!("{:?}", e))?, exit ), }.map_err(|e| format!("{:?}", e))}),
首先使用tokio
库构建一个runtime
,
然后根据节点角色配置,分别传入全节点或轻节点服务,调用run_until_exit
函数。
最后在函数中,调用tokio runtime
启动线程,将由service
构建出的`Future informant
绑定到event loop
上面定期轮询。代码如下:
fn run_until_exit<T, C, E>( mut runtime: Runtime, service: T, e: E,) -> error::Result<()> where T: Deref<Target=substrate_service::Service<C>> + Future<Item = (), Error = ()> + Send + 'static, C: substrate_service::Components, E: IntoExit,{ ... let informant = cli::informant::build(&service); runtime.executor().spawn(exit.until(informant).map(|_| ())); ... Ok(())}
代码中调用了core/cli/src/informant.rs
的build
函数,创建了一个Futrue
“线人”informant
。
基本上到这儿,相关的命令就全启动了。我们看下生成全节点或轻节点服务的具体细节。
construct_service_factory
在声明宏construct_service_factory
的定义中,有如下代码:
#[macro_export]macro_rules! construct_service_factory { ... FullService = $full_service:ty { $( $full_service_init:tt )* }, AuthoritySetup = { $( $authority_setup:tt )* }, LightService = $light_service:ty { $( $light_service_init:tt )* }, ... fn new_light( config: $crate::FactoryFullConfiguration<Self> ) -> $crate::Result<Self::LightService, $crate::Error> { ( $( $light_service_init )* ) (config) } fn new_full( config: $crate::FactoryFullConfiguration<Self> ) -> Result<Self::FullService, $crate::Error> { ( $( $full_service_init )* ) (config).and_then(|service| { ($( $authority_setup )*)(service) }) } ... }
在node/cli/src/service.rs
中,包含了service结合service中的对宏的调用,宏展开后,是执行的<Factory>::new(config)
,代码如下:
construct_service_factory! { struct Factory { ... FullService = FullComponents<Self> { |config: FactoryFullConfiguration<Self>| FullComponents::<Factory>::new(config) }, ... LightService = LightComponents<Self> { |config| <LightComponents<Factory>>::new(config) }, ...}
在core/service/src/components.rs
中定义了substrate的服务组件:FullComponents
和LightComponents
。它们new
函数的实现均调用了Service
的new
函数,代码如下:
Ok( Self { service: Service::new(config)?, })
通过该函数创建substrate service
,它会启动客户端,初始化session keys
,构建网络,交易池以及RPC
,并管理他们之间的通信,包括区块通知,交易通知等。关键代码如下:
let executor = NativeExecutor::new(config.default_heap_pages);...Components::RuntimeServices::generate_intial_session_keys( client.clone(), config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),)?;...let network_protocol = <Components::Factory>::build_network_protocol(&config)?;let transaction_pool = Arc::new( Components::build_transaction_pool(config.transaction_pool.clone(), client.clone())?);...let events = client.import_notification_stream() .map(|v| Ok::<_, ()>(v)).compat() .for_each(move |notification| { let number = *notification.header.number();...let events = transaction_pool.import_notification_stream() .for_each(move |_| {...Ok(Service { client, network, network_status_sinks, select_chain, transaction_pool, signal: Some(signal), to_spawn_tx, to_spawn_rx, to_poll: Vec::new(), config, exit, rpc_handlers, _rpc: rpc, _telemetry: telemetry, _offchain_workers: offchain_workers, _telemetry_on_connect_sinks: telemetry_connection_sinks.clone(), keystore,})
这个有些类似于以太坊,在启动节点时把相关的网络服务都创建好。这样最后Ok返回整个Service
。
PS:源码分析是基于master分支(substrate 2.0)。
1. 其中对命令行参数的解析,使用了第三方库structopt
,该库通过结构体来解析参数,并对clap
库进行了补充。
2. 异步编程,使用了第三方库tokio
,该库使用Reactor-Executor模式,是基于事件驱动的非阻塞I/O库。是 Rust 中的异步编程框架,它将复杂的异步编程抽象为 Futures、Tasks 和 Executor,并提供了 Timers 等基础设施。
转载地址:http://zmahz.baihongyu.com/