Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions docs/src/advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,97 @@ This can be useful for performance when one expects to append many additional da
## Fallback Behaviour
By default JLD2 will attempt to open files using the `MmapIO` backend. If that fails, it retries using `IOStream`.

## Virtual Datasets

Virtual datasets (VDS) allow you to create datasets that reference data from multiple source files without copying the data. This is useful for combining large distributed datasets efficiently.

### Basic Usage

Create a virtual dataset mapping entire source files:

```julia
using JLD2

# Create source files
jldsave("data1.jld2"; x = fill(1.0, 3))
jldsave("data2.jld2"; x = fill(2.0, 3))

# Create virtual dataset
jldopen("virtual.jld2", "w") do f
mappings = [
JLD2.VirtualMapping("./data1.jld2", "x"),
JLD2.VirtualMapping("./data2.jld2", "x")
]
JLD2.create_virtual_dataset(f, "combined", (3, 2), Float64, mappings)
end

# Read back
data = jldopen("virtual.jld2", "r") do f
f["combined"] # Returns [1.0 2.0; 1.0 2.0; 1.0 2.0]
end
```

### Selection Methods

Virtual mappings support three ways to specify regions:

**1. Julia index ranges (recommended)**
```julia
mapping = JLD2.VirtualMapping("./data.jld2", "measurements";
vds_indices=(1:1, 1:5)) # Place in first row, columns 1-5

mapping = JLD2.VirtualMapping("./data.jld2", "measurements";
src_indices=(1:10, 5:15), # Take rows 1-10, cols 5-15 from source
vds_indices=(1:10, 1:11)) # Place at rows 1-10, cols 1-11 in VDS
```

**2. Root index + shape (most intuitive)**
```julia
mapping = JLD2.VirtualMapping("./data.jld2", "measurements";
vds_root=(2, 1), # Start at row 2, column 1
vds_shape=(1, 5)) # Block is 1 row × 5 columns

mapping = JLD2.VirtualMapping("./data.jld2", "measurements";
src_root=(5, 10), src_shape=(3, 4), # Take 3×4 block from source
vds_root=(1, 1), vds_shape=(3, 4)) # Place at top-left of VDS
```

**3. Direct HyperslabSelection (advanced)**
```julia
vds_sel = JLD2.HyperslabSelection([0x0, 0x0], [0x1, 0x1], [0x1, 0x1], [0x5, 0x1])
mapping = JLD2.VirtualMapping("./data.jld2", "measurements"; vds_selection=vds_sel)
```

### Strided Selections

Select non-contiguous regions using strided ranges:

```julia
# Every other row
mapping = JLD2.VirtualMapping("./data.jld2", "measurements";
vds_indices=(1:2:10, 1:5)) # Rows 1, 3, 5, 7, 9 in VDS
```

### Automatic Inference

Automatically infer dimensions and types from source files:

```julia
jldopen("virtual.jld2", "w") do f
source_files = ["./data1.jld2", "./data2.jld2", "./data3.jld2"]

# Automatically determines dimensions and element type
JLD2.create_virtual_dataset(f, "combined", source_files, "measurements")
end
```

### Pattern-based File Names

Use `%b` for sequential file patterns:

```julia
# Expands to sub-0.jld2, sub-1.jld2, etc.
mapping = JLD2.VirtualMapping("./sub-%b.jld2", "dataset")
```


4 changes: 3 additions & 1 deletion src/JLD2.jl
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ function jldopen(fname::AbstractString, wr::Bool, create::Bool, truncate::Bool,
parallel_read::Bool=false,
plain::Bool=false
) where T<:Union{Type{IOStream},Type{MmapIO}}

mmaparrays && @warn "mmaparrays keyword is currently ignored" maxlog = 1
filters = Filters.normalize_filters(compress)

Expand Down Expand Up @@ -501,6 +501,8 @@ include("Filters.jl")
using .Filters: WrittenFilterPipeline, FilterPipeline, iscompressed
using .Filters: Shuffle, Deflate, ZstdFilter

include("virtual_datasets.jl")
include("virtual_datasets_patternbased.jl")
include("datasets.jl")
include("global_heaps.jl")
include("fractal_heaps.jl")
Expand Down
10 changes: 9 additions & 1 deletion src/datalayouts.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ struct DataLayout
end

ischunked(dl::DataLayout) = dl.storage_type == LcChunked
isvirtual(dl::DataLayout) = dl.storage_type == LcVirtual
DataLayout(f::JLD2.JLDFile, msg_::Hmessage) =
DataLayout(f, HmWrap(HmDataLayout, msg_))

Expand Down Expand Up @@ -44,7 +45,14 @@ function DataLayout(f::JLD2.JLDFile, msg::HmWrap{HmDataLayout})

chunk_dimensions = Int[msg.dimensions[1:end-1]...] # drop element size as last dimension
chunked_storage = true
DataLayout(version, storage_type, data_length, data_offset, msg.dimensionality, 0, chunk_dimensions)
DataLayout(version, storage_type, data_length, data_offset, msg.dimensionality, 0, chunk_dimensions)
elseif storage_type == LcVirtual
# Virtual dataset layout
data_length = -1 # Virtual datasets don't have a fixed data length
heap_address = msg.data_address
index = msg.index
# Store the global heap address in data_offset for now
DataLayout(version, storage_type, data_length, fileoffset(f, heap_address), 0, index, UInt64[])
else
throw(UnsupportedFeatureException("Unknown data layout"))
end
Expand Down
55 changes: 55 additions & 0 deletions src/datasets.jl
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,67 @@ Otherwise, `datatype_offset` points to the offset of the datatype attribute.
v = Array{T, 1}()
track_weakref!(f, header_offset, v)
return v
elseif isvirtual(layout)
# Handle virtual dataset
return read_virtual_data(f, dataspace, dt, layout, filters, header_offset, attributes)
end
seek(f.io, layout.data_offset)
read_dataspace = (dataspace, header_offset, layout, filters)
read_data(f, rr, read_dataspace, attributes)
end

function read_virtual_data(f::JLDFile, dataspace::ReadDataspace,
@nospecialize(dt::H5Datatype),
layout::DataLayout,
filters::FilterPipeline,
header_offset::RelOffset,
attributes::Union{Vector{ReadAttribute},Nothing})
# Read virtual dataset layout from global heap
hid = GlobalHeapID(h5offset(f, layout.data_offset), UInt32(layout.chunk_indexing_type))

io = f.io
# Find the global heap
if haskey(f.global_heaps, hid.heap_offset)
gh = f.global_heaps[hid.heap_offset]
else
seek(io, fileoffset(f, hid.heap_offset))
f.global_heaps[hid.heap_offset] = gh = jlread(io, GlobalHeap)
end

# Seek to the heap object
seek(io, gh.objects[hid.index] + 8) # Skip object index, ref count, reserved
obj_size = Int(jlread(io, Length))

# Read the virtual dataset global heap block (Version 0 format)
version = jlread(io, UInt8)
version != 0 && throw(UnsupportedVersionException(
"Only virtual dataset heap block version 0 is currently supported, got version $version"))

# Read number of entries (8 bytes for "Size of Lengths")
num_entries = Int(jlread(io, UInt64))

# Read each mapping
mappings = VirtualMapping[]
for i in 1:num_entries
# Read source filename (null-terminated string)
source_filename = read_bytestring(io)

# Read source dataset name (null-terminated string)
source_dataset = read_bytestring(io)

# Read source selection
src_selection = jlread(io, DataspaceSelection)

# Read virtual selection
vds_selection = jlread(io, DataspaceSelection)

push!(mappings, VirtualMapping(source_filename, source_dataset, src_selection, vds_selection))
end

# Process the virtual dataset mappings to create the combined dataset
return combine_virtual_mappings(f, mappings, dataspace, dt)
end

# Most types can only be scalars or arrays
@nospecializeinfer function read_data(f::JLDFile,
@nospecialize(rr),
Expand Down
9 changes: 7 additions & 2 deletions src/dataspaces.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,21 @@ struct WriteDataspace{N,A<:Tuple}
dataspace_type::UInt8
size::NTuple{N,Length}
attributes::A
max_dimensions::NTuple{N,UInt64}
end

# Outer constructors for convenience
WriteDataspace(dataspace_type::UInt8, size::NTuple{N,UInt64}, attributes::A) where {N,A<:Tuple} =
WriteDataspace{N,A}(dataspace_type, size, attributes, size)

struct ReadDataspace
dataspace_type::UInt8
dimensionality::UInt8
dimensions_offset::Int64
end
ReadDataspace() = ReadDataspace(DS_SCALAR, 0, -1)

ReadDataspace(f, msg_::Union{Hmessage, Message}) =
ReadDataspace(f, msg_::Union{Hmessage, Message}) =
ReadDataspace(f, HmWrap(HmDataspace, msg_))
ReadDataspace(f, msg::HmWrap{HmDataspace}) =
ReadDataspace(msg.dataspace_type, msg.dimensionality, fileoffset(f, msg.dim_offset))
Expand Down Expand Up @@ -105,4 +110,4 @@ function jlwrite(io::IO, dspace::WriteDataspace{N}) where N
for x in dspace.size
jlwrite(io, x::Length)
end
end
end
56 changes: 31 additions & 25 deletions src/global_heaps.jl
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,26 @@ isatend(f::JLDFile, gh::GlobalHeap) =
heap_object_length(data::AbstractArray) = length(data)
heap_object_length(::Any) = 1

function write_heap_object(f::JLDFile, odr::ODR, data, wsession::JLDWriteSession) where ODR
# The type parameter ODR is needed to convince the compiler to specialize on ODR.
psz = odr_sizeof(odr) * heap_object_length(data)
objsz = 8 + jlsizeof(Length) + psz
objsz += 8 - mod1(objsz, 8)
"""
allocate_in_global_heap(f::JLDFile, objsz::Int) -> GlobalHeap

Allocate space in the global heap for an object of size `objsz`.
Returns the GlobalHeap that has space for the object.

Allocation strategy:
1. Use existing heap if object fits
2. Extend existing heap if it's at end of file
3. Create new heap otherwise
"""
function allocate_in_global_heap(f::JLDFile, objsz::Int)
io = f.io

# This is basically a memory allocation problem. Right now we do it
# in a pretty naive way. We:
#
# 1. Put the object in the last created global heap if it fits
# 2. Extend the last global heap if it's at the end of the file
# 3. Create a new global heap if we can't do 1 or 2
#
# This is not a great approach if we're writing objects of
# different sizes interspersed with new datasets. The torture case
# would be a Vector{Any} of mutable objects, some of which contain
# large (>4080 byte) strings and some of which contain small
# strings. In that case, we'd be better off trying to put the small
# strings into existing heaps, rather than writing new ones. This
# should be revisited at a later date.

# Can only fit up to typemax(UInt16) items in a single heap
heap_filled = length(f.global_heap.objects) >= typemax(UInt16)

if objsz + 8 + jlsizeof(Length) < f.global_heap.free && !heap_filled
# Fits in existing global heap
gh = f.global_heap
return f.global_heap
elseif isatend(f, f.global_heap) && !heap_filled
# Global heap is at end and can be extended
gh = f.global_heap
Expand All @@ -52,6 +45,7 @@ function write_heap_object(f::JLDFile, odr::ODR, data, wsession::JLDWriteSession
seek(io, gh.offset + 8)
jlwrite(io, gh.length)
f.end_of_data += delta
return gh
else
# Need to create a new global heap
heapsz = max(objsz, 4096)
Expand All @@ -63,9 +57,21 @@ function write_heap_object(f::JLDFile, odr::ODR, data, wsession::JLDWriteSession
f.end_of_data = position(io) + heapsz
gh = f.global_heap = f.global_heaps[h5offset(f, offset)] =
GlobalHeap(offset, heapsz, heapsz, Int64[])
return gh
end
end

function write_heap_object(f::JLDFile, odr::ODR, data, wsession::JLDWriteSession) where ODR
# The type parameter ODR is needed to convince the compiler to specialize on ODR.
psz = odr_sizeof(odr) * heap_object_length(data)
objsz = 8 + jlsizeof(Length) + psz
objsz += 8 - mod1(objsz, 8)
io = f.io

# Allocate space in global heap
gh = allocate_in_global_heap(f, objsz)

# Write data
# Write object header
index = length(gh.objects) + 1
objoffset = gh.offset + 8 + jlsizeof(Length) + gh.length - gh.free
seek(io, objoffset)
Expand All @@ -74,7 +80,7 @@ function write_heap_object(f::JLDFile, odr::ODR, data, wsession::JLDWriteSession
jlwrite(io, UInt32(0)) # Reserved
jlwrite(io, Length(psz)) # Object size

# Update global heap object
# Update global heap
gh.free -= objsz
push!(gh.objects, objoffset)

Expand All @@ -85,9 +91,9 @@ function write_heap_object(f::JLDFile, odr::ODR, data, wsession::JLDWriteSession
jlwrite(io, Length(gh.free - 8 - jlsizeof(Length))) # Object size
end

# Write data
# Write actual data
seek(io, objoffset + 8 + jlsizeof(Length))
write_data(io, f, data, odr, datamode(odr), wsession) # Object data
write_data(io, f, data, odr, datamode(odr), wsession)

GlobalHeapID(h5offset(f, gh.offset), index)
end
Expand Down
12 changes: 8 additions & 4 deletions src/headermessages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
(version == 1) && dataspace_type::@computed(DS_V1)
version == 1 && @skip(5)
dim_offset::@Offset
dimensions::NTuple{Int(dimensionality), Int64}
isset(flags,0) && max_dimension_size::NTuple{Int(dimensionality), Int64}
dimensions::NTuple{Int(dimensionality), UInt64}
if isset(flags,0)
max_dimension_size::NTuple{Int(dimensionality), UInt64} = kw.dimensions
end
end

@pseudostruct HmLinkInfo begin
Expand Down Expand Up @@ -148,8 +150,10 @@ end
data_address::RelOffset
end
if layout_class == LcVirtual # Virtual Storage
data_address::RelOffset
index::UInt32
# Virtual Dataset Storage Layout
# Points to global heap containing virtual dataset mappings
data_address::RelOffset # Global heap address containing VDS mappings
index::UInt32 # Index within the global heap collection
end
end
end
Expand Down
Loading
Loading