gridfs.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761
  1. // mgo - MongoDB driver for Go
  2. //
  3. // Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
  4. //
  5. // All rights reserved.
  6. //
  7. // Redistribution and use in source and binary forms, with or without
  8. // modification, are permitted provided that the following conditions are met:
  9. //
  10. // 1. Redistributions of source code must retain the above copyright notice, this
  11. // list of conditions and the following disclaimer.
  12. // 2. Redistributions in binary form must reproduce the above copyright notice,
  13. // this list of conditions and the following disclaimer in the documentation
  14. // and/or other materials provided with the distribution.
  15. //
  16. // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  17. // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  18. // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  19. // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
  20. // ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  21. // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  22. // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  23. // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  24. // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  25. // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26. package mgo
  27. import (
  28. "crypto/md5"
  29. "encoding/hex"
  30. "errors"
  31. "hash"
  32. "io"
  33. "os"
  34. "sync"
  35. "time"
  36. "gopkg.in/mgo.v2/bson"
  37. )
  38. type GridFS struct {
  39. Files *Collection
  40. Chunks *Collection
  41. }
  42. type gfsFileMode int
  43. const (
  44. gfsClosed gfsFileMode = 0
  45. gfsReading gfsFileMode = 1
  46. gfsWriting gfsFileMode = 2
  47. )
  48. type GridFile struct {
  49. m sync.Mutex
  50. c sync.Cond
  51. gfs *GridFS
  52. mode gfsFileMode
  53. err error
  54. chunk int
  55. offset int64
  56. wpending int
  57. wbuf []byte
  58. wsum hash.Hash
  59. rbuf []byte
  60. rcache *gfsCachedChunk
  61. doc gfsFile
  62. }
  63. type gfsFile struct {
  64. Id interface{} "_id"
  65. ChunkSize int "chunkSize"
  66. UploadDate time.Time "uploadDate"
  67. Length int64 ",minsize"
  68. MD5 string
  69. Filename string ",omitempty"
  70. ContentType string "contentType,omitempty"
  71. Metadata *bson.Raw ",omitempty"
  72. }
  73. type gfsChunk struct {
  74. Id interface{} "_id"
  75. FilesId interface{} "files_id"
  76. N int
  77. Data []byte
  78. }
  79. type gfsCachedChunk struct {
  80. wait sync.Mutex
  81. n int
  82. data []byte
  83. err error
  84. }
  85. func newGridFS(db *Database, prefix string) *GridFS {
  86. return &GridFS{db.C(prefix + ".files"), db.C(prefix + ".chunks")}
  87. }
  88. func (gfs *GridFS) newFile() *GridFile {
  89. file := &GridFile{gfs: gfs}
  90. file.c.L = &file.m
  91. //runtime.SetFinalizer(file, finalizeFile)
  92. return file
  93. }
  94. func finalizeFile(file *GridFile) {
  95. file.Close()
  96. }
  97. // Create creates a new file with the provided name in the GridFS. If the file
  98. // name already exists, a new version will be inserted with an up-to-date
  99. // uploadDate that will cause it to be atomically visible to the Open and
  100. // OpenId methods. If the file name is not important, an empty name may be
  101. // provided and the file Id used instead.
  102. //
  103. // It's important to Close files whether they are being written to
  104. // or read from, and to check the err result to ensure the operation
  105. // completed successfully.
  106. //
  107. // A simple example inserting a new file:
  108. //
  109. // func check(err error) {
  110. // if err != nil {
  111. // panic(err.String())
  112. // }
  113. // }
  114. // file, err := db.GridFS("fs").Create("myfile.txt")
  115. // check(err)
  116. // n, err := file.Write([]byte("Hello world!"))
  117. // check(err)
  118. // err = file.Close()
  119. // check(err)
  120. // fmt.Printf("%d bytes written\n", n)
  121. //
  122. // The io.Writer interface is implemented by *GridFile and may be used to
  123. // help on the file creation. For example:
  124. //
  125. // file, err := db.GridFS("fs").Create("myfile.txt")
  126. // check(err)
  127. // messages, err := os.Open("/var/log/messages")
  128. // check(err)
  129. // defer messages.Close()
  130. // err = io.Copy(file, messages)
  131. // check(err)
  132. // err = file.Close()
  133. // check(err)
  134. //
  135. func (gfs *GridFS) Create(name string) (file *GridFile, err error) {
  136. file = gfs.newFile()
  137. file.mode = gfsWriting
  138. file.wsum = md5.New()
  139. file.doc = gfsFile{Id: bson.NewObjectId(), ChunkSize: 255 * 1024, Filename: name}
  140. return
  141. }
  142. // OpenId returns the file with the provided id, for reading.
  143. // If the file isn't found, err will be set to mgo.ErrNotFound.
  144. //
  145. // It's important to Close files whether they are being written to
  146. // or read from, and to check the err result to ensure the operation
  147. // completed successfully.
  148. //
  149. // The following example will print the first 8192 bytes from the file:
  150. //
  151. // func check(err error) {
  152. // if err != nil {
  153. // panic(err.String())
  154. // }
  155. // }
  156. // file, err := db.GridFS("fs").OpenId(objid)
  157. // check(err)
  158. // b := make([]byte, 8192)
  159. // n, err := file.Read(b)
  160. // check(err)
  161. // fmt.Println(string(b))
  162. // check(err)
  163. // err = file.Close()
  164. // check(err)
  165. // fmt.Printf("%d bytes read\n", n)
  166. //
  167. // The io.Reader interface is implemented by *GridFile and may be used to
  168. // deal with it. As an example, the following snippet will dump the whole
  169. // file into the standard output:
  170. //
  171. // file, err := db.GridFS("fs").OpenId(objid)
  172. // check(err)
  173. // err = io.Copy(os.Stdout, file)
  174. // check(err)
  175. // err = file.Close()
  176. // check(err)
  177. //
  178. func (gfs *GridFS) OpenId(id interface{}) (file *GridFile, err error) {
  179. var doc gfsFile
  180. err = gfs.Files.Find(bson.M{"_id": id}).One(&doc)
  181. if err != nil {
  182. return
  183. }
  184. file = gfs.newFile()
  185. file.mode = gfsReading
  186. file.doc = doc
  187. return
  188. }
  189. // Open returns the most recently uploaded file with the provided
  190. // name, for reading. If the file isn't found, err will be set
  191. // to mgo.ErrNotFound.
  192. //
  193. // It's important to Close files whether they are being written to
  194. // or read from, and to check the err result to ensure the operation
  195. // completed successfully.
  196. //
  197. // The following example will print the first 8192 bytes from the file:
  198. //
  199. // file, err := db.GridFS("fs").Open("myfile.txt")
  200. // check(err)
  201. // b := make([]byte, 8192)
  202. // n, err := file.Read(b)
  203. // check(err)
  204. // fmt.Println(string(b))
  205. // check(err)
  206. // err = file.Close()
  207. // check(err)
  208. // fmt.Printf("%d bytes read\n", n)
  209. //
  210. // The io.Reader interface is implemented by *GridFile and may be used to
  211. // deal with it. As an example, the following snippet will dump the whole
  212. // file into the standard output:
  213. //
  214. // file, err := db.GridFS("fs").Open("myfile.txt")
  215. // check(err)
  216. // err = io.Copy(os.Stdout, file)
  217. // check(err)
  218. // err = file.Close()
  219. // check(err)
  220. //
  221. func (gfs *GridFS) Open(name string) (file *GridFile, err error) {
  222. var doc gfsFile
  223. err = gfs.Files.Find(bson.M{"filename": name}).Sort("-uploadDate").One(&doc)
  224. if err != nil {
  225. return
  226. }
  227. file = gfs.newFile()
  228. file.mode = gfsReading
  229. file.doc = doc
  230. return
  231. }
  232. // OpenNext opens the next file from iter for reading, sets *file to it,
  233. // and returns true on the success case. If no more documents are available
  234. // on iter or an error occurred, *file is set to nil and the result is false.
  235. // Errors will be available via iter.Err().
  236. //
  237. // The iter parameter must be an iterator on the GridFS files collection.
  238. // Using the GridFS.Find method is an easy way to obtain such an iterator,
  239. // but any iterator on the collection will work.
  240. //
  241. // If the provided *file is non-nil, OpenNext will close it before attempting
  242. // to iterate to the next element. This means that in a loop one only
  243. // has to worry about closing files when breaking out of the loop early
  244. // (break, return, or panic).
  245. //
  246. // For example:
  247. //
  248. // gfs := db.GridFS("fs")
  249. // query := gfs.Find(nil).Sort("filename")
  250. // iter := query.Iter()
  251. // var f *mgo.GridFile
  252. // for gfs.OpenNext(iter, &f) {
  253. // fmt.Printf("Filename: %s\n", f.Name())
  254. // }
  255. // if iter.Close() != nil {
  256. // panic(iter.Close())
  257. // }
  258. //
  259. func (gfs *GridFS) OpenNext(iter *Iter, file **GridFile) bool {
  260. if *file != nil {
  261. // Ignoring the error here shouldn't be a big deal
  262. // as we're reading the file and the loop iteration
  263. // for this file is finished.
  264. _ = (*file).Close()
  265. }
  266. var doc gfsFile
  267. if !iter.Next(&doc) {
  268. *file = nil
  269. return false
  270. }
  271. f := gfs.newFile()
  272. f.mode = gfsReading
  273. f.doc = doc
  274. *file = f
  275. return true
  276. }
  277. // Find runs query on GridFS's files collection and returns
  278. // the resulting Query.
  279. //
  280. // This logic:
  281. //
  282. // gfs := db.GridFS("fs")
  283. // iter := gfs.Find(nil).Iter()
  284. //
  285. // Is equivalent to:
  286. //
  287. // files := db.C("fs" + ".files")
  288. // iter := files.Find(nil).Iter()
  289. //
  290. func (gfs *GridFS) Find(query interface{}) *Query {
  291. return gfs.Files.Find(query)
  292. }
  293. // RemoveId deletes the file with the provided id from the GridFS.
  294. func (gfs *GridFS) RemoveId(id interface{}) error {
  295. err := gfs.Files.Remove(bson.M{"_id": id})
  296. if err != nil {
  297. return err
  298. }
  299. _, err = gfs.Chunks.RemoveAll(bson.D{{"files_id", id}})
  300. return err
  301. }
  302. type gfsDocId struct {
  303. Id interface{} "_id"
  304. }
  305. // Remove deletes all files with the provided name from the GridFS.
  306. func (gfs *GridFS) Remove(name string) (err error) {
  307. iter := gfs.Files.Find(bson.M{"filename": name}).Select(bson.M{"_id": 1}).Iter()
  308. var doc gfsDocId
  309. for iter.Next(&doc) {
  310. if e := gfs.RemoveId(doc.Id); e != nil {
  311. err = e
  312. }
  313. }
  314. if err == nil {
  315. err = iter.Close()
  316. }
  317. return err
  318. }
  319. func (file *GridFile) assertMode(mode gfsFileMode) {
  320. switch file.mode {
  321. case mode:
  322. return
  323. case gfsWriting:
  324. panic("GridFile is open for writing")
  325. case gfsReading:
  326. panic("GridFile is open for reading")
  327. case gfsClosed:
  328. panic("GridFile is closed")
  329. default:
  330. panic("internal error: missing GridFile mode")
  331. }
  332. }
  333. // SetChunkSize sets size of saved chunks. Once the file is written to, it
  334. // will be split in blocks of that size and each block saved into an
  335. // independent chunk document. The default chunk size is 255kb.
  336. //
  337. // It is a runtime error to call this function once the file has started
  338. // being written to.
  339. func (file *GridFile) SetChunkSize(bytes int) {
  340. file.assertMode(gfsWriting)
  341. debugf("GridFile %p: setting chunk size to %d", file, bytes)
  342. file.m.Lock()
  343. file.doc.ChunkSize = bytes
  344. file.m.Unlock()
  345. }
  346. // Id returns the current file Id.
  347. func (file *GridFile) Id() interface{} {
  348. return file.doc.Id
  349. }
  350. // SetId changes the current file Id.
  351. //
  352. // It is a runtime error to call this function once the file has started
  353. // being written to, or when the file is not open for writing.
  354. func (file *GridFile) SetId(id interface{}) {
  355. file.assertMode(gfsWriting)
  356. file.m.Lock()
  357. file.doc.Id = id
  358. file.m.Unlock()
  359. }
  360. // Name returns the optional file name. An empty string will be returned
  361. // in case it is unset.
  362. func (file *GridFile) Name() string {
  363. return file.doc.Filename
  364. }
  365. // SetName changes the optional file name. An empty string may be used to
  366. // unset it.
  367. //
  368. // It is a runtime error to call this function when the file is not open
  369. // for writing.
  370. func (file *GridFile) SetName(name string) {
  371. file.assertMode(gfsWriting)
  372. file.m.Lock()
  373. file.doc.Filename = name
  374. file.m.Unlock()
  375. }
  376. // ContentType returns the optional file content type. An empty string will be
  377. // returned in case it is unset.
  378. func (file *GridFile) ContentType() string {
  379. return file.doc.ContentType
  380. }
  381. // ContentType changes the optional file content type. An empty string may be
  382. // used to unset it.
  383. //
  384. // It is a runtime error to call this function when the file is not open
  385. // for writing.
  386. func (file *GridFile) SetContentType(ctype string) {
  387. file.assertMode(gfsWriting)
  388. file.m.Lock()
  389. file.doc.ContentType = ctype
  390. file.m.Unlock()
  391. }
  392. // GetMeta unmarshals the optional "metadata" field associated with the
  393. // file into the result parameter. The meaning of keys under that field
  394. // is user-defined. For example:
  395. //
  396. // result := struct{ INode int }{}
  397. // err = file.GetMeta(&result)
  398. // if err != nil {
  399. // panic(err.String())
  400. // }
  401. // fmt.Printf("inode: %d\n", result.INode)
  402. //
  403. func (file *GridFile) GetMeta(result interface{}) (err error) {
  404. file.m.Lock()
  405. if file.doc.Metadata != nil {
  406. err = bson.Unmarshal(file.doc.Metadata.Data, result)
  407. }
  408. file.m.Unlock()
  409. return
  410. }
  411. // SetMeta changes the optional "metadata" field associated with the
  412. // file. The meaning of keys under that field is user-defined.
  413. // For example:
  414. //
  415. // file.SetMeta(bson.M{"inode": inode})
  416. //
  417. // It is a runtime error to call this function when the file is not open
  418. // for writing.
  419. func (file *GridFile) SetMeta(metadata interface{}) {
  420. file.assertMode(gfsWriting)
  421. data, err := bson.Marshal(metadata)
  422. file.m.Lock()
  423. if err != nil && file.err == nil {
  424. file.err = err
  425. } else {
  426. file.doc.Metadata = &bson.Raw{Data: data}
  427. }
  428. file.m.Unlock()
  429. }
  430. // Size returns the file size in bytes.
  431. func (file *GridFile) Size() (bytes int64) {
  432. file.m.Lock()
  433. bytes = file.doc.Length
  434. file.m.Unlock()
  435. return
  436. }
  437. // MD5 returns the file MD5 as a hex-encoded string.
  438. func (file *GridFile) MD5() (md5 string) {
  439. return file.doc.MD5
  440. }
  441. // UploadDate returns the file upload time.
  442. func (file *GridFile) UploadDate() time.Time {
  443. return file.doc.UploadDate
  444. }
  445. // SetUploadDate changes the file upload time.
  446. //
  447. // It is a runtime error to call this function when the file is not open
  448. // for writing.
  449. func (file *GridFile) SetUploadDate(t time.Time) {
  450. file.assertMode(gfsWriting)
  451. file.m.Lock()
  452. file.doc.UploadDate = t
  453. file.m.Unlock()
  454. }
  455. // Close flushes any pending changes in case the file is being written
  456. // to, waits for any background operations to finish, and closes the file.
  457. //
  458. // It's important to Close files whether they are being written to
  459. // or read from, and to check the err result to ensure the operation
  460. // completed successfully.
  461. func (file *GridFile) Close() (err error) {
  462. file.m.Lock()
  463. defer file.m.Unlock()
  464. if file.mode == gfsWriting {
  465. if len(file.wbuf) > 0 && file.err == nil {
  466. file.insertChunk(file.wbuf)
  467. file.wbuf = file.wbuf[0:0]
  468. }
  469. file.completeWrite()
  470. } else if file.mode == gfsReading && file.rcache != nil {
  471. file.rcache.wait.Lock()
  472. file.rcache = nil
  473. }
  474. file.mode = gfsClosed
  475. debugf("GridFile %p: closed", file)
  476. return file.err
  477. }
  478. func (file *GridFile) completeWrite() {
  479. for file.wpending > 0 {
  480. debugf("GridFile %p: waiting for %d pending chunks to complete file write", file, file.wpending)
  481. file.c.Wait()
  482. }
  483. if file.err == nil {
  484. hexsum := hex.EncodeToString(file.wsum.Sum(nil))
  485. if file.doc.UploadDate.IsZero() {
  486. file.doc.UploadDate = bson.Now()
  487. }
  488. file.doc.MD5 = hexsum
  489. file.err = file.gfs.Files.Insert(file.doc)
  490. }
  491. if file.err != nil {
  492. file.gfs.Chunks.RemoveAll(bson.D{{"files_id", file.doc.Id}})
  493. }
  494. if file.err == nil {
  495. index := Index{
  496. Key: []string{"files_id", "n"},
  497. Unique: true,
  498. }
  499. file.err = file.gfs.Chunks.EnsureIndex(index)
  500. }
  501. }
  502. // Abort cancels an in-progress write, preventing the file from being
  503. // automically created and ensuring previously written chunks are
  504. // removed when the file is closed.
  505. //
  506. // It is a runtime error to call Abort when the file was not opened
  507. // for writing.
  508. func (file *GridFile) Abort() {
  509. if file.mode != gfsWriting {
  510. panic("file.Abort must be called on file opened for writing")
  511. }
  512. file.err = errors.New("write aborted")
  513. }
  514. // Write writes the provided data to the file and returns the
  515. // number of bytes written and an error in case something
  516. // wrong happened.
  517. //
  518. // The file will internally cache the data so that all but the last
  519. // chunk sent to the database have the size defined by SetChunkSize.
  520. // This also means that errors may be deferred until a future call
  521. // to Write or Close.
  522. //
  523. // The parameters and behavior of this function turn the file
  524. // into an io.Writer.
  525. func (file *GridFile) Write(data []byte) (n int, err error) {
  526. file.assertMode(gfsWriting)
  527. file.m.Lock()
  528. debugf("GridFile %p: writing %d bytes", file, len(data))
  529. defer file.m.Unlock()
  530. if file.err != nil {
  531. return 0, file.err
  532. }
  533. n = len(data)
  534. file.doc.Length += int64(n)
  535. chunkSize := file.doc.ChunkSize
  536. if len(file.wbuf)+len(data) < chunkSize {
  537. file.wbuf = append(file.wbuf, data...)
  538. return
  539. }
  540. // First, flush file.wbuf complementing with data.
  541. if len(file.wbuf) > 0 {
  542. missing := chunkSize - len(file.wbuf)
  543. if missing > len(data) {
  544. missing = len(data)
  545. }
  546. file.wbuf = append(file.wbuf, data[:missing]...)
  547. data = data[missing:]
  548. file.insertChunk(file.wbuf)
  549. file.wbuf = file.wbuf[0:0]
  550. }
  551. // Then, flush all chunks from data without copying.
  552. for len(data) > chunkSize {
  553. size := chunkSize
  554. if size > len(data) {
  555. size = len(data)
  556. }
  557. file.insertChunk(data[:size])
  558. data = data[size:]
  559. }
  560. // And append the rest for a future call.
  561. file.wbuf = append(file.wbuf, data...)
  562. return n, file.err
  563. }
  564. func (file *GridFile) insertChunk(data []byte) {
  565. n := file.chunk
  566. file.chunk++
  567. debugf("GridFile %p: adding to checksum: %q", file, string(data))
  568. file.wsum.Write(data)
  569. for file.doc.ChunkSize*file.wpending >= 1024*1024 {
  570. // Hold on.. we got a MB pending.
  571. file.c.Wait()
  572. if file.err != nil {
  573. return
  574. }
  575. }
  576. file.wpending++
  577. debugf("GridFile %p: inserting chunk %d with %d bytes", file, n, len(data))
  578. // We may not own the memory of data, so rather than
  579. // simply copying it, we'll marshal the document ahead of time.
  580. data, err := bson.Marshal(gfsChunk{bson.NewObjectId(), file.doc.Id, n, data})
  581. if err != nil {
  582. file.err = err
  583. return
  584. }
  585. go func() {
  586. err := file.gfs.Chunks.Insert(bson.Raw{Data: data})
  587. file.m.Lock()
  588. file.wpending--
  589. if err != nil && file.err == nil {
  590. file.err = err
  591. }
  592. file.c.Broadcast()
  593. file.m.Unlock()
  594. }()
  595. }
  596. // Seek sets the offset for the next Read or Write on file to
  597. // offset, interpreted according to whence: 0 means relative to
  598. // the origin of the file, 1 means relative to the current offset,
  599. // and 2 means relative to the end. It returns the new offset and
  600. // an error, if any.
  601. func (file *GridFile) Seek(offset int64, whence int) (pos int64, err error) {
  602. file.m.Lock()
  603. debugf("GridFile %p: seeking for %s (whence=%d)", file, offset, whence)
  604. defer file.m.Unlock()
  605. switch whence {
  606. case os.SEEK_SET:
  607. case os.SEEK_CUR:
  608. offset += file.offset
  609. case os.SEEK_END:
  610. offset += file.doc.Length
  611. default:
  612. panic("unsupported whence value")
  613. }
  614. if offset > file.doc.Length {
  615. return file.offset, errors.New("seek past end of file")
  616. }
  617. if offset == file.doc.Length {
  618. // If we're seeking to the end of the file,
  619. // no need to read anything. This enables
  620. // a client to find the size of the file using only the
  621. // io.ReadSeeker interface with low overhead.
  622. file.offset = offset
  623. return file.offset, nil
  624. }
  625. chunk := int(offset / int64(file.doc.ChunkSize))
  626. if chunk+1 == file.chunk && offset >= file.offset {
  627. file.rbuf = file.rbuf[int(offset-file.offset):]
  628. file.offset = offset
  629. return file.offset, nil
  630. }
  631. file.offset = offset
  632. file.chunk = chunk
  633. file.rbuf = nil
  634. file.rbuf, err = file.getChunk()
  635. if err == nil {
  636. file.rbuf = file.rbuf[int(file.offset-int64(chunk)*int64(file.doc.ChunkSize)):]
  637. }
  638. return file.offset, err
  639. }
  640. // Read reads into b the next available data from the file and
  641. // returns the number of bytes written and an error in case
  642. // something wrong happened. At the end of the file, n will
  643. // be zero and err will be set to io.EOF.
  644. //
  645. // The parameters and behavior of this function turn the file
  646. // into an io.Reader.
  647. func (file *GridFile) Read(b []byte) (n int, err error) {
  648. file.assertMode(gfsReading)
  649. file.m.Lock()
  650. debugf("GridFile %p: reading at offset %d into buffer of length %d", file, file.offset, len(b))
  651. defer file.m.Unlock()
  652. if file.offset == file.doc.Length {
  653. return 0, io.EOF
  654. }
  655. for err == nil {
  656. i := copy(b, file.rbuf)
  657. n += i
  658. file.offset += int64(i)
  659. file.rbuf = file.rbuf[i:]
  660. if i == len(b) || file.offset == file.doc.Length {
  661. break
  662. }
  663. b = b[i:]
  664. file.rbuf, err = file.getChunk()
  665. }
  666. return n, err
  667. }
  668. func (file *GridFile) getChunk() (data []byte, err error) {
  669. cache := file.rcache
  670. file.rcache = nil
  671. if cache != nil && cache.n == file.chunk {
  672. debugf("GridFile %p: Getting chunk %d from cache", file, file.chunk)
  673. cache.wait.Lock()
  674. data, err = cache.data, cache.err
  675. } else {
  676. debugf("GridFile %p: Fetching chunk %d", file, file.chunk)
  677. var doc gfsChunk
  678. err = file.gfs.Chunks.Find(bson.D{{"files_id", file.doc.Id}, {"n", file.chunk}}).One(&doc)
  679. data = doc.Data
  680. }
  681. file.chunk++
  682. if int64(file.chunk)*int64(file.doc.ChunkSize) < file.doc.Length {
  683. // Read the next one in background.
  684. cache = &gfsCachedChunk{n: file.chunk}
  685. cache.wait.Lock()
  686. debugf("GridFile %p: Scheduling chunk %d for background caching", file, file.chunk)
  687. // Clone the session to avoid having it closed in between.
  688. chunks := file.gfs.Chunks
  689. session := chunks.Database.Session.Clone()
  690. go func(id interface{}, n int) {
  691. defer session.Close()
  692. chunks = chunks.With(session)
  693. var doc gfsChunk
  694. cache.err = chunks.Find(bson.D{{"files_id", id}, {"n", n}}).One(&doc)
  695. cache.data = doc.Data
  696. cache.wait.Unlock()
  697. }(file.doc.Id, file.chunk)
  698. file.rcache = cache
  699. }
  700. debugf("Returning err: %#v", err)
  701. return
  702. }