上卷 程序设计
中卷 标准库
- bufio 1.18
- bytes 1.18
- io 1.18
- container 1.18
- encoding 1.18
- crypto 1.18
- hash 1.18
- index 1.18
- sort 1.18
- context 1.18
- database 1.18
- connection
- query
- queryrow
- exec
- prepare
- transaction
- scan & null
- context
- tcp
- udp
- http
- server
- handler
- client
- h2、tls
- url
- rpc
- exec
- signal
- embed 1.18
- plugin 1.18
- reflect 1.18
- runtime 1.18
- KeepAlived
- ReadMemStats
- SetFinalizer
- Stack
- sync 1.18
- atomic
- mutex
- rwmutex
- waitgroup
- cond
- once
- map
- pool
- copycheck
- nocopy
- unsafe 1.18
- fmt 1.18
- log 1.18
- math 1.18
- time 1.18
- timer
下卷 运行时
源码剖析
附录
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
io 1.18
定义 I/O 基本接口,及相关组合。
Reader
,Writer
,Closer
,Seeker
ReadWriter
,ReaderAt
,WriterAt
,ReaderFrom
,WriterTo
ReadCloser
,WriteCloser
,ReadWriteCloser
ByteReader
,ByteWriter
,StringWriter
package main import ( "bufio" "log" "os" ) func main() { f, err := os.Create("./demo.txt") if err != nil { log.Fatalln(err) } defer f.Close() defer f.Sync() w := bufio.NewWriter(f) defer w.Flush() w.WriteString("hello, world!") }
标准库
关联密切的包和类型:
bufio.Reader, Writer
:缓冲 I/O。bytes.Reader, Buffer
:字节数组。strings.Reader, Builder
:字符串。os.File
:系统文件接口。(syscall)net.TCPConn, UDPConn
:网络连接。fmt.Fprint
:格式化。
封装:
[]byte
=>bytes.Buffer
,Reader
string
=>strings.Builder
,Reader
channel
=>io.Pipe
设备:
// os const DevNull = "/dev/null" var ( Stdin = NewFile(uintptr(syscall.Stdin), "/dev/stdin") Stdout = NewFile(uintptr(syscall.Stdout), "/dev/stdout") Stderr = NewFile(uintptr(syscall.Stderr), "/dev/stderr") )
// io var Discard Writer = discard{}
文件系统
引入 io/fs
抽象文件系统,解除 io
和 os
的直接依赖。
允许我们实现抽象只读文件系统,映射到云端、缓存或压缩包内。
+--> CloudFS | Open(...) --> io/fs --+--> os.DirFS | +--> embed.FS
os.DirFS
embed.FS
package main import ( "fmt" "io/fs" "log" "os" ) func main() { var s fs.FS = os.DirFS("/dev") f, err := s.Open("urandom") if err != nil { log.Fatalln(err) } defer f.Close() buf := make([]byte, 10) n, _ := f.Read(buf) fmt.Printf("%#v, %d\n", buf, n) }
其他
使用 io
辅助函数时,要注意可能有多次数据复制。
比如, io.Copy
内置一个 32 KB
交换缓冲。
从 1.16 弃用 io/ioutil
,对应功能由 io
和 os
提供,
example
使用说明,及注意事项。
type Reader interface { Read(p []byte) (n int, err error) } type Writer interface { Write(p []byte) (n int, err error) } type Closer interface { Close() error }
Read
: 可同时返回部分数据和EOF
。先判断n > 0
,再处理err != nil
。Write
: 不能修改p
。部分写入时,返回已写数量和错误。Close
: 重复关闭,看具体实现。
定位
type Seeker interface { Seek(offset int64, whence int) (int64, error) } // whence const ( SeekStart = 0 // seek relative to the origin of the file SeekCurrent = 1 // seek relative to the current offset SeekEnd = 2 // seek relative to the end )
Seek
:偏移( offset
)可以是负数。
ReaderAt
、 WriterAt
:从头( start
)开始计算偏移。(不受 seek
影响,也不修改)
type ReaderAt interface { ReadAt(p []byte, off int64) (n int, err error) } type WriterAt interface { WriteAt(p []byte, off int64) (n int, err error) }
func main() { data := []byte{0, 1, 2, 3, 4, 5, 6} r := bytes.NewReader(data) for i := 0; i < 3; i++ { if i == 2 { r.Seek(-2, io.SeekEnd) } buf := make([]byte, 1) n, err := r.Read(buf) fmt.Println("Read:", n, err, buf) } // 从头开始,不影响 Seek。 buf := make([]byte, 1) n, err := r.ReadAt(buf, 1) fmt.Println("ReadAt:", n, err, buf) // 继续 for.Read 位置继续。 b, err := r.ReadByte() fmt.Println("ReadByte:", b, err) } /* Read: 1 <nil> [0] Read: 1 <nil> [1] Read: 1 <nil> [5] ReadAt: 1 <nil> [1] ReadByte: 6 <nil> */
字节
type ByteReader interface { ReadByte() (byte, error) } type ByteWriter interface { WriteByte(c byte) error } type ByteScanner interface { ByteReader UnreadByte() error }
ReadByte
:出错时,不会消耗输入数据。UnreadByte
:返回上一次ReadByte
结果,不影响输入位置。
func main() { data := []byte{0, 1, 2, 3, 4, 5, 6} r := bytes.NewReader(data) b, err := r.ReadByte() fmt.Println(b, err) // 须在 ReadByte 之后。 // 位置 seek-1,不能连续调用。 err = r.UnreadByte() if err != nil { fmt.Println("unread: ", err) } for { b, err := r.ReadByte() fmt.Println("for: ", b, err) if err != nil { break } } } /* 0 <nil> for: 0 <nil> for: 1 <nil> for: 2 <nil> for: 3 <nil> for: 4 <nil> for: 5 <nil> for: 6 <nil> for: 0 EOF */
其他
LimitReader
:读取 n
字节后,以 EOF
结束。
type LimitedReader struct { R Reader // underlying reader N int64 // max bytes remaining } func LimitReader(r Reader, n int64) Reader
func main() { data := []byte{0, 1, 2, 3, 4, 5, 6} r := bytes.NewReader(data) r2 := io.LimitReader(r, 3) for { buf := make([]byte, 2) n, err := r2.Read(buf) fmt.Println(n, err, buf) if err == io.EOF { break } } } /* 2 <nil> [0 1] 1 <nil> [2 0] 0 EOF [0 0] */
TeeReader
:所有从 r
读取的数据,都会写入 w
。
func TeeReader(r Reader, w Writer) Reader
func main() { src := []byte{0, 1, 2, 3, 4, 5, 6} r := bytes.NewReader(src) w := bytes.NewBuffer(nil) t := io.TeeReader(r, w) for { buf := make([]byte, 2) _, err := t.Read(buf) if err == io.EOF { break } } fmt.Println(w.Bytes()) } // [0 1 2 3 4 5 6]
SectionReader
:限制读范围。
func NewSectionReader(r ReaderAt, off int64, n int64) *SectionReader
func main() { data := []byte{0, 1, 2, 3, 4, 5, 6} r := bytes.NewReader(data) s := io.NewSectionReader(r, 1, 5) for { buf := make([]byte, 2) n, err := s.Read(buf) fmt.Println(n, err, buf) if err == io.EOF { break } } } /* 2 <nil> [1 2] 2 <nil> [3 4] 1 <nil> [5 0] 0 EOF [0 0] */
MultiReader
:依次从多个源读。(全部完毕, EOF
;中途出错,终止)
MultiWriter
:将数据写入多个目标。(任何错误都会终止)
func MultiReader(readers ...Reader) Reader func MultiWriter(writers ...Writer) Writer
func main() { r1 := bytes.NewReader([]byte{0, 1, 2, 3, 4, 5, 6}) r2 := bytes.NewReader([]byte{10, 11, 12, 13}) w1 := bytes.NewBuffer(nil) w2 := bytes.NewBuffer(nil) mr := io.MultiReader(r1, r2) mw := io.MultiWriter(w1, w2) for { buf := make([]byte, 2) n, err := mr.Read(buf) fmt.Println(n, err, buf) if err != nil { break } _, _ = mw.Write(buf[:n]) } fmt.Println(w1.Bytes()) fmt.Println(w2.Bytes()) } /* 2 <nil> [0 1] 2 <nil> [2 3] 2 <nil> [4 5] 1 <nil> [6 0] 2 <nil> [10 11] 2 <nil> [12 13] 0 EOF [0 0] [0 1 2 3 4 5 6 10 11 12 13] [0 1 2 3 4 5 6 10 11 12 13] */
pipe
基于内存(channel)实现的无缓冲同步管道(pipe),并发安全。
- 同步
channel
阻塞,读写匹配。
package main import ( "fmt" "io" "os" ) func main() { r, w := io.Pipe() go func() { defer w.Close() fmt.Fprint(w, "hello, world!\n") }() io.Copy(os.Stdout, r) }
类似管道,还有:
func Pipe() (r *File, w *File, err error) // os func Pipe() (Conn, Conn) // net
源码剖析
内部使用无缓冲 channel
交换数据。
可看作 channel
的 io.Reader
, Writer
封装版。
// io/pipe.go type pipe struct { wrMu sync.Mutex // 写锁 wrCh chan []byte // 数据通道 rdCh chan int // 数据读取数量通知 once sync.Once // 确保关闭仅执行一次 done chan struct{} // 结束通知 } func Pipe() (*PipeReader, *PipeWriter) { p := &pipe{ wrCh: make(chan []byte), rdCh: make(chan int), done: make(chan struct{}), } return &PipeReader{p}, &PipeWriter{p} }
基本流程 :
- 管道(
wrCh
)传递切片,类似协议包。告知{ 指针,长度 }
,不含底层数组。 - 接收方获取切片,据此对底层数据进行复制(
copy
)。 - 缓冲可能过小,用另一管道(
rdCh
)告知复制量,以便其再次发送剩余数据切片。
func (p *pipe) write(b []byte) (n int, err error) { // 关闭检查,独占写锁。 select { case <-p.done: return 0, p.writeCloseError() default: p.wrMu.Lock() defer p.wrMu.Unlock() } for once := true; once || len(b) > 0; once = false { select { case p.wrCh <- b: // 写数据。(切片本身,不包括底层数据) nw := <-p.rdCh // 对方读取数据量。 b = b[nw:] // 继续写剩余数据。 n += nw case <-p.done: // 关闭,终止。 return n, p.writeCloseError() } } return n, nil }
func (p *pipe) read(b []byte) (n int, err error) { // 关闭检查。 select { case <-p.done: return 0, p.readCloseError() default: } select { case bw := <-p.wrCh: // 读取数据。(字节切片) nr := copy(b, bw) // 依照本地缓冲大小复制数据。 p.rdCh <- nr // 告知本次复制数量。 return nr, nil case <-p.done: // 关闭。 return 0, p.readCloseError() } }
用 sync.Once
确保关闭操作仅执行一次。
func (p *pipe) closeRead(err error) error { p.once.Do(func() { close(p.done) }) return nil } func (p *pipe) closeWrite(err error) error { p.once.Do(func() { close(p.done) }) return nil }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论