Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ The following emojis are used to highlight certain changes:

### Fixed

- `gateway`: Fixed CAR responses including blocked content. The gateway now properly filters out blocked CIDs from CAR format responses, ensuring content filtering policies are enforced across all response formats. ([ipfs/kubo#10361](https://github.com/ipfs/kubo/issues/10361))

### Security


Expand Down
241 changes: 189 additions & 52 deletions gateway/backend_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,51 +377,64 @@ func (bb *BlocksBackend) Head(ctx context.Context, path path.ImmutablePath) (Con
// https://ipld.io/specs/transport/car/carv1/#number-of-roots
var emptyRoot = []cid.Cid{cid.MustParse("bafkqaaa")}

// GetCAR returns a CAR stream for the provided immutable path.
//
// This method implements a two-phase approach to ensure correct HTTP status codes:
//
// Phase 1: Path validation (determines HTTP status)
// - Attempts to resolve the path to validate it exists
// - If root is blocked → HTTP 410 Gone
// - If root is not found → HTTP 404 Not Found
// - If path is valid → HTTP 200 with CAR stream
//
// Phase 2: CAR streaming (after status determined)
// - Streams CAR data through io.Pipe
// - Blocked content within the DAG is skipped gracefully
// - Client receives partial CAR if internal content is blocked
//
// This approach fixes:
// - https://github.com/ipfs/boxo/issues/458 (empty CAR with 200 status)
// - https://github.com/ipfs/kubo/issues/10361 (blocked CIDs in CAR output)
func (bb *BlocksBackend) GetCAR(ctx context.Context, p path.ImmutablePath, params CarParams) (ContentPathMetadata, io.ReadCloser, error) {
pathMetadata, resolveErr := bb.ResolvePath(ctx, p)
if resolveErr != nil {
rootCid, err := cid.Decode(strings.Split(p.String(), "/")[2])
if err != nil {
return ContentPathMetadata{}, nil, err
}

var buf bytes.Buffer
cw, err := storage.NewWritable(&buf, emptyRoot, car.WriteAsCarV1(true))
if err != nil {
return ContentPathMetadata{}, nil, err
}

blockGetter := merkledag.NewDAGService(bb.blockService).Session(ctx)

blockGetter = &nodeGetterToCarExporer{
ng: blockGetter,
cw: cw,
}

// Setup the UnixFS resolver.
f := newNodeGetterFetcherSingleUseFactory(ctx, blockGetter)
pathResolver := resolver.NewBasicResolver(f)
_, _, err = pathResolver.ResolveToLastNode(ctx, p)

if isErrNotFound(err) {
return ContentPathMetadata{
PathSegmentRoots: nil,
LastSegment: path.FromCid(rootCid),
ContentType: "",
}, io.NopCloser(&buf), nil
} else if err != nil {
return ContentPathMetadata{}, nil, err
} else {
return ContentPathMetadata{}, nil, resolveErr
}
}

// Validate namespace early
if p.Namespace() != path.IPFSNamespace {
return ContentPathMetadata{}, nil, errors.New("path does not have /ipfs/ prefix")
}

// Try to resolve path metadata first
pathMetadata, err := bb.ResolvePath(ctx, p)
if err == nil {
// Path is valid - proceed with standard CAR streaming
return bb.streamCAR(ctx, p, pathMetadata, params)
}

// Path resolution failed - determine appropriate HTTP status
// by checking the root block's availability
return bb.handleCarErrorPath(ctx, p, params, err)
}

// streamCAR handles the standard case where the path is valid and accessible.
// It sets up a pipe for streaming CAR data and starts a goroutine for content generation.
//
// For a path like /ipfs/QmRootCid/sub/lastCid, the CAR output MUST contain blocks in this order:
// 1. Root block (QmRootCid)
// 2. Path traversal blocks (all blocks needed to go from QmRootCid→sub→lastCid)
// 3. Terminal element blocks (complete DAG of lastCid based on selector)
//
// This ordering is a protocol requirement that ensures clients can verify the path
// and access content in a streaming fashion without seeking backwards.
func (bb *BlocksBackend) streamCAR(ctx context.Context, p path.ImmutablePath, pathMetadata ContentPathMetadata, params CarParams) (ContentPathMetadata, io.ReadCloser, error) {
// Create a pipe for streaming CAR data
// The reader side (r) is returned immediately to the HTTP handler
// The writer side (w) is used by the goroutine to stream CAR content
r, w := io.Pipe()

// Start asynchronous CAR generation
go func() {
defer w.Close()

// Initialize CAR writer with the terminal element's CID as root
// For /ipfs/QmRoot/sub/terminal, this must be terminal's CID, not QmRoot
cw, err := storage.NewWritable(
w,
[]cid.Cid{pathMetadata.LastSegment.RootCid()},
Expand All @@ -430,44 +443,124 @@ func (bb *BlocksBackend) GetCAR(ctx context.Context, p path.ImmutablePath, param
)
if err != nil {
// io.PipeWriter.CloseWithError always returns nil.
_ = w.CloseWithError(err)
_ = w.CloseWithError(fmt.Errorf("creating CAR writer: %w", err))
return
}

blockGetter := merkledag.NewDAGService(bb.blockService).Session(ctx)

blockGetter = &nodeGetterToCarExporer{
ng: blockGetter,
// Create DAG service using the existing session from context if available
// (embedded by WrapContextForRequest), or create a new one if needed.
// blockservice.NewSession automatically handles session reuse, ensuring
// all operations within a request share the same session for better performance.
// Sessions properly enforce denylist checks via wrapped blockstore/exchange.
session := blockservice.NewSession(ctx, bb.blockService)
dagService := merkledag.WrapSession(session)

// Wrap DAG service to write blocks to CAR as they're fetched
// This ensures all accessed blocks are included in the CAR output
blockGetter := &nodeGetterToCarExporer{
ng: dagService,
cw: cw,
}

// Setup the UnixFS resolver.
f := newNodeGetterFetcherSingleUseFactory(ctx, blockGetter)
pathResolver := resolver.NewBasicResolver(f)
// Set up UnixFS resolver
factory := newNodeGetterFetcherSingleUseFactory(ctx, blockGetter)
pathResolver := resolver.NewBasicResolver(factory)

// Set up IPLD LinkSystem for DAG traversal
lsys := cidlink.DefaultLinkSystem()
unixfsnode.AddUnixFSReificationToLinkSystem(&lsys)
// Use blockGetter which writes blocks to CAR as they're fetched.
// With comprehensive nopfs wrappers (BlockService, Blockstore, Exchange),
// blocking is already handled at the service level. The blockGetter
// (nodeGetterToCarExporer) simply passes through accessible blocks and
// writes them to the CAR output.
lsys.StorageReadOpener = blockOpener(ctx, blockGetter)

// First resolve the path since we always need to.
// CRITICAL PATH RESOLUTION: This call traverses from root to terminal element,
// writing each block to the CAR in order. For /ipfs/QmRoot/sub/terminal:
// 1. Fetches QmRoot block (written to CAR via nodeGetterToCarExporer)
// 2. Fetches intermediate blocks to reach 'sub' (written to CAR)
// 3. Fetches blocks to reach 'terminal' (written to CAR)
// This ensures proper CAR block ordering per the IPFS protocol requirements.
// The first successful block fetch triggers data to be written to the pipe,
// which causes the HTTP handler to send status 200.
lastCid, remainder, err := pathResolver.ResolveToLastNode(ctx, p)
if err != nil {
// Resolution failed - close pipe with error
// io.PipeWriter.CloseWithError always returns nil.
_ = w.CloseWithError(err)
_ = w.CloseWithError(fmt.Errorf("path resolution failed: %w", err))
return
}

// The terminal block will be written to CAR automatically when accessed
// during walkGatewaySimpleSelector traversal via blockGetter

// Continue with DAG traversal for the remaining content
// TODO: support selectors passed as request param: https://github.com/ipfs/kubo/issues/8769
// TODO: this is very slow if blocks are remote due to linear traversal. Do we need deterministic traversals here?
carWriteErr := walkGatewaySimpleSelector(ctx, lastCid, nil, remainder, params, &lsys)

// io.PipeWriter.CloseWithError always returns nil.
_ = w.CloseWithError(carWriteErr)
if err := walkGatewaySimpleSelector(ctx, lastCid, nil, remainder, params, &lsys); err != nil {
// Traversal errors appear as stream errors (HTTP 200 already sent)
// io.PipeWriter.CloseWithError always returns nil.
_ = w.CloseWithError(fmt.Errorf("DAG traversal failed: %w", err))
}
}()

return pathMetadata, r, nil
}

// handleCarErrorPath determines the appropriate HTTP status when path resolution fails.
// It does NOT generate a CAR - just checks root block availability for error classification.
func (bb *BlocksBackend) handleCarErrorPath(ctx context.Context, p path.ImmutablePath, params CarParams, resolveErr error) (ContentPathMetadata, io.ReadCloser, error) {
rootCid := p.RootCid()

// Create metadata with root CID for 404 responses
md := ContentPathMetadata{
PathSegmentRoots: nil,
LastSegment: path.FromCid(rootCid),
}

// Try to fetch just the root block to determine error type
// Reuse existing session from context for consistency with other operations
// in the same request. This improves performance through better peer management.
session := blockservice.NewSession(ctx, bb.blockService)
dagService := merkledag.WrapSession(session)
_, err := dagService.Get(ctx, rootCid)
if err != nil {
// Use existing error checking functions from errors.go
if isErrContentBlocked(err) {
// Root is blocked → HTTP 410 Gone
return ContentPathMetadata{}, nil, NewErrorStatusCode(
fmt.Errorf("content at %s is blocked: %w", p.String(), err),
http.StatusGone,
)
}

if isErrNotFound(err) {
// Root not found → HTTP 404 Not Found
// Return metadata so gateway can show proper 404 page
return md, nil, err
}

// Other errors (network, timeout, etc.) → HTTP 500
return ContentPathMetadata{}, nil, err
}

// Root exists but path resolution failed
// Check if the path resolution error is due to blocked content in the path
if isErrContentBlocked(resolveErr) {
// Path contains blocked content
// For CAR requests of blocked paths, we return HTTP 410
// This is consistent with how we handle directly blocked root CIDs
return ContentPathMetadata{}, nil, NewErrorStatusCode(
fmt.Errorf("path %s contains blocked content: %w", p.String(), resolveErr),
http.StatusGone,
)
}

// For other errors (broken links, etc.), return the original error
return ContentPathMetadata{}, nil, resolveErr
}

// walkGatewaySimpleSelector walks the subgraph described by the path and terminal element parameters
func walkGatewaySimpleSelector(ctx context.Context, lastCid cid.Cid, terminalBlk blocks.Block, remainder []string, params CarParams, lsys *ipld.LinkSystem) error {
lctx := ipld.LinkContext{Ctx: ctx}
Expand Down Expand Up @@ -517,6 +610,9 @@ func walkGatewaySimpleSelector(ctx context.Context, lastCid cid.Cid, terminalBlk
return err
}

// Use the regular LinkSystem for traversal.
// The blockGetter (via nodeGetterToCarExporer) already handles writing blocks to CAR
// as they're fetched during traversal, so no special handling is needed.
progress := traversal.Progress{
Cfg: &traversal.Config{
Ctx: ctx,
Expand Down Expand Up @@ -784,6 +880,16 @@ func (bb *BlocksBackend) resolvePath(ctx context.Context, p path.Path) (path.Imm
return imPath, remainder, nil
}

// nodeGetterToCarExporer wraps a format.NodeGetter to write blocks to a CAR writer
// as they are fetched. This is crucial for ensuring proper CAR block ordering:
//
// - During path resolution (QmRoot→sub→terminal), each block is written immediately
// - This ensures path traversal blocks appear in the CAR before terminal DAG blocks
// - The ordering allows streaming verification and reduces client memory requirements
//
// IMPORTANT: This wrapper is used for path resolution but NOT for the traversal's blockOpener.
// The blockOpener uses the underlying dagService directly to ensure proper blocking checks
// before any blocks are written to the CAR.
type nodeGetterToCarExporer struct {
ng format.NodeGetter
cw storage.WritableCar
Expand All @@ -792,9 +898,12 @@ type nodeGetterToCarExporer struct {
func (n *nodeGetterToCarExporer) Get(ctx context.Context, c cid.Cid) (format.Node, error) {
nd, err := n.ng.Get(ctx, c)
if err != nil {
// Pass through all errors - blockOpener will handle them appropriately
return nil, err
}

// Only write the block to CAR if we successfully fetched it
// This is called during path resolution, not during traversal
if err := n.trySendBlock(ctx, nd); err != nil {
return nil, err
}
Expand All @@ -820,6 +929,19 @@ func (n *nodeGetterToCarExporer) GetMany(ctx context.Context, cids []cid.Cid) <-
case outCh <- nd:
case <-ctx.Done():
}
} else {
// Handle errors from the underlying NodeGetter:
// - NotFound errors: content doesn't exist, skip silently
// - Blocked errors: content is blocked, skip silently
// - Other errors: propagate to caller
if !format.IsNotFound(nd.Err) && !isErrContentBlocked(nd.Err) {
// Only pass through non-blocked errors
select {
case outCh <- nd:
case <-ctx.Done():
}
}
// For blocked/not found errors, we simply skip - don't send anything
}
}
}()
Expand Down Expand Up @@ -909,15 +1031,30 @@ func (n *nodeGetterFetcherSingleUseFactory) blankProgress(ctx context.Context) t
}
}

// blockOpener returns a function that loads blocks during CAR traversal.
// It is used by the IPLD LinkSystem during the walkGatewaySimpleSelector traversal.
//
// When a blocked CID is encountered, it returns traversal.SkipMe{} which tells
// the traversal to skip that branch of the DAG without failing the entire operation.
// This allows generating CARs with partial content when some blocks are filtered.
func blockOpener(ctx context.Context, ng format.NodeGetter) ipld.BlockReadOpener {
return func(_ ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
cidLink, ok := lnk.(cidlink.Link)
if !ok {
return nil, fmt.Errorf("invalid link type for loading: %v", lnk)
}

// Attempt to fetch the block through the NodeGetter.
// If using a blocking-aware BlockService, this returns an error for blocked CIDs.
blk, err := ng.Get(ctx, cidLink.Cid)
if err != nil {
// Check if this block is blocked (not just missing)
if isErrContentBlocked(err) {
// Return traversal.SkipMe{} to gracefully skip blocked content.
// The traversal continues with other accessible parts of the DAG.
return nil, traversal.SkipMe{}
}
// Propagate all other errors including NotFound (broken DAG)
return nil, err
}

Expand Down
26 changes: 22 additions & 4 deletions gateway/backend_car_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,33 @@ func (ps *remoteCarFetcher) Fetch(ctx context.Context, path path.ImmutablePath,
}

if resp.StatusCode != http.StatusOK {
errData, err := io.ReadAll(resp.Body)
defer resp.Body.Close() // Ensure body is closed and drained

// Limit error message reading to prevent memory exhaustion
// 512 bytes is enough for one sentence of error description
const maxErrorSize = 512
limitedReader := io.LimitReader(resp.Body, maxErrorSize)
errData, err := io.ReadAll(limitedReader)
var errMsg string
if err != nil {
err = fmt.Errorf("could not read error message: %w", err)
errMsg = fmt.Sprintf("could not read error message: %v", err)
} else {
err = fmt.Errorf("%q", string(errData))
errMsg = string(errData)
// Add ellipsis if we hit the limit
if len(errData) == maxErrorSize {
errMsg = errMsg + "..."
}
}
return fmt.Errorf("http error from car gateway: %s: %w", resp.Status, err)

// Wrap with appropriate status code for proper handling downstream
return NewErrorStatusCode(
fmt.Errorf("car gateway responded with %s: %q", resp.Status, errMsg),
resp.StatusCode,
)
}

// For successful responses, callback is responsible for reading
// and closing the body
err = cb(path, resp.Body)
if err != nil {
resp.Body.Close()
Expand Down
Loading
Loading