|
6 | 6 | import java.sql.Statement; |
7 | 7 | import java.util.Collection; |
8 | 8 | import java.util.List; |
| 9 | +import java.util.Map; |
| 10 | +import java.util.concurrent.atomic.AtomicInteger; |
9 | 11 | import java.util.stream.Collectors; |
| 12 | +import java.util.stream.Stream; |
10 | 13 | import javax.annotation.Nonnull; |
11 | 14 | import javax.annotation.Nullable; |
12 | 15 | import javax.persistence.EntityManager; |
@@ -73,13 +76,26 @@ protected String generateTempIdTable( @Nonnull final List<ChannelAddress> addres |
73 | 76 | { |
74 | 77 | return |
75 | 78 | "DECLARE @Ids TABLE ( Id INTEGER NOT NULL );\n" + |
76 | | - "INSERT INTO @Ids VALUES " + |
77 | | - addresses.stream() |
78 | | - .map( a -> "(" + a.getSubChannelId() + ")" ) |
79 | | - .collect( Collectors.joining( "," ) ) + |
| 79 | + chunked( addresses.stream().map( ChannelAddress::getSubChannelId ), 900 ) |
| 80 | + .map( ids -> |
| 81 | + "INSERT INTO @Ids VALUES " + |
| 82 | + ids.stream().map( id -> "(" + id + ")" ).collect( Collectors.joining( "," ) ) ). |
| 83 | + collect( Collectors.joining( "\n" ) ) + |
80 | 84 | "\n"; |
81 | 85 | } |
82 | 86 |
|
| 87 | + @SuppressWarnings( "SameParameterValue" ) |
| 88 | + private static <T> Stream<List<T>> chunked( @Nonnull final Stream<T> stream, final int chunkSize ) |
| 89 | + { |
| 90 | + final AtomicInteger index = new AtomicInteger( 0 ); |
| 91 | + |
| 92 | + return stream |
| 93 | + .collect( Collectors.groupingBy( x -> index.getAndIncrement() / chunkSize ) ) |
| 94 | + .entrySet().stream() |
| 95 | + .sorted( Map.Entry.comparingByKey() ) |
| 96 | + .map( Map.Entry::getValue ); |
| 97 | + } |
| 98 | + |
83 | 99 | protected void bulkLinkFromSourceGraphToTargetGraph( @Nonnull final ReplicantSession session, |
84 | 100 | @Nullable final Object filter, |
85 | 101 | @Nonnull final ChangeSet changeSet, |
|
0 commit comments