spark 中的日志

在打包一个 spark streaming 工程到 yarn 上跑的时候,发现自己的 log4j.properties 没有被读取

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
log4j: Trying to find [log4j.xml] using context classloader org.apache.spark.util.MutableURLClassLoader@70ad2036.
log4j: Trying to find [log4j.xml] using sun.misc.Launcher$AppClassLoader@5561bfa3 class loader.
log4j: Trying to find [log4j.xml] using ClassLoader.getSystemResource().
log4j: Trying to find [log4j.properties] using context classloader org.apache.spark.util.MutableURLClassLoader@70ad2036.
log4j: Using URL [file:/etc/spark/conf.cloudera.spark_on_yarn/log4j.properties] for automatic log4j configuration.
log4j: Reading configuration from URL file:/etc/spark/conf.cloudera.spark_on_yarn/log4j.properties

而其实我的 classpath 中是有一个 log4j.properties 文件的

http://stackoverflow.com/quest… 这里有人遇到同样的问题,答案说,

In 1.4.1, MutableURLClassLoader is not set before log4j initialization like it is in 1.3.1.

Here is the explanation:

While parsing arguments in in SparkSubmit.scala, it uses spark.util.Utils. This object has a new static dependency on log4j, through ShutdownHookManager, that triggers it’s initialization before the call to setContextClassLoader(MutableURLClassLoader) from submit > doRunMain > runMain

同时给出一个 issue 地址, https://issues.apache.org/jira… 看下来就是说 classpath 的优先级被抢了,那么试试在 spark-submit 脚本中的 –jars 第一个贴上自己的 jar 包,然而发现这并没有什么卵用

http://apache-spark-user-list…. 这里有另外一个帖子,说是在 by adding it to SPARK_CLASSPATH in spark-env.sh 就可以,于是想到在 spark-submit 中配应该也可以,于是在加上这个 –driver-class-path “xxx.jar” 发现在 driver 节点是可以了,但是在 executor 节点依然不行

先不管这个,但是发现在算子函数中,使用 logger 报错无法序列化,又是老问题,看到这里, http://stackoverflow.com/quest…

object Holder extends Serializable {      
   @transient lazy val log = Logger.getLogger(getClass.getName)    
}


val someRdd = spark.parallelize(List(1, 2, 3)).foreach { element =>
   Holder.log.info(element)
}

这种方式确实可以,但是就丢失了来源信息,也就是说,每行日志都是归到这个 object 的名下的

但是,为什么这种方式可以呢,transient 又是个什么鬼, http://www.cnblogs.com/lanxuez… 有讨论,但是感觉说的是 Java 的关键字,而不是 scala 的修饰符,所以还有点问题, http://stackoverflow.com/quest… 也有抛出问题,但是没有答案, http://fdahms.com/2015/10/14/s… 这里的讨论就差不多靠点边了

首先,用 @transient lazy val 这种方式修饰的变量,是不被序列化的,而且,不需要,也不会被重复初始化,那么他的值从哪里来呢,我理解是从已经有的对象中来。也就是说,spark 本身开了一个到日志文件的 IO 流,我们通过这种方式,可以蹭他的用,不过,另外一种理解就是,虽然他没有被序列化,但是也并不是蹭 spark 的用,而是自己在每个 jvm 中,首次用到这个类的时候,自己开了一个 IO 流,具体的实现,没有看到明确的资料

不过如果这种用外援的 object 方式可以的话,就会让人想到,那我用 class 中本来的变量,加上一个 @transient lazy 去修饰可不可以,很遗憾,答案是不可以

那么,spark 半身的 logging 是怎么做的呢,看到这里, https://github.com/apache/spar… 为了避免代码版本变化,我拷贝一份

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.internal

import org.apache.log4j.{Level, LogManager, PropertyConfigurator}
import org.slf4j.{Logger, LoggerFactory}
import org.slf4j.impl.StaticLoggerBinder

import org.apache.spark.util.Utils

/**
 * Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
 * logging messages at different levels using methods that only evaluate parameters lazily if the
 * log level is enabled.
 */
trait Logging {

  // Make the log field transient so that objects with Logging can
  // be serialized and used on another machine
  @transient private var log_ : Logger = null

  // Method to get the logger name for this object
  protected def logName = {
    // Ignore trailing $'s in the class names for Scala objects
    this.getClass.getName.stripSuffix("$")
  }

  // Method to get or create the logger for this object
  protected def log: Logger = {
    if (log_ == null) {
      initializeLogIfNecessary(false)
      log_ = LoggerFactory.getLogger(logName)
    }
    log_
  }

  // Log methods that take only a String
  protected def logInfo(msg: => String) {
    if (log.isInfoEnabled) log.info(msg)
  }

  protected def logDebug(msg: => String) {
    if (log.isDebugEnabled) log.debug(msg)
  }

  protected def logTrace(msg: => String) {
    if (log.isTraceEnabled) log.trace(msg)
  }

  protected def logWarning(msg: => String) {
    if (log.isWarnEnabled) log.warn(msg)
  }

  protected def logError(msg: => String) {
    if (log.isErrorEnabled) log.error(msg)
  }

  // Log methods that take Throwables (Exceptions/Errors) too
  protected def logInfo(msg: => String, throwable: Throwable) {
    if (log.isInfoEnabled) log.info(msg, throwable)
  }

  protected def logDebug(msg: => String, throwable: Throwable) {
    if (log.isDebugEnabled) log.debug(msg, throwable)
  }

  protected def logTrace(msg: => String, throwable: Throwable) {
    if (log.isTraceEnabled) log.trace(msg, throwable)
  }

  protected def logWarning(msg: => String, throwable: Throwable) {
    if (log.isWarnEnabled) log.warn(msg, throwable)
  }

  protected def logError(msg: => String, throwable: Throwable) {
    if (log.isErrorEnabled) log.error(msg, throwable)
  }

  protected def isTraceEnabled(): Boolean = {
    log.isTraceEnabled
  }

  protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit = {
    if (!Logging.initialized) {
      Logging.initLock.synchronized {
        if (!Logging.initialized) {
          initializeLogging(isInterpreter)
        }
      }
    }
  }

  private def initializeLogging(isInterpreter: Boolean): Unit = {
    // Don't use a logger in here, as this is itself occurring during initialization of a logger
    // If Log4j 1.2 is being used, but is not initialized, load a default properties file
    val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr
    // This distinguishes the log4j 1.2 binding, currently
    // org.slf4j.impl.Log4jLoggerFactory, from the log4j 2.0 binding, currently
    // org.apache.logging.slf4j.Log4jLoggerFactory
    val usingLog4j12 = "org.slf4j.impl.Log4jLoggerFactory".equals(binderClass)
    if (usingLog4j12) {
      val log4j12Initialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
      // scalastyle:off println
      if (!log4j12Initialized) {
        val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
        Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
          case Some(url) =>
            PropertyConfigurator.configure(url)
            System.err.println(s"Using Spark's default log4j profile: $defaultLogProps")
          case None =>
            System.err.println(s"Spark was unable to load $defaultLogProps")
        }
      }

      if (isInterpreter) {
        // Use the repl's main class to define the default log level when running the shell,
        // overriding the root logger's config if they're different.
        val rootLogger = LogManager.getRootLogger()
        val replLogger = LogManager.getLogger(logName)
        val replLevel = Option(replLogger.getLevel()).getOrElse(Level.WARN)
        if (replLevel != rootLogger.getEffectiveLevel()) {
          System.err.printf("Setting default log level to \"%s\".\n", replLevel)
          System.err.println("To adjust logging level use sc.setLogLevel(newLevel). " +
            "For SparkR, use setLogLevel(newLevel).")
          rootLogger.setLevel(replLevel)
        }
      }
      // scalastyle:on println
    }
    Logging.initialized = true

    // Force a call into slf4j to initialize it. Avoids this happening from multiple threads
    // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
    log
  }
}

private object Logging {
  @volatile private var initialized = false
  val initLock = new Object()
  try {
    // We use reflection here to handle the case where users remove the
    // slf4j-to-jul bridge order to route their logs to JUL.
    val bridgeClass = Utils.classForName("org.slf4j.bridge.SLF4JBridgeHandler")
    bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
    val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
    if (!installed) {
      bridgeClass.getMethod("install").invoke(null)
    }
  } catch {
    case e: ClassNotFoundException => // can't log anything yet so just fail silently
  }
}

可以看到是用了一个 trait,用法上,参考 https://github.com/apache/spar… 的 class SparkContext(config: SparkConf) extends Logging 可以看到是用了 extends 的方式

我们也照猫画虎看看,写了一个 trait,但是没有写对应的 object,依然报错无法序列化,看到这里 http://stackoverflow.com/quest… 有一个例子

trait Loggable {

    val logger:Logger = Logging.getLogger(this)

    def checkFormat(msg:String, refs:Seq[Any]):String =
        if (refs.size > 0) msgfmtSeq(msg, refs) else msg 

    def trace(msg:String, refs:Any*) = logger trace checkFormat(msg, refs)

    def trace(t:Throwable, msg:String, refs:Any*) = logger trace (checkFormat(msg, refs), t)

    def info(msg:String, refs:Any*) = logger info checkFormat(msg, refs)

    def info(t:Throwable, msg:String, refs:Any*) = logger info (checkFormat(msg, refs), t)

    def warn(msg:String, refs:Any*) = logger warn checkFormat(msg, refs)

    def warn(t:Throwable, msg:String, refs:Any*) = logger warn (checkFormat(msg, refs), t)

    def critical(msg:String, refs:Any*) = logger error checkFormat(msg, refs)

    def critical(t:Throwable, msg:String, refs:Any*) = logger error (checkFormat(msg, refs), t)

}

/**
 * Note: implementation taken from scalax.logging API
 */
object Logging {  

    def loggerNameForClass(className: String) = {  
        if (className endsWith "$") className.substring(0, className.length - 1)  
        else className  
    }  

    def getLogger(logging: AnyRef) = LoggerFactory.getLogger(loggerNameForClass(logging.getClass.getName))  
}

一开始写成这样

trait Loggable {
  @Transient private var log_ : Logger = null

  protected def log: Logger = {
    if (log_ == null) {
      log_ = LoggerFactory.getLogger(this.getClass.getName)
    }
    log_
  }

  // Log methods that take only a String
  protected def logInfo(msg: => String) {
    if (log.isInfoEnabled) log.info(msg)
  }

  protected def logDebug(msg: => String) {
    if (log.isDebugEnabled) log.debug(msg)
  }

  protected def logTrace(msg: => String) {
    if (log.isTraceEnabled) log.trace(msg)
  }

  protected def logWarning(msg: => String) {
    if (log.isWarnEnabled) log.warn(msg)
  }

  protected def logError(msg: => String) {
    if (log.isErrorEnabled) log.error(msg)
  }

  // Log methods that take Throwables (Exceptions/Errors) too
  protected def logInfo(msg: => String, throwable: Throwable) {
    if (log.isInfoEnabled) log.info(msg, throwable)
  }

  protected def logDebug(msg: => String, throwable: Throwable) {
    if (log.isDebugEnabled) log.debug(msg, throwable)
  }

  protected def logTrace(msg: => String, throwable: Throwable) {
    if (log.isTraceEnabled) log.trace(msg, throwable)
  }

  protected def logWarning(msg: => String, throwable: Throwable) {
    if (log.isWarnEnabled) log.warn(msg, throwable)
  }

  protected def logError(msg: => String, throwable: Throwable) {
    if (log.isErrorEnabled) log.error(msg, throwable)
  }
}

说无法序列化,以为是没有用 object,于是改成这样

trait Loggable {
  private var log : Logger = Logging.getLogger(this)

  // Log methods that take only a String
  protected def logInfo(msg: => String) {
    if (log.isInfoEnabled) log.info(msg)
  }

  protected def logDebug(msg: => String) {
    if (log.isDebugEnabled) log.debug(msg)
  }

  protected def logTrace(msg: => String) {
    if (log.isTraceEnabled) log.trace(msg)
  }

  protected def logWarning(msg: => String) {
    if (log.isWarnEnabled) log.warn(msg)
  }

  protected def logError(msg: => String) {
    if (log.isErrorEnabled) log.error(msg)
  }

  // Log methods that take Throwables (Exceptions/Errors) too
  protected def logInfo(msg: => String, throwable: Throwable) {
    if (log.isInfoEnabled) log.info(msg, throwable)
  }

  protected def logDebug(msg: => String, throwable: Throwable) {
    if (log.isDebugEnabled) log.debug(msg, throwable)
  }

  protected def logTrace(msg: => String, throwable: Throwable) {
    if (log.isTraceEnabled) log.trace(msg, throwable)
  }

  protected def logWarning(msg: => String, throwable: Throwable) {
    if (log.isWarnEnabled) log.warn(msg, throwable)
  }

  protected def logError(msg: => String, throwable: Throwable) {
    if (log.isErrorEnabled) log.error(msg, throwable)
  }
}

object Logging {
  def getLogger(logging: AnyRef) = {
    LoggerFactory.getLogger(logging.getClass.getName)
  }
}

结果还是不行,于是索性直接照搬他的那个实现,果然也是跪

Leave a Reply

Your email address will not be published. Required fields are marked *