spark 2.1.1
系统中希望监控spark on yarn任务的执行进度,但是监控过程发现提交任务之后执行进度总是10%,直到执行成功或者失败,进度会突然变为100%,很神奇,
下面看spark on yarn任务提交过程:
spark on yarn提交任务时会把mainClass修改为Client
childMainClass = "org.apache.spark.deploy.yarn.Client"
spark-submit过程详见:
下面看Client执行过程:
org.apache.spark.deploy.yarn.Client
def main(argStrings: Array[String]) {... val sparkConf = new SparkConf // SparkSubmit would use yarn cache to distribute files & jars in yarn mode, // so remove them from sparkConf here for yarn mode. sparkConf.remove("spark.jars") sparkConf.remove("spark.files") val args = new ClientArguments(argStrings) new Client(args, sparkConf).run()... def run(): Unit = { this.appId = submitApplication()... def submitApplication(): ApplicationId = {... val containerContext = createContainerLaunchContext(newAppResponse)... private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) : ContainerLaunchContext = {... val amClass = if (isClusterMode) { Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName } else { Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName }
这里调用过程为Client.main->run->submitApplication->createContainerLaunchContext,然后会设置amClass,最终都会调用到ApplicationMaster,因为ExecutorLauncher内部也是调用ApplicationMaster,如下:
org.apache.spark.deploy.yarn.ExecutorLauncher
object ExecutorLauncher { def main(args: Array[String]): Unit = { ApplicationMaster.main(args) }}
下面看ApplicationMaster:
org.apache.spark.deploy.yarn.ApplicationMaster
def main(args: Array[String]): Unit = {... SparkHadoopUtil.get.runAsSparkUser { () => master = new ApplicationMaster(amArgs, new YarnRMClient) System.exit(master.run()) }... final def run(): Int = {... if (isClusterMode) { runDriver(securityMgr) } else { runExecutorLauncher(securityMgr) }... private def registerAM( _sparkConf: SparkConf, _rpcEnv: RpcEnv, driverRef: RpcEndpointRef, uiAddress: String, securityMgr: SecurityManager) = {... allocator = client.register(driverUrl, driverRef, yarnConf, _sparkConf, uiAddress, historyAddress, securityMgr, localResources) allocator.allocateResources() reporterThread = launchReporterThread()... private def launchReporterThread(): Thread = { // The number of failures in a row until Reporter thread give up val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES) val t = new Thread { override def run() { var failureCount = 0 while (!finished) { try { if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, s"Max number of executor failures ($maxNumExecutorFailures) reached") } else { logDebug("Sending progress") allocator.allocateResources() }...
这里调用过程为ApplicationMaster.main->run,run中会调用runDriver或者runExecutorLauncher,最终都会调用到registerAM,其中会调用YarnAllocator.allocateResources,然后在launchReporterThread中会启动一个thread,其中也会不断调用YarnAllocator.allocateResources,下面看YarnAllocator:
org.apache.spark.deploy.yarn.YarnAllocator
def allocateResources(): Unit = synchronized { updateResourceRequests() val progressIndicator = 0.1f // Poll the ResourceManager. This doubles as a heartbeat if there are no pending container // requests. val allocateResponse = amClient.allocate(progressIndicator)
可见这里会设置进度为0.1,即10%,而且是硬编码,所以spark on yarn的执行进度一直为10%,所以想监控spark on yarn的任务进度看来是徒劳的;