通过自定义的InputFormat处理多行错误日志
我想通过自定义的InputFormat来处理多行错误日志,主要策略是读入时对其进行预处理,把多行错误日志归并为一行,然后通过hive的RegexSerDe来处理,以下是自定义类的代码:
import java.io.IOException; import java.nio.charset.Charset; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.RecordReader; import com.xh.xhdatawarehouse.hive.plugin.SeekableLineReader; public class ErrLogRecordReader implements RecordReader<LongWritable, Text> { private static final Log LOG = LogFactory.getLog(ErrLogRecordReader.class.getName()); private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private SeekableLineReader lineReader; int maxLineLength; public ErrLogRecordReader(FileSplit inputSplit, Configuration job) throws IOException { this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = inputSplit.getStart(); end = start + inputSplit.getLength(); final Path file = inputSplit.getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); // Open file and seek to the start of the split FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(file); //InputStreamReader read = new InputStreamReader (new FileInputStream(f),"UTF-8"); boolean skipFirstLine = false; //if (codec != null) { // lineReader = new BackLineReader(codec.createInputStream(fileIn), Charset.forName("GBK"), job); // end = Long.MAX_VALUE; //} else { if (start != 0) { skipFirstLine = true; --start; fileIn.seek(start); } lineReader = new SeekableLineReader(fileIn, Charset.forName("UTF-8"), job); //} if (skipFirstLine) { start += lineReader.readLine(new Text(), 0, (int)Math.min((long)Integer.MAX_VALUE, end - start)); } this.pos = start; } public ErrLogRecordReader(FSDataInputStream in, long offset, long endOffset, int maxLineLength) { this.maxLineLength = maxLineLength; this.lineReader = new SeekableLineReader(in); this.start = offset; this.pos = offset; this.end = endOffset; } public ErrLogRecordReader(FSDataInputStream in, long offset, long endOffset, Configuration job) throws IOException { this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); this.lineReader = new SeekableLineReader(in, Charset.forName("UTF-8"), job); this.start = offset; this.pos = offset; this.end = endOffset; } public LongWritable createKey() { return new LongWritable(); } public Text createValue() { return new Text(); } /** * Reads the next record in the split. All instances of t, n and rn are replaced by a space. * @param key key of the record which will map to the byte offset of the record's line * @param value the record in text format * @return true if a record existed, false otherwise * @throws IOException */ public synchronized boolean next(LongWritable key, Text value) throws IOException { String reStart = "d{4}-d{2}-d{2}s[^ ]+s([WARN]|[ERROR]|[INFO])s[^ ]+s[^ ]+s-s.*"; if (pos < end) { key.set(pos); System.out.println("key : "+key.get()); int newSize = lineReader.readLine(value, maxLineLength, Math.max((int)Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); String sstr = value.toString(); if(sstr.matches(reStart)){ pos += newSize; //读下一行,直到异常的结尾行 while (pos < end) { int nextLineSize = lineReader.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); String nextLine = value.toString(); if (nextLineSize == 0) { break; } if (!nextLine.matches(reStart)) { pos += nextLineSize; newSize += nextLineSize; sstr = sstr.replaceAll("rn", "#"); sstr += "#"+nextLine; System.out.println("pos = "+pos); }else{ System.out.println("break..."); System.out.println("pos:"+pos); lineReader.seek(pos); System.out.println("position: "+getPos()); //System.out.println("position: "+lineReader.getPos());*/ break; } } value.set(sstr); if (newSize < maxLineLength) return true; } // line too long. try again LOG.info("Skipped line of size " + newSize + " at position " + (pos - newSize)); } return false; } public float getProgress() { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float)(end - start)); } } public synchronized long getPos() throws IOException { return pos; } public synchronized void close() throws IOException { if (lineReader != null) lineReader.close(); } }
import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.Charset; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.io.Text; public class SeekableLineReader extends InputStreamReader{ private static final byte DEFAULT_ESCAPE_CHARACTER = ''; private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; private int bufferSize = DEFAULT_BUFFER_SIZE; private FSDataInputStream in; private byte[] buffer; // the number of bytes in the real buffer private int bufferLength = 0; // the current position of the buffer private int bufferPos = 0; private byte escapeChar; private static final byte CR = 'r'; private static final byte LF = 'n'; /** * Create a multi-line reader that reads from the given stream using the * default buffer-size (64K). * @param in The input stream * @throws IOException */ public SeekableLineReader(FSDataInputStream in, byte escapeChar) { super(in); this.BackUpLineReader(in, DEFAULT_BUFFER_SIZE, escapeChar); } public SeekableLineReader(FSDataInputStream in) { super(in); this.BackUpLineReader(in, DEFAULT_BUFFER_SIZE, DEFAULT_ESCAPE_CHARACTER); } /** * Create a multi-line reader that reads from the given stream using the * given buffer-size. * @param in The input stream * @param bufferSize Size of the read buffer * @throws IOException */ public void BackUpLineReader(FSDataInputStream in, int bufferSize, byte escapeChar) { this.escapeChar = escapeChar; this.in = in; this.bufferSize = bufferSize; this.buffer = new byte[this.bufferSize]; } /** * Create a multi-line reader that reads from the given stream using the * <code>io.file.buffer.size</code> specified in the given * <code>Configuration</code>. * @param in input stream * @param conf configuration * @throws IOException */ public SeekableLineReader(FSDataInputStream in, Charset cs, Configuration conf) throws IOException { super(in, cs); this.BackUpLineReader(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE), DEFAULT_ESCAPE_CHARACTER); } public SeekableLineReader(FSDataInputStream in, Charset cs, Configuration conf, byte escapeChar) throws IOException { super(in, cs); this.BackUpLineReader(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE), escapeChar); } /** * Close the underlying stream. * @throws IOException */ public void close() throws IOException { in.close(); } /** * Read one line from the InputStream into the given Text. A line * can be terminated by one of the following: 'n' (LF), 'r' (CR), * or 'rn' (CR+LF). Will ignore any of these characters if they * are proceeded by a (e.g. n). EOF also terminates an otherwise * unterminated line. * * @param str the object to store the given line (without the newline) * @param maxLineLength the maximum number of bytes to store into str; * the rest will be silently discarded. * @param maxBytesToConsume the maximum number of bytes to consume in * this call. This is only a hint, because if the line crosses this * threshold, we allow it to happen. It can overshoot potentially by * as much as one buffer length. * * @return the number of bytes read including the (longest) newline * found * * @throws IOException */ public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { /* We're reading data from in, but the head of the stream may be * already buffered in buffer, so we have several cases: * 1. No newline characters are in the buffer, so we need to copy * everything and read another buffer from the stream. * 2. An unambiguously terminated line is in buffer, so we just * copy to str. * 3. Ambiguously terminated line is in buffer, i.e. buffer ends * in CR. In this case we copy everything up to CR to str, but * we also need to see what follows CR: if it's LF, then we * need consume LF as well, so next call to readLine will read * from after that. * We use a flag prevCharCR to signal if previous character was CR * and, if it happens to be at the end of the buffer, delay * consuming it until we have a chance to look at the char that * follows. */ str.clear(); int txtLength = 0; // tracks str.getLength() as an optimization int newLineLength = 0; // length of the terminating newline boolean prevCharCR = false; // true if prev char was r long bytesConsumed = 0; do { int startPos = bufferPos; // starting from where we left off if (bufferPos >= bufferLength) { startPos = bufferPos = 0; if (prevCharCR) ++bytesConsumed; // account for CR from previous read bufferLength = in.read(buffer); if (bufferLength <= 0) break; // EOF } for (; bufferPos < bufferLength; ++bufferPos) { boolean escaped = false; if (bufferPos > 0) escaped = buffer[bufferPos-1] == escapeChar; if (buffer[bufferPos] == LF && !escaped) { newLineLength = prevCharCR ? 2 : 1; ++bufferPos; // at next loop proceed from following byte break; } if (prevCharCR) { // CR + notLF, we are at notLF newLineLength = 1; break; } prevCharCR = (buffer[bufferPos] == CR && !escaped); } int readLength = bufferPos - startPos; if (prevCharCR && newLineLength == 0) --readLength; bytesConsumed += readLength; int appendLength = readLength - newLineLength; if (appendLength > maxLineLength - txtLength) { appendLength = maxLineLength - txtLength; } if (appendLength > 0) { str.append(buffer, startPos, appendLength); txtLength += appendLength; } } while (newLineLength == 0 && bytesConsumed < maxBytesToConsume); if (bytesConsumed > (long)Integer.MAX_VALUE) throw new IOException("Too many bytes before newline: " + bytesConsumed); return (int)bytesConsumed; } /** * Read from the InputStream into the given Text. * @param str the object to store the given line * @param maxLineLength the maximum number of bytes to store into str * @return the number of bytes read including newline * @throws IOException if the underlying stream throws */ public int readLine(Text str, int maxLineLength) throws IOException { return readLine(str, maxLineLength, Integer.MAX_VALUE); } /** * Read from the InputStream into the given Text. * @param str the object to store the given line * @return the number of bytes read including newline * @throws IOException if the underlying stream throws */ public int readLine(Text str) throws IOException { return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE); } public void seek(long pos) throws IOException{ in.seek(pos); } public long getPos() throws IOException{ return in.getPos(); } }
我的测试日志是:
2012-08-24 04:24:40,949 [INFO] [pool-1-thread-23103] [com.xh.storageserver.filter.FtpLoggingFilter] - RECEIVED: RETR xhdisk002/M00/00/E6/wKhJC1APoBcEAAAAAAAAAAAAAAA745.exe 2012-08-24 04:24:40,951 [INFO] [pool-1-thread-23106] [com.xh.storageserver.filter.FtpLoggingFilter] - SENT: 150 File status okay; about to open data connection. 2012-08-24 04:24:46,891 [WARN] [Thread-2017118] [com.xh.storageserver.ftp.connect.FtpDataConnection] - Exception during data transfer, closing data connection socket java.net.SocketException: Connection reset at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:96) at java.net.SocketOutputStream.write(SocketOutputStream.java:136) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109) at com.xh.storageserver.ftp.connect.FtpDataConnection.transfer(FtpDataConnection.java:277) at com.xh.storageserver.ftp.connect.FtpDataConnection.transferToClient(FtpDataConnection.java:143) at com.xh.storageserver.ftp.command.RETR.fortune(RETR.java:170) at com.xh.storageserver.ftp.command.RETR$1.run(RETR.java:48) 2012-08-24 04:24:46,891 [ERROR] [Thread-2017118] [com.xh.storageserver.ftp.command.RETR] - Socket exception during data transfer java.net.SocketException: Socket closed at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:99) at java.net.SocketOutputStream.write(SocketOutputStream.java:136) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123) at com.xh.storageserver.ftp.connect.FtpDataConnection.transfer(FtpDataConnection.java:296) at com.xh.storageserver.ftp.connect.FtpDataConnection.transferToClient(FtpDataConnection.java:143) at com.xh.storageserver.ftp.command.RETR.fortune(RETR.java:170) at com.xh.storageserver.ftp.command.RETR$1.run(RETR.java:48) 2012-08-24 04:24:46,892 [INFO] [pool-1-thread-23104] [com.xh.storageserver.filter.FtpLoggingFilter] - SENT: 426 Data connection error. 2012-08-24 04:24:46,893 [INFO] [pool-1-thread-23105] [com.xh.storageserver.filter.FtpLoggingFilter] - RECEIVED: QUIT 2012-08-24 04:24:46,894 [INFO] [pool-1-thread-23105] [com.xh.storageserver.filter.FtpLoggingFilter] - SENT: 221 Goodbye.但是我发现seek函数并没有起作用,所以在此想向各位大牛请教解决方法,谢谢。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我在
SeekableLineReader中修改了seek函数,增加了一行:bufferPos = (int) pos;虽然上面的例子能正常显示了,但是遇到大文件仍然无法正确处理,只能得到既有日志记录的一半,求了解hive内部原理的大神提供技术支持,拜谢!顺便说一下,我特意找的文件用例length小于int的取值范围。