Skip to content

Commit baa44ca

Browse files
committed
Add Collection.NewIter method.
This is a refactoring of the first batch + cursor logic that is used in the Pipe and Repair methods, so it may be used both internally in other places and also externally when people depend on functionality not yet implemented in the driver.
1 parent 513c45d commit baa44ca

File tree

1 file changed

+75
-57
lines changed

1 file changed

+75
-57
lines changed

session.go

Lines changed: 75 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1855,13 +1855,6 @@ func (c *Collection) Repair() *Iter {
18551855
cloned := session.Clone()
18561856
cloned.SetMode(Strong, false)
18571857
defer cloned.Close()
1858-
c = c.With(cloned)
1859-
1860-
iter := &Iter{
1861-
session: session,
1862-
timeout: -1,
1863-
}
1864-
iter.gotReply.L = &iter.m
18651858

18661859
var result struct {
18671860
Cursor struct {
@@ -1874,28 +1867,10 @@ func (c *Collection) Repair() *Iter {
18741867
RepairCursor: c.Name,
18751868
Cursor: &repairCmdCursor{batchSize},
18761869
}
1877-
iter.err = c.Database.Run(cmd, &result)
1878-
if iter.err != nil {
1879-
return iter
1880-
}
1881-
docs := result.Cursor.FirstBatch
1882-
for i := range docs {
1883-
iter.docData.Push(docs[i].Data)
1884-
}
1885-
if result.Cursor.Id != 0 {
1886-
socket, err := cloned.acquireSocket(true)
1887-
if err != nil {
1888-
// Cloned session is in strong mode, and the query
1889-
// above succeeded. Should have a reserved socket.
1890-
panic("internal error: " + err.Error())
1891-
}
1892-
iter.server = socket.Server()
1893-
socket.Release()
1894-
iter.op.cursorId = result.Cursor.Id
1895-
iter.op.collection = c.FullName
1896-
iter.op.replyFunc = iter.replyFunc()
1897-
}
1898-
return iter
1870+
1871+
clonedc := c.With(cloned)
1872+
err := clonedc.Database.Run(cmd, &result)
1873+
return clonedc.NewIter(session, result.Cursor.FirstBatch, result.Cursor.Id, err)
18991874
}
19001875

19011876
// FindId is a convenience helper equivalent to:
@@ -1957,7 +1932,6 @@ func (c *Collection) Pipe(pipeline interface{}) *Pipe {
19571932
// Iter executes the pipeline and returns an iterator capable of going
19581933
// over all the generated results.
19591934
func (p *Pipe) Iter() *Iter {
1960-
19611935
// Clone session and set it to strong mode so that the server
19621936
// used for the query may be safely obtained afterwards, if
19631937
// necessary for iteration when a cursor is received.
@@ -1966,12 +1940,6 @@ func (p *Pipe) Iter() *Iter {
19661940
defer cloned.Close()
19671941
c := p.collection.With(cloned)
19681942

1969-
iter := &Iter{
1970-
session: p.session,
1971-
timeout: -1,
1972-
}
1973-
iter.gotReply.L = &iter.m
1974-
19751943
var result struct {
19761944
// 2.4, no cursors.
19771945
Result []bson.Raw
@@ -1989,34 +1957,84 @@ func (p *Pipe) Iter() *Iter {
19891957
AllowDisk: p.allowDisk,
19901958
Cursor: &pipeCmdCursor{p.batchSize},
19911959
}
1992-
iter.err = c.Database.Run(cmd, &result)
1993-
if e, ok := iter.err.(*QueryError); ok && e.Message == `unrecognized field "cursor` {
1960+
err := c.Database.Run(cmd, &result)
1961+
if e, ok := err.(*QueryError); ok && e.Message == `unrecognized field "cursor` {
19941962
cmd.Cursor = nil
19951963
cmd.AllowDisk = false
1996-
iter.err = c.Database.Run(cmd, &result)
1964+
err = c.Database.Run(cmd, &result)
1965+
}
1966+
firstBatch := result.Result
1967+
if firstBatch == nil {
1968+
firstBatch = result.Cursor.FirstBatch
1969+
}
1970+
return c.NewIter(p.session, firstBatch, result.Cursor.Id, err)
1971+
}
1972+
1973+
// NewIter returns a newly created iterator with the provided parameters.
1974+
// Using this method is not recommended unless the desired functionality
1975+
// is not yet exposed via a more convenient interface (Find, Pipe, etc).
1976+
//
1977+
// The optional session parameter associates the lifetime of the returned
1978+
// iterator to an arbitrary session. If nil, the iterator will be bound to
1979+
// c's session.
1980+
//
1981+
// Documents in firstBatch will be individually provided by the returned
1982+
// iterator before documents from cursorId are made available. If cursorId
1983+
// is zero, only the documents in firstBatch are provided.
1984+
//
1985+
// If err is not nil, the iterator's Err method will report it after
1986+
// exhausting documents in firstBatch.
1987+
//
1988+
// NewIter must be called right after the cursor id is obtained, and must not
1989+
// be called on a collection in Eventual mode, because the cursor id is
1990+
// associated with the specific server that returned it. The session parameter
1991+
// may be in any mode or state, though.
1992+
//
1993+
func (c *Collection) NewIter(session *Session, firstBatch []bson.Raw, cursorId int64, err error) *Iter {
1994+
var server *mongoServer
1995+
csession := c.Database.Session
1996+
csession.m.RLock()
1997+
socket := csession.masterSocket
1998+
if socket == nil {
1999+
socket = csession.slaveSocket
19972000
}
1998-
if iter.err != nil {
1999-
return iter
2001+
if socket != nil {
2002+
server = socket.Server()
20002003
}
2001-
docs := result.Result
2002-
if docs == nil {
2003-
docs = result.Cursor.FirstBatch
2004+
csession.m.RUnlock()
2005+
2006+
if server == nil {
2007+
if csession.Mode() == Eventual {
2008+
panic("Collection.NewIter called in Eventual mode")
2009+
}
2010+
panic("Collection.NewIter called on a fresh session with no associated server")
20042011
}
2005-
for i := range docs {
2006-
iter.docData.Push(docs[i].Data)
2012+
2013+
if session == nil {
2014+
session = csession
20072015
}
2008-
if result.Cursor.Id != 0 {
2009-
socket, err := cloned.acquireSocket(true)
2010-
if err != nil {
2011-
// Cloned session is in strong mode, and the query
2012-
// above succeeded. Should have a reserved socket.
2013-
panic("internal error: " + err.Error())
2016+
2017+
iter := &Iter{
2018+
session: session,
2019+
server: server,
2020+
timeout: -1,
2021+
err: err,
2022+
}
2023+
iter.gotReply.L = &iter.m
2024+
for _, doc := range firstBatch {
2025+
iter.docData.Push(doc.Data)
2026+
}
2027+
if cursorId != 0 {
2028+
socket, err := c.Database.Session.acquireSocket(true)
2029+
if err == nil {
2030+
iter.server = socket.Server()
2031+
socket.Release()
2032+
iter.op.cursorId = cursorId
2033+
iter.op.collection = c.FullName
2034+
iter.op.replyFunc = iter.replyFunc()
2035+
} else if iter.err == nil {
2036+
iter.err = err
20142037
}
2015-
iter.server = socket.Server()
2016-
socket.Release()
2017-
iter.op.cursorId = result.Cursor.Id
2018-
iter.op.collection = c.FullName
2019-
iter.op.replyFunc = iter.replyFunc()
20202038
}
20212039
return iter
20222040
}

0 commit comments

Comments
 (0)