将大型 gzip 数据文件上传到 HDFS

发布于 2024-11-16 19:42:58 字数 2818 浏览 2 评论 0原文

我有一个用例,我想在 HDFS 上上传大的 gzip 压缩文本数据文件(~ 60 GB)。

我下面的代码大约需要 2 小时才能以 500 MB 的块上传这些文件。以下是伪代码。我正在检查是否有人可以帮助我减少这个时间:

i) int fileFetchBuffer = 500000000; System.out.println("文件获取缓冲区为:" + fileFetchBuffer); 整数偏移量=0; int 字节读取 = -1;

    try {
        fileStream = new FileInputStream (file);    
        if (fileName.endsWith(".gz")) {
            stream = new GZIPInputStream(fileStream);

            BufferedReader reader = new BufferedReader(new InputStreamReader(stream)); 

            String[] fileN = fileName.split("\\.");
            System.out.println("fil 0 : " + fileN[0]);
            System.out.println("fil 1 : " + fileN[1]);
            //logger.info("First line is: " + streamBuff.readLine());

            byte[] buffer = new byte[fileFetchBuffer];

            FileSystem fs = FileSystem.get(conf);

            int charsLeft = fileFetchBuffer;
            while (true) {

                charsLeft = fileFetchBuffer;    



             logger.info("charsLeft outside while: " + charsLeft);

          FSDataOutputStream dos = null;
                while (charsLeft != 0) {
                    bytesRead = stream.read(buffer, 0, charsLeft);
                    if (bytesRead < 0) {
                        dos.flush();
                        dos.close();
                        break;
                    }
                    offset = offset + bytesRead;
                    charsLeft = charsLeft - bytesRead; 
                    logger.info("offset in record: " + offset);
                    logger.info("charsLeft: " + charsLeft);
                    logger.info("bytesRead in record: " + bytesRead);
                    //prettyPrintHex(buffer);

                    String outFileStr = Utils.getOutputFileName(
                            stagingDir,
                            fileN[0],
                            outFileNum);

                    if (dos == null) {
                    Path outFile = new Path(outFileStr);
                    if (fs.exists(outFile)) {
                        fs.delete(outFile, false);
                    }

                    dos = fs.create(outFile);
                    }

                    dos.write(buffer, 0, bytesRead);


                } 

                logger.info("done writing: " + outFileNum);
                dos.flush();
                dos.close();

                if (bytesRead < 0) {
                    dos.flush();
                    dos.close();
                    break;
                }

                outFileNum++;

            }  // end of if


        } else {
            // Assume uncompressed file
            stream = fileStream;
        }           

    } catch(FileNotFoundException e) {
        logger.error("File not found" + e);
    }

I have a use case where I want to upload big gzipped text data files (~ 60 GB) on HDFS.

My code below is taking about 2 hours to upload these files in chunks of 500 MB. Following is the pseudo code. I was chekcing if somebody could help me reduce this time:

i) int fileFetchBuffer = 500000000;
System.out.println("file fetch buffer is: " + fileFetchBuffer);
int offset = 0;
int bytesRead = -1;

    try {
        fileStream = new FileInputStream (file);    
        if (fileName.endsWith(".gz")) {
            stream = new GZIPInputStream(fileStream);

            BufferedReader reader = new BufferedReader(new InputStreamReader(stream)); 

            String[] fileN = fileName.split("\\.");
            System.out.println("fil 0 : " + fileN[0]);
            System.out.println("fil 1 : " + fileN[1]);
            //logger.info("First line is: " + streamBuff.readLine());

            byte[] buffer = new byte[fileFetchBuffer];

            FileSystem fs = FileSystem.get(conf);

            int charsLeft = fileFetchBuffer;
            while (true) {

                charsLeft = fileFetchBuffer;    



             logger.info("charsLeft outside while: " + charsLeft);

          FSDataOutputStream dos = null;
                while (charsLeft != 0) {
                    bytesRead = stream.read(buffer, 0, charsLeft);
                    if (bytesRead < 0) {
                        dos.flush();
                        dos.close();
                        break;
                    }
                    offset = offset + bytesRead;
                    charsLeft = charsLeft - bytesRead; 
                    logger.info("offset in record: " + offset);
                    logger.info("charsLeft: " + charsLeft);
                    logger.info("bytesRead in record: " + bytesRead);
                    //prettyPrintHex(buffer);

                    String outFileStr = Utils.getOutputFileName(
                            stagingDir,
                            fileN[0],
                            outFileNum);

                    if (dos == null) {
                    Path outFile = new Path(outFileStr);
                    if (fs.exists(outFile)) {
                        fs.delete(outFile, false);
                    }

                    dos = fs.create(outFile);
                    }

                    dos.write(buffer, 0, bytesRead);


                } 

                logger.info("done writing: " + outFileNum);
                dos.flush();
                dos.close();

                if (bytesRead < 0) {
                    dos.flush();
                    dos.close();
                    break;
                }

                outFileNum++;

            }  // end of if


        } else {
            // Assume uncompressed file
            stream = fileStream;
        }           

    } catch(FileNotFoundException e) {
        logger.error("File not found" + e);
    }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

软糖 2024-11-23 19:42:58

您应该考虑使用 来自 Apache 的超级包 IO< /a>.

它有一种方法

IOUtils.copy( InputStream, OutputStream )

可以极大地减少复制文件所需的时间。

You should consider using the super package IO from Apache.

It has a method

IOUtils.copy( InputStream, OutputStream )

that would tremendously reduce time needed to copy your files.

雪若未夕 2024-11-23 19:42:58

我尝试使用缓冲输入流,但没有发现真正的区别。
我认为文件通道的实现可能会更加高效。告诉我是否不够快。

package toto;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;

public class Slicer {

    private static final int BUFFER_SIZE = 50000;

    public static void main(String[] args) {

        try 
        {
            slice( args[ 0 ], args[ 1 ], Long.parseLong( args[2]) );
        }//try
        catch (IOException e) 
        {
            e.printStackTrace();
        }//catch
        catch( Exception ex )
        {
            ex.printStackTrace();
            System.out.println( "Usage :  toto.Slicer <big file> <chunk name radix > <chunks size>" );
        }//catch
    }//met

    /**
     * Slices a huge files in chunks.
     * @param inputFileName the big file to slice.
     * @param outputFileRadix the base name of slices generated by the slicer. All slices will then be numbered outputFileRadix0,outputFileRadix1,outputFileRadix2...
     * @param chunkSize the size of chunks in bytes
     * @return the number of slices.
     */
    public static int slice( String inputFileName, String outputFileRadix, long chunkSize ) throws IOException
    {
        //I would had some code to pretty print the output file names
        //I mean adding a couple of 0 before chunkNumber in output file name
        //so that they all have same number of chars
        //use java.io.File for that, estimate number of chunks, take power of 10, got number of leading 0s

        //just to get some stats
        long timeStart = System.currentTimeMillis();
        long timeStartSlice = timeStart;
        long timeEnd = 0;

        //io streams and chunk counter
        int chunkNumber = 0;
        FileInputStream fis = null;
        FileOutputStream fos = null;

        try 
        {
            //open files
            fis = new FileInputStream( inputFileName );
            fos = new FileOutputStream( outputFileRadix + chunkNumber );

            //declare state variables
            boolean finished = false;
            byte[] buffer = new byte[ BUFFER_SIZE ];
            int bytesRead = 0;
            long bytesInChunk = 0;


            while( !finished )
            {
                //System.out.println( "bytes to read " +(int)Math.min( BUFFER_SIZE, chunkSize - bytesInChunk ) );
                bytesRead = fis.read( buffer,0, (int)Math.min( BUFFER_SIZE, chunkSize - bytesInChunk ) );

                if( bytesRead == -1 )
                    finished = true;
                else
                {
                                            fos.write( buffer, 0, bytesRead );
                    bytesInChunk += bytesRead;
                    if( bytesInChunk == chunkSize )
                    {
                        if( fos != null )
                        {
                            fos.close();
                            timeEnd = System.currentTimeMillis();
                            System.out.println( "Chunk "+chunkNumber + " has been generated in "+ (timeEnd - timeStartSlice) +" ms");
                            chunkNumber ++;
                            bytesInChunk = 0;
                            timeStartSlice = timeEnd;
                            System.out.println( "Creating slice number " + chunkNumber );
                            fos = new FileOutputStream( outputFileRadix + chunkNumber );
                        }//if
                    }//if
                }//else
            }//while
        }
        catch (Exception e) 
        {
            System.out.println( "A problem occured during slicing : " );
            e.printStackTrace();
        }//catch
        finally 
        {
            //whatever happens close all files
            System.out.println( "Closing all files.");
            if( fis != null )
                fis.close();
            if( fos != null )
                fos.close();
        }//fin

        timeEnd = System.currentTimeMillis();
        System.out.println( "Total slicing time : " + (timeEnd - timeStart) +" ms" );
        System.out.println( "Total number of slices "+ (chunkNumber +1) );

        return chunkNumber+1;
    }//met
}//class

问候,
史蒂芬

I tried with buffered input stream and saw no real difference.
I suppose a file channel implementation could be even more efficient. Tell me if it's not fast enough.

package toto;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;

public class Slicer {

    private static final int BUFFER_SIZE = 50000;

    public static void main(String[] args) {

        try 
        {
            slice( args[ 0 ], args[ 1 ], Long.parseLong( args[2]) );
        }//try
        catch (IOException e) 
        {
            e.printStackTrace();
        }//catch
        catch( Exception ex )
        {
            ex.printStackTrace();
            System.out.println( "Usage :  toto.Slicer <big file> <chunk name radix > <chunks size>" );
        }//catch
    }//met

    /**
     * Slices a huge files in chunks.
     * @param inputFileName the big file to slice.
     * @param outputFileRadix the base name of slices generated by the slicer. All slices will then be numbered outputFileRadix0,outputFileRadix1,outputFileRadix2...
     * @param chunkSize the size of chunks in bytes
     * @return the number of slices.
     */
    public static int slice( String inputFileName, String outputFileRadix, long chunkSize ) throws IOException
    {
        //I would had some code to pretty print the output file names
        //I mean adding a couple of 0 before chunkNumber in output file name
        //so that they all have same number of chars
        //use java.io.File for that, estimate number of chunks, take power of 10, got number of leading 0s

        //just to get some stats
        long timeStart = System.currentTimeMillis();
        long timeStartSlice = timeStart;
        long timeEnd = 0;

        //io streams and chunk counter
        int chunkNumber = 0;
        FileInputStream fis = null;
        FileOutputStream fos = null;

        try 
        {
            //open files
            fis = new FileInputStream( inputFileName );
            fos = new FileOutputStream( outputFileRadix + chunkNumber );

            //declare state variables
            boolean finished = false;
            byte[] buffer = new byte[ BUFFER_SIZE ];
            int bytesRead = 0;
            long bytesInChunk = 0;


            while( !finished )
            {
                //System.out.println( "bytes to read " +(int)Math.min( BUFFER_SIZE, chunkSize - bytesInChunk ) );
                bytesRead = fis.read( buffer,0, (int)Math.min( BUFFER_SIZE, chunkSize - bytesInChunk ) );

                if( bytesRead == -1 )
                    finished = true;
                else
                {
                                            fos.write( buffer, 0, bytesRead );
                    bytesInChunk += bytesRead;
                    if( bytesInChunk == chunkSize )
                    {
                        if( fos != null )
                        {
                            fos.close();
                            timeEnd = System.currentTimeMillis();
                            System.out.println( "Chunk "+chunkNumber + " has been generated in "+ (timeEnd - timeStartSlice) +" ms");
                            chunkNumber ++;
                            bytesInChunk = 0;
                            timeStartSlice = timeEnd;
                            System.out.println( "Creating slice number " + chunkNumber );
                            fos = new FileOutputStream( outputFileRadix + chunkNumber );
                        }//if
                    }//if
                }//else
            }//while
        }
        catch (Exception e) 
        {
            System.out.println( "A problem occured during slicing : " );
            e.printStackTrace();
        }//catch
        finally 
        {
            //whatever happens close all files
            System.out.println( "Closing all files.");
            if( fis != null )
                fis.close();
            if( fos != null )
                fos.close();
        }//fin

        timeEnd = System.currentTimeMillis();
        System.out.println( "Total slicing time : " + (timeEnd - timeStart) +" ms" );
        System.out.println( "Total number of slices "+ (chunkNumber +1) );

        return chunkNumber+1;
    }//met
}//class

Greetings,
Stéphane

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文