11
11
// See the License for the specific language governing permissions and
12
12
// limitations under the License.
13
13
14
- using OrasProject . Oras . Oci ;
15
14
using System ;
15
+ using System . IO ;
16
16
using System . Threading ;
17
17
using System . Threading . Tasks ;
18
+ using OrasProject . Oras . Oci ;
19
+ using OrasProject . Oras . Registry ;
18
20
using static OrasProject . Oras . Content . Extensions ;
19
21
20
22
namespace OrasProject . Oras ;
21
23
24
+ public struct CopyOptions
25
+ {
26
+ // public int Concurrency { get; set; }
27
+
28
+ public event Action < Descriptor > OnPreCopy ;
29
+ public event Action < Descriptor > OnPostCopy ;
30
+ public event Action < Descriptor > OnCopySkipped ;
31
+ public event Action < Descriptor , string > OnMounted ;
32
+
33
+ public Func < Descriptor , string [ ] > MountFrom { get ; set ; }
34
+
35
+ internal void PreCopy ( Descriptor descriptor )
36
+ {
37
+ OnPreCopy ? . Invoke ( descriptor ) ;
38
+ }
39
+
40
+ internal void PostCopy ( Descriptor descriptor )
41
+ {
42
+ OnPostCopy ? . Invoke ( descriptor ) ;
43
+ }
44
+
45
+ internal void CopySkipped ( Descriptor descriptor )
46
+ {
47
+ OnCopySkipped ? . Invoke ( descriptor ) ;
48
+ }
49
+
50
+ internal void Mounted ( Descriptor descriptor , string sourceRepository )
51
+ {
52
+ OnMounted ? . Invoke ( descriptor , sourceRepository ) ;
53
+ }
54
+ }
22
55
public static class Extensions
23
56
{
24
57
@@ -36,38 +69,89 @@ public static class Extensions
36
69
/// <param name="cancellationToken"></param>
37
70
/// <returns></returns>
38
71
/// <exception cref="Exception"></exception>
39
- public static async Task < Descriptor > CopyAsync ( this ITarget src , string srcRef , ITarget dst , string dstRef , CancellationToken cancellationToken = default )
72
+ public static async Task < Descriptor > CopyAsync ( this ITarget src , string srcRef , ITarget dst , string dstRef , CancellationToken cancellationToken = default , CopyOptions ? copyOptions = default )
40
73
{
41
74
if ( string . IsNullOrEmpty ( dstRef ) )
42
75
{
43
76
dstRef = srcRef ;
44
77
}
45
78
var root = await src . ResolveAsync ( srcRef , cancellationToken ) . ConfigureAwait ( false ) ;
46
- await src . CopyGraphAsync ( dst , root , cancellationToken ) . ConfigureAwait ( false ) ;
79
+ await src . CopyGraphAsync ( dst , root , cancellationToken , copyOptions ) . ConfigureAwait ( false ) ;
47
80
await dst . TagAsync ( root , dstRef , cancellationToken ) . ConfigureAwait ( false ) ;
48
81
return root ;
49
82
}
50
83
51
- public static async Task CopyGraphAsync ( this ITarget src , ITarget dst , Descriptor node , CancellationToken cancellationToken )
84
+ public static async Task CopyGraphAsync ( this ITarget src , ITarget dst , Descriptor node , CancellationToken cancellationToken , CopyOptions ? copyOptions = default )
52
85
{
53
86
// check if node exists in target
54
87
if ( await dst . ExistsAsync ( node , cancellationToken ) . ConfigureAwait ( false ) )
55
88
{
89
+ copyOptions ? . CopySkipped ( node ) ;
56
90
return ;
57
91
}
58
92
59
93
// retrieve successors
60
94
var successors = await src . GetSuccessorsAsync ( node , cancellationToken ) . ConfigureAwait ( false ) ;
61
- // obtain data stream
62
- var dataStream = await src . FetchAsync ( node , cancellationToken ) . ConfigureAwait ( false ) ;
95
+
63
96
// check if the node has successors
64
- if ( successors != null )
97
+ foreach ( var childNode in successors )
98
+ {
99
+ await src . CopyGraphAsync ( dst , childNode , cancellationToken , copyOptions ) . ConfigureAwait ( false ) ;
100
+ }
101
+
102
+ var sourceRepositories = copyOptions ? . MountFrom ( node ) ?? [ ] ;
103
+ if ( dst is IMounter mounter && sourceRepositories . Length > 0 )
65
104
{
66
- foreach ( var childNode in successors )
105
+ for ( var i = 0 ; i < sourceRepositories . Length ; i ++ )
67
106
{
68
- await src . CopyGraphAsync ( dst , childNode , cancellationToken ) . ConfigureAwait ( false ) ;
107
+ var sourceRepository = sourceRepositories [ i ] ;
108
+ var mountFailed = false ;
109
+
110
+ async Task < Stream > GetContents ( CancellationToken token )
111
+ {
112
+ // the invocation of getContent indicates that mounting has failed
113
+ mountFailed = true ;
114
+
115
+ if ( i < sourceRepositories . Length - 1 )
116
+ {
117
+ // If this is not the last one, skip this source and try next one
118
+ // We want to return an error that we will test for from mounter.Mount()
119
+ throw new SkipSourceException ( ) ;
120
+ }
121
+
122
+ // this is the last iteration so we need to actually get the content and do the copy
123
+ // but first call the PreCopy function
124
+ copyOptions ? . PreCopy ( node ) ;
125
+ return await src . FetchAsync ( node , token ) . ConfigureAwait ( false ) ;
126
+ }
127
+
128
+ try
129
+ {
130
+ await mounter . MountAsync ( node , sourceRepository , GetContents , cancellationToken ) . ConfigureAwait ( false ) ;
131
+ }
132
+ catch ( SkipSourceException )
133
+ {
134
+ }
135
+
136
+ if ( ! mountFailed )
137
+ {
138
+ copyOptions ? . Mounted ( node , sourceRepository ) ;
139
+ return ;
140
+ }
69
141
}
70
142
}
71
- await dst . PushAsync ( node , dataStream , cancellationToken ) . ConfigureAwait ( false ) ;
143
+ else
144
+ {
145
+ // alternatively we just copy it
146
+ copyOptions ? . PreCopy ( node ) ;
147
+ var dataStream = await src . FetchAsync ( node , cancellationToken ) . ConfigureAwait ( false ) ;
148
+ await dst . PushAsync ( node , dataStream , cancellationToken ) . ConfigureAwait ( false ) ;
149
+ }
150
+
151
+ // we copied it
152
+ copyOptions ? . PostCopy ( node ) ;
72
153
}
154
+
155
+ private class SkipSourceException : Exception { }
73
156
}
157
+
0 commit comments