返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

io 1.18

发布于 2024-10-12 19:15:51 字数 9626 浏览 0 评论 0 收藏 0

定义 I/O 基本接口,及相关组合。

  • Reader , Writer , Closer , Seeker
  • ReadWriter , ReaderAt , WriterAt , ReaderFrom , WriterTo
  • ReadCloser , WriteCloser , ReadWriteCloser
  • ByteReader , ByteWriter , StringWriter
package main

import (
	"bufio"
	"log"
	"os"
)

func main() {
	f, err := os.Create("./demo.txt")
	if err != nil {
		log.Fatalln(err)
	}

	defer f.Close()
	defer f.Sync()

	w := bufio.NewWriter(f)
	defer w.Flush()

	w.WriteString("hello, world!")
}

标准库

关联密切的包和类型:

  • bufio.Reader, Writer :缓冲 I/O。
  • bytes.Reader, Buffer :字节数组。
  • strings.Reader, Builder :字符串。
  • os.File :系统文件接口。(syscall)
  • net.TCPConn, UDPConn :网络连接。
  • fmt.Fprint :格式化。

封装:

  • []byte => bytes.BufferReader
  • string => strings.BuilderReader
  • channel => io.Pipe

设备:

// os

const DevNull = "/dev/null"

var (
    Stdin  = NewFile(uintptr(syscall.Stdin), "/dev/stdin")
    Stdout = NewFile(uintptr(syscall.Stdout), "/dev/stdout")
    Stderr = NewFile(uintptr(syscall.Stderr), "/dev/stderr")
)
// io

var Discard Writer = discard{}

文件系统

引入 io/fs 抽象文件系统,解除 ioos 的直接依赖。

允许我们实现抽象只读文件系统,映射到云端、缓存或压缩包内。

                      +--> CloudFS
                      |
Open(...) --> io/fs --+--> os.DirFS
                      |
                      +--> embed.FS
  • os.DirFS
  • embed.FS
package main

import (
	"fmt"
	"io/fs"
	"log"
	"os"
)

func main() {
	var s fs.FS = os.DirFS("/dev")
	
	f, err := s.Open("urandom")
	if err != nil { log.Fatalln(err) }
	defer f.Close()

	buf := make([]byte, 10)
	n, _ := f.Read(buf)
	fmt.Printf("%#v, %d\n", buf, n)
}

其他

使用 io 辅助函数时,要注意可能有多次数据复制。

比如, io.Copy 内置一个 32 KB 交换缓冲。

从 1.16 弃用 io/ioutil ,对应功能由 ioos 提供,

example

使用说明,及注意事项。

type Reader interface {
	Read(p []byte) (n int, err error)
}

type Writer interface {
	Write(p []byte) (n int, err error)
}

type Closer interface {
	Close() error
}
  • Read : 可同时返回部分数据和 EOF 。先判断 n > 0 ,再处理 err != nil
  • Write : 不能修改 p 。部分写入时,返回已写数量和错误。
  • Close : 重复关闭,看具体实现。

定位

type Seeker interface {
	Seek(offset int64, whence int) (int64, error)
}

// whence
const (
	SeekStart   = 0 // seek relative to the origin of the file
	SeekCurrent = 1 // seek relative to the current offset
	SeekEnd     = 2 // seek relative to the end
)

Seek :偏移( offset )可以是负数。

ReaderAtWriterAt :从头( start )开始计算偏移。(不受 seek 影响,也不修改)

type ReaderAt interface {
	ReadAt(p []byte, off int64) (n int, err error)
}

type WriterAt interface {
	WriteAt(p []byte, off int64) (n int, err error)
}
func main() {
	data := []byte{0, 1, 2, 3, 4, 5, 6}
	r := bytes.NewReader(data)

	for i := 0; i < 3; i++ {
		if i == 2 {
			r.Seek(-2, io.SeekEnd)
		}

		buf := make([]byte, 1)
		n, err := r.Read(buf)
		fmt.Println("Read:", n, err, buf)
	}

    // 从头开始,不影响 Seek。
	buf := make([]byte, 1)
	n, err := r.ReadAt(buf, 1)
	fmt.Println("ReadAt:", n, err, buf)

    // 继续 for.Read 位置继续。
	b, err := r.ReadByte()
	fmt.Println("ReadByte:", b, err)
}

/*

  Read: 1 <nil> [0]
  Read: 1 <nil> [1]
  Read: 1 <nil> [5]
ReadAt: 1 <nil> [1]

ReadByte: 6 <nil>

*/

字节

type ByteReader interface {
	ReadByte() (byte, error)
}

type ByteWriter interface {
	WriteByte(c byte) error
}

type ByteScanner interface {
	ByteReader
	UnreadByte() error
}
  • ReadByte :出错时,不会消耗输入数据。
  • UnreadByte :返回上一次 ReadByte 结果,不影响输入位置。
func main() {
	data := []byte{0, 1, 2, 3, 4, 5, 6}
	r := bytes.NewReader(data)

	b, err := r.ReadByte()
	fmt.Println(b, err)

	// 须在 ReadByte 之后。
	// 位置 seek-1,不能连续调用。
	err = r.UnreadByte()
	if err != nil { fmt.Println("unread: ", err) }

	for {
		b, err := r.ReadByte()
		fmt.Println("for: ", b, err)

		if err != nil { break }
	}
}

/*

0 <nil>

for:  0 <nil>
for:  1 <nil>
for:  2 <nil>
for:  3 <nil>
for:  4 <nil>
for:  5 <nil>
for:  6 <nil>
for:  0 EOF

*/

其他

LimitReader :读取 n 字节后,以 EOF 结束。

type LimitedReader struct {
	R Reader // underlying reader
	N int64  // max bytes remaining
}

func LimitReader(r Reader, n int64) Reader
func main() {
	data := []byte{0, 1, 2, 3, 4, 5, 6}
	r := bytes.NewReader(data)
	r2 := io.LimitReader(r, 3)

	for {
		buf := make([]byte, 2)
		n, err := r2.Read(buf)
		fmt.Println(n, err, buf)

		if err == io.EOF { break }
	}
}

/*

2 <nil> [0 1]
1 <nil> [2 0]
0 EOF   [0 0]

*/

TeeReader :所有从 r 读取的数据,都会写入 w

func TeeReader(r Reader, w Writer) Reader
func main() {
	src := []byte{0, 1, 2, 3, 4, 5, 6}

	r := bytes.NewReader(src)
	w := bytes.NewBuffer(nil)
	t := io.TeeReader(r, w)

	for {
		buf := make([]byte, 2)
		_, err := t.Read(buf)
		if err == io.EOF { break }
	}

	fmt.Println(w.Bytes())
}

// [0 1 2 3 4 5 6]

SectionReader :限制读范围。

func NewSectionReader(r ReaderAt, off int64, n int64) *SectionReader
func main() {
	data := []byte{0, 1, 2, 3, 4, 5, 6}
	r := bytes.NewReader(data)
	s := io.NewSectionReader(r, 1, 5)

	for {
		buf := make([]byte, 2)
		n, err := s.Read(buf)
		fmt.Println(n, err, buf)

		if err == io.EOF { break }
	}
}

/*

2 <nil> [1 2]
2 <nil> [3 4]
1 <nil> [5 0]
0 EOF   [0 0]

*/

MultiReader :依次从多个源读。(全部完毕, EOF ;中途出错,终止)

MultiWriter :将数据写入多个目标。(任何错误都会终止)

func MultiReader(readers ...Reader) Reader
func MultiWriter(writers ...Writer) Writer
func main() {
	r1 := bytes.NewReader([]byte{0, 1, 2, 3, 4, 5, 6})
	r2 := bytes.NewReader([]byte{10, 11, 12, 13})

	w1 := bytes.NewBuffer(nil)
	w2 := bytes.NewBuffer(nil)

	mr := io.MultiReader(r1, r2)
	mw := io.MultiWriter(w1, w2)

	for {
		buf := make([]byte, 2)
		n, err := mr.Read(buf)

		fmt.Println(n, err, buf)
		if err != nil { break }

		_, _ = mw.Write(buf[:n])
	}

	fmt.Println(w1.Bytes())
	fmt.Println(w2.Bytes())
}

/*

2 <nil> [0 1]
2 <nil> [2 3]
2 <nil> [4 5]
1 <nil> [6 0]
2 <nil> [10 11]
2 <nil> [12 13]
0 EOF [0 0]
[0 1 2 3 4 5 6 10 11 12 13]
[0 1 2 3 4 5 6 10 11 12 13]

*/

pipe

基于内存(channel)实现的无缓冲同步管道(pipe),并发安全。

  • 同步 channel 阻塞,读写匹配。
package main

import (
	"fmt"
	"io"
	"os"
)

func main() {
	r, w := io.Pipe()

	go func() {
		defer w.Close()
		fmt.Fprint(w, "hello, world!\n")
	}()

	io.Copy(os.Stdout, r)
}

类似管道,还有:

func Pipe() (r *File, w *File, err error)   // os
func Pipe() (Conn, Conn)                    // net

源码剖析

内部使用无缓冲 channel 交换数据。
可看作 channelio.Reader , Writer 封装版。

// io/pipe.go

type pipe struct {
	wrMu sync.Mutex    // 写锁
	wrCh chan []byte   // 数据通道
	rdCh chan int      // 数据读取数量通知

	once sync.Once     // 确保关闭仅执行一次
	done chan struct{} // 结束通知
}

func Pipe() (*PipeReader, *PipeWriter) {
	p := &pipe{
		wrCh: make(chan []byte),
		rdCh: make(chan int),
		done: make(chan struct{}),
	}
	return &PipeReader{p}, &PipeWriter{p}
}

基本流程

  • 管道( wrCh )传递切片,类似协议包。告知 { 指针,长度 } ,不含底层数组。
  • 接收方获取切片,据此对底层数据进行复制( copy )。
  • 缓冲可能过小,用另一管道( rdCh )告知复制量,以便其再次发送剩余数据切片。
func (p *pipe) write(b []byte) (n int, err error) {
    
    // 关闭检查,独占写锁。
	select {
	case <-p.done:
		return 0, p.writeCloseError()
	default:
		p.wrMu.Lock()
		defer p.wrMu.Unlock()
	}

	for once := true; once || len(b) > 0; once = false {
		select {
		case p.wrCh <- b:    // 写数据。(切片本身,不包括底层数据)
			nw := <-p.rdCh   // 对方读取数据量。
			b = b[nw:]       // 继续写剩余数据。
			n += nw
		case <-p.done:       // 关闭,终止。
			return n, p.writeCloseError()
		}
	}
	return n, nil
}
func (p *pipe) read(b []byte) (n int, err error) {
    
    // 关闭检查。
	select {
	case <-p.done:
		return 0, p.readCloseError()
	default:
	}

	select {
	case bw := <-p.wrCh:    // 读取数据。(字节切片)
		nr := copy(b, bw)   // 依照本地缓冲大小复制数据。
		p.rdCh <- nr        // 告知本次复制数量。
		return nr, nil
	case <-p.done:          // 关闭。
		return 0, p.readCloseError()
	}
}

sync.Once 确保关闭操作仅执行一次。

func (p *pipe) closeRead(err error) error {
	p.once.Do(func() { close(p.done) })
	return nil
}

func (p *pipe) closeWrite(err error) error {
	p.once.Do(func() { close(p.done) })
	return nil
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文