通过自定义的InputFormat处理多行错误日志

发布于 2021-11-16 18:58:55 字数 15149 浏览 881 评论 1

我想通过自定义的InputFormat来处理多行错误日志,主要策略是读入时对其进行预处理,把多行错误日志归并为一行,然后通过hive的RegexSerDe来处理,以下是自定义类的代码:

import java.io.IOException;
import java.nio.charset.Charset;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;

import com.xh.xhdatawarehouse.hive.plugin.SeekableLineReader;


public class ErrLogRecordReader implements RecordReader<LongWritable, Text> {
	
private static final Log LOG = LogFactory.getLog(ErrLogRecordReader.class.getName());
	
	private CompressionCodecFactory compressionCodecs = null;
	private long start;
	private long pos;
	private long end;
	private SeekableLineReader lineReader;
	int maxLineLength;
	
	public ErrLogRecordReader(FileSplit inputSplit, Configuration job) throws IOException {
		this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
		start = inputSplit.getStart();
		end = start + inputSplit.getLength();
		final Path file = inputSplit.getPath();
		compressionCodecs = new CompressionCodecFactory(job);
		final CompressionCodec codec = compressionCodecs.getCodec(file);
		
		// Open file and seek to the start of the split
		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn = fs.open(file);
		//InputStreamReader read = new InputStreamReader (new FileInputStream(f),"UTF-8");
		boolean skipFirstLine = false;
		//if (codec != null) {
		//	lineReader = new BackLineReader(codec.createInputStream(fileIn), Charset.forName("GBK"), job);
		//	end = Long.MAX_VALUE;
		//} else {
			if (start != 0) {
				skipFirstLine = true;
				--start;
				fileIn.seek(start);
			}
			lineReader = new SeekableLineReader(fileIn, Charset.forName("UTF-8"), job);
		//}
		
		if (skipFirstLine) {
			start += lineReader.readLine(new Text(), 0, (int)Math.min((long)Integer.MAX_VALUE, end - start));
		}
		this.pos = start;
	}
	
	public ErrLogRecordReader(FSDataInputStream in, long offset, long endOffset, int maxLineLength) {
		this.maxLineLength = maxLineLength;
		this.lineReader = new SeekableLineReader(in);
		this.start = offset;
		this.pos = offset;
		this.end = endOffset;
	}
	
	public ErrLogRecordReader(FSDataInputStream in, long offset, long endOffset, Configuration job)
		throws IOException {
		this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
		this.lineReader = new SeekableLineReader(in, Charset.forName("UTF-8"), job);
		this.start = offset;
		this.pos = offset;
		this.end = endOffset; 
	}
	
	public LongWritable createKey() {
		return new LongWritable();
	}
	
	public Text createValue() {
		return new Text();
	}
	
	/**
	 * Reads the next record in the split.  All instances of t, n and rn are replaced by a space.
	 * @param key key of the record which will map to the byte offset of the record's line
	 * @param value the record in text format
	 * @return  true if a record existed, false otherwise
	 * @throws IOException
	 */
	public synchronized boolean next(LongWritable key, Text value) throws IOException {
		String reStart = "d{4}-d{2}-d{2}s[^ ]+s([WARN]|[ERROR]|[INFO])s[^ ]+s[^ ]+s-s.*";
		if (pos < end) {
			key.set(pos);
			System.out.println("key : "+key.get());
			int newSize = lineReader.readLine(value, maxLineLength, Math.max((int)Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
			String sstr = value.toString();			
			
			
			if(sstr.matches(reStart)){
				pos += newSize;
				//读下一行,直到异常的结尾行
				while (pos < end) {
					int nextLineSize = lineReader.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
				
					String nextLine = value.toString();
					if (nextLineSize == 0) {
						break;
					}	
					
					if (!nextLine.matches(reStart)) {
						pos += nextLineSize;
						newSize += nextLineSize;
						sstr = sstr.replaceAll("rn", "#");
						sstr += "#"+nextLine;
						System.out.println("pos = "+pos);
					}else{
						System.out.println("break...");
						System.out.println("pos:"+pos);
						
						lineReader.seek(pos);
						
						System.out.println("position: "+getPos());
						
						//System.out.println("position: "+lineReader.getPos());*/
						break;
					}
				}
				
				value.set(sstr);
			

				if (newSize < maxLineLength)
					return true;
				
				
			}
		
			 // line too long. try again   
			LOG.info("Skipped line of size " + newSize + " at position " + (pos - newSize));
		}
		return false;
	}
	
	public float getProgress() {
		if (start == end) {
			return 0.0f;
		} else {
			return Math.min(1.0f, (pos - start) / (float)(end - start));
		}
	}
	
	public synchronized long getPos() throws IOException {
		return pos;
	}
	
	public synchronized void close() throws IOException {
		if (lineReader != null)
			lineReader.close();
	}
}

import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.io.Text;


public class SeekableLineReader extends InputStreamReader{
	private static final byte DEFAULT_ESCAPE_CHARACTER = '';
	private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
	private int bufferSize = DEFAULT_BUFFER_SIZE;
	private FSDataInputStream in;
	private byte[] buffer;
	// the number of bytes in the real buffer
	private int bufferLength = 0;
	// the current position of the buffer
	private int bufferPos = 0;
	private byte escapeChar;
	
	private static final byte CR = 'r';
	private static final byte LF = 'n';
	
	/**
	 * Create a multi-line reader that reads from the given stream using the
	 * default buffer-size (64K).
	 * @param in The input stream
	 * @throws IOException
	 */
	
	
	
	public SeekableLineReader(FSDataInputStream in, byte escapeChar) {
		super(in);
		this.BackUpLineReader(in, DEFAULT_BUFFER_SIZE, escapeChar);
	}
	
	public SeekableLineReader(FSDataInputStream in) {
		super(in);
		this.BackUpLineReader(in, DEFAULT_BUFFER_SIZE, DEFAULT_ESCAPE_CHARACTER);
	}
	
	/**
	 * Create a multi-line reader that reads from the given stream using the
	 * given buffer-size.
	 * @param in The input stream
	 * @param bufferSize Size of the read buffer
	 * @throws IOException
	 */
	public void BackUpLineReader(FSDataInputStream in, int bufferSize, byte escapeChar) {
		this.escapeChar = escapeChar;
		this.in = in;
		this.bufferSize = bufferSize;
		this.buffer = new byte[this.bufferSize];
	}
	
	/**
	 * Create a multi-line reader that reads from the given stream using the
	 * <code>io.file.buffer.size</code> specified in the given
	 * <code>Configuration</code>.
	 * @param in input stream
	 * @param conf configuration
	 * @throws IOException
	 */
	public SeekableLineReader(FSDataInputStream in, Charset cs, Configuration conf) throws IOException {
		super(in, cs);
		this.BackUpLineReader(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE), DEFAULT_ESCAPE_CHARACTER);
	}
	public SeekableLineReader(FSDataInputStream in, Charset cs, Configuration conf, byte escapeChar) throws IOException {
		super(in, cs);
		this.BackUpLineReader(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE), escapeChar);
	}
	
	/**
	 * Close the underlying stream.
	 * @throws IOException
	 */
	public void close() throws IOException {
		in.close();
	}
	
	/**
	 * Read one line from the InputStream into the given Text. A line
	 * can be terminated by one of the following: 'n' (LF), 'r' (CR),
	 * or 'rn' (CR+LF).  Will ignore any of these characters if they
	 * are proceeded by a  (e.g. n).  EOF also terminates an otherwise
	 * unterminated line.
	 * 
	 * @param str the object to store the given line (without the newline)
	 * @param maxLineLength the maximum number of bytes to store into str; 
	 * the rest will be silently discarded.
	 * @param maxBytesToConsume the maximum number of bytes to consume in
	 * this call.  This is only a hint, because if the line crosses this
	 * threshold, we allow it to happen.  It can overshoot potentially by
	 * as much as one buffer length.
	 * 
	 * @return  the number of bytes read including the (longest) newline
	 * found
	 * 
	 * @throws IOException
	 */
	public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
		throws IOException {
		/* We're reading data from in, but the head of the stream may be
	     * already buffered in buffer, so we have several cases:
	     * 1. No newline characters are in the buffer, so we need to copy
	     *    everything and read another buffer from the stream.
	     * 2. An unambiguously terminated line is in buffer, so we just
	     *    copy to str.
	     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
	     *    in CR.  In this case we copy everything up to CR to str, but
	     *    we also need to see what follows CR: if it's LF, then we
	     *    need consume LF as well, so next call to readLine will read
	     *    from after that.
	     * We use a flag prevCharCR to signal if previous character was CR
	     * and, if it happens to be at the end of the buffer, delay
	     * consuming it until we have a chance to look at the char that
	     * follows.
	     */
		
		str.clear();
		int txtLength = 0; // tracks str.getLength() as an optimization
		int newLineLength = 0; // length of the terminating newline
		boolean prevCharCR = false; // true if prev char was r
		long bytesConsumed = 0;
		
		do {
			int startPos = bufferPos; // starting from where we left off
			if (bufferPos >= bufferLength) {
				startPos = bufferPos = 0;
				if (prevCharCR) ++bytesConsumed; // account for CR from previous read
				bufferLength = in.read(buffer);
				if (bufferLength <= 0) break; // EOF
			}
			for (; bufferPos < bufferLength; ++bufferPos) {
				boolean escaped = false;
				if (bufferPos > 0) 
					escaped = buffer[bufferPos-1] == escapeChar;
				if (buffer[bufferPos] == LF && !escaped) {
					newLineLength = prevCharCR ? 2 : 1;
					++bufferPos; // at next loop proceed from following byte
					break;
				}
				if (prevCharCR) { // CR + notLF, we are at notLF
					newLineLength = 1;
					break;
				}
				prevCharCR = (buffer[bufferPos] == CR && !escaped);
			}
			int readLength = bufferPos - startPos;
			if (prevCharCR && newLineLength == 0) 
				--readLength;
			bytesConsumed += readLength;
			int appendLength = readLength - newLineLength;
			if (appendLength > maxLineLength - txtLength) {
				appendLength = maxLineLength - txtLength;
			}
			if (appendLength > 0) {
				str.append(buffer, startPos, appendLength);
				txtLength += appendLength;
			}
		} while (newLineLength == 0 && bytesConsumed < maxBytesToConsume);
		
		if (bytesConsumed > (long)Integer.MAX_VALUE)
			throw new IOException("Too many bytes before newline: " + bytesConsumed);
		
		return (int)bytesConsumed;
	}
	
	/**
	 * Read from the InputStream into the given Text.
	 * @param str the object to store the given line
	 * @param maxLineLength the maximum number of bytes to store into str
	 * @return  the number of bytes read including newline
	 * @throws IOException if the underlying stream throws
	 */
	public int readLine(Text str, int maxLineLength) throws IOException {
		return readLine(str, maxLineLength, Integer.MAX_VALUE);
	}
	
	/**
	 * Read from the InputStream into the given Text.
	 * @param str the object to store the given line
	 * @return  the number of bytes read including newline
	 * @throws IOException if the underlying stream throws
	 */
	public int readLine(Text str) throws IOException {
		return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
	}
	
	public void seek(long pos) throws IOException{
		in.seek(pos);
	}
	
	public long getPos() throws IOException{
		return in.getPos();
	}
}

 

我的测试日志是:

2012-08-24 04:24:40,949 [INFO] [pool-1-thread-23103] [com.xh.storageserver.filter.FtpLoggingFilter] - RECEIVED: RETR xhdisk002/M00/00/E6/wKhJC1APoBcEAAAAAAAAAAAAAAA745.exe
2012-08-24 04:24:40,951 [INFO] [pool-1-thread-23106] [com.xh.storageserver.filter.FtpLoggingFilter] - SENT: 150 File status okay; about to open data connection.
2012-08-24 04:24:46,891 [WARN] [Thread-2017118] [com.xh.storageserver.ftp.connect.FtpDataConnection] - Exception during data transfer, closing data connection socket
java.net.SocketException: Connection reset
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:96)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)
	at com.xh.storageserver.ftp.connect.FtpDataConnection.transfer(FtpDataConnection.java:277)
	at com.xh.storageserver.ftp.connect.FtpDataConnection.transferToClient(FtpDataConnection.java:143)
	at com.xh.storageserver.ftp.command.RETR.fortune(RETR.java:170)
	at com.xh.storageserver.ftp.command.RETR$1.run(RETR.java:48)
2012-08-24 04:24:46,891 [ERROR] [Thread-2017118] [com.xh.storageserver.ftp.command.RETR] - Socket exception during data transfer
java.net.SocketException: Socket closed
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:99)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
	at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123)
	at com.xh.storageserver.ftp.connect.FtpDataConnection.transfer(FtpDataConnection.java:296)
	at com.xh.storageserver.ftp.connect.FtpDataConnection.transferToClient(FtpDataConnection.java:143)
	at com.xh.storageserver.ftp.command.RETR.fortune(RETR.java:170)
	at com.xh.storageserver.ftp.command.RETR$1.run(RETR.java:48)
2012-08-24 04:24:46,892 [INFO] [pool-1-thread-23104] [com.xh.storageserver.filter.FtpLoggingFilter] - SENT: 426 Data connection error.
2012-08-24 04:24:46,893 [INFO] [pool-1-thread-23105] [com.xh.storageserver.filter.FtpLoggingFilter] - RECEIVED: QUIT
2012-08-24 04:24:46,894 [INFO] [pool-1-thread-23105] [com.xh.storageserver.filter.FtpLoggingFilter] - SENT: 221 Goodbye.

但是我发现seek函数并没有起作用,所以在此想向各位大牛请教解决方法,谢谢。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

恋你朝朝暮暮 2021-11-19 04:30:36

我在
SeekableLineReader中修改了seek函数,增加了一行:bufferPos = (int) pos;虽然上面的例子能正常显示了,但是遇到大文件仍然无法正确处理,只能得到既有日志记录的一半,求了解hive内部原理的大神提供技术支持,拜谢!顺便说一下,我特意找的文件用例length小于int的取值范围。

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文