在打包一个 spark streaming 工程到 yarn 上跑的时候,发现自己的 log4j.properties 没有被读取
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] log4j: Trying to find [log4j.xml] using context classloader org.apache.spark.util.MutableURLClassLoader@70ad2036. log4j: Trying to find [log4j.xml] using sun.misc.Launcher$AppClassLoader@5561bfa3 class loader. log4j: Trying to find [log4j.xml] using ClassLoader.getSystemResource(). log4j: Trying to find [log4j.properties] using context classloader org.apache.spark.util.MutableURLClassLoader@70ad2036. log4j: Using URL [file:/etc/spark/conf.cloudera.spark_on_yarn/log4j.properties] for automatic log4j configuration. log4j: Reading configuration from URL file:/etc/spark/conf.cloudera.spark_on_yarn/log4j.properties
而其实我的 classpath 中是有一个 log4j.properties 文件的
http://stackoverflow.com/quest… 这里有人遇到同样的问题,答案说,
In 1.4.1, MutableURLClassLoader is not set before log4j initialization like it is in 1.3.1.
Here is the explanation:
While parsing arguments in in SparkSubmit.scala, it uses spark.util.Utils. This object has a new static dependency on log4j, through ShutdownHookManager, that triggers it’s initialization before the call to setContextClassLoader(MutableURLClassLoader) from submit > doRunMain > runMain
同时给出一个 issue 地址, https://issues.apache.org/jira… 看下来就是说 classpath 的优先级被抢了,那么试试在 spark-submit 脚本中的 –jars 第一个贴上自己的 jar 包,然而发现这并没有什么卵用
http://apache-spark-user-list…. 这里有另外一个帖子,说是在 by adding it to SPARK_CLASSPATH in spark-env.sh 就可以,于是想到在 spark-submit 中配应该也可以,于是在加上这个 –driver-class-path “xxx.jar” 发现在 driver 节点是可以了,但是在 executor 节点依然不行
先不管这个,但是发现在算子函数中,使用 logger 报错无法序列化,又是老问题,看到这里, http://stackoverflow.com/quest…
object Holder extends Serializable {
@transient lazy val log = Logger.getLogger(getClass.getName)
}
val someRdd = spark.parallelize(List(1, 2, 3)).foreach { element =>
Holder.log.info(element)
}
这种方式确实可以,但是就丢失了来源信息,也就是说,每行日志都是归到这个 object 的名下的
但是,为什么这种方式可以呢,transient 又是个什么鬼, http://www.cnblogs.com/lanxuez… 有讨论,但是感觉说的是 Java 的关键字,而不是 scala 的修饰符,所以还有点问题, http://stackoverflow.com/quest… 也有抛出问题,但是没有答案, http://fdahms.com/2015/10/14/s… 这里的讨论就差不多靠点边了
首先,用 @transient lazy val 这种方式修饰的变量,是不被序列化的,而且,不需要,也不会被重复初始化,那么他的值从哪里来呢,我理解是从已经有的对象中来。也就是说,spark 本身开了一个到日志文件的 IO 流,我们通过这种方式,可以蹭他的用,不过,另外一种理解就是,虽然他没有被序列化,但是也并不是蹭 spark 的用,而是自己在每个 jvm 中,首次用到这个类的时候,自己开了一个 IO 流,具体的实现,没有看到明确的资料
不过如果这种用外援的 object 方式可以的话,就会让人想到,那我用 class 中本来的变量,加上一个 @transient lazy 去修饰可不可以,很遗憾,答案是不可以
那么,spark 半身的 logging 是怎么做的呢,看到这里, https://github.com/apache/spar… 为了避免代码版本变化,我拷贝一份
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.internal
import org.apache.log4j.{Level, LogManager, PropertyConfigurator}
import org.slf4j.{Logger, LoggerFactory}
import org.slf4j.impl.StaticLoggerBinder
import org.apache.spark.util.Utils
/**
* Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
* logging messages at different levels using methods that only evaluate parameters lazily if the
* log level is enabled.
*/
trait Logging {
// Make the log field transient so that objects with Logging can
// be serialized and used on another machine
@transient private var log_ : Logger = null
// Method to get the logger name for this object
protected def logName = {
// Ignore trailing $'s in the class names for Scala objects
this.getClass.getName.stripSuffix("$")
}
// Method to get or create the logger for this object
protected def log: Logger = {
if (log_ == null) {
initializeLogIfNecessary(false)
log_ = LoggerFactory.getLogger(logName)
}
log_
}
// Log methods that take only a String
protected def logInfo(msg: => String) {
if (log.isInfoEnabled) log.info(msg)
}
protected def logDebug(msg: => String) {
if (log.isDebugEnabled) log.debug(msg)
}
protected def logTrace(msg: => String) {
if (log.isTraceEnabled) log.trace(msg)
}
protected def logWarning(msg: => String) {
if (log.isWarnEnabled) log.warn(msg)
}
protected def logError(msg: => String) {
if (log.isErrorEnabled) log.error(msg)
}
// Log methods that take Throwables (Exceptions/Errors) too
protected def logInfo(msg: => String, throwable: Throwable) {
if (log.isInfoEnabled) log.info(msg, throwable)
}
protected def logDebug(msg: => String, throwable: Throwable) {
if (log.isDebugEnabled) log.debug(msg, throwable)
}
protected def logTrace(msg: => String, throwable: Throwable) {
if (log.isTraceEnabled) log.trace(msg, throwable)
}
protected def logWarning(msg: => String, throwable: Throwable) {
if (log.isWarnEnabled) log.warn(msg, throwable)
}
protected def logError(msg: => String, throwable: Throwable) {
if (log.isErrorEnabled) log.error(msg, throwable)
}
protected def isTraceEnabled(): Boolean = {
log.isTraceEnabled
}
protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit = {
if (!Logging.initialized) {
Logging.initLock.synchronized {
if (!Logging.initialized) {
initializeLogging(isInterpreter)
}
}
}
}
private def initializeLogging(isInterpreter: Boolean): Unit = {
// Don't use a logger in here, as this is itself occurring during initialization of a logger
// If Log4j 1.2 is being used, but is not initialized, load a default properties file
val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr
// This distinguishes the log4j 1.2 binding, currently
// org.slf4j.impl.Log4jLoggerFactory, from the log4j 2.0 binding, currently
// org.apache.logging.slf4j.Log4jLoggerFactory
val usingLog4j12 = "org.slf4j.impl.Log4jLoggerFactory".equals(binderClass)
if (usingLog4j12) {
val log4j12Initialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
// scalastyle:off println
if (!log4j12Initialized) {
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
case Some(url) =>
PropertyConfigurator.configure(url)
System.err.println(s"Using Spark's default log4j profile: $defaultLogProps")
case None =>
System.err.println(s"Spark was unable to load $defaultLogProps")
}
}
if (isInterpreter) {
// Use the repl's main class to define the default log level when running the shell,
// overriding the root logger's config if they're different.
val rootLogger = LogManager.getRootLogger()
val replLogger = LogManager.getLogger(logName)
val replLevel = Option(replLogger.getLevel()).getOrElse(Level.WARN)
if (replLevel != rootLogger.getEffectiveLevel()) {
System.err.printf("Setting default log level to \"%s\".\n", replLevel)
System.err.println("To adjust logging level use sc.setLogLevel(newLevel). " +
"For SparkR, use setLogLevel(newLevel).")
rootLogger.setLevel(replLevel)
}
}
// scalastyle:on println
}
Logging.initialized = true
// Force a call into slf4j to initialize it. Avoids this happening from multiple threads
// and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
log
}
}
private object Logging {
@volatile private var initialized = false
val initLock = new Object()
try {
// We use reflection here to handle the case where users remove the
// slf4j-to-jul bridge order to route their logs to JUL.
val bridgeClass = Utils.classForName("org.slf4j.bridge.SLF4JBridgeHandler")
bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
if (!installed) {
bridgeClass.getMethod("install").invoke(null)
}
} catch {
case e: ClassNotFoundException => // can't log anything yet so just fail silently
}
}
可以看到是用了一个 trait,用法上,参考 https://github.com/apache/spar… 的 class SparkContext(config: SparkConf) extends Logging 可以看到是用了 extends 的方式
我们也照猫画虎看看,写了一个 trait,但是没有写对应的 object,依然报错无法序列化,看到这里 http://stackoverflow.com/quest… 有一个例子
trait Loggable {
val logger:Logger = Logging.getLogger(this)
def checkFormat(msg:String, refs:Seq[Any]):String =
if (refs.size > 0) msgfmtSeq(msg, refs) else msg
def trace(msg:String, refs:Any*) = logger trace checkFormat(msg, refs)
def trace(t:Throwable, msg:String, refs:Any*) = logger trace (checkFormat(msg, refs), t)
def info(msg:String, refs:Any*) = logger info checkFormat(msg, refs)
def info(t:Throwable, msg:String, refs:Any*) = logger info (checkFormat(msg, refs), t)
def warn(msg:String, refs:Any*) = logger warn checkFormat(msg, refs)
def warn(t:Throwable, msg:String, refs:Any*) = logger warn (checkFormat(msg, refs), t)
def critical(msg:String, refs:Any*) = logger error checkFormat(msg, refs)
def critical(t:Throwable, msg:String, refs:Any*) = logger error (checkFormat(msg, refs), t)
}
/**
* Note: implementation taken from scalax.logging API
*/
object Logging {
def loggerNameForClass(className: String) = {
if (className endsWith "$") className.substring(0, className.length - 1)
else className
}
def getLogger(logging: AnyRef) = LoggerFactory.getLogger(loggerNameForClass(logging.getClass.getName))
}
一开始写成这样
trait Loggable {
@Transient private var log_ : Logger = null
protected def log: Logger = {
if (log_ == null) {
log_ = LoggerFactory.getLogger(this.getClass.getName)
}
log_
}
// Log methods that take only a String
protected def logInfo(msg: => String) {
if (log.isInfoEnabled) log.info(msg)
}
protected def logDebug(msg: => String) {
if (log.isDebugEnabled) log.debug(msg)
}
protected def logTrace(msg: => String) {
if (log.isTraceEnabled) log.trace(msg)
}
protected def logWarning(msg: => String) {
if (log.isWarnEnabled) log.warn(msg)
}
protected def logError(msg: => String) {
if (log.isErrorEnabled) log.error(msg)
}
// Log methods that take Throwables (Exceptions/Errors) too
protected def logInfo(msg: => String, throwable: Throwable) {
if (log.isInfoEnabled) log.info(msg, throwable)
}
protected def logDebug(msg: => String, throwable: Throwable) {
if (log.isDebugEnabled) log.debug(msg, throwable)
}
protected def logTrace(msg: => String, throwable: Throwable) {
if (log.isTraceEnabled) log.trace(msg, throwable)
}
protected def logWarning(msg: => String, throwable: Throwable) {
if (log.isWarnEnabled) log.warn(msg, throwable)
}
protected def logError(msg: => String, throwable: Throwable) {
if (log.isErrorEnabled) log.error(msg, throwable)
}
}
说无法序列化,以为是没有用 object,于是改成这样
trait Loggable {
private var log : Logger = Logging.getLogger(this)
// Log methods that take only a String
protected def logInfo(msg: => String) {
if (log.isInfoEnabled) log.info(msg)
}
protected def logDebug(msg: => String) {
if (log.isDebugEnabled) log.debug(msg)
}
protected def logTrace(msg: => String) {
if (log.isTraceEnabled) log.trace(msg)
}
protected def logWarning(msg: => String) {
if (log.isWarnEnabled) log.warn(msg)
}
protected def logError(msg: => String) {
if (log.isErrorEnabled) log.error(msg)
}
// Log methods that take Throwables (Exceptions/Errors) too
protected def logInfo(msg: => String, throwable: Throwable) {
if (log.isInfoEnabled) log.info(msg, throwable)
}
protected def logDebug(msg: => String, throwable: Throwable) {
if (log.isDebugEnabled) log.debug(msg, throwable)
}
protected def logTrace(msg: => String, throwable: Throwable) {
if (log.isTraceEnabled) log.trace(msg, throwable)
}
protected def logWarning(msg: => String, throwable: Throwable) {
if (log.isWarnEnabled) log.warn(msg, throwable)
}
protected def logError(msg: => String, throwable: Throwable) {
if (log.isErrorEnabled) log.error(msg, throwable)
}
}
object Logging {
def getLogger(logging: AnyRef) = {
LoggerFactory.getLogger(logging.getClass.getName)
}
}
结果还是不行,于是索性直接照搬他的那个实现,果然也是跪