在打包一个 spark streaming 工程到 yarn 上跑的时候,发现自己的 log4j.properties 没有被读取
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] log4j: Trying to find [log4j.xml] using context classloader org.apache.spark.util.MutableURLClassLoader@70ad2036. log4j: Trying to find [log4j.xml] using sun.misc.Launcher$AppClassLoader@5561bfa3 class loader. log4j: Trying to find [log4j.xml] using ClassLoader.getSystemResource(). log4j: Trying to find [log4j.properties] using context classloader org.apache.spark.util.MutableURLClassLoader@70ad2036. log4j: Using URL [file:/etc/spark/conf.cloudera.spark_on_yarn/log4j.properties] for automatic log4j configuration. log4j: Reading configuration from URL file:/etc/spark/conf.cloudera.spark_on_yarn/log4j.properties
而其实我的 classpath 中是有一个 log4j.properties 文件的
http://stackoverflow.com/quest… 这里有人遇到同样的问题,答案说,
In 1.4.1, MutableURLClassLoader is not set before log4j initialization like it is in 1.3.1.
Here is the explanation:
While parsing arguments in in SparkSubmit.scala, it uses spark.util.Utils. This object has a new static dependency on log4j, through ShutdownHookManager, that triggers it’s initialization before the call to setContextClassLoader(MutableURLClassLoader) from submit > doRunMain > runMain
同时给出一个 issue 地址, https://issues.apache.org/jira… 看下来就是说 classpath 的优先级被抢了,那么试试在 spark-submit 脚本中的 –jars 第一个贴上自己的 jar 包,然而发现这并没有什么卵用
http://apache-spark-user-list…. 这里有另外一个帖子,说是在 by adding it to SPARK_CLASSPATH in spark-env.sh 就可以,于是想到在 spark-submit 中配应该也可以,于是在加上这个 –driver-class-path “xxx.jar” 发现在 driver 节点是可以了,但是在 executor 节点依然不行
先不管这个,但是发现在算子函数中,使用 logger 报错无法序列化,又是老问题,看到这里, http://stackoverflow.com/quest…
object Holder extends Serializable { @transient lazy val log = Logger.getLogger(getClass.getName) } val someRdd = spark.parallelize(List(1, 2, 3)).foreach { element => Holder.log.info(element) }
这种方式确实可以,但是就丢失了来源信息,也就是说,每行日志都是归到这个 object 的名下的
但是,为什么这种方式可以呢,transient 又是个什么鬼, http://www.cnblogs.com/lanxuez… 有讨论,但是感觉说的是 Java 的关键字,而不是 scala 的修饰符,所以还有点问题, http://stackoverflow.com/quest… 也有抛出问题,但是没有答案, http://fdahms.com/2015/10/14/s… 这里的讨论就差不多靠点边了
首先,用 @transient lazy val 这种方式修饰的变量,是不被序列化的,而且,不需要,也不会被重复初始化,那么他的值从哪里来呢,我理解是从已经有的对象中来。也就是说,spark 本身开了一个到日志文件的 IO 流,我们通过这种方式,可以蹭他的用,不过,另外一种理解就是,虽然他没有被序列化,但是也并不是蹭 spark 的用,而是自己在每个 jvm 中,首次用到这个类的时候,自己开了一个 IO 流,具体的实现,没有看到明确的资料
不过如果这种用外援的 object 方式可以的话,就会让人想到,那我用 class 中本来的变量,加上一个 @transient lazy 去修饰可不可以,很遗憾,答案是不可以
那么,spark 半身的 logging 是怎么做的呢,看到这里, https://github.com/apache/spar… 为了避免代码版本变化,我拷贝一份
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark.internal import org.apache.log4j.{Level, LogManager, PropertyConfigurator} import org.slf4j.{Logger, LoggerFactory} import org.slf4j.impl.StaticLoggerBinder import org.apache.spark.util.Utils /** * Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows * logging messages at different levels using methods that only evaluate parameters lazily if the * log level is enabled. */ trait Logging { // Make the log field transient so that objects with Logging can // be serialized and used on another machine @transient private var log_ : Logger = null // Method to get the logger name for this object protected def logName = { // Ignore trailing $'s in the class names for Scala objects this.getClass.getName.stripSuffix("$") } // Method to get or create the logger for this object protected def log: Logger = { if (log_ == null) { initializeLogIfNecessary(false) log_ = LoggerFactory.getLogger(logName) } log_ } // Log methods that take only a String protected def logInfo(msg: => String) { if (log.isInfoEnabled) log.info(msg) } protected def logDebug(msg: => String) { if (log.isDebugEnabled) log.debug(msg) } protected def logTrace(msg: => String) { if (log.isTraceEnabled) log.trace(msg) } protected def logWarning(msg: => String) { if (log.isWarnEnabled) log.warn(msg) } protected def logError(msg: => String) { if (log.isErrorEnabled) log.error(msg) } // Log methods that take Throwables (Exceptions/Errors) too protected def logInfo(msg: => String, throwable: Throwable) { if (log.isInfoEnabled) log.info(msg, throwable) } protected def logDebug(msg: => String, throwable: Throwable) { if (log.isDebugEnabled) log.debug(msg, throwable) } protected def logTrace(msg: => String, throwable: Throwable) { if (log.isTraceEnabled) log.trace(msg, throwable) } protected def logWarning(msg: => String, throwable: Throwable) { if (log.isWarnEnabled) log.warn(msg, throwable) } protected def logError(msg: => String, throwable: Throwable) { if (log.isErrorEnabled) log.error(msg, throwable) } protected def isTraceEnabled(): Boolean = { log.isTraceEnabled } protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit = { if (!Logging.initialized) { Logging.initLock.synchronized { if (!Logging.initialized) { initializeLogging(isInterpreter) } } } } private def initializeLogging(isInterpreter: Boolean): Unit = { // Don't use a logger in here, as this is itself occurring during initialization of a logger // If Log4j 1.2 is being used, but is not initialized, load a default properties file val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr // This distinguishes the log4j 1.2 binding, currently // org.slf4j.impl.Log4jLoggerFactory, from the log4j 2.0 binding, currently // org.apache.logging.slf4j.Log4jLoggerFactory val usingLog4j12 = "org.slf4j.impl.Log4jLoggerFactory".equals(binderClass) if (usingLog4j12) { val log4j12Initialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements // scalastyle:off println if (!log4j12Initialized) { val defaultLogProps = "org/apache/spark/log4j-defaults.properties" Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match { case Some(url) => PropertyConfigurator.configure(url) System.err.println(s"Using Spark's default log4j profile: $defaultLogProps") case None => System.err.println(s"Spark was unable to load $defaultLogProps") } } if (isInterpreter) { // Use the repl's main class to define the default log level when running the shell, // overriding the root logger's config if they're different. val rootLogger = LogManager.getRootLogger() val replLogger = LogManager.getLogger(logName) val replLevel = Option(replLogger.getLevel()).getOrElse(Level.WARN) if (replLevel != rootLogger.getEffectiveLevel()) { System.err.printf("Setting default log level to \"%s\".\n", replLevel) System.err.println("To adjust logging level use sc.setLogLevel(newLevel). " + "For SparkR, use setLogLevel(newLevel).") rootLogger.setLevel(replLevel) } } // scalastyle:on println } Logging.initialized = true // Force a call into slf4j to initialize it. Avoids this happening from multiple threads // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html log } } private object Logging { @volatile private var initialized = false val initLock = new Object() try { // We use reflection here to handle the case where users remove the // slf4j-to-jul bridge order to route their logs to JUL. val bridgeClass = Utils.classForName("org.slf4j.bridge.SLF4JBridgeHandler") bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null) val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean] if (!installed) { bridgeClass.getMethod("install").invoke(null) } } catch { case e: ClassNotFoundException => // can't log anything yet so just fail silently } }
可以看到是用了一个 trait,用法上,参考 https://github.com/apache/spar… 的 class SparkContext(config: SparkConf) extends Logging 可以看到是用了 extends 的方式
我们也照猫画虎看看,写了一个 trait,但是没有写对应的 object,依然报错无法序列化,看到这里 http://stackoverflow.com/quest… 有一个例子
trait Loggable { val logger:Logger = Logging.getLogger(this) def checkFormat(msg:String, refs:Seq[Any]):String = if (refs.size > 0) msgfmtSeq(msg, refs) else msg def trace(msg:String, refs:Any*) = logger trace checkFormat(msg, refs) def trace(t:Throwable, msg:String, refs:Any*) = logger trace (checkFormat(msg, refs), t) def info(msg:String, refs:Any*) = logger info checkFormat(msg, refs) def info(t:Throwable, msg:String, refs:Any*) = logger info (checkFormat(msg, refs), t) def warn(msg:String, refs:Any*) = logger warn checkFormat(msg, refs) def warn(t:Throwable, msg:String, refs:Any*) = logger warn (checkFormat(msg, refs), t) def critical(msg:String, refs:Any*) = logger error checkFormat(msg, refs) def critical(t:Throwable, msg:String, refs:Any*) = logger error (checkFormat(msg, refs), t) } /** * Note: implementation taken from scalax.logging API */ object Logging { def loggerNameForClass(className: String) = { if (className endsWith "$") className.substring(0, className.length - 1) else className } def getLogger(logging: AnyRef) = LoggerFactory.getLogger(loggerNameForClass(logging.getClass.getName)) }
一开始写成这样
trait Loggable { @Transient private var log_ : Logger = null protected def log: Logger = { if (log_ == null) { log_ = LoggerFactory.getLogger(this.getClass.getName) } log_ } // Log methods that take only a String protected def logInfo(msg: => String) { if (log.isInfoEnabled) log.info(msg) } protected def logDebug(msg: => String) { if (log.isDebugEnabled) log.debug(msg) } protected def logTrace(msg: => String) { if (log.isTraceEnabled) log.trace(msg) } protected def logWarning(msg: => String) { if (log.isWarnEnabled) log.warn(msg) } protected def logError(msg: => String) { if (log.isErrorEnabled) log.error(msg) } // Log methods that take Throwables (Exceptions/Errors) too protected def logInfo(msg: => String, throwable: Throwable) { if (log.isInfoEnabled) log.info(msg, throwable) } protected def logDebug(msg: => String, throwable: Throwable) { if (log.isDebugEnabled) log.debug(msg, throwable) } protected def logTrace(msg: => String, throwable: Throwable) { if (log.isTraceEnabled) log.trace(msg, throwable) } protected def logWarning(msg: => String, throwable: Throwable) { if (log.isWarnEnabled) log.warn(msg, throwable) } protected def logError(msg: => String, throwable: Throwable) { if (log.isErrorEnabled) log.error(msg, throwable) } }
说无法序列化,以为是没有用 object,于是改成这样
trait Loggable { private var log : Logger = Logging.getLogger(this) // Log methods that take only a String protected def logInfo(msg: => String) { if (log.isInfoEnabled) log.info(msg) } protected def logDebug(msg: => String) { if (log.isDebugEnabled) log.debug(msg) } protected def logTrace(msg: => String) { if (log.isTraceEnabled) log.trace(msg) } protected def logWarning(msg: => String) { if (log.isWarnEnabled) log.warn(msg) } protected def logError(msg: => String) { if (log.isErrorEnabled) log.error(msg) } // Log methods that take Throwables (Exceptions/Errors) too protected def logInfo(msg: => String, throwable: Throwable) { if (log.isInfoEnabled) log.info(msg, throwable) } protected def logDebug(msg: => String, throwable: Throwable) { if (log.isDebugEnabled) log.debug(msg, throwable) } protected def logTrace(msg: => String, throwable: Throwable) { if (log.isTraceEnabled) log.trace(msg, throwable) } protected def logWarning(msg: => String, throwable: Throwable) { if (log.isWarnEnabled) log.warn(msg, throwable) } protected def logError(msg: => String, throwable: Throwable) { if (log.isErrorEnabled) log.error(msg, throwable) } } object Logging { def getLogger(logging: AnyRef) = { LoggerFactory.getLogger(logging.getClass.getName) } }
结果还是不行,于是索性直接照搬他的那个实现,果然也是跪