Mapreduce任務實現郵件監控

jopen 9年前發布 | 2K 次閱讀 PHP MapReduce

    這里主要使用Java自帶郵件類實現Mapreduce任務的監控,如果 Mapreduce任務報錯則發送報錯郵件。Mapreduce的報錯信息通過hdfs中的日志獲取,里面的報錯日志是json格式,這里先將json轉 換成xml格式然后再發送到郵件。具體代碼如下

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.URI;
import java.util.Properties;
import java.util.StringTokenizer;

import javax.mail.Authenticator; import javax.mail.Message; import javax.mail.MessagingException; import javax.mail.PasswordAuthentication; import javax.mail.Session; import javax.mail.Transport; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeMessage;

import net.sf.json.JSONArray; import net.sf.json.JSONObject; import net.sf.json.xml.XMLSerializer;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job;

public class Email {

private static final String USERNAME = "123456@qq.com";//發送郵件的用戶名
private static final String PASSWORD = "123456789";//發送郵件的用戶名對應的密碼
private static final String EMAIL_HOST = "smtp.qq.com";//郵件服務器host

public static void main(String args[]) {
    try {
        sendEmail("測試郵件", "測試郵件內容!", "test@qq.com");
        System.out.println("email ok !");
    } catch (MessagingException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

/**
 * @category 發送郵件方法,該方法實現發送Mapreduce任務報錯信息,具體的報錯信息通過hdfs的報錯日志獲取
 * @param to 目標郵箱(可以多個郵箱,用,號隔開)
 * @param job 通過mapreduce的job獲取jobID
 * @param time 通過時間戳訪問錯誤日志路徑
 * @throws Exception
 */
public static void sendErrMail(String to, Job job, String time)
        throws Exception {
    String subject = job.getJobName();
    String message = getErr(job, time);
    LoginMail lm = new LoginMail(USERNAME, PASSWORD);
    // 創建session
    Properties props = new Properties();
    props.put("mail.smtp.auth", "true");
    props.put("mail.smtp.host", EMAIL_HOST);
    Session session = Session.getDefaultInstance(props, lm);

    // 創建 message
    Message msg = new MimeMessage(session);

    // 設置發送源地址
    msg.setFrom(new InternetAddress(USERNAME));

    // 多用戶分解
    StringTokenizer st = new StringTokenizer(to, ",");
    String[] recipients = new String[st.countTokens()];
    int rc = 0;
    while (st.hasMoreTokens())
        recipients[rc++] = st.nextToken();
    InternetAddress[] addressTo = new InternetAddress[recipients.length];
    for (int i = 0; i < recipients.length; i++) {
        addressTo[i] = new InternetAddress(recipients[i]);
    }
    msg.setRecipients(Message.RecipientType.TO, addressTo);

    // 設置郵件主題并發送郵件
    msg.setSubject(subject);
    msg.setContent(message, "text/html;charset=utf-8");
    Transport.send(msg);
}

/**
 * @category 自定義主題內容發送,這里的郵件內容不一定是Mapreduce的,可以任意填寫
 * @param subject 主題
 * @param body 內容
 * @param to 目標郵箱
 * @throws MessagingException
 */
public static void sendEmail(String subject, String body, String to)
        throws MessagingException {
    LoginMail lm = new LoginMail(USERNAME, PASSWORD);
    // 創建session
    Properties props = new Properties();
    props.put("mail.smtp.auth", "true");
    props.put("mail.smtp.host", EMAIL_HOST);
    Session session = Session.getDefaultInstance(props, lm);

    // 創建 message
    Message msg = new MimeMessage(session);

    // 設置發送源地址
    msg.setFrom(new InternetAddress(USERNAME));

    // 多用戶分解
    StringTokenizer st = new StringTokenizer(to, ",");
    String[] recipients = new String[st.countTokens()];
    int rc = 0;
    while (st.hasMoreTokens())
        recipients[rc++] = st.nextToken();
    InternetAddress[] addressTo = new InternetAddress[recipients.length];
    for (int i = 0; i < recipients.length; i++) {
        addressTo[i] = new InternetAddress(recipients[i]);
    }
    msg.setRecipients(Message.RecipientType.TO, addressTo);

    // 設置郵件主題并發送郵件
    msg.setSubject(subject);
    msg.setContent(body, "text/html;charset=utf-8");
    Transport.send(msg);

}

/**
 * @category 獲取日志文件
 * @param job
 * @param time
 * @return FSDataInputStream
 * @throws IOException
 */
public static FSDataInputStream getFile(Job job, String time)
        throws IOException {
    String year = time.substring(0, 4);
    String month = time.substring(4, 6);
    String day = time.substring(6, 8);
    String dst = "hdfs://192.168.1.100:9000/tmp/hadoop-yarn/staging/history/done/"
            + year + "/" + month + "/" + day + "/000000";
    FileSystem fs = FileSystem.get(URI.create(dst), new Configuration());
    FileStatus[] status = fs.listStatus(new Path(dst));
    FSDataInputStream in = null;
    for (int i = 0; i < status.length; i++) {
        if (status[i].getPath().getName()
                .contains(job.getJobID().toString())
                && status[i].getPath().getName().endsWith("jhist")) {
            in = new FSDataInputStream(fs.open(status[i].getPath()));
        }
    }
    return in;
}

/**
 * @category 解析文件類容為xml
 * @param job
 * @param time
 * @return xml
 * @throws IOException
 * @throws InterruptedException
 */
public static String getErr(Job job, String time) throws IOException,
        InterruptedException {
    FSDataInputStream in = getFile(job, time);
    Thread t1 = new Thread();
    while (in == null) {
        t1.sleep(20000);//由于hdfs每個job的日志不是實時生成,所以需要每隔20秒檢查一次hdfs該job日志是否已生成
        t1.join();
        in = getFile(job, time);
    }
    BufferedReader br = new BufferedReader(new InputStreamReader(in));

    String line = "";
    JSONObject jo;
    JSONArray jsa = new JSONArray();
    String xml = "";
    XMLSerializer xmlSerializer = new XMLSerializer();
    while ((line = br.readLine()) != null) {
        if (line.toUpperCase().indexOf("error".toUpperCase()) > -1) {
            jo = JSONObject.fromObject(line);
            jsa.add(jo);
        }
    }
    xml = xmlSerializer.write(jsa);
    in.close();
    br.close();
    return xml;

}

/**
 * @category 獲取try-catch中的異常內容
 * @param e Exception
 * @return 異常內容
 */
public static String getException(Exception e) {
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    PrintStream pout = new PrintStream(out);
    e.printStackTrace(pout);
    String ret = new String(out.toByteArray());
    pout.close();
    try {
        out.close();
    } catch (Exception ex) {
    }
    return ret;
}

}

class LoginMail extends Authenticator {

private String username;
private String password;

public String getUsername() {
    return username;
}

public void setUsername(String username) {
    this.username = username;
}

public String getPassword() {
    return password;
}

public void setPassword(String password) {
    this.password = password;
}

@Override
protected PasswordAuthentication getPasswordAuthentication() {
    return new PasswordAuthentication(username, password);
}

public LoginMail(String username, String password) {
    this.username = username;
    this.password = password;
}

}</pre> 來自:http://my.oschina.net/mkh/blog/493885

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!