-
Couldn't load subscription status.
- Fork 501
[SystemDS-#3524] Multi-threading of transformdecode/[SystemDS-#3521] Improved Feature Transformations #2275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 23 commits
365d2b8
d5dc7c1
3712b9c
ca836a4
f43a853
84077b0
8fcbbcc
c02fa76
3c66fcc
8da2e0d
19a4437
34a4341
0c5acdb
98865e7
2886537
b75b7d1
61f6d39
839b050
595f4b7
bfcb133
0e67c0a
91fbc54
ec7fe78
6dfa925
b8c4f88
d62f358
56e7cd9
d7a3228
0a35c28
81c405f
91cc343
8448990
dd14314
41cb69b
ba3dc04
bb92e76
abafe03
9b67321
64651ec
808cba2
0156c58
337334e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,172 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.sysds.runtime.transform.decode; | ||
|
|
||
| import org.apache.commons.logging.Log; | ||
| import org.apache.commons.logging.LogFactory; | ||
| import org.apache.sysds.common.Types.ValueType; | ||
| import org.apache.sysds.runtime.DMLRuntimeException; | ||
| import org.apache.sysds.runtime.frame.data.FrameBlock; | ||
| import org.apache.sysds.runtime.matrix.data.MatrixBlock; | ||
|
|
||
|
|
||
| import java.io.Externalizable; | ||
| import java.io.IOException; | ||
| import java.io.ObjectInput; | ||
| import java.io.ObjectOutput; | ||
|
|
||
| public abstract class ColumnDecoder implements Externalizable { | ||
| protected static final Log LOG = LogFactory.getLog(Decoder.class.getName()); | ||
| private static final long serialVersionUID = -1732411001366177787L; | ||
|
|
||
| protected ValueType[] _schema; | ||
| protected int[] _colList; | ||
| protected String[] _colnames = null; | ||
| protected ColumnDecoder(ValueType[] schema, int[] colList) { | ||
| _schema = schema; | ||
| _colList = colList; | ||
| } | ||
|
|
||
| public ValueType[] getSchema() { | ||
| return _schema; | ||
| } | ||
|
|
||
| public void setColnames(String[] colnames) { | ||
| _colnames = colnames; | ||
| } | ||
|
|
||
| public String[] getColnames() { | ||
| return _colnames; | ||
| } | ||
|
|
||
| public int[] getColList() {return _colList;} | ||
| /** | ||
| * Block decode API converting a matrix block into a frame block. | ||
| * | ||
| * @param in Input matrix block | ||
| * @param out Output frame block | ||
| * @return returns given output frame block for convenience | ||
| */ | ||
| public abstract FrameBlock columnDecode(MatrixBlock in, FrameBlock out); | ||
|
|
||
| /** | ||
| * Block decode API converting a matrix block into a frame block in parallel. | ||
| * | ||
| * @param in Input matrix block | ||
| * @param out Output frame block | ||
| * @param k Parallelization degree | ||
| * @return returns the given output frame block for convenience | ||
| */ | ||
| public FrameBlock columnDecode(MatrixBlock in, FrameBlock out, int k) { | ||
| return columnDecode(in, out); | ||
| } | ||
|
|
||
| /** | ||
| * Block decode row block | ||
| * | ||
| * @param in input Matrix Block | ||
| * @param out output FrameBlock | ||
| * @param rl row start to decode | ||
| * @param ru row end to decode (not inclusive) | ||
| */ | ||
| public abstract void columnDecode(MatrixBlock in, FrameBlock out, int rl, int ru); | ||
|
|
||
| /** | ||
| * Returns a new Decoder that only handles a sub range of columns. The sub-range refers to the columns after | ||
| * decoding. | ||
| * | ||
| * @param colStart the start index of the sub-range (1-based, inclusive) | ||
| * @param colEnd the end index of the sub-range (1-based, exclusive) | ||
| * @param dummycodedOffset the offset of dummycoded segments before colStart | ||
| * @return a decoder of the same type, just for the sub-range | ||
| */ | ||
| public ColumnDecoder subRangeDecoder(int colStart, int colEnd, int dummycodedOffset) { | ||
| throw new DMLRuntimeException( | ||
| getClass().getSimpleName() + " does not support the creation of a sub-range decoder"); | ||
| } | ||
|
|
||
| /** | ||
| * Update index-ranges to after decoding. Note that only Dummycoding changes the ranges. | ||
| * | ||
| * @param beginDims the begin indexes before encoding | ||
| * @param endDims the end indexes before encoding | ||
| */ | ||
| public void updateIndexRanges(long[] beginDims, long[] endDims) { | ||
| // do nothing - default | ||
| } | ||
|
|
||
| public abstract void initMetaData(FrameBlock meta); | ||
|
|
||
| /** | ||
| * Redirects the default java serialization via externalizable to our default | ||
| * hadoop writable serialization for efficient broadcast/rdd serialization. | ||
| * | ||
| * @param os object output | ||
| * @throws IOException if IOException occurs | ||
| */ | ||
| @Override | ||
| public void writeExternal(ObjectOutput os) | ||
| throws IOException | ||
| { | ||
| int size1 = (_colList == null) ? 0 : _colList.length; | ||
| os.writeInt(size1); | ||
| for(int i = 0; i < size1; i++) | ||
| os.writeInt(_colList[i]); | ||
|
|
||
| int size2 = (_colnames == null) ? 0 : _colnames.length; | ||
| os.writeInt(size2); | ||
| for(int j = 0; j < size2; j++) | ||
| os.writeUTF(_colnames[j]); | ||
|
|
||
| int size3 = (_schema == null) ? 0 : _schema.length; | ||
| os.writeInt(size3); | ||
| for(int j = 0; j < size3; j++) | ||
| os.writeByte(_schema[j].ordinal()); | ||
| } | ||
|
|
||
| /** | ||
| * Redirects the default java serialization via externalizable to our default | ||
| * hadoop writable serialization for efficient broadcast/rdd deserialization. | ||
| * | ||
| * @param in object input | ||
| * @throws IOException if IOException occur | ||
| */ | ||
| @Override | ||
| public void readExternal(ObjectInput in) | ||
| throws IOException | ||
| { | ||
| int size1 = in.readInt(); | ||
| _colList = (size1 == 0) ? null : new int[size1]; | ||
| for(int i = 0; i < size1; i++) | ||
| _colList[i] = in.readInt(); | ||
|
|
||
| int size2 = in.readInt(); | ||
| _colnames = (size2 == 0) ? null : new String[size2]; | ||
| for(int j = 0; j < size2; j++) { | ||
| _colnames[j] = in.readUTF(); | ||
| } | ||
|
|
||
| int size3 = in.readInt(); | ||
| _schema = (size3 == 0) ? null : new ValueType[size3]; | ||
| for(int j = 0; j < size3; j++) { | ||
| _schema[j] = ValueType.values()[in.readByte()]; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,214 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.sysds.runtime.transform.decode; | ||
|
|
||
| import org.apache.commons.lang3.NotImplementedException; | ||
| import org.apache.sysds.common.Types.ValueType; | ||
| import org.apache.sysds.runtime.DMLRuntimeException; | ||
| import org.apache.sysds.runtime.frame.data.FrameBlock; | ||
| import org.apache.sysds.runtime.frame.data.columns.Array; | ||
| import org.apache.sysds.runtime.matrix.data.MatrixBlock; | ||
| import org.apache.sysds.runtime.util.UtilFunctions; | ||
|
|
||
| import java.io.IOException; | ||
| import java.io.ObjectInput; | ||
| import java.io.ObjectOutput; | ||
|
|
||
| public class ColumnDecoderBin extends ColumnDecoder { | ||
| private static final long serialVersionUID = -3784249774608228805L; | ||
|
|
||
| private int[] _numBins; | ||
| private double[][] _binMins = null; | ||
| private double[][] _binMaxs = null; | ||
|
|
||
| public ColumnDecoderBin() { | ||
| super(null, null); | ||
| } | ||
|
|
||
| protected ColumnDecoderBin(ValueType[] schema, int[] binCols) { | ||
| super(schema, binCols); | ||
| } | ||
|
|
||
|
|
||
| //@Override | ||
| //public FrameBlock columnDecode(MatrixBlock in, FrameBlock out) { | ||
| // | ||
| // long b1 = System.nanoTime(); | ||
| // out.ensureAllocatedColumns(in.getNumRows()); | ||
| // for (int i = 0; i < in.getNumRows(); i++) { | ||
| // for (int j = 0; j < _colList.length; j++) { | ||
| // double val = in.get(i, j); | ||
| // if (!Double.isNaN(val)) { | ||
| // int key = (int) Math.round(val); | ||
| // double bmin = _binMins[j][key - 1]; | ||
| // double bmax = _binMaxs[j][key - 1]; | ||
| // double oval = bmin + (bmax - bmin) / 2 + (val - key) * (bmax - bmin); | ||
| // out.getColumn(_colList[j] - 1).set(i, oval); | ||
| // } else { | ||
| // out.getColumn(_colList[j] - 1).set(i, val); | ||
| // } | ||
| // } | ||
| // } | ||
| // //columnDecode(in, out, 0, in.getNumRows()); | ||
| // long b2 = System.nanoTime(); | ||
| // System.out.println(this.getClass() + "time: " + (b2 - b1) / 1e6 + " ms"); | ||
| // return out; | ||
| //} | ||
|
|
||
| @Override | ||
| public FrameBlock columnDecode(MatrixBlock in, FrameBlock out) { | ||
| long b1 = System.nanoTime(); | ||
| out.ensureAllocatedColumns(in.getNumRows()); | ||
|
|
||
| final int outColIndex = _colList[0] - 1; | ||
|
||
| final double[] binMins = _binMins[0]; | ||
| final double[] binMaxs = _binMaxs[0]; | ||
| final int nRows = in.getNumRows(); | ||
| Array<?> a = out.getColumn(0); | ||
| for (int i = 0; i < nRows; i++) { | ||
| double val = in.get(i, 0); | ||
| double decoded; | ||
| if (!Double.isNaN(val)) { | ||
| int key = (int) Math.round(val); | ||
| double bmin = binMins[key - 1]; | ||
| double bmax = binMaxs[key - 1]; | ||
| decoded = bmin + (bmax - bmin) / 2 | ||
| + (val - key) * (bmax - bmin); | ||
| a.set(i, decoded); | ||
| } else { | ||
| a.set(i, val); | ||
| } | ||
| } | ||
| long b2 = System.nanoTime(); | ||
| System.out.println(this.getClass() +": "+ (b2 - b1) / 1e6 + " ms"); | ||
| return out; | ||
| } | ||
|
|
||
|
|
||
| @Override | ||
| public void columnDecode(MatrixBlock in, FrameBlock out, int rl, int ru) { | ||
| for (int i = rl; i < ru; i++) { | ||
| for (int j = 0; j < _colList.length; j++) { | ||
| double val = in.get(i, j); | ||
| if (!Double.isNaN(val)) { | ||
| int key = (int) Math.round(val); | ||
| double bmin = _binMins[j][key - 1]; | ||
| double bmax = _binMaxs[j][key - 1]; | ||
| double oval = bmin + (bmax - bmin) / 2 + (val - key) * (bmax - bmin); | ||
| out.getColumn(_colList[j] - 1).set(i, oval); | ||
| } else { | ||
| out.getColumn(_colList[j] - 1).set(i, val); | ||
| } | ||
| } | ||
|
||
| } | ||
| } | ||
|
|
||
| //@Override | ||
| //public ColumnDecoder subRangeDecoder(int colStart, int colEnd, int dummycodedOffset) { | ||
| // if (colEnd - colStart != 1) | ||
| // throw new NotImplementedException(); | ||
| // | ||
| // for (int i = 0; i < _colList.length; i++) { | ||
| // if (_colList[i] == colStart) { | ||
| // ValueType[] schema = (_schema != null) ? new ValueType[]{_schema[colStart - 1]} : null; | ||
| // ColumnDecoderBin sub = new ColumnDecoderBin(schema, new int[]{colStart}); | ||
| // sub._numBins = new int[]{_numBins[i]}; | ||
| // sub._binMins = new double[][]{_binMins[i]}; | ||
| // sub._binMaxs = new double[][]{_binMaxs[i]}; | ||
| // return sub; | ||
| // } | ||
| // } | ||
| // return null; | ||
| //} | ||
|
|
||
| @Override | ||
| public ColumnDecoder subRangeDecoder(int colStart, int colEnd, int dummycodedOffset) { | ||
|
|
||
| for (int i = 0; i < _colList.length; i++) { | ||
| long b1 = System.nanoTime(); | ||
| ValueType[] schema = (_schema != null) ? new ValueType[]{_schema[colStart - 1]} : null; | ||
| if (_colList[i] == colStart) { | ||
| ColumnDecoderBin sub = new ColumnDecoderBin(schema, new int[]{colStart}); | ||
| sub._numBins = new int[]{_numBins[i]}; | ||
| sub._binMins = new double[][]{_binMins[i]}; | ||
| sub._binMaxs = new double[][]{_binMaxs[i]}; | ||
| return sub; | ||
| } | ||
| long b2 = System.nanoTime(); | ||
| System.out.println("time: " + (b2 - b1) / 1e6 + " ms"); | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public void initMetaData(FrameBlock meta) { | ||
| //initialize bin boundaries | ||
| _numBins = new int[_colList.length]; | ||
| _binMins = new double[_colList.length][]; | ||
| _binMaxs = new double[_colList.length][]; | ||
|
|
||
| //parse and insert bin boundaries | ||
| for( int j=0; j<_colList.length; j++ ) { | ||
| int numBins = (int)meta.getColumnMetadata(_colList[j]-1).getNumDistinct(); | ||
| _binMins[j] = new double[numBins]; | ||
| _binMaxs[j] = new double[numBins]; | ||
| for( int i=0; i<meta.getNumRows() & i<numBins; i++ ) { | ||
| if( meta.get(i, _colList[j]-1)==null ) { | ||
| if( i+1 < numBins ) | ||
| throw new DMLRuntimeException("Did not reach number of bins: "+(i+1)+"/"+numBins); | ||
| break; //reached end of bins | ||
| } | ||
| String[] parts = UtilFunctions.splitRecodeEntry( | ||
| meta.get(i, _colList[j]-1).toString()); | ||
| _binMins[j][i] = Double.parseDouble(parts[0]); | ||
| _binMaxs[j][i] = Double.parseDouble(parts[1]); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void writeExternal(ObjectOutput out) throws IOException { | ||
| super.writeExternal(out); | ||
| for( int i=0; i<_colList.length; i++ ) { | ||
| int len = _numBins[i]; | ||
| out.writeInt(len); | ||
| for(int j=0; j<len; j++) { | ||
| out.writeDouble(_binMins[i][j]); | ||
| out.writeDouble(_binMaxs[i][j]); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void readExternal(ObjectInput in) throws IOException { | ||
| super.readExternal(in); | ||
| _numBins = new int[_colList.length]; | ||
| _binMins = new double[_colList.length][]; | ||
| _binMaxs = new double[_colList.length][]; | ||
| for( int i=0; i<_colList.length; i++ ) { | ||
| int len = in.readInt(); | ||
| _numBins[i] = len; | ||
| for(int j=0; j<len; j++) { | ||
| _binMins[i][j] = in.readDouble(); | ||
| _binMaxs[i][j] = in.readDouble(); | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why a column list? A column encoder should work on a single column.