From 98a90f12f84a82f9efe5e8e726d0c8fdb3faf212 Mon Sep 17 00:00:00 2001 From: kazuki-yane <42165581+kazuki-yane@users.noreply.github.com> Date: Mon, 20 Apr 2020 13:37:00 +0900 Subject: [PATCH 01/39] delete underbar if prefix is empty (#1) * delete underbar if prefix is empty * add test --- src/main/java/org/embulk/input/marketo/MarketoUtils.java | 2 +- src/test/java/org/embulk/input/marketo/MarketoUtilsTest.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/embulk/input/marketo/MarketoUtils.java b/src/main/java/org/embulk/input/marketo/MarketoUtils.java index dfc24d2..1bbbda0 100644 --- a/src/main/java/org/embulk/input/marketo/MarketoUtils.java +++ b/src/main/java/org/embulk/input/marketo/MarketoUtils.java @@ -83,7 +83,7 @@ public static List getFieldNameFromMarketoFields(List colu public static String buildColumnName(String prefix, String columnName) { - return prefix + "_" + columnName; + return prefix.isEmpty() ? columnName : prefix + "_" + columnName; } public static final List sliceRange(DateTime fromDate, DateTime toDate, int rangeSize) diff --git a/src/test/java/org/embulk/input/marketo/MarketoUtilsTest.java b/src/test/java/org/embulk/input/marketo/MarketoUtilsTest.java index cbdccf1..9938f0d 100644 --- a/src/test/java/org/embulk/input/marketo/MarketoUtilsTest.java +++ b/src/test/java/org/embulk/input/marketo/MarketoUtilsTest.java @@ -56,6 +56,8 @@ public void buildColumnName() throws Exception { String columnName = MarketoUtils.buildColumnName("prefix", "columnName"); assertEquals("prefix_columnName", columnName); + String noPrefixColumn = MarketoUtils.buildColumnName("", "columnName"); + assertEquals("columnName", noPrefixColumn); } @Test From 73613af5ebdca79051d3665bcca00a72b49b0c39 Mon Sep 17 00:00:00 2001 From: kazuki-yane Date: Mon, 11 May 2020 18:23:45 +0900 Subject: [PATCH 02/39] add list and activity type --- .../marketo/MarketoInputPluginDelegate.java | 10 +- .../embulk/input/marketo/MarketoService.java | 2 + .../input/marketo/MarketoServiceImpl.java | 6 ++ .../delegate/ActivityTypeInputPlugin.java | 42 ++++++++ .../marketo/delegate/ListInputPlugin.java | 43 ++++++++ .../delegate/ActivityTypeInputPluginTest.java | 73 ++++++++++++++ .../marketo/delegate/ListInputPluginTest.java | 73 ++++++++++++++ .../fixtures/activity_type_response_full.json | 98 +++++++++++++++++++ 8 files changed, 345 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/embulk/input/marketo/delegate/ActivityTypeInputPlugin.java create mode 100644 src/main/java/org/embulk/input/marketo/delegate/ListInputPlugin.java create mode 100644 src/test/java/org/embulk/input/marketo/delegate/ActivityTypeInputPluginTest.java create mode 100644 src/test/java/org/embulk/input/marketo/delegate/ListInputPluginTest.java create mode 100644 src/test/resources/fixtures/activity_type_response_full.json diff --git a/src/main/java/org/embulk/input/marketo/MarketoInputPluginDelegate.java b/src/main/java/org/embulk/input/marketo/MarketoInputPluginDelegate.java index cf19d21..196d730 100644 --- a/src/main/java/org/embulk/input/marketo/MarketoInputPluginDelegate.java +++ b/src/main/java/org/embulk/input/marketo/MarketoInputPluginDelegate.java @@ -16,6 +16,8 @@ import org.embulk.input.marketo.delegate.LeadWithListInputPlugin; import org.embulk.input.marketo.delegate.LeadWithProgramInputPlugin; import org.embulk.input.marketo.delegate.ProgramInputPlugin; +import org.embulk.input.marketo.delegate.ListInputPlugin; +import org.embulk.input.marketo.delegate.ActivityTypeInputPlugin; import org.embulk.input.marketo.rest.MarketoRestClient; import java.util.Date; @@ -31,7 +33,9 @@ public interface PluginTask CampaignInputPlugin.PluginTask, ProgramInputPlugin.PluginTask, MarketoRestClient.PluginTask, - CustomObjectInputPlugin.PluginTask + CustomObjectInputPlugin.PluginTask, + ListInputPlugin.PluginTask, + ActivityTypeInputPlugin.PluginTask { @Config("target") Target getTarget(); @@ -76,7 +80,9 @@ public enum Target ALL_LEAD_WITH_LIST_ID(new LeadWithListInputPlugin()), ALL_LEAD_WITH_PROGRAM_ID(new LeadWithProgramInputPlugin()), PROGRAM(new ProgramInputPlugin()), - CUSTOM_OBJECT(new CustomObjectInputPlugin()); + CUSTOM_OBJECT(new CustomObjectInputPlugin()), + LIST(new ListInputPlugin()), + ACTIVITY_TYPE(new ActivityTypeInputPlugin()); private RestClientInputPluginDelegate restClientInputPluginDelegate; diff --git a/src/main/java/org/embulk/input/marketo/MarketoService.java b/src/main/java/org/embulk/input/marketo/MarketoService.java index 2a0d7aa..6a37926 100644 --- a/src/main/java/org/embulk/input/marketo/MarketoService.java +++ b/src/main/java/org/embulk/input/marketo/MarketoService.java @@ -35,4 +35,6 @@ public interface MarketoService List describeCustomObject(String customObjectAPIName); Iterable getActivityTypes(); + + Iterable getLists(); } diff --git a/src/main/java/org/embulk/input/marketo/MarketoServiceImpl.java b/src/main/java/org/embulk/input/marketo/MarketoServiceImpl.java index 95401b5..540cf66 100644 --- a/src/main/java/org/embulk/input/marketo/MarketoServiceImpl.java +++ b/src/main/java/org/embulk/input/marketo/MarketoServiceImpl.java @@ -237,6 +237,12 @@ public Iterable getCustomObject(String customObjectAPIName, String c return marketoRestClient.getCustomObject(customObjectAPIName, customObjectFilterType, customObjectFields, fromValue, toValue); } + @Override + public Iterable getLists() + { + return marketoRestClient.getLists(); + } + @Override public Iterable getActivityTypes() { diff --git a/src/main/java/org/embulk/input/marketo/delegate/ActivityTypeInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/ActivityTypeInputPlugin.java new file mode 100644 index 0000000..1208790 --- /dev/null +++ b/src/main/java/org/embulk/input/marketo/delegate/ActivityTypeInputPlugin.java @@ -0,0 +1,42 @@ +package org.embulk.input.marketo.delegate; + +import com.google.common.collect.FluentIterable; +import org.embulk.base.restclient.ServiceResponseMapper; +import org.embulk.base.restclient.jackson.JacksonServiceResponseMapper; +import org.embulk.base.restclient.record.ServiceRecord; +import org.embulk.base.restclient.record.ValueLocator; +import org.embulk.input.marketo.MarketoService; +import org.embulk.input.marketo.MarketoUtils; +import org.embulk.spi.type.Types; + +import java.util.Iterator; + +public class ActivityTypeInputPlugin extends MarketoBaseInputPluginDelegate +{ + public interface PluginTask extends MarketoBaseInputPluginDelegate.PluginTask + { + } + + public ActivityTypeInputPlugin() + { + } + + @Override + protected Iterator getServiceRecords(MarketoService marketoService, PluginTask task) + { + return FluentIterable.from(marketoService.getActivityTypes()).transform(MarketoUtils.TRANSFORM_OBJECT_TO_JACKSON_SERVICE_RECORD_FUNCTION).iterator(); + } + + @Override + public ServiceResponseMapper buildServiceResponseMapper(PluginTask task) + { + JacksonServiceResponseMapper.Builder builder = JacksonServiceResponseMapper.builder(); + builder.add("id", Types.LONG) + .add("name", Types.STRING) + .add("description", Types.STRING) + .add("primaryAttribute", Types.JSON) + .add("attributes", Types.JSON) + .add("apiName", Types.STRING); + return builder.build(); + } +} diff --git a/src/main/java/org/embulk/input/marketo/delegate/ListInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/ListInputPlugin.java new file mode 100644 index 0000000..176a0ac --- /dev/null +++ b/src/main/java/org/embulk/input/marketo/delegate/ListInputPlugin.java @@ -0,0 +1,43 @@ +package org.embulk.input.marketo.delegate; + +import com.google.common.collect.FluentIterable; +import org.embulk.base.restclient.ServiceResponseMapper; +import org.embulk.base.restclient.jackson.JacksonServiceResponseMapper; +import org.embulk.base.restclient.record.ServiceRecord; +import org.embulk.base.restclient.record.ValueLocator; +import org.embulk.input.marketo.MarketoService; +import org.embulk.input.marketo.MarketoUtils; +import org.embulk.spi.type.Types; + +import java.util.Iterator; + +public class ListInputPlugin extends MarketoBaseInputPluginDelegate +{ + public interface PluginTask extends MarketoBaseInputPluginDelegate.PluginTask + { + } + + public ListInputPlugin() + { + } + + @Override + protected Iterator getServiceRecords(MarketoService marketoService, PluginTask task) + { + return FluentIterable.from(marketoService.getLists()).transform(MarketoUtils.TRANSFORM_OBJECT_TO_JACKSON_SERVICE_RECORD_FUNCTION).iterator(); + } + + @Override + public ServiceResponseMapper buildServiceResponseMapper(PluginTask task) + { + JacksonServiceResponseMapper.Builder builder = JacksonServiceResponseMapper.builder(); + builder.add("id", Types.LONG) + .add("name", Types.STRING) + .add("description", Types.STRING) + .add("programName", Types.STRING) + .add("workspaceName", Types.STRING) + .add("createdAt", Types.TIMESTAMP, MarketoUtils.MARKETO_DATE_TIME_FORMAT) + .add("updatedAt", Types.TIMESTAMP, MarketoUtils.MARKETO_DATE_TIME_FORMAT); + return builder.build(); + } +} diff --git a/src/test/java/org/embulk/input/marketo/delegate/ActivityTypeInputPluginTest.java b/src/test/java/org/embulk/input/marketo/delegate/ActivityTypeInputPluginTest.java new file mode 100644 index 0000000..e9d1ecc --- /dev/null +++ b/src/test/java/org/embulk/input/marketo/delegate/ActivityTypeInputPluginTest.java @@ -0,0 +1,73 @@ +package org.embulk.input.marketo.delegate; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.embulk.EmbulkTestRuntime; +import org.embulk.base.restclient.ServiceResponseMapper; +import org.embulk.base.restclient.record.RecordImporter; +import org.embulk.base.restclient.record.ValueLocator; +import org.embulk.config.ConfigLoader; +import org.embulk.config.ConfigSource; +import org.embulk.input.marketo.rest.MarketoRestClient; +import org.embulk.input.marketo.rest.RecordPagingIterable; +import org.embulk.spi.PageBuilder; +import org.embulk.spi.Schema; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; + +/** + * Created by tai.khuu on 10/10/17. + */ +public class ActivityTypeInputPluginTest +{ + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Rule + public EmbulkTestRuntime embulkTestRuntime = new EmbulkTestRuntime(); + + private ConfigSource configSource; + + private ActivityTypeInputPlugin activityTypeInputPlugin; + + private MarketoRestClient mockMarketoRestClient; + + @Before + public void setUp() throws Exception + { + activityTypeInputPlugin = Mockito.spy(new ActivityTypeInputPlugin()); + ConfigLoader configLoader = embulkTestRuntime.getInjector().getInstance(ConfigLoader.class); + configSource = configLoader.fromYaml(this.getClass().getResourceAsStream("/config/rest_config.yaml")); + mockMarketoRestClient = Mockito.mock(MarketoRestClient.class); + Mockito.doReturn(mockMarketoRestClient).when(activityTypeInputPlugin).createMarketoRestClient(Mockito.any(ActivityTypeInputPlugin.PluginTask.class)); + } + + @Test + public void testRun() throws IOException + { + RecordPagingIterable mockRecordPagingIterable = Mockito.mock(RecordPagingIterable.class); + JavaType javaType = OBJECT_MAPPER.getTypeFactory().constructParametrizedType(List.class, List.class, ObjectNode.class); + List objectNodeList = OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("/fixtures/activity_type_response_full.json"), javaType); + Mockito.when(mockRecordPagingIterable.iterator()).thenReturn(objectNodeList.iterator()); + Mockito.when(mockMarketoRestClient.getActivityTypes()).thenReturn(mockRecordPagingIterable); + ActivityTypeInputPlugin.PluginTask task = configSource.loadConfig(ActivityTypeInputPlugin.PluginTask.class); + ServiceResponseMapper mapper = activityTypeInputPlugin.buildServiceResponseMapper(task); + RecordImporter recordImporter = mapper.createRecordImporter(); + PageBuilder mockPageBuilder = Mockito.mock(PageBuilder.class); + activityTypeInputPlugin.ingestServiceData(task, recordImporter, 1, mockPageBuilder); + Mockito.verify(mockMarketoRestClient, Mockito.times(1)).getActivityTypes(); + Schema embulkSchema = mapper.getEmbulkSchema(); + ArgumentCaptor longArgumentCaptor = ArgumentCaptor.forClass(Long.class); + Mockito.verify(mockPageBuilder, Mockito.times(5)).setLong(Mockito.eq(embulkSchema.lookupColumn("id")), longArgumentCaptor.capture()); + List allValues = longArgumentCaptor.getAllValues(); + assertArrayEquals(new Long[]{1L, 2L, 3L, 4L, 5L}, allValues.toArray()); + } +} diff --git a/src/test/java/org/embulk/input/marketo/delegate/ListInputPluginTest.java b/src/test/java/org/embulk/input/marketo/delegate/ListInputPluginTest.java new file mode 100644 index 0000000..5c5e561 --- /dev/null +++ b/src/test/java/org/embulk/input/marketo/delegate/ListInputPluginTest.java @@ -0,0 +1,73 @@ +package org.embulk.input.marketo.delegate; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.embulk.EmbulkTestRuntime; +import org.embulk.base.restclient.ServiceResponseMapper; +import org.embulk.base.restclient.record.RecordImporter; +import org.embulk.base.restclient.record.ValueLocator; +import org.embulk.config.ConfigLoader; +import org.embulk.config.ConfigSource; +import org.embulk.input.marketo.rest.MarketoRestClient; +import org.embulk.input.marketo.rest.RecordPagingIterable; +import org.embulk.spi.PageBuilder; +import org.embulk.spi.Schema; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; + +/** + * Created by tai.khuu on 10/10/17. + */ +public class ListInputPluginTest +{ + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Rule + public EmbulkTestRuntime embulkTestRuntime = new EmbulkTestRuntime(); + + private ConfigSource configSource; + + private ListInputPlugin listInputPlugin; + + private MarketoRestClient mockMarketoRestClient; + + @Before + public void setUp() throws Exception + { + listInputPlugin = Mockito.spy(new ListInputPlugin()); + ConfigLoader configLoader = embulkTestRuntime.getInjector().getInstance(ConfigLoader.class); + configSource = configLoader.fromYaml(this.getClass().getResourceAsStream("/config/rest_config.yaml")); + mockMarketoRestClient = Mockito.mock(MarketoRestClient.class); + Mockito.doReturn(mockMarketoRestClient).when(listInputPlugin).createMarketoRestClient(Mockito.any(ListInputPlugin.PluginTask.class)); + } + + @Test + public void testRun() throws IOException + { + RecordPagingIterable mockRecordPagingIterable = Mockito.mock(RecordPagingIterable.class); + JavaType javaType = OBJECT_MAPPER.getTypeFactory().constructParametrizedType(List.class, List.class, ObjectNode.class); + List objectNodeList = OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("/fixtures/list_reponse_full.json"), javaType); + Mockito.when(mockRecordPagingIterable.iterator()).thenReturn(objectNodeList.iterator()); + Mockito.when(mockMarketoRestClient.getLists()).thenReturn(mockRecordPagingIterable); + ListInputPlugin.PluginTask task = configSource.loadConfig(ListInputPlugin.PluginTask.class); + ServiceResponseMapper mapper = listInputPlugin.buildServiceResponseMapper(task); + RecordImporter recordImporter = mapper.createRecordImporter(); + PageBuilder mockPageBuilder = Mockito.mock(PageBuilder.class); + listInputPlugin.ingestServiceData(task, recordImporter, 1, mockPageBuilder); + Mockito.verify(mockMarketoRestClient, Mockito.times(1)).getLists(); + Schema embulkSchema = mapper.getEmbulkSchema(); + ArgumentCaptor longArgumentCaptor = ArgumentCaptor.forClass(Long.class); + Mockito.verify(mockPageBuilder, Mockito.times(24)).setLong(Mockito.eq(embulkSchema.lookupColumn("id")), longArgumentCaptor.capture()); + List allValues = longArgumentCaptor.getAllValues(); + assertArrayEquals(new Long[]{1007L, 1009L, 1010L, 1011L, 1012L, 1052L, 1063L, 1066L, 1067L, 1072L, 1073L, 1075L, 1076L, 1077L, 1078L, 1079L, 1080L, 1081L, 1082L, 1083L, 1084L, 1085L, 1086L, 1087L}, allValues.toArray()); + } +} diff --git a/src/test/resources/fixtures/activity_type_response_full.json b/src/test/resources/fixtures/activity_type_response_full.json new file mode 100644 index 0000000..4a7ec97 --- /dev/null +++ b/src/test/resources/fixtures/activity_type_response_full.json @@ -0,0 +1,98 @@ +[ + { + "id": 1, + "name": "bb", + "description": "sample", + "primaryAttribute": { + "name": "exampleID", + "dataType": "integer" + }, + "attributes": [ + { + "name": "sample_1", + "dataType": "string" + }, + { + "name": "sample_2", + "dataType": "boolean" + } + ] + }, + { + "id": 2, + "name": "TD Output Test aa", + "description": "sample_aa", + "primaryAttribute": { + "name": "exampleID", + "dataType": "integer" + }, + "attributes": [ + { + "name": "sample_aa_1", + "dataType": "string" + }, + { + "name": "sample_aa_2", + "dataType": "boolean" + } + ] + }, + { + "id": 3, + "name": "Bill_progream a", + "description": "sample", + "primaryAttribute": { + "name": "exampleID", + "dataType": "integer" + }, + "attributes": [ + { + "name": "sample_a_1", + "dataType": "string" + }, + { + "name": "sample_a_1", + "dataType": "boolean" + } + ], + "apiName": "sample_name_c" + }, + { + "id": 4, + "name": "Bill_progream b", + "description": "sample_b", + "primaryAttribute": { + "name": "exampleID", + "dataType": "integer" + }, + "attributes": [ + { + "name": "sample_b_1", + "dataType": "string" + }, + { + "name": "sample_b_2", + "dataType": "boolean" + } + ] + }, + { + "id": 5, + "name": "Bill_progream c", + "description": "sample_c", + "primaryAttribute": { + "name": "exampleID", + "dataType": "integer" + }, + "attributes": [ + { + "name": "sample_c_1", + "dataType": "string" + }, + { + "name": "sample_c_2", + "dataType": "boolean" + } + ] + } +] \ No newline at end of file From 5e21ae8891d0a8c423c9c7db1b4bac82ee119af8 Mon Sep 17 00:00:00 2001 From: kazuki-yane Date: Tue, 12 May 2020 16:13:17 +0900 Subject: [PATCH 03/39] update README --- README.md | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/README.md b/README.md index d3db228..75a01ff 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,8 @@ embulk-input-marketo is the gem preparing Embulk input plugins for [Marketo](htt - Lead by program(all_lead_with_program_id) - Campaign(campaign) - Assets Programs (program) +- List (list) +- Activity Type (activity_type) This plugin uses Marketo REST API. @@ -172,6 +174,30 @@ Incremental support: yes (Query by `date_range` only) Range ingestion: yes +### List + +List extract all list data from Marketo + +`target: list` + +Schema type: Static schema + +Incremental support: no + +Range ingestion: no + +### Activity Type + +Activity Type extract all activity type data from Marketo + +`target: activity_type` + +Schema type: Static schema + +Incremental support: no + +Range ingestion: no + ## Example For lead, you have `partial-config.yml` like below: From b68ba66b043485c799571cf300baedaafeed64ac Mon Sep 17 00:00:00 2001 From: kazuki-yane Date: Thu, 14 May 2020 13:54:38 +0900 Subject: [PATCH 04/39] delete unnecessary comment --- .../input/marketo/delegate/ActivityTypeInputPluginTest.java | 3 --- .../org/embulk/input/marketo/delegate/ListInputPluginTest.java | 3 --- 2 files changed, 6 deletions(-) diff --git a/src/test/java/org/embulk/input/marketo/delegate/ActivityTypeInputPluginTest.java b/src/test/java/org/embulk/input/marketo/delegate/ActivityTypeInputPluginTest.java index e9d1ecc..d163fcf 100644 --- a/src/test/java/org/embulk/input/marketo/delegate/ActivityTypeInputPluginTest.java +++ b/src/test/java/org/embulk/input/marketo/delegate/ActivityTypeInputPluginTest.java @@ -24,9 +24,6 @@ import static org.junit.Assert.assertArrayEquals; -/** - * Created by tai.khuu on 10/10/17. - */ public class ActivityTypeInputPluginTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); diff --git a/src/test/java/org/embulk/input/marketo/delegate/ListInputPluginTest.java b/src/test/java/org/embulk/input/marketo/delegate/ListInputPluginTest.java index 5c5e561..f6d4bcc 100644 --- a/src/test/java/org/embulk/input/marketo/delegate/ListInputPluginTest.java +++ b/src/test/java/org/embulk/input/marketo/delegate/ListInputPluginTest.java @@ -24,9 +24,6 @@ import static org.junit.Assert.assertArrayEquals; -/** - * Created by tai.khuu on 10/10/17. - */ public class ListInputPluginTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); From 88ffe639ab1380811b29bd7525d9003c86fd5c15 Mon Sep 17 00:00:00 2001 From: dododo8m Date: Tue, 26 May 2020 04:47:34 +0900 Subject: [PATCH 05/39] =?UTF-8?q?=E3=82=A2=E3=82=AF=E3=83=86=E3=82=A3?= =?UTF-8?q?=E3=83=93=E3=83=86=E3=82=A3=E3=81=AEJSON=E3=83=87=E3=83=BC?= =?UTF-8?q?=E3=82=BF=E3=82=92=E8=AA=AD=E3=81=BF=E5=8F=96=E3=82=8A=E3=81=A7?= =?UTF-8?q?=E3=81=8D=E3=82=8B=E3=82=88=E3=81=86=E3=81=AB=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/embulk/input/marketo/CsvTokenizer.java | 10 +++++++++- .../delegate/MarketoBaseBulkExtractInputPlugin.java | 11 ++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/embulk/input/marketo/CsvTokenizer.java b/src/main/java/org/embulk/input/marketo/CsvTokenizer.java index da5d099..e67b5ce 100644 --- a/src/main/java/org/embulk/input/marketo/CsvTokenizer.java +++ b/src/main/java/org/embulk/input/marketo/CsvTokenizer.java @@ -10,6 +10,7 @@ import org.embulk.spi.DataException; import org.embulk.spi.Exec; import org.embulk.spi.util.LineDecoder; +import org.slf4j.Logger; import java.util.ArrayDeque; import java.util.ArrayList; @@ -21,6 +22,8 @@ */ public class CsvTokenizer { + private static final Logger LOGGER = Exec.getLogger(CsvTokenizer.class); + static enum RecordState { NOT_END, END, @@ -206,6 +209,10 @@ private boolean nextLine(boolean skipEmptyLine) line.isEmpty() || (commentLineMarker != null && line.startsWith(commentLineMarker))); if (!skip) { + line = line.replace("\\\"\"", "\\\"\\\""); + // column = column.replace(":\\\"\\\",", ":\"\","); + line = line.replace("\\\"\\\",\\\"\\\"", "\\\"\\\"\\,\\\"\\\""); + return true; } } @@ -423,7 +430,8 @@ else if (isDelimiterFollowingFrom(linePos)) { else if (isSpace(c)) { // column has trailing spaces and quoted. TODO should this be rejected? } else { - throw new InvalidValueException(String.format("Unexpected extra character '%c' after a value quoted by '%c'", c, quote)); + columnState = ColumnState.QUOTED_VALUE; + // throw new InvalidValueException(String.format("Unexpected extra character '%c' after a value quoted by '%c'", c, quote)); } break; diff --git a/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java index a627339..69538f4 100644 --- a/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java +++ b/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java @@ -34,6 +34,7 @@ import org.embulk.spi.util.LineDecoder; import org.joda.time.DateTime; import org.msgpack.value.Value; +import org.slf4j.Logger; import java.io.InputStream; import java.text.DateFormat; @@ -53,6 +54,8 @@ public abstract class MarketoBaseBulkExtractInputPlugin getNextCSVRecord() try { int i = 0; while (tokenizer.hasNextColumn()) { - kvMap.put(headers.get(i), tokenizer.nextColumnOrNull()); + String column = tokenizer.nextColumnOrNull(); + if(column != null && !":\"\",".equals(column)){ + column = column.replace("\"\"", "\\\"\\\""); + column = column.replace(":\\\"\\\",", ":\"\","); + column = column.replace("\\\"\\\"\\,\\\"\\\"", "\\\"\\\",\\\"\\\""); + } + kvMap.put(headers.get(i), column); i++; } } From 818fc277aee7e2b2498040a51b81d02cb19b6eda Mon Sep 17 00:00:00 2001 From: dododo8m Date: Tue, 26 May 2020 04:48:30 +0900 Subject: [PATCH 06/39] =?UTF-8?q?gitignore=E3=81=AE=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 96bb2a0..c3285c1 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,4 @@ build/ /.metadata/ .classpath .project - +bin/ From 5547b3b9583daae646fdb555bf7d1ed1fd5565c9 Mon Sep 17 00:00:00 2001 From: dododo8m Date: Tue, 26 May 2020 04:50:43 +0900 Subject: [PATCH 07/39] =?UTF-8?q?=E3=83=90=E3=83=BC=E3=82=B8=E3=83=A7?= =?UTF-8?q?=E3=83=B3=E3=82=A2=E3=83=83=E3=83=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index c9860c1..581f4be 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ repositories { configurations { provided } -version = "0.6.18" +version = "0.6.2" sourceCompatibility = 1.7 targetCompatibility = 1.7 From 6409f8189f44173d3e346433e845642d8e833f85 Mon Sep 17 00:00:00 2001 From: dododo8m Date: Tue, 26 May 2020 04:55:37 +0900 Subject: [PATCH 08/39] =?UTF-8?q?=E3=83=90=E3=83=BC=E3=82=B8=E3=83=A7?= =?UTF-8?q?=E3=83=B3=E3=82=A2=E3=83=83=E3=83=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 581f4be..1e6abc0 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ repositories { configurations { provided } -version = "0.6.2" +version = "0.6.21" sourceCompatibility = 1.7 targetCompatibility = 1.7 From 80227bf59c32a2c2d6f8c2142c1a65614afdbf1a Mon Sep 17 00:00:00 2001 From: dododo8m Date: Mon, 1 Jun 2020 05:49:55 +0900 Subject: [PATCH 09/39] =?UTF-8?q?=E3=83=90=E3=83=83=E3=82=AF=E3=82=B9?= =?UTF-8?q?=E3=83=A9=E3=83=83=E3=82=B7=E3=83=A5=E3=81=A7json=20parse?= =?UTF-8?q?=E3=82=A8=E3=83=A9=E3=83=BC=E3=81=AB=E3=81=AA=E3=82=8B=E3=81=A8?= =?UTF-8?q?=E3=81=8D=E3=81=AE=E5=AF=BE=E5=BF=9C=E3=82=92=E8=BF=BD=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../delegate/MarketoBaseBulkExtractInputPlugin.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java index 69538f4..e285f13 100644 --- a/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java +++ b/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java @@ -27,6 +27,7 @@ import org.embulk.spi.Exec; import org.embulk.spi.PageBuilder; import org.embulk.spi.Schema; +import org.embulk.spi.json.JsonParseException; import org.embulk.spi.json.JsonParser; import org.embulk.spi.time.Timestamp; import org.embulk.spi.time.TimestampParser; @@ -297,7 +298,13 @@ public double doubleValue() @Override public Value jsonValue(JsonParser jsonParser) { - return jsonParser.parse(textValue); + try { + textValue = textValue.replace("\\", "\\\\"); + return jsonParser.parse(textValue); + } catch (Exception e) { + LOGGER.info("skipped to parse JSON: " + textValue); + return jsonParser.parse("{}"); + } } @Override From 802f826fa2d9219e9bdc8d68aa882daa8ae1a743 Mon Sep 17 00:00:00 2001 From: dododo8m Date: Mon, 1 Jun 2020 05:50:34 +0900 Subject: [PATCH 10/39] =?UTF-8?q?=E3=83=90=E3=83=BC=E3=82=B8=E3=83=A7?= =?UTF-8?q?=E3=83=B3=E3=82=A2=E3=83=83=E3=83=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 1e6abc0..f252ef4 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ repositories { configurations { provided } -version = "0.6.21" +version = "0.6.22" sourceCompatibility = 1.7 targetCompatibility = 1.7 From b5474ed4acc1eebe367a27aa5f20981d7241ea87 Mon Sep 17 00:00:00 2001 From: dododo8m Date: Thu, 4 Jun 2020 10:50:49 +0900 Subject: [PATCH 11/39] =?UTF-8?q?CSV=E3=83=91=E3=83=BC=E3=82=B5=E3=83=BC?= =?UTF-8?q?=E3=82=92=E4=BD=BF=E7=94=A8=E3=81=99=E3=82=8B=E3=82=88=E3=81=86?= =?UTF-8?q?=E3=81=AB=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 3 + .../embulk/input/marketo/CsvTokenizer.java | 93 +++++++++---------- .../embulk/input/marketo/MarketoUtils.java | 30 +++++- .../MarketoBaseBulkExtractInputPlugin.java | 20 ++-- 4 files changed, 86 insertions(+), 60 deletions(-) diff --git a/build.gradle b/build.gradle index f252ef4..0d26d50 100644 --- a/build.gradle +++ b/build.gradle @@ -25,6 +25,9 @@ dependencies { provided "org.embulk:embulk-core:0.8.+" compile "org.embulk.base.restclient:embulk-base-restclient:0.5.3" compile "org.embulk.base.restclient:embulk-util-retryhelper-jetty92:0.5.3" + compile 'org.apache.commons:commons-text:1.2' + // https://mvnrepository.com/artifact/org.apache.commons/commons-csv + compile group: 'org.apache.commons', name: 'commons-csv', version: '1.8' testCompile "junit:junit:4.+" testCompile "org.embulk:embulk-core:0.8.+:tests" testCompile "org.embulk:embulk-test:0.8.+" diff --git a/src/main/java/org/embulk/input/marketo/CsvTokenizer.java b/src/main/java/org/embulk/input/marketo/CsvTokenizer.java index e67b5ce..b1d3884 100644 --- a/src/main/java/org/embulk/input/marketo/CsvTokenizer.java +++ b/src/main/java/org/embulk/input/marketo/CsvTokenizer.java @@ -4,6 +4,10 @@ import com.fasterxml.jackson.annotation.JsonValue; import com.google.common.base.Optional; import com.google.common.base.Preconditions; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigException; @@ -12,6 +16,7 @@ import org.embulk.spi.util.LineDecoder; import org.slf4j.Logger; +import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; @@ -20,17 +25,14 @@ /** * Created by tai.khuu on 9/15/17. */ -public class CsvTokenizer -{ +public class CsvTokenizer { private static final Logger LOGGER = Exec.getLogger(CsvTokenizer.class); - static enum RecordState - { + static enum RecordState { NOT_END, END, } - static enum ColumnState - { + static enum ColumnState { BEGIN, VALUE, QUOTED_VALUE, AFTER_QUOTED_VALUE, FIRST_TRIM, LAST_TRIM_OR_VALUE, } @@ -38,8 +40,7 @@ static enum ColumnState static final char NO_QUOTE = '\0'; static final char NO_ESCAPE = '\0'; - public interface PluginTask extends LineDecoder.DecoderTask - { + public interface PluginTask extends LineDecoder.DecoderTask { @Config("delimiter") @ConfigDefault("\",\"") String getDelimiter(); @@ -63,7 +64,7 @@ public interface PluginTask extends LineDecoder.DecoderTask boolean getTrimIfNotQuoted(); @Config("max_quoted_size_limit") - @ConfigDefault("131072") //128kB + @ConfigDefault("131072") // 128kB long getMaxQuotedSizeLimit(); @Config("comment_line_marker") @@ -82,7 +83,8 @@ public interface PluginTask extends LineDecoder.DecoderTask private final LineDecoder input; private final String nullStringOrNull; - private RecordState recordState = RecordState.END; // initial state is end of a record. nextRecord() must be called first + private RecordState recordState = RecordState.END; // initial state is end of a record. nextRecord() must be called + // first private long lineNumber = 0; private String line = null; @@ -91,24 +93,22 @@ public interface PluginTask extends LineDecoder.DecoderTask private List quotedValueLines = new ArrayList<>(); private Deque unreadLines = new ArrayDeque<>(); - public CsvTokenizer(LineDecoder input, PluginTask task) - { + public CsvTokenizer(LineDecoder input, PluginTask task) { this(task.getDelimiter(), task.getQuoteChar().or(QuoteCharacter.noQuote()).getCharacter(), task.getEscapeChar().or(EscapeCharacter.noEscape()).getCharacter(), task.getNewline().getString(), - task.getTrimIfNotQuoted(), task.getMaxQuotedSizeLimit(), task.getCommentLineMarker().orNull(), input, task.getNullString().orNull()); + task.getTrimIfNotQuoted(), task.getMaxQuotedSizeLimit(), task.getCommentLineMarker().orNull(), input, + task.getNullString().orNull()); } - public CsvTokenizer(String delimiter, char quote, char escape, String newline, boolean trimIfNotQuoted, long maxQuotedSizeLimit, String commentLineMarker, LineDecoder input, String nullStringOrNull) - { + public CsvTokenizer(String delimiter, char quote, char escape, String newline, boolean trimIfNotQuoted, + long maxQuotedSizeLimit, String commentLineMarker, LineDecoder input, String nullStringOrNull) { if (delimiter.length() == 0) { throw new ConfigException("Empty delimiter is not allowed"); - } - else { + } else { this.delimiterChar = delimiter.charAt(0); if (delimiter.length() > 1) { delimiterFollowingString = delimiter.substring(1); - } - else { + } else { delimiterFollowingString = null; } } @@ -122,13 +122,11 @@ public CsvTokenizer(String delimiter, char quote, char escape, String newline, b this.nullStringOrNull = nullStringOrNull; } - public long getCurrentLineNumber() - { + public long getCurrentLineNumber() { return lineNumber; } - public boolean skipHeaderLine() - { + public boolean skipHeaderLine() { boolean skipped = input.poll() != null; if (skipped) { lineNumber++; @@ -137,15 +135,13 @@ public boolean skipHeaderLine() } // returns skipped line - public String skipCurrentLine() - { + public String skipCurrentLine() { String skippedLine; if (quotedValueLines.isEmpty()) { skippedLine = line; - } - else { + } else { // recover lines of quoted value - skippedLine = quotedValueLines.remove(0); // TODO optimize performance + skippedLine = quotedValueLines.remove(0); // TODO optimize performance unreadLines.addAll(quotedValueLines); lineNumber -= quotedValueLines.size(); if (line != null) { @@ -158,8 +154,7 @@ public String skipCurrentLine() return skippedLine; } - public boolean nextFile() - { + public boolean nextFile() { boolean next = input.nextFile(); if (next) { lineNumber = 0; @@ -168,13 +163,11 @@ public boolean nextFile() } // used by guess-csv - public boolean nextRecord() - { + public boolean nextRecord() { return nextRecord(true); } - public boolean nextRecord(boolean skipEmptyLine) - { + public boolean nextRecord(boolean skipEmptyLine) { // If at the end of record, read the next line and initialize the state if (recordState != RecordState.END) { throw new TooManyColumnsException("Too many columns"); @@ -184,19 +177,16 @@ public boolean nextRecord(boolean skipEmptyLine) if (hasNext) { recordState = RecordState.NOT_END; return true; - } - else { + } else { return false; } } - private boolean nextLine(boolean skipEmptyLine) - { + private boolean nextLine(boolean skipEmptyLine) { while (true) { if (!unreadLines.isEmpty()) { line = unreadLines.removeFirst(); - } - else { + } else { line = input.poll(); if (line == null) { return false; @@ -205,19 +195,28 @@ private boolean nextLine(boolean skipEmptyLine) linePos = 0; lineNumber++; - boolean skip = skipEmptyLine && ( - line.isEmpty() || - (commentLineMarker != null && line.startsWith(commentLineMarker))); + boolean skip = skipEmptyLine + && (line.isEmpty() || (commentLineMarker != null && line.startsWith(commentLineMarker))); if (!skip) { - line = line.replace("\\\"\"", "\\\"\\\""); - // column = column.replace(":\\\"\\\",", ":\"\","); - line = line.replace("\\\"\\\",\\\"\\\"", "\\\"\\\"\\,\\\"\\\""); - return true; } } } + public CSVRecord csvParse() { + try { + LOGGER.info(line); + CSVParser csvParser = CSVParser.parse(line, CSVFormat.DEFAULT); + CSVRecord record = csvParser.getRecords().get(0); + LOGGER.info(record.toString()); + recordState = RecordState.END; + return record; + } catch (IOException e) { + LOGGER.info(e.getMessage()); + throw new InvalidValueException("死にました"); + } + } + public boolean hasNextColumn() { return recordState == RecordState.NOT_END; diff --git a/src/main/java/org/embulk/input/marketo/MarketoUtils.java b/src/main/java/org/embulk/input/marketo/MarketoUtils.java index 1bbbda0..d9afae3 100644 --- a/src/main/java/org/embulk/input/marketo/MarketoUtils.java +++ b/src/main/java/org/embulk/input/marketo/MarketoUtils.java @@ -1,5 +1,7 @@ package org.embulk.input.marketo; +import com.fasterxml.jackson.core.SerializableString; +import com.fasterxml.jackson.core.io.CharacterEscapes; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Function; @@ -31,7 +33,6 @@ public class MarketoUtils { public static final String MARKETO_DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S%z"; public static final String MARKETO_DATE_FORMAT = "%Y-%m-%d"; - public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static final Function TRANSFORM_OBJECT_TO_JACKSON_SERVICE_RECORD_FUNCTION = new Function() { @Nullable @@ -52,6 +53,33 @@ private MarketoUtils() { } + public static ObjectMapper getObjectMapper() + { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.getFactory().setCharacterEscapes(new CharacterEscapes() { + + private final int[] escapeCodes; + + { + escapeCodes = standardAsciiEscapesForJSON(); + escapeCodes['\''] = CharacterEscapes.ESCAPE_STANDARD; + escapeCodes['/'] = CharacterEscapes.ESCAPE_STANDARD; + escapeCodes['\n'] = CharacterEscapes.ESCAPE_STANDARD; + } + + @Override + public int[] getEscapeCodesForAscii() { + return escapeCodes; + } + + @Override + public SerializableString getEscapeSequence(int ch) { + return null; + } + }); + return new ObjectMapper(); + } + public static ServiceResponseMapper buildDynamicResponseMapper(String prefix, List columns) { JacksonServiceResponseMapper.Builder builder = JacksonServiceResponseMapper.builder(); diff --git a/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java index e285f13..28ef870 100644 --- a/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java +++ b/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java @@ -4,6 +4,10 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.Iterators; + +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.text.StringEscapeUtils; import org.embulk.base.restclient.jackson.JacksonServiceRecord; import org.embulk.base.restclient.jackson.JacksonServiceValue; import org.embulk.base.restclient.record.RecordImporter; @@ -173,7 +177,7 @@ public Iterator> apply(LineDecoder input) int imported = 0; while (csvRecords.hasNext()) { Map csvRecord = csvRecords.next(); - ObjectNode objectNode = MarketoUtils.OBJECT_MAPPER.valueToTree(csvRecord); + ObjectNode objectNode = MarketoUtils.getObjectMapper().valueToTree(csvRecord); recordImporter.importRecord(new AllStringJacksonServiceRecord(objectNode), pageBuilder); imported = imported + 1; } @@ -299,7 +303,6 @@ public double doubleValue() public Value jsonValue(JsonParser jsonParser) { try { - textValue = textValue.replace("\\", "\\\\"); return jsonParser.parse(textValue); } catch (Exception e) { LOGGER.info("skipped to parse JSON: " + textValue); @@ -435,16 +438,9 @@ private Map getNextCSVRecord() } Map kvMap = new HashMap<>(); try { - int i = 0; - while (tokenizer.hasNextColumn()) { - String column = tokenizer.nextColumnOrNull(); - if(column != null && !":\"\",".equals(column)){ - column = column.replace("\"\"", "\\\"\\\""); - column = column.replace(":\\\"\\\",", ":\"\","); - column = column.replace("\\\"\\\"\\,\\\"\\\"", "\\\"\\\",\\\"\\\""); - } - kvMap.put(headers.get(i), column); - i++; + CSVRecord record = tokenizer.csvParse(); + for (int i = 0; i < headers.size(); i++) { + kvMap.put(headers.get(i), record.get(i)); } } catch (CsvTokenizer.InvalidValueException ex) { From c0116e0a6d761f651ac8204b8cde1dd85b405a43 Mon Sep 17 00:00:00 2001 From: dododo8m Date: Thu, 4 Jun 2020 10:52:27 +0900 Subject: [PATCH 12/39] =?UTF-8?q?=E3=83=90=E3=83=BC=E3=82=B8=E3=83=A7?= =?UTF-8?q?=E3=83=B3=E3=82=A2=E3=83=83=E3=83=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 2 +- .../delegate/LeadServiceResponseMapperBuilderTest.java | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/build.gradle b/build.gradle index 0d26d50..4b10af3 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ repositories { configurations { provided } -version = "0.6.22" +version = "0.6.23" sourceCompatibility = 1.7 targetCompatibility = 1.7 diff --git a/src/test/java/org/embulk/input/marketo/delegate/LeadServiceResponseMapperBuilderTest.java b/src/test/java/org/embulk/input/marketo/delegate/LeadServiceResponseMapperBuilderTest.java index e9dc5e7..402fff6 100644 --- a/src/test/java/org/embulk/input/marketo/delegate/LeadServiceResponseMapperBuilderTest.java +++ b/src/test/java/org/embulk/input/marketo/delegate/LeadServiceResponseMapperBuilderTest.java @@ -64,8 +64,8 @@ public void setUp() throws Exception "target: all_lead_with_list_id\n"); pluginTask = configSource.loadConfig(LeadServiceResponseMapperBuilder.PluginTask.class); marketoService = Mockito.mock(MarketoService.class); - JavaType marketoFieldsType = MarketoUtils.OBJECT_MAPPER.getTypeFactory().constructParametrizedType(List.class, List.class, MarketoField.class); - List marketoFields = MarketoUtils.OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("/fixtures/lead_describe_marketo_fields_full.json"), marketoFieldsType); + JavaType marketoFieldsType = MarketoUtils.getObjectMapper().getTypeFactory().constructParametrizedType(List.class, List.class, MarketoField.class); + List marketoFields = MarketoUtils.getObjectMapper().readValue(this.getClass().getResourceAsStream("/fixtures/lead_describe_marketo_fields_full.json"), marketoFieldsType); Mockito.when(marketoService.describeLead()).thenReturn(marketoFields); } @@ -87,7 +87,7 @@ public void buildServiceResponseMapper() throws Exception @Test public void getLeadColumnsIncludedEmpty() throws IOException { - configSource = configSource.set("included_fields", MarketoUtils.OBJECT_MAPPER.readTree("[]")); + configSource = configSource.set("included_fields", MarketoUtils.getObjectMapper().readTree("[]")); pluginTask = configSource.loadConfig(LeadServiceResponseMapperBuilder.PluginTask.class); leadServiceResponseMapperBuilder = new LeadServiceResponseMapperBuilder<>(pluginTask, marketoService); List leadColumns = leadServiceResponseMapperBuilder.getLeadColumns(); @@ -97,7 +97,7 @@ public void getLeadColumnsIncludedEmpty() throws IOException @Test public void getLeadColumnsIncluded1() throws IOException { - configSource = configSource.set("included_fields", MarketoUtils.OBJECT_MAPPER.readTree("[\"company\",\"incorrect_value\"]")); + configSource = configSource.set("included_fields", MarketoUtils.getObjectMapper().readTree("[\"company\",\"incorrect_value\"]")); pluginTask = configSource.loadConfig(LeadServiceResponseMapperBuilder.PluginTask.class); leadServiceResponseMapperBuilder = new LeadServiceResponseMapperBuilder<>(pluginTask, marketoService); List leadColumns = leadServiceResponseMapperBuilder.getLeadColumns(); @@ -108,7 +108,7 @@ public void getLeadColumnsIncluded1() throws IOException @Test public void getLeadColumnsIncluded2() throws IOException { - configSource = configSource.set("included_fields", MarketoUtils.OBJECT_MAPPER.readTree("[\"company\",\"incorrect_value\"]")); + configSource = configSource.set("included_fields", MarketoUtils.getObjectMapper().readTree("[\"company\",\"incorrect_value\"]")); pluginTask = configSource.loadConfig(LeadServiceResponseMapperBuilder.PluginTask.class); marketoService = Mockito.mock(MarketoService.class); leadServiceResponseMapperBuilder = new LeadServiceResponseMapperBuilder<>(pluginTask, marketoService); From 47beabd1c437a430fbf34e694a73247d2da4e73d Mon Sep 17 00:00:00 2001 From: dododo8m Date: Thu, 4 Jun 2020 11:01:00 +0900 Subject: [PATCH 13/39] =?UTF-8?q?=E4=B8=8D=E8=A6=81=E3=81=AA=E3=83=AD?= =?UTF-8?q?=E3=82=B0=E3=82=92=E5=89=8A=E9=99=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 2 +- src/main/java/org/embulk/input/marketo/CsvTokenizer.java | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/build.gradle b/build.gradle index 4b10af3..119699d 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ repositories { configurations { provided } -version = "0.6.23" +version = "0.6.24" sourceCompatibility = 1.7 targetCompatibility = 1.7 diff --git a/src/main/java/org/embulk/input/marketo/CsvTokenizer.java b/src/main/java/org/embulk/input/marketo/CsvTokenizer.java index b1d3884..686b880 100644 --- a/src/main/java/org/embulk/input/marketo/CsvTokenizer.java +++ b/src/main/java/org/embulk/input/marketo/CsvTokenizer.java @@ -205,10 +205,8 @@ private boolean nextLine(boolean skipEmptyLine) { public CSVRecord csvParse() { try { - LOGGER.info(line); CSVParser csvParser = CSVParser.parse(line, CSVFormat.DEFAULT); CSVRecord record = csvParser.getRecords().get(0); - LOGGER.info(record.toString()); recordState = RecordState.END; return record; } catch (IOException e) { From b287bb64199309e88a1efffe7a0a4437a564bb20 Mon Sep 17 00:00:00 2001 From: dododo8m Date: Tue, 9 Jun 2020 07:00:42 +0900 Subject: [PATCH 14/39] =?UTF-8?q?CSVparser=E3=82=92=E4=BD=BF=E7=94=A8?= =?UTF-8?q?=E3=81=99=E3=82=8B=E3=82=88=E3=81=86=E3=81=AB=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../embulk/input/marketo/CsvTokenizer.java | 63 ++-- .../MarketoBaseBulkExtractInputPlugin.java | 273 +++++++----------- 2 files changed, 140 insertions(+), 196 deletions(-) diff --git a/src/main/java/org/embulk/input/marketo/CsvTokenizer.java b/src/main/java/org/embulk/input/marketo/CsvTokenizer.java index 686b880..5346c59 100644 --- a/src/main/java/org/embulk/input/marketo/CsvTokenizer.java +++ b/src/main/java/org/embulk/input/marketo/CsvTokenizer.java @@ -7,7 +7,6 @@ import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; -import org.apache.commons.csv.CSVRecord; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigException; @@ -16,7 +15,9 @@ import org.embulk.spi.util.LineDecoder; import org.slf4j.Logger; +import java.io.BufferedReader; import java.io.IOException; +import java.io.Reader; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; @@ -72,16 +73,16 @@ public interface PluginTask extends LineDecoder.DecoderTask { Optional getCommentLineMarker(); } - private final char delimiterChar; - private final String delimiterFollowingString; - private final char quote; - private final char escape; - private final String newline; - private final boolean trimIfNotQuoted; - private final long maxQuotedSizeLimit; - private final String commentLineMarker; - private final LineDecoder input; - private final String nullStringOrNull; + private char delimiterChar; + private String delimiterFollowingString; + private char quote; + private char escape; + private String newline; + private boolean trimIfNotQuoted; + private long maxQuotedSizeLimit; + private String commentLineMarker; + private LineDecoder input; + private String nullStringOrNull; private RecordState recordState = RecordState.END; // initial state is end of a record. nextRecord() must be called // first @@ -122,6 +123,34 @@ public CsvTokenizer(String delimiter, char quote, char escape, String newline, b this.nullStringOrNull = nullStringOrNull; } + private Reader inputStream; + + public CsvTokenizer(Reader inputStream) + { + this.inputStream = inputStream; + } + + public CSVParser csvParse() { + try { + BufferedReader b = new BufferedReader(inputStream); + StringBuilder sb = new StringBuilder(); + String line = b.readLine(); + sb.append(line); + while(line != null){ + sb.append("\r\n"); + sb.append(line); + line = b.readLine(); + } + String csv = sb.toString(); + + CSVParser csvParser = CSVParser.parse(csv, CSVFormat.DEFAULT.withFirstRecordAsHeader()); + inputStream.close(); + return csvParser; + } catch (IOException e) { + throw new InvalidValueException(e.getMessage()); + } + } + public long getCurrentLineNumber() { return lineNumber; } @@ -203,18 +232,6 @@ private boolean nextLine(boolean skipEmptyLine) { } } - public CSVRecord csvParse() { - try { - CSVParser csvParser = CSVParser.parse(line, CSVFormat.DEFAULT); - CSVRecord record = csvParser.getRecords().get(0); - recordState = RecordState.END; - return record; - } catch (IOException e) { - LOGGER.info(e.getMessage()); - throw new InvalidValueException("死にました"); - } - } - public boolean hasNextColumn() { return recordState == RecordState.NOT_END; diff --git a/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java index 28ef870..22693c8 100644 --- a/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java +++ b/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java @@ -1,13 +1,11 @@ package org.embulk.input.marketo.delegate; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.collect.Iterators; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; -import org.apache.commons.text.StringEscapeUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.embulk.base.restclient.jackson.JacksonServiceRecord; import org.embulk.base.restclient.jackson.JacksonServiceValue; import org.embulk.base.restclient.record.RecordImporter; @@ -27,44 +25,47 @@ import org.embulk.spi.BufferAllocator; import org.embulk.spi.Column; import org.embulk.spi.ColumnVisitor; -import org.embulk.spi.DataException; import org.embulk.spi.Exec; import org.embulk.spi.PageBuilder; import org.embulk.spi.Schema; -import org.embulk.spi.json.JsonParseException; import org.embulk.spi.json.JsonParser; import org.embulk.spi.time.Timestamp; import org.embulk.spi.time.TimestampParser; +import org.embulk.spi.util.FileInputInputStream; import org.embulk.spi.util.InputStreamFileInput; import org.embulk.spi.util.LineDecoder; import org.joda.time.DateTime; import org.msgpack.value.Value; import org.slf4j.Logger; +import java.io.BufferedReader; +import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CodingErrorAction; import java.text.DateFormat; import java.text.SimpleDateFormat; -import java.util.ArrayList; import java.util.Date; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.NoSuchElementException; /** * Created by tai.khuu on 9/18/17. */ -public abstract class MarketoBaseBulkExtractInputPlugin extends MarketoBaseInputPluginDelegate -{ +public abstract class MarketoBaseBulkExtractInputPlugin + extends MarketoBaseInputPluginDelegate { private static final String FROM_DATE = "from_date"; private static final Logger LOGGER = Exec.getLogger(MarketoBaseBulkExtractInputPlugin.class); private static final int MARKETO_MAX_RANGE_EXTRACT = 30; - public interface PluginTask extends MarketoBaseInputPluginDelegate.PluginTask, CsvTokenizer.PluginTask - { + private List records; + + public interface PluginTask extends MarketoBaseInputPluginDelegate.PluginTask, CsvTokenizer.PluginTask { @Config("from_date") Date getFromDate(); @@ -95,8 +96,9 @@ public interface PluginTask extends MarketoBaseInputPluginDelegate.PluginTask, C @Config("incremental_column") @ConfigDefault("\"createdAt\"") - //Incremental column are only keep here since we don't want to introduce too much change to plugin - //Consider remove it in next release + // Incremental column are only keep here since we don't want to introduce too + // much change to plugin + // Consider remove it in next release Optional getIncrementalColumn(); void setIncrementalColumn(Optional incrementalColumn); @@ -104,12 +106,12 @@ public interface PluginTask extends MarketoBaseInputPluginDelegate.PluginTask, C @Config("uid_column") @ConfigDefault("null") Optional getUidColumn(); + void setUidColumn(Optional uidColumn); } @Override - public void validateInputTask(T task) - { + public void validateInputTask(T task) { super.validateInputTask(task); if (task.getFromDate() == null) { throw new ConfigException("From date is required for Bulk Extract"); @@ -117,31 +119,28 @@ public void validateInputTask(T task) if (task.getFromDate().getTime() >= task.getJobStartTime().getMillis()) { throw new ConfigException("From date can't not be in future"); } - if (task.getIncremental() - && task.getIncrementalColumn().isPresent() + if (task.getIncremental() && task.getIncrementalColumn().isPresent() && task.getIncrementalColumn().get().equals("updatedAt")) { throw new ConfigException("Column 'updatedAt' cannot be incremental imported"); } - //Calculate to date + // Calculate to date DateTime toDate = getToDate(task); task.setToDate(Optional.of(toDate.toDate())); } - public DateTime getToDate(T task) - { + public DateTime getToDate(T task) { Date fromDate = task.getFromDate(); DateTime dateTime = new DateTime(fromDate); DateTime toDate = dateTime.plusDays(task.getFetchDays()); if (toDate.isAfter(task.getJobStartTime())) { - //Lock down to date + // Lock down to date toDate = task.getJobStartTime(); } return toDate; } @Override - public ConfigDiff buildConfigDiff(T task, Schema schema, int taskCount, List taskReports) - { + public ConfigDiff buildConfigDiff(T task, Schema schema, int taskCount, List taskReports) { ConfigDiff configDiff = super.buildConfigDiff(task, schema, taskCount, taskReports); String incrementalColumn = task.getIncrementalColumn().orNull(); if (incrementalColumn != null && task.getIncremental()) { @@ -154,31 +153,28 @@ public ConfigDiff buildConfigDiff(T task, Schema schema, int taskCount, List> csvRecords = Iterators.concat(Iterators.transform(decoderIterator, new Function>>() - { - @Override - public Iterator> apply(LineDecoder input) - { - return new CsvRecordIterator(input, task); - } - })); - //Keep the preview code here when we can enable real preview - if (Exec.isPreview()) { - csvRecords = Iterators.limit(csvRecords, PREVIEW_RECORD_LIMIT); - } int imported = 0; - while (csvRecords.hasNext()) { - Map csvRecord = csvRecords.next(); - ObjectNode objectNode = MarketoUtils.getObjectMapper().valueToTree(csvRecord); - recordImporter.importRecord(new AllStringJacksonServiceRecord(objectNode), pageBuilder); + while (decoderIterator.hasNext()) { + try { + Reader inputStream = decoderIterator.next(); + CsvTokenizer csvTokenizer = new CsvTokenizer(inputStream); + CSVParser csvParser = csvTokenizer.csvParse(); + records = csvParser.getRecords(); + records.remove(0); + for (CSVRecord csvRecord : records) { + ObjectNode objectNode = MarketoUtils.getObjectMapper().valueToTree(csvRecord.toMap()); + recordImporter.importRecord(new AllStringJacksonServiceRecord(objectNode), pageBuilder); + } + } catch (CsvTokenizer.InvalidValueException | IllegalArgumentException | IOException ex) { + LOGGER.warn("skipped csv line: " + ExceptionUtils.getStackTrace(ex)); + } imported = imported + 1; } return taskReport; @@ -188,50 +184,43 @@ public Iterator> apply(LineDecoder input) /** * This method should be removed when we allow skip preview phase + * * @param pageBuilder * @return TaskReport */ - private TaskReport importMockPreviewData(final PageBuilder pageBuilder) - { + private TaskReport importMockPreviewData(final PageBuilder pageBuilder) { final JsonParser jsonParser = new JsonParser(); Schema schema = pageBuilder.getSchema(); for (int i = 1; i <= PREVIEW_RECORD_LIMIT; i++) { final int rowNum = i; - schema.visitColumns(new ColumnVisitor() - { + schema.visitColumns(new ColumnVisitor() { @Override - public void booleanColumn(Column column) - { + public void booleanColumn(Column column) { pageBuilder.setBoolean(column, false); } @Override - public void longColumn(Column column) - { + public void longColumn(Column column) { pageBuilder.setLong(column, 12345L); } @Override - public void doubleColumn(Column column) - { + public void doubleColumn(Column column) { pageBuilder.setDouble(column, 12345.123); } @Override - public void stringColumn(Column column) - { + public void stringColumn(Column column) { pageBuilder.setString(column, column.getName() + "_" + rowNum); } @Override - public void timestampColumn(Column column) - { + public void timestampColumn(Column column) { pageBuilder.setTimestamp(column, Timestamp.ofEpochMilli(System.currentTimeMillis())); } @Override - public void jsonColumn(Column column) - { + public void jsonColumn(Column column) { pageBuilder.setJson(column, jsonParser.parse("{\"mockKey\":\"mockValue\"}")); } }); @@ -240,68 +229,64 @@ public void jsonColumn(Column column) return Exec.newTaskReport(); } - private LineDecoderIterator getLineDecoderIterator(T task) - { - List dateRanges = MarketoUtils.sliceRange(new DateTime(task.getFromDate()), new DateTime(task.getToDate().orNull()), MARKETO_MAX_RANGE_EXTRACT); + private LineDecoderIterator getLineDecoderIterator(T task) { + List dateRanges = MarketoUtils.sliceRange(new DateTime(task.getFromDate()), + new DateTime(task.getToDate().orNull()), MARKETO_MAX_RANGE_EXTRACT); final Iterator iterator = dateRanges.iterator(); return new LineDecoderIterator(iterator, task); } @Override - protected final Iterator getServiceRecords(MarketoService marketoService, T task) - { + protected final Iterator getServiceRecords(MarketoService marketoService, T task) { throw new UnsupportedOperationException(); } - protected abstract InputStream getExtractedStream(MarketoService service, T task, DateTime fromDate, DateTime toDate); + protected abstract InputStream getExtractedStream(MarketoService service, T task, DateTime fromDate, + DateTime toDate); - private static class AllStringJacksonServiceRecord extends JacksonServiceRecord - { - public AllStringJacksonServiceRecord(ObjectNode record) - { + private static class AllStringJacksonServiceRecord extends JacksonServiceRecord { + public AllStringJacksonServiceRecord(ObjectNode record) { super(record); } @Override - public JacksonServiceValue getValue(ValueLocator locator) - { + public JacksonServiceValue getValue(ValueLocator locator) { // We know that this thing only contain text. JacksonServiceValue value = super.getValue(locator); return new StringConverterJacksonServiceRecord(value.stringValue()); } } - private static class StringConverterJacksonServiceRecord extends JacksonServiceValue - { + private static class StringConverterJacksonServiceRecord extends JacksonServiceValue { private String textValue; - public StringConverterJacksonServiceRecord(String textValue) - { + public StringConverterJacksonServiceRecord(String textValue) { super(null); this.textValue = textValue; } @Override - public boolean isNull() - { + public boolean isNull() { return textValue == null || textValue.equals("null"); } @Override - public boolean booleanValue() - { + public boolean booleanValue() { return Boolean.parseBoolean(textValue); } @Override - public double doubleValue() - { - return Double.parseDouble(textValue); + public double doubleValue() { + try { + return Double.parseDouble(textValue); + } catch (Exception e) { + LOGGER.info("skipped to parse Double: " + textValue); + return Double.NaN; + } } @Override - public Value jsonValue(JsonParser jsonParser) - { + public Value jsonValue(JsonParser jsonParser) { try { return jsonParser.parse(textValue); } catch (Exception e) { @@ -311,26 +296,32 @@ public Value jsonValue(JsonParser jsonParser) } @Override - public long longValue() - { - return Long.parseLong(textValue); + public long longValue() { + try { + return Long.parseLong(textValue); + } catch (Exception e) { + LOGGER.info("skipped to parse Long: " + textValue); + return Long.MIN_VALUE; + } } @Override - public String stringValue() - { + public String stringValue() { return textValue; } @Override - public Timestamp timestampValue(TimestampParser timestampParser) - { - return timestampParser.parse(textValue); + public Timestamp timestampValue(TimestampParser timestampParser) { + try { + return timestampParser.parse(textValue); + } catch (Exception e) { + LOGGER.info("skipped to parse Timestamp: " + textValue); + return null; + } } } - private final class LineDecoderIterator implements Iterator, AutoCloseable - { + private final class LineDecoderIterator implements Iterator, AutoCloseable { private LineDecoder currentLineDecoder; private Iterator dateRangeIterator; @@ -339,8 +330,8 @@ private final class LineDecoderIterator implements Iterator, AutoCl private MarketoRestClient marketoRestClient; private T task; - public LineDecoderIterator(Iterator dateRangeIterator, T task) - { + + public LineDecoderIterator(Iterator dateRangeIterator, T task) { marketoRestClient = createMarketoRestClient(task); marketoService = new MarketoServiceImpl(marketoRestClient); this.dateRangeIterator = dateRangeIterator; @@ -348,8 +339,7 @@ public LineDecoderIterator(Iterator dateRangeIterator, T } @Override - public void close() - { + public void close() { if (currentLineDecoder != null) { currentLineDecoder.close(); } @@ -359,69 +349,24 @@ public void close() } @Override - public boolean hasNext() - { + public boolean hasNext() { return dateRangeIterator.hasNext(); } @Override - public LineDecoder next() - { + public Reader next() { if (hasNext()) { MarketoUtils.DateRange next = dateRangeIterator.next(); - InputStream extractedStream = getExtractedStream(marketoService, task, next.fromDate, next.toDate); - currentLineDecoder = new LineDecoder(new InputStreamFileInput(task.getBufferAllocator(), extractedStream), task); - return currentLineDecoder; - } - throw new NoSuchElementException(); - } - - @Override - public void remove() - { - throw new UnsupportedOperationException("Removed are not supported"); - } - } - - private class CsvRecordIterator implements Iterator> - { - private CsvTokenizer tokenizer; + InputStream inputStream = getExtractedStream(marketoService, task, next.fromDate, next.toDate); + InputStreamFileInput in = new InputStreamFileInput(task.getBufferAllocator(), inputStream); + FileInputInputStream fileInputInputStream = new FileInputInputStream(in); - private List headers; + CharsetDecoder decoder = task.getCharset().newDecoder().onMalformedInput(CodingErrorAction.REPLACE) + .onUnmappableCharacter(CodingErrorAction.REPLACE); + BufferedReader b = new BufferedReader(new InputStreamReader(fileInputInputStream, decoder)); + fileInputInputStream.nextFile(); - private Map currentCsvRecord; - public CsvRecordIterator(LineDecoder lineDecoder, T task) - { - tokenizer = new CsvTokenizer(lineDecoder, task); - if (!tokenizer.nextFile()) { - throw new DataException("Can't read extract input stream"); - } - headers = new ArrayList<>(); - tokenizer.nextRecord(); - while (tokenizer.hasNextColumn()) { - headers.add(tokenizer.nextColumn()); - } - } - - @Override - public boolean hasNext() - { - if (currentCsvRecord == null) { - currentCsvRecord = getNextCSVRecord(); - } - return currentCsvRecord != null; - } - - @Override - public Map next() - { - try { - if (hasNext()) { - return currentCsvRecord; - } - } - finally { - currentCsvRecord = null; + return b; } throw new NoSuchElementException(); } @@ -429,25 +374,7 @@ public Map next() @Override public void remove() { - throw new UnsupportedOperationException(); - } - private Map getNextCSVRecord() - { - if (!tokenizer.nextRecord()) { - return null; - } - Map kvMap = new HashMap<>(); - try { - CSVRecord record = tokenizer.csvParse(); - for (int i = 0; i < headers.size(); i++) { - kvMap.put(headers.get(i), record.get(i)); - } - } - catch (CsvTokenizer.InvalidValueException ex) { - throw new DataException("Encounter exception when parse csv file. Please check to see if you are using the correct" + - "quote or escape character.", ex); - } - return kvMap; + throw new UnsupportedOperationException("Removed are not supported"); } } } From 19224e869eb99960ea9b6fd87ddf3bcc035d1fee Mon Sep 17 00:00:00 2001 From: dododo8m Date: Tue, 9 Jun 2020 07:04:39 +0900 Subject: [PATCH 15/39] =?UTF-8?q?=E3=83=90=E3=83=BC=E3=82=B8=E3=83=A7?= =?UTF-8?q?=E3=83=B3=E3=82=A2=E3=83=83=E3=83=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 119699d..006f928 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ repositories { configurations { provided } -version = "0.6.24" +version = "0.6.25" sourceCompatibility = 1.7 targetCompatibility = 1.7 From 25598475bf99537bd2299717b771523dbfb32713 Mon Sep 17 00:00:00 2001 From: dododo8m Date: Wed, 24 Jun 2020 07:07:31 +0900 Subject: [PATCH 16/39] =?UTF-8?q?id=E7=B3=BB=E3=81=AE=E3=83=97=E3=83=AC?= =?UTF-8?q?=E3=83=93=E3=83=A5=E3=83=BC=E3=81=AE=E3=83=87=E3=83=BC=E3=82=BF?= =?UTF-8?q?=E3=82=92=E6=95=B0=E5=AD=97=E3=81=AB=E3=81=99=E3=82=8B=E3=82=88?= =?UTF-8?q?=E3=81=86=E3=81=AB=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 2 +- .../marketo/delegate/MarketoBaseBulkExtractInputPlugin.java | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 006f928..6a9c914 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ repositories { configurations { provided } -version = "0.6.25" +version = "0.6.26" sourceCompatibility = 1.7 targetCompatibility = 1.7 diff --git a/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java index 22693c8..dbd62d6 100644 --- a/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java +++ b/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java @@ -211,7 +211,11 @@ public void doubleColumn(Column column) { @Override public void stringColumn(Column column) { - pageBuilder.setString(column, column.getName() + "_" + rowNum); + if(column.getName().endsWith("Id") || column.getName().equals("id")){ + pageBuilder.setString(column, Integer.toString(rowNum)); + }else{ + pageBuilder.setString(column, column.getName() + "_" + rowNum); + } } @Override From cb88d88e35bc7f83c369b7d70ab92a41d29f4d32 Mon Sep 17 00:00:00 2001 From: dododo8m Date: Wed, 1 Jul 2020 08:33:35 +0900 Subject: [PATCH 17/39] =?UTF-8?q?OOM=E5=AF=BE=E5=BF=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 2 +- .../embulk/input/marketo/CsvTokenizer.java | 23 +++++++++++++------ 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/build.gradle b/build.gradle index 6a9c914..bc02b3f 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ repositories { configurations { provided } -version = "0.6.26" +version = "0.6.27" sourceCompatibility = 1.7 targetCompatibility = 1.7 diff --git a/src/main/java/org/embulk/input/marketo/CsvTokenizer.java b/src/main/java/org/embulk/input/marketo/CsvTokenizer.java index 5346c59..940b11b 100644 --- a/src/main/java/org/embulk/input/marketo/CsvTokenizer.java +++ b/src/main/java/org/embulk/input/marketo/CsvTokenizer.java @@ -16,10 +16,16 @@ import org.slf4j.Logger; import java.io.BufferedReader; +import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.io.Reader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Calendar; import java.util.Deque; import java.util.List; @@ -132,19 +138,22 @@ public CsvTokenizer(Reader inputStream) public CSVParser csvParse() { try { + String path = String.format("tmp_%d.csv", Calendar.getInstance().getTimeInMillis()); + File file = new File(path); + FileWriter filewriter = new FileWriter(file); + BufferedReader b = new BufferedReader(inputStream); - StringBuilder sb = new StringBuilder(); String line = b.readLine(); - sb.append(line); + filewriter.write(line); while(line != null){ - sb.append("\r\n"); - sb.append(line); + filewriter.write("\r\n"); + filewriter.write(line); line = b.readLine(); } - String csv = sb.toString(); - - CSVParser csvParser = CSVParser.parse(csv, CSVFormat.DEFAULT.withFirstRecordAsHeader()); + filewriter.close(); inputStream.close(); + + CSVParser csvParser = CSVParser.parse(file, StandardCharsets.UTF_8, CSVFormat.DEFAULT.withFirstRecordAsHeader()); return csvParser; } catch (IOException e) { throw new InvalidValueException(e.getMessage()); From 5772b9fe316f34f2a70da9ba5785c679fe716e67 Mon Sep 17 00:00:00 2001 From: dododo8m Date: Tue, 7 Jul 2020 14:27:58 +0900 Subject: [PATCH 18/39] =?UTF-8?q?=E3=83=87=E3=83=BC=E3=82=BF=E3=81=AE?= =?UTF-8?q?=E5=8F=96=E5=BE=97=E6=9C=9F=E9=96=93=E3=82=9210=E6=97=A5?= =?UTF-8?q?=E9=96=93=E3=81=AB=E5=A4=89=E6=9B=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../marketo/delegate/MarketoBaseBulkExtractInputPlugin.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java index dbd62d6..a0682bf 100644 --- a/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java +++ b/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java @@ -61,7 +61,7 @@ public abstract class MarketoBaseBulkExtractInputPlugin records; From 37665933394ed78c3dd201980f3865f1e5819f3a Mon Sep 17 00:00:00 2001 From: dododo8m Date: Thu, 16 Jul 2020 19:35:51 +0900 Subject: [PATCH 19/39] =?UTF-8?q?CSV=E3=83=87=E3=83=BC=E3=82=BF=E3=81=AE?= =?UTF-8?q?=E5=8F=96=E5=BE=97=E3=83=AD=E3=82=B8=E3=83=83=E3=82=AF=E3=81=AE?= =?UTF-8?q?=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../embulk/input/marketo/CsvTokenizer.java | 8 +++--- .../MarketoBaseBulkExtractInputPlugin.java | 26 ++++++++++++------- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/embulk/input/marketo/CsvTokenizer.java b/src/main/java/org/embulk/input/marketo/CsvTokenizer.java index 940b11b..a19a874 100644 --- a/src/main/java/org/embulk/input/marketo/CsvTokenizer.java +++ b/src/main/java/org/embulk/input/marketo/CsvTokenizer.java @@ -144,11 +144,13 @@ public CSVParser csvParse() { BufferedReader b = new BufferedReader(inputStream); String line = b.readLine(); - filewriter.write(line); - while(line != null){ - filewriter.write("\r\n"); + while(true){ filewriter.write(line); line = b.readLine(); + if(line == null){ + break; + } + filewriter.write("\r\n"); } filewriter.close(); inputStream.close(); diff --git a/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java index a0682bf..ee82313 100644 --- a/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java +++ b/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java @@ -6,6 +6,7 @@ import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.codehaus.plexus.util.CollectionUtils; import org.embulk.base.restclient.jackson.JacksonServiceRecord; import org.embulk.base.restclient.jackson.JacksonServiceValue; import org.embulk.base.restclient.record.RecordImporter; @@ -61,9 +62,7 @@ public abstract class MarketoBaseBulkExtractInputPlugin records; + private static final int MARKETO_MAX_RANGE_EXTRACT = 30; public interface PluginTask extends MarketoBaseInputPluginDelegate.PluginTask, CsvTokenizer.PluginTask { @Config("from_date") @@ -163,15 +162,18 @@ public TaskReport ingestServiceData(final T task, RecordImporter recordImporter, int imported = 0; while (decoderIterator.hasNext()) { try { - Reader inputStream = decoderIterator.next(); - CsvTokenizer csvTokenizer = new CsvTokenizer(inputStream); - CSVParser csvParser = csvTokenizer.csvParse(); - records = csvParser.getRecords(); - records.remove(0); - for (CSVRecord csvRecord : records) { + CSVParser csvParser = CsvTokenizer(decoderIterator); + System.gc(); + while (csvParser.iterator().hasNext()) { + CSVRecord csvRecord = csvParser.iterator().next(); ObjectNode objectNode = MarketoUtils.getObjectMapper().valueToTree(csvRecord.toMap()); recordImporter.importRecord(new AllStringJacksonServiceRecord(objectNode), pageBuilder); + if(csvParser.getRecordNumber() % 10000 == 0) { + LOGGER.info("transfer record count: " + csvParser.getRecordNumber()); + } } + LOGGER.info("transfer record count: " + csvParser.getRecordNumber()); + csvParser.close(); } catch (CsvTokenizer.InvalidValueException | IllegalArgumentException | IOException ex) { LOGGER.warn("skipped csv line: " + ExceptionUtils.getStackTrace(ex)); } @@ -182,6 +184,12 @@ public TaskReport ingestServiceData(final T task, RecordImporter recordImporter, } } + private CSVParser CsvTokenizer(LineDecoderIterator decoderIterator) throws IOException { + Reader inputStream = decoderIterator.next(); + CsvTokenizer csvTokenizer = new CsvTokenizer(inputStream); + return csvTokenizer.csvParse(); + } + /** * This method should be removed when we allow skip preview phase * From 2ec9a18608a2706449ab66c456ebd58f0544ddc2 Mon Sep 17 00:00:00 2001 From: dododo8m Date: Thu, 16 Jul 2020 20:37:55 +0900 Subject: [PATCH 20/39] =?UTF-8?q?=E3=83=90=E3=83=BC=E3=82=B8=E3=83=A7?= =?UTF-8?q?=E3=83=B3=E3=82=A2=E3=83=83=E3=83=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index bc02b3f..de24b05 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ repositories { configurations { provided } -version = "0.6.27" +version = "0.6.28" sourceCompatibility = 1.7 targetCompatibility = 1.7 From 3034eee482da46a6f9cd027e705cd233dc5ef952 Mon Sep 17 00:00:00 2001 From: kazuki-yane Date: Thu, 23 Jul 2020 19:31:16 +0900 Subject: [PATCH 21/39] =?UTF-8?q?=E5=AE=9F=E8=A3=85=E5=AE=8C=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/embulk/input/marketo/MarketoUtils.java | 17 +++++++++++++---- .../input/marketo/rest/MarketoRestClient.java | 8 ++++++-- .../embulk/input/marketo/MarketoUtilsTest.java | 13 +++++++++++-- .../marketo/rest/MarketoRestClientTest.java | 5 ++++- 4 files changed, 34 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/embulk/input/marketo/MarketoUtils.java b/src/main/java/org/embulk/input/marketo/MarketoUtils.java index d9afae3..daee35a 100644 --- a/src/main/java/org/embulk/input/marketo/MarketoUtils.java +++ b/src/main/java/org/embulk/input/marketo/MarketoUtils.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.io.CharacterEscapes; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Optional; import com.google.common.base.Function; import com.google.common.collect.Sets; import org.embulk.base.restclient.ServiceResponseMapper; @@ -129,14 +130,22 @@ public static final List sliceRange(DateTime fromDate, DateTime toDat return ranges; } - public static String getIdentityEndPoint(String accountId) + public static String getIdentityEndPoint(String accountId, Optional endpoint) { - return "https://" + accountId + ".mktorest.com/identity"; + if(endpoint.isPresent()){ + return endpoint.get() + "/identity"; + } else { + return "https://" + accountId + ".mktorest.com/identity"; + } } - public static String getEndPoint(String accountID) + public static String getEndPoint(String accountID, Optional endpoint) { - return "https://" + accountID + ".mktorest.com"; + if(endpoint.isPresent()){ + return endpoint.get(); + } else { + return "https://" + accountID + ".mktorest.com"; + } } public static final class DateRange diff --git a/src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java b/src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java index 11f980b..e49fe11 100644 --- a/src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java +++ b/src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java @@ -100,6 +100,10 @@ public interface PluginTask extends Task @Config("account_id") String getAccountId(); + @Config("endpoint") + @ConfigDefault("null") + Optional getInputEndpoint(); + @Config("client_secret") String getClientSecret(); @@ -143,8 +147,8 @@ public interface PluginTask extends Task public MarketoRestClient(PluginTask task) { - this(MarketoUtils.getEndPoint(task.getAccountId()), - MarketoUtils.getIdentityEndPoint(task.getAccountId()), + this(MarketoUtils.getEndPoint(task.getAccountId(),task.getInputEndpoint()), + MarketoUtils.getIdentityEndPoint(task.getAccountId(),task.getInputEndpoint()), task.getClientId(), task.getClientSecret(), task.getPartnerApiKey(), diff --git a/src/test/java/org/embulk/input/marketo/MarketoUtilsTest.java b/src/test/java/org/embulk/input/marketo/MarketoUtilsTest.java index 9938f0d..70c6749 100644 --- a/src/test/java/org/embulk/input/marketo/MarketoUtilsTest.java +++ b/src/test/java/org/embulk/input/marketo/MarketoUtilsTest.java @@ -7,6 +7,7 @@ import org.embulk.spi.type.Types; import org.joda.time.DateTime; import org.junit.Test; +import com.google.common.base.Optional; import java.util.ArrayList; import java.util.List; @@ -63,15 +64,23 @@ public void buildColumnName() throws Exception @Test public void getIdentityEndPoint() throws Exception { - String identityEndPoint = MarketoUtils.getIdentityEndPoint("accountId"); + Optional endpoint = Optional.of(""); + String identityEndPoint = MarketoUtils.getIdentityEndPoint("accountId",endpoint); assertEquals("https://accountId.mktorest.com/identity", identityEndPoint); + Optional endpoint2 = Optional.of("endpoint"); + String identityEndPointUsingEndpoint = MarketoUtils.getIdentityEndPoint("accountId",endpoint2); + assertEquals("endpoint/identity", identityEndPointUsingEndpoint); } @Test public void getEndPoint() throws Exception { - String endPoint = MarketoUtils.getEndPoint("accountId"); + Optional endpoint = Optional.of(""); + String endPoint = MarketoUtils.getEndPoint("accountId",endpoint); assertEquals("https://accountId.mktorest.com", endPoint); + Optional endpoint2 = Optional.of("endpoint"); + String endPointUsingEndpoint = MarketoUtils.getEndPoint("accountId",endpoint2); + assertEquals("endpoint", endPointUsingEndpoint); } @Test diff --git a/src/test/java/org/embulk/input/marketo/rest/MarketoRestClientTest.java b/src/test/java/org/embulk/input/marketo/rest/MarketoRestClientTest.java index 71b0444..a861b09 100644 --- a/src/test/java/org/embulk/input/marketo/rest/MarketoRestClientTest.java +++ b/src/test/java/org/embulk/input/marketo/rest/MarketoRestClientTest.java @@ -9,6 +9,7 @@ import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.Multimap; import com.google.common.io.ByteStreams; +import com.google.common.base.Optional; import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.util.FormContentProvider; import org.embulk.EmbulkTestRuntime; @@ -54,7 +55,9 @@ public class MarketoRestClientTest private static final String TEST_CLIENT_ID = "test_client_id"; - private static final String END_POINT = MarketoUtils.getEndPoint(TEST_ACCOUNT_ID); + private static final Optional TEST_ENDPOINT = null; + + private static final String END_POINT = MarketoUtils.getEndPoint(TEST_ACCOUNT_ID,Optional.empty); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); From 239cd8322f49146e4fa79033edcab376fe9cb80a Mon Sep 17 00:00:00 2001 From: kazuki-yane Date: Fri, 24 Jul 2020 11:33:37 +0900 Subject: [PATCH 22/39] fix_test --- src/test/java/org/embulk/input/marketo/MarketoUtilsTest.java | 4 ++-- .../org/embulk/input/marketo/rest/MarketoRestClientTest.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/test/java/org/embulk/input/marketo/MarketoUtilsTest.java b/src/test/java/org/embulk/input/marketo/MarketoUtilsTest.java index 70c6749..9d8ec54 100644 --- a/src/test/java/org/embulk/input/marketo/MarketoUtilsTest.java +++ b/src/test/java/org/embulk/input/marketo/MarketoUtilsTest.java @@ -64,7 +64,7 @@ public void buildColumnName() throws Exception @Test public void getIdentityEndPoint() throws Exception { - Optional endpoint = Optional.of(""); + Optional endpoint = Optional.absent(); String identityEndPoint = MarketoUtils.getIdentityEndPoint("accountId",endpoint); assertEquals("https://accountId.mktorest.com/identity", identityEndPoint); Optional endpoint2 = Optional.of("endpoint"); @@ -75,7 +75,7 @@ public void getIdentityEndPoint() throws Exception @Test public void getEndPoint() throws Exception { - Optional endpoint = Optional.of(""); + Optional endpoint = Optional.absent(); String endPoint = MarketoUtils.getEndPoint("accountId",endpoint); assertEquals("https://accountId.mktorest.com", endPoint); Optional endpoint2 = Optional.of("endpoint"); diff --git a/src/test/java/org/embulk/input/marketo/rest/MarketoRestClientTest.java b/src/test/java/org/embulk/input/marketo/rest/MarketoRestClientTest.java index a861b09..6f745d6 100644 --- a/src/test/java/org/embulk/input/marketo/rest/MarketoRestClientTest.java +++ b/src/test/java/org/embulk/input/marketo/rest/MarketoRestClientTest.java @@ -55,9 +55,9 @@ public class MarketoRestClientTest private static final String TEST_CLIENT_ID = "test_client_id"; - private static final Optional TEST_ENDPOINT = null; + private static final Optional TEST_ENDPOINT = Optional.absent(); - private static final String END_POINT = MarketoUtils.getEndPoint(TEST_ACCOUNT_ID,Optional.empty); + private static final String END_POINT = MarketoUtils.getEndPoint(TEST_ACCOUNT_ID,TEST_ENDPOINT); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); From e3bc837e91f5aeaebda8ce5b61dbcfc60420eb1b Mon Sep 17 00:00:00 2001 From: kazuki-yane Date: Fri, 24 Jul 2020 15:55:20 +0900 Subject: [PATCH 23/39] version_up --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index de24b05..94d4373 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ repositories { configurations { provided } -version = "0.6.28" +version = "0.6.29" sourceCompatibility = 1.7 targetCompatibility = 1.7 From 6f5255c17166a3d309dcc3bc7af462283c974e3c Mon Sep 17 00:00:00 2001 From: dododo8m Date: Tue, 28 Jul 2020 07:27:57 +0900 Subject: [PATCH 24/39] =?UTF-8?q?tmp=E3=83=95=E3=82=A1=E3=82=A4=E3=83=AB?= =?UTF-8?q?=E7=94=9F=E6=88=90=E6=99=82=E3=81=AB=E3=83=AD=E3=82=B0=E3=82=92?= =?UTF-8?q?=E8=BF=BD=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/org/embulk/input/marketo/CsvTokenizer.java | 7 +++++++ .../delegate/MarketoBaseBulkExtractInputPlugin.java | 4 ++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/embulk/input/marketo/CsvTokenizer.java b/src/main/java/org/embulk/input/marketo/CsvTokenizer.java index a19a874..8eeb1f1 100644 --- a/src/main/java/org/embulk/input/marketo/CsvTokenizer.java +++ b/src/main/java/org/embulk/input/marketo/CsvTokenizer.java @@ -142,8 +142,11 @@ public CSVParser csvParse() { File file = new File(path); FileWriter filewriter = new FileWriter(file); + LOGGER.info("create tmp file: " + path); + BufferedReader b = new BufferedReader(inputStream); String line = b.readLine(); + int count = 0; while(true){ filewriter.write(line); line = b.readLine(); @@ -151,6 +154,10 @@ public CSVParser csvParse() { break; } filewriter.write("\r\n"); + count += 1; + if(count % 1 == 0) { + LOGGER.info("import record count: " + count); + } } filewriter.close(); inputStream.close(); diff --git a/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java index ee82313..00d03c1 100644 --- a/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java +++ b/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java @@ -162,7 +162,7 @@ public TaskReport ingestServiceData(final T task, RecordImporter recordImporter, int imported = 0; while (decoderIterator.hasNext()) { try { - CSVParser csvParser = CsvTokenizer(decoderIterator); + CSVParser csvParser = getCsvParser(decoderIterator); System.gc(); while (csvParser.iterator().hasNext()) { CSVRecord csvRecord = csvParser.iterator().next(); @@ -184,7 +184,7 @@ public TaskReport ingestServiceData(final T task, RecordImporter recordImporter, } } - private CSVParser CsvTokenizer(LineDecoderIterator decoderIterator) throws IOException { + private CSVParser getCsvParser(LineDecoderIterator decoderIterator) throws IOException { Reader inputStream = decoderIterator.next(); CsvTokenizer csvTokenizer = new CsvTokenizer(inputStream); return csvTokenizer.csvParse(); From fb9859a43f7bbf362b3aec91c9f16cbbcced719c Mon Sep 17 00:00:00 2001 From: dododo8m Date: Tue, 28 Jul 2020 07:28:59 +0900 Subject: [PATCH 25/39] =?UTF-8?q?=E3=83=90=E3=83=BC=E3=82=B8=E3=83=A7?= =?UTF-8?q?=E3=83=B3=E3=82=A2=E3=83=83=E3=83=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 94d4373..3ac315b 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ repositories { configurations { provided } -version = "0.6.29" +version = "0.6.30" sourceCompatibility = 1.7 targetCompatibility = 1.7 From 8935e18871dee66c9f6cd40eb36d758c2da8cff4 Mon Sep 17 00:00:00 2001 From: dododo8m Date: Tue, 28 Jul 2020 07:31:42 +0900 Subject: [PATCH 26/39] =?UTF-8?q?=E3=83=90=E3=83=BC=E3=82=B8=E3=83=A7?= =?UTF-8?q?=E3=83=B3=E3=82=A2=E3=83=83=E3=83=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 2 +- src/main/java/org/embulk/input/marketo/CsvTokenizer.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 3ac315b..7e81f8f 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ repositories { configurations { provided } -version = "0.6.30" +version = "0.6.31" sourceCompatibility = 1.7 targetCompatibility = 1.7 diff --git a/src/main/java/org/embulk/input/marketo/CsvTokenizer.java b/src/main/java/org/embulk/input/marketo/CsvTokenizer.java index 8eeb1f1..aa8a64e 100644 --- a/src/main/java/org/embulk/input/marketo/CsvTokenizer.java +++ b/src/main/java/org/embulk/input/marketo/CsvTokenizer.java @@ -155,7 +155,7 @@ public CSVParser csvParse() { } filewriter.write("\r\n"); count += 1; - if(count % 1 == 0) { + if(count % 10000 == 0) { LOGGER.info("import record count: " + count); } } From 2c039209fbe22176225faadabb0b4bf60ba9208d Mon Sep 17 00:00:00 2001 From: tk3fftk Date: Thu, 6 Jan 2022 09:27:45 +0900 Subject: [PATCH 27/39] add gem-push workflow --- .github/workflow/gem-push.yml | 27 +++++++++++++++++++++++++++ build.gradle | 11 ++++++++++- 2 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 .github/workflow/gem-push.yml diff --git a/.github/workflow/gem-push.yml b/.github/workflow/gem-push.yml new file mode 100644 index 0000000..8eb09f1 --- /dev/null +++ b/.github/workflow/gem-push.yml @@ -0,0 +1,27 @@ +name: Ruby Gem + +on: + workflow_dispatch: + push: + tags: + - 'v*' + +jobs: + build: + name: Build + Publish + runs-on: ubuntu-latest + permissions: + packages: write + contents: read + steps: + - uses: actions/checkout@v2 + - name: Set up Ruby 2.7 + uses: ruby/setup-ruby@v1 + with: + ruby-version: 2.7 + - name: push gem + uses: trocco-io/push-gem-to-gpr-action@v1 + with: + language: java + gem-path: "./build/gems/*.gem" + github-token: "${{ secrets.GITHUB_TOKEN }}" diff --git a/build.gradle b/build.gradle index 7e81f8f..d1752e6 100644 --- a/build.gradle +++ b/build.gradle @@ -4,6 +4,7 @@ plugins { id "java" id "checkstyle" id "jacoco" + id "com.palantir.git-version" version "0.12.3" } import com.github.jrubygradle.JRubyExec repositories { @@ -16,7 +17,15 @@ repositories { configurations { provided } -version = "0.6.31" +version = { + def baseVersion = "0.6.31" + def vd = versionDetails() + if (vd.commitDistance == 0 && vd.lastTag ==~ /^[0-9]+\.[0-9]+\.[0-9]+(\.[a-zA-Z0-9]+)?/) { + baseVersion + } else { + "${baseVersion}.${vd.gitHash}.pre" + } +}() sourceCompatibility = 1.7 targetCompatibility = 1.7 From e91857c839061ff7d7160262b3c0d965c44eae48 Mon Sep 17 00:00:00 2001 From: tk3fftk Date: Fri, 7 Jan 2022 06:23:57 +0900 Subject: [PATCH 28/39] fix dir name for actions --- .github/{workflow => workflows}/gem-push.yml | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename .github/{workflow => workflows}/gem-push.yml (100%) diff --git a/.github/workflow/gem-push.yml b/.github/workflows/gem-push.yml similarity index 100% rename from .github/workflow/gem-push.yml rename to .github/workflows/gem-push.yml From 5e9f60f4c7fd957e1d31002629278867c27ac84e Mon Sep 17 00:00:00 2001 From: k-roof <42165581+kazuki-yane@users.noreply.github.com> Date: Mon, 20 Jun 2022 15:35:52 +0900 Subject: [PATCH 29/39] =?UTF-8?q?=E3=83=AD=E3=82=B0=E5=A4=89=E6=9B=B4=20(#?= =?UTF-8?q?12)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Kenta Suzuki --- .gitignore | 1 + build.gradle | 4 +++- .../embulk/input/marketo/rest/MarketoBaseRestClient.java | 6 +++++- .../org/embulk/input/marketo/rest/MarketoRestClient.java | 4 +++- 4 files changed, 12 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index c3285c1..1c3a4c4 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ build/ .classpath .project bin/ +example.yml diff --git a/build.gradle b/build.gradle index d1752e6..f4fe2d0 100644 --- a/build.gradle +++ b/build.gradle @@ -17,8 +17,9 @@ repositories { configurations { provided } + version = { - def baseVersion = "0.6.31" + def baseVersion = "0.6.32" def vd = versionDetails() if (vd.commitDistance == 0 && vd.lastTag ==~ /^[0-9]+\.[0-9]+\.[0-9]+(\.[a-zA-Z0-9]+)?/) { baseVersion @@ -26,6 +27,7 @@ version = { "${baseVersion}.${vd.gitHash}.pre" } }() + sourceCompatibility = 1.7 targetCompatibility = 1.7 diff --git a/src/main/java/org/embulk/input/marketo/rest/MarketoBaseRestClient.java b/src/main/java/org/embulk/input/marketo/rest/MarketoBaseRestClient.java index a123ad6..f151fd7 100644 --- a/src/main/java/org/embulk/input/marketo/rest/MarketoBaseRestClient.java +++ b/src/main/java/org/embulk/input/marketo/rest/MarketoBaseRestClient.java @@ -54,6 +54,8 @@ public class MarketoBaseRestClient implements AutoCloseable private String clientSecret; + private String accountId; + private String accessToken; private int marketoLimitIntervalMillis; @@ -69,6 +71,7 @@ public class MarketoBaseRestClient implements AutoCloseable MarketoBaseRestClient(String identityEndPoint, String clientId, String clientSecret, + String accountId, Optional partnerApiKey, int marketoLimitIntervalMillis, long readTimeoutMillis, @@ -77,6 +80,7 @@ public class MarketoBaseRestClient implements AutoCloseable this.identityEndPoint = identityEndPoint; this.clientId = clientId; this.clientSecret = clientSecret; + this.accountId = accountId; this.readTimeoutMillis = readTimeoutMillis; this.retryHelper = retryHelper; this.marketoLimitIntervalMillis = marketoLimitIntervalMillis; @@ -236,7 +240,7 @@ public void requestOnce(HttpClient client, Response.Listener responseListener) } } } - LOGGER.info("CALLING {} -> {} - params: {}", method, target, params); + LOGGER.info("CALLING Account ID: {} {} -> {} - params: {}", accountId, method, target, params); if (contentProvider != null) { request.content(contentProvider); } diff --git a/src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java b/src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java index e49fe11..82a84b5 100644 --- a/src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java +++ b/src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java @@ -151,6 +151,7 @@ public MarketoRestClient(PluginTask task) MarketoUtils.getIdentityEndPoint(task.getAccountId(),task.getInputEndpoint()), task.getClientId(), task.getClientSecret(), + task.getAccountId(), task.getPartnerApiKey(), task.getBatchSize(), task.getMaxReturn(), @@ -166,6 +167,7 @@ public MarketoRestClient(String endPoint, String identityEndPoint, String clientId, String clientSecret, + String accountId, Optional partnerApiKey, Integer batchSize, Integer maxReturn, @@ -173,7 +175,7 @@ public MarketoRestClient(String endPoint, int marketoLimitIntervalMilis, Jetty92RetryHelper retryHelper) { - super(identityEndPoint, clientId, clientSecret, partnerApiKey, marketoLimitIntervalMilis, readTimeoutMilis, retryHelper); + super(identityEndPoint, clientId, clientSecret, accountId, partnerApiKey, marketoLimitIntervalMilis, readTimeoutMilis, retryHelper); this.endPoint = endPoint; this.batchSize = batchSize; this.maxReturn = maxReturn; From 2126d8cbb9e024c209e99740889e15d2d7179ebe Mon Sep 17 00:00:00 2001 From: Tetsuro Sano Date: Wed, 29 Jun 2022 15:25:11 +0900 Subject: [PATCH 30/39] =?UTF-8?q?=E3=83=90=E3=83=BC=E3=82=B8=E3=83=A7?= =?UTF-8?q?=E3=83=B3=E3=81=AE=E4=BB=98=E3=81=91=E6=96=B9=E3=82=92=E5=A4=89?= =?UTF-8?q?=E6=9B=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 4c73667..aededc7 100644 --- a/build.gradle +++ b/build.gradle @@ -15,10 +15,14 @@ repositories { group = "com.treasuredata.embulk.plugins" description = "Loads records from Marketo." version = { - def baseVersion = "0.6.32" + def baseVersion = "0.6.23" + def patchVersion = "1" def vd = versionDetails() + if (vd.lastTag != "${baseVersion}.${patchVersion}") { + logger.warn "lastTag '${vd.lastTag}' is not '${baseVersion}.${patchVersion}'" + } if (vd.commitDistance == 0 && vd.lastTag ==~ /^[0-9]+\.[0-9]+\.[0-9]+(\.[a-zA-Z0-9]+)?/) { - baseVersion + "${vd.lastTag}.trocco" } else { "${baseVersion}.${vd.gitHash}.pre" } From 1361d01801e6537f97ea60d623a9d5a4f2978381 Mon Sep 17 00:00:00 2001 From: Tetsuro Sano Date: Fri, 1 Jul 2022 19:39:03 +0900 Subject: [PATCH 31/39] =?UTF-8?q?=E3=82=BF=E3=82=B0=E5=90=8D=E3=81=AE?= =?UTF-8?q?=E5=85=88=E9=A0=AD=E3=81=AB=20v=20=E3=82=92=E4=BB=98=E3=81=91?= =?UTF-8?q?=E3=82=8B=E3=82=88=E3=81=86=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/build.gradle b/build.gradle index 21c8e70..912c4f0 100644 --- a/build.gradle +++ b/build.gradle @@ -18,11 +18,11 @@ version = { def baseVersion = "0.6.24" def patchVersion = "1" def vd = versionDetails() - if (vd.lastTag != "${baseVersion}.${patchVersion}") { - logger.warn "lastTag '${vd.lastTag}' is not '${baseVersion}.${patchVersion}'" + if (vd.lastTag != "v${baseVersion}.${patchVersion}") { + logger.warn "lastTag '${vd.lastTag}' is not 'v${baseVersion}.${patchVersion}'" } - if (vd.commitDistance == 0 && vd.lastTag ==~ /^[0-9]+\.[0-9]+\.[0-9]+(\.[a-zA-Z0-9]+)?/) { - "${vd.lastTag}.trocco" + if (vd.commitDistance == 0 && vd.lastTag ==~ /^v[0-9]+\.[0-9]+\.[0-9]+(\.[a-zA-Z0-9]+)?/) { + "${vd.lastTag.replaceFirst(/^v/, "")}.trocco" } else { "${baseVersion}.${vd.gitHash}.pre" } From 547063e0216bf98e3f8a19ea52bcf2b0c5c5199c Mon Sep 17 00:00:00 2001 From: Tetsuro Sano Date: Wed, 13 Jul 2022 14:38:41 +0900 Subject: [PATCH 32/39] =?UTF-8?q?=E3=83=90=E3=83=BC=E3=82=B8=E3=83=A7?= =?UTF-8?q?=E3=83=8B=E3=83=B3=E3=82=B0=E3=81=AE=E3=83=AB=E3=83=BC=E3=83=AB?= =?UTF-8?q?=E3=82=92=E5=A4=89=E6=9B=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/gem-push.yml | 2 +- build.gradle | 13 +++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/.github/workflows/gem-push.yml b/.github/workflows/gem-push.yml index 8eb09f1..f7bb584 100644 --- a/.github/workflows/gem-push.yml +++ b/.github/workflows/gem-push.yml @@ -4,7 +4,7 @@ on: workflow_dispatch: push: tags: - - 'v*' + - '*' jobs: build: diff --git a/build.gradle b/build.gradle index 912c4f0..86000a8 100644 --- a/build.gradle +++ b/build.gradle @@ -16,15 +16,16 @@ group = "com.treasuredata.embulk.plugins" description = "Loads records from Marketo." version = { def baseVersion = "0.6.24" - def patchVersion = "1" + def troccoVersion = "0.0.1" + def tag = "${baseVersion}-trocco-${troccoVersion}" def vd = versionDetails() - if (vd.lastTag != "v${baseVersion}.${patchVersion}") { - logger.warn "lastTag '${vd.lastTag}' is not 'v${baseVersion}.${patchVersion}'" + if (vd.lastTag != "${tag}") { + logger.warn "lastTag '${vd.lastTag}' is not '${tag}'" } - if (vd.commitDistance == 0 && vd.lastTag ==~ /^v[0-9]+\.[0-9]+\.[0-9]+(\.[a-zA-Z0-9]+)?/) { - "${vd.lastTag.replaceFirst(/^v/, "")}.trocco" + if (vd.commitDistance == 0 && vd.lastTag ==~ /^[0-9]+\.[0-9]+\.[0-9]+(\[.-][.a-zA-Z0-9-]+)?/) { + vd.lastTag } else { - "${baseVersion}.${vd.gitHash}.pre" + "0.0.0.${vd.gitHash}" } }() From 07e256ef6b4e5d5d75e9e6ca374c6916db99e497 Mon Sep 17 00:00:00 2001 From: Tetsuro Sano Date: Fri, 8 Jul 2022 20:41:15 +0900 Subject: [PATCH 33/39] Add support for folder --- README.md | 22 +++++ build.gradle | 2 +- .../marketo/MarketoInputPluginDelegate.java | 7 +- .../embulk/input/marketo/MarketoService.java | 4 + .../input/marketo/MarketoServiceImpl.java | 11 +++ .../marketo/delegate/FolderInputPlugin.java | 85 ++++++++++++++++++ .../marketo/rest/MarketoRESTEndpoint.java | 3 +- .../input/marketo/rest/MarketoRestClient.java | 15 ++++ .../delegate/FolderInputPluginTest.java | 85 ++++++++++++++++++ .../marketo/rest/MarketoRestClientTest.java | 26 ++++++ .../resources/fixtures/folder_response.json | 86 +++++++++++++++++++ .../fixtures/folder_response_full.json | 68 +++++++++++++++ 12 files changed, 410 insertions(+), 4 deletions(-) create mode 100644 src/main/java/org/embulk/input/marketo/delegate/FolderInputPlugin.java create mode 100644 src/test/java/org/embulk/input/marketo/delegate/FolderInputPluginTest.java create mode 100644 src/test/resources/fixtures/folder_response.json create mode 100644 src/test/resources/fixtures/folder_response_full.json diff --git a/README.md b/README.md index da9f4a3..6af5110 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ embulk-input-marketo is the gem preparing Embulk input plugins for [Marketo](htt - Program Members (program_members) - List (list) - Activity Type (activity_type) +- Assets Folders (folder) This plugin uses Marketo REST API. @@ -225,6 +226,27 @@ Incremental support: no Range ingestion: no +### Assets folders + +Get child folders from within a specified root folder or all folders if no root folder is specified. + +`target: folder` + +Configuration: + +| name | required | default value | description | +|---------------------|----------|---------------|-----------------------------------------------------------------------------------------------------------------------| +| **root_id** | false | null | Parent folder id | +| **root_type** | false | folder | Parent folder type, supported values `folder`, `program` | +| **max_depth** | false | 2 | Maximum folder depth to traverse | +| **workspace** | false | null | Name of the workspace | + +Schema type: Static schema + +Incremental support: no + +Range ingestion: no + ## Example For lead, you have `partial-config.yml` like below: diff --git a/build.gradle b/build.gradle index 86000a8..31ec02d 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ group = "com.treasuredata.embulk.plugins" description = "Loads records from Marketo." version = { def baseVersion = "0.6.24" - def troccoVersion = "0.0.1" + def troccoVersion = "0.1.0" def tag = "${baseVersion}-trocco-${troccoVersion}" def vd = versionDetails() if (vd.lastTag != "${tag}") { diff --git a/src/main/java/org/embulk/input/marketo/MarketoInputPluginDelegate.java b/src/main/java/org/embulk/input/marketo/MarketoInputPluginDelegate.java index eb67ee4..ae4c8cf 100644 --- a/src/main/java/org/embulk/input/marketo/MarketoInputPluginDelegate.java +++ b/src/main/java/org/embulk/input/marketo/MarketoInputPluginDelegate.java @@ -10,6 +10,7 @@ import org.embulk.input.marketo.delegate.ActivityTypeInputPlugin; import org.embulk.input.marketo.delegate.CampaignInputPlugin; import org.embulk.input.marketo.delegate.CustomObjectInputPlugin; +import org.embulk.input.marketo.delegate.FolderInputPlugin; import org.embulk.input.marketo.delegate.LeadBulkExtractInputPlugin; import org.embulk.input.marketo.delegate.LeadWithListInputPlugin; import org.embulk.input.marketo.delegate.LeadWithProgramInputPlugin; @@ -37,7 +38,8 @@ public interface PluginTask CustomObjectInputPlugin.PluginTask, ProgramMembersBulkExtractInputPlugin.PluginTask, ListInputPlugin.PluginTask, - ActivityTypeInputPlugin.PluginTask + ActivityTypeInputPlugin.PluginTask, + FolderInputPlugin.PluginTask { @Config("target") Target getTarget(); @@ -85,7 +87,8 @@ public enum Target CUSTOM_OBJECT(new CustomObjectInputPlugin()), PROGRAM_MEMBERS(new ProgramMembersBulkExtractInputPlugin()), LIST(new ListInputPlugin()), - ACTIVITY_TYPE(new ActivityTypeInputPlugin()); + ACTIVITY_TYPE(new ActivityTypeInputPlugin()), + FOLDER(new FolderInputPlugin()); private final RestClientInputPluginDelegate restClientInputPluginDelegate; diff --git a/src/main/java/org/embulk/input/marketo/MarketoService.java b/src/main/java/org/embulk/input/marketo/MarketoService.java index 1164f9f..5e016d0 100644 --- a/src/main/java/org/embulk/input/marketo/MarketoService.java +++ b/src/main/java/org/embulk/input/marketo/MarketoService.java @@ -1,11 +1,13 @@ package org.embulk.input.marketo; import com.fasterxml.jackson.databind.node.ObjectNode; +import org.embulk.input.marketo.delegate.FolderInputPlugin.RootType; import org.embulk.input.marketo.model.MarketoField; import java.io.File; import java.util.Date; import java.util.List; +import java.util.Optional; import java.util.Set; /** @@ -48,4 +50,6 @@ public interface MarketoService ObjectNode describeProgramMembers(); File extractProgramMembers(String exportID); + + Iterable getFolders(Optional rootId, RootType rootType, Integer maxDepth, Optional workspace); } diff --git a/src/main/java/org/embulk/input/marketo/MarketoServiceImpl.java b/src/main/java/org/embulk/input/marketo/MarketoServiceImpl.java index 0638d25..1141b57 100644 --- a/src/main/java/org/embulk/input/marketo/MarketoServiceImpl.java +++ b/src/main/java/org/embulk/input/marketo/MarketoServiceImpl.java @@ -1,10 +1,12 @@ package org.embulk.input.marketo; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.CaseFormat; import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; +import org.embulk.input.marketo.delegate.FolderInputPlugin.RootType; import org.embulk.input.marketo.model.BulkExtractRangeHeader; import org.embulk.input.marketo.model.MarketoField; import org.embulk.input.marketo.rest.MarketoRestClient; @@ -21,6 +23,7 @@ import java.io.OutputStream; import java.util.Date; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -275,4 +278,12 @@ public InputStream apply(BulkExtractRangeHeader bulkExtractRangeHeader) } }); } + + @Override + public Iterable getFolders(Optional rootId, RootType rootType, Integer maxDepth, Optional workspace) + { + String type = CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, rootType.name()); + Optional root = rootId.isPresent() ? Optional.of(String.format("{\"id\": %d, \"type\": \"%s\"}", rootId.get(), type)) : Optional.empty(); + return marketoRestClient.getFolders(root, maxDepth, workspace); + } } diff --git a/src/main/java/org/embulk/input/marketo/delegate/FolderInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/FolderInputPlugin.java new file mode 100644 index 0000000..4c30c13 --- /dev/null +++ b/src/main/java/org/embulk/input/marketo/delegate/FolderInputPlugin.java @@ -0,0 +1,85 @@ +package org.embulk.input.marketo.delegate; + +import com.fasterxml.jackson.annotation.JsonCreator; +import org.embulk.base.restclient.ServiceResponseMapper; +import org.embulk.base.restclient.jackson.JacksonServiceResponseMapper; +import org.embulk.base.restclient.record.ServiceRecord; +import org.embulk.base.restclient.record.ValueLocator; +import org.embulk.input.marketo.MarketoService; +import org.embulk.input.marketo.MarketoUtils; +import org.embulk.spi.type.Types; +import org.embulk.util.config.Config; +import org.embulk.util.config.ConfigDefault; + +import java.util.Iterator; +import java.util.Optional; +import java.util.stream.StreamSupport; + +public class FolderInputPlugin extends MarketoBaseInputPluginDelegate +{ + public interface PluginTask extends MarketoBaseInputPluginDelegate.PluginTask + { + @Config("root_id") + @ConfigDefault("null") + Optional getRootId(); + + @Config("root_type") + @ConfigDefault("\"folder\"") + RootType getRootType(); + + @Config("max_depth") + @ConfigDefault("2") + Integer getMaxDepth(); + + @Config("workspace") + @ConfigDefault("null") + Optional getWorkspace(); + } + + public FolderInputPlugin() + { + } + + @Override + protected Iterator getServiceRecords(MarketoService marketoService, PluginTask task) + { + return StreamSupport.stream(marketoService.getFolders( + task.getRootId(), + task.getRootType(), + task.getMaxDepth(), + task.getWorkspace() + ).spliterator(), false).map(MarketoUtils.TRANSFORM_OBJECT_TO_JACKSON_SERVICE_RECORD_FUNCTION::apply).iterator(); + } + + @Override + public ServiceResponseMapper buildServiceResponseMapper(PluginTask task) + { + JacksonServiceResponseMapper.Builder builder = JacksonServiceResponseMapper.builder(); + builder.add("id", Types.LONG) + .add("name", Types.STRING) + .add("description", Types.STRING) + .add("createdAt", Types.TIMESTAMP, MarketoUtils.MARKETO_DATE_TIME_FORMAT) + .add("updatedAt", Types.TIMESTAMP, MarketoUtils.MARKETO_DATE_TIME_FORMAT) + .add("url", Types.STRING) + .add("folderId", Types.JSON) + .add("folderType", Types.STRING) + .add("parent", Types.JSON) + .add("path", Types.STRING) + .add("isArchive", Types.BOOLEAN) + .add("isSystem", Types.BOOLEAN) + .add("accessZoneId", Types.LONG) + .add("workspace", Types.STRING); + return builder.build(); + } + + public enum RootType { + FOLDER, + PROGRAM; + + @JsonCreator + public static RootType of(String value) + { + return RootType.valueOf(value.toUpperCase()); + } + } +} diff --git a/src/main/java/org/embulk/input/marketo/rest/MarketoRESTEndpoint.java b/src/main/java/org/embulk/input/marketo/rest/MarketoRESTEndpoint.java index 02048e4..6cebe20 100644 --- a/src/main/java/org/embulk/input/marketo/rest/MarketoRESTEndpoint.java +++ b/src/main/java/org/embulk/input/marketo/rest/MarketoRESTEndpoint.java @@ -32,7 +32,8 @@ public enum MarketoRESTEndpoint CREATE_PROGRAM_MEMBERS_EXPORT_JOB("/bulk/v1/program/members/export/create.json"), START_PROGRAM_MEMBERS_EXPORT_JOB("/bulk/v1/program/members/export/${export_id}/enqueue.json"), GET_PROGRAM_MEMBERS_EXPORT_STATUS("/bulk/v1/program/members/export/${export_id}/status.json"), - GET_PROGRAM_MEMBERS_EXPORT_RESULT("/bulk/v1/program/members/export/${export_id}/file.json"); + GET_PROGRAM_MEMBERS_EXPORT_RESULT("/bulk/v1/program/members/export/${export_id}/file.json"), + GET_FOLDERS("/rest/asset/v1/folders.json"); private final String endpoint; MarketoRESTEndpoint(String endpoint) diff --git a/src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java b/src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java index 092e6bf..9eefdac 100644 --- a/src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java +++ b/src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java @@ -628,4 +628,19 @@ public InputStream getProgramMemberBulkExtractResult(String exportId, BulkExtrac { return getBulkExtractResult(MarketoRESTEndpoint.GET_PROGRAM_MEMBERS_EXPORT_RESULT, exportId, bulkExtractRangeHeader); } + + public RecordPagingIterable getFolders(Optional root, int maxDepth, Optional workspace) + { + ImmutableListMultimap.Builder builder = new ImmutableListMultimap + .Builder() + .put("maxDepth", String.valueOf(maxDepth)) + .put(MAX_RETURN, DEFAULT_MAX_RETURN); + if (root.isPresent()) { + builder.put("root", root.get()); + } + if (workspace.isPresent()) { + builder.put("workSpace", workspace.get()); + } + return getRecordWithOffsetPagination(endPoint + MarketoRESTEndpoint.GET_FOLDERS.getEndpoint(), builder.build(), ObjectNode.class); + } } diff --git a/src/test/java/org/embulk/input/marketo/delegate/FolderInputPluginTest.java b/src/test/java/org/embulk/input/marketo/delegate/FolderInputPluginTest.java new file mode 100644 index 0000000..35060ea --- /dev/null +++ b/src/test/java/org/embulk/input/marketo/delegate/FolderInputPluginTest.java @@ -0,0 +1,85 @@ +package org.embulk.input.marketo.delegate; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.embulk.EmbulkTestRuntime; +import org.embulk.base.restclient.ServiceResponseMapper; +import org.embulk.base.restclient.record.RecordImporter; +import org.embulk.base.restclient.record.ValueLocator; +import org.embulk.config.ConfigLoader; +import org.embulk.config.ConfigSource; +import org.embulk.input.marketo.delegate.FolderInputPlugin.PluginTask; +import org.embulk.input.marketo.rest.MarketoRestClient; +import org.embulk.input.marketo.rest.RecordPagingIterable; +import org.embulk.spi.PageBuilder; +import org.embulk.spi.Schema; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +import static org.embulk.input.marketo.MarketoUtilsTest.CONFIG_MAPPER; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class FolderInputPluginTest +{ + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Rule + public EmbulkTestRuntime runtime = new EmbulkTestRuntime(); + + private ConfigSource configSource; + + private FolderInputPlugin mockPlugin; + + private MarketoRestClient mockRestClient; + + @Before + public void setUp() throws Exception + { + mockPlugin = spy(new FolderInputPlugin()); + ConfigLoader configLoader = runtime.getInjector().getInstance(ConfigLoader.class); + configSource = configLoader.fromYaml(this.getClass().getResourceAsStream("/config/rest_config.yaml")); + mockRestClient = mock(MarketoRestClient.class); + doReturn(mockRestClient).when(mockPlugin).createMarketoRestClient(any(PluginTask.class)); + } + + @Test + @SuppressWarnings("unchecked") + public void testRun() throws IOException + { + RecordPagingIterable mockRecordPagingIterable = mock(RecordPagingIterable.class); + JavaType javaType = OBJECT_MAPPER.getTypeFactory().constructParametrizedType(List.class, List.class, ObjectNode.class); + List objectNodeList = OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("/fixtures/folder_response_full.json"), javaType); + when(mockRecordPagingIterable.spliterator()).thenReturn(objectNodeList.spliterator()); + when(mockRestClient.getFolders(Optional.empty(), 2, Optional.empty())).thenReturn(mockRecordPagingIterable); + + PluginTask task = CONFIG_MAPPER.map(configSource, PluginTask.class); + ServiceResponseMapper mapper = mockPlugin.buildServiceResponseMapper(task); + RecordImporter recordImporter = mapper.createRecordImporter(); + PageBuilder mockPageBuilder = mock(PageBuilder.class); + mockPlugin.ingestServiceData(task, recordImporter, 1, mockPageBuilder); + + verify(mockRestClient, times(1)).getFolders(Optional.empty(), 2, Optional.empty()); + Schema embulkSchema = mapper.getEmbulkSchema(); + assertEquals(embulkSchema.size(), 14); + ArgumentCaptor longArgumentCaptor = ArgumentCaptor.forClass(Long.class); + verify(mockPageBuilder, times(3)).setLong(eq(embulkSchema.lookupColumn("id")), longArgumentCaptor.capture()); + List allValues = longArgumentCaptor.getAllValues(); + assertArrayEquals(new Long[]{1001L, 1002L, 2001L}, allValues.toArray()); + } +} diff --git a/src/test/java/org/embulk/input/marketo/rest/MarketoRestClientTest.java b/src/test/java/org/embulk/input/marketo/rest/MarketoRestClientTest.java index 1b919c2..b39520e 100644 --- a/src/test/java/org/embulk/input/marketo/rest/MarketoRestClientTest.java +++ b/src/test/java/org/embulk/input/marketo/rest/MarketoRestClientTest.java @@ -657,4 +657,30 @@ public void testGetListsByIds() throws IOException FormContentProvider form = formCaptor.getValue(); Assert.assertEquals("nextPageToken=GWP55GLCVCZLPE6SS7OCG5IEXQ%3D%3D%3D%3D%3D%3D&id=123%2C456&batchSize=300", fromContentProviderToString(form)); } + + @Test + public void getFolders() throws Exception + { + ArrayNode listPages = (ArrayNode) OBJECT_MAPPER.readTree(new String(ByteStreams.toByteArray(this.getClass().getResourceAsStream("/fixtures/folder_response.json")))).get("responses"); + MarketoResponse page1 = OBJECT_MAPPER.readValue(listPages.get(0).toString(), RESPONSE_TYPE); + MarketoResponse page2 = OBJECT_MAPPER.readValue(listPages.get(1).toString(), RESPONSE_TYPE); + doReturn(page1).doReturn(page2).when(marketoRestClient).doGet(eq(END_POINT + MarketoRESTEndpoint.GET_FOLDERS.getEndpoint()), isNull(), any(Multimap.class), any(MarketoResponseJetty92EntityReader.class)); + RecordPagingIterable lists = marketoRestClient.getFolders(Optional.empty(), 2, Optional.empty()); + Iterator iterator = lists.iterator(); + ObjectNode folder1 = iterator.next(); + ObjectNode folder2 = iterator.next(); + ObjectNode folder3 = iterator.next(); + Assert.assertFalse(iterator.hasNext()); + Assert.assertEquals("folder_test_1_name", folder1.get("name").asText()); + Assert.assertEquals("folder_test_2_name", folder2.get("name").asText()); + Assert.assertEquals("program_test_1_name", folder3.get("name").asText()); + ArgumentCaptor immutableListMultimapArgumentCaptor = ArgumentCaptor.forClass(ImmutableListMultimap.class); + verify(marketoRestClient, times(2)).doGet(eq(END_POINT + MarketoRESTEndpoint.GET_FOLDERS.getEndpoint()), isNull(), immutableListMultimapArgumentCaptor.capture(), any(MarketoResponseJetty92EntityReader.class)); + List params = immutableListMultimapArgumentCaptor.getAllValues(); + ImmutableListMultimap params1 = params.get(0); + Assert.assertEquals("0", params1.get("offset").get(0)); + Assert.assertEquals("2", params1.get("maxReturn").get(0)); + ImmutableListMultimap params2 = params.get(1); + Assert.assertEquals("2", params2.get("offset").get(0)); + } } diff --git a/src/test/resources/fixtures/folder_response.json b/src/test/resources/fixtures/folder_response.json new file mode 100644 index 0000000..5af76b8 --- /dev/null +++ b/src/test/resources/fixtures/folder_response.json @@ -0,0 +1,86 @@ +{ + "responses": [ + { + "success": true, + "errors": [], + "requestId": "15d3a#181dca1287a", + "warnings": [], + "result": [ + { + "name": "folder_test_1_name", + "description": "", + "createdAt": "2022-05-30T10:01:40Z+0000", + "updatedAt": "2022-05-30T10:01:40Z+0000", + "url": "https://app-sjdemo1.marketo.com/#MF1001A1", + "folderId": { + "id": 1001, + "type": "Folder" + }, + "folderType": "Marketing Folder", + "parent": { + "id": 1000, + "type": "Folder" + }, + "path": "/Marketing Activities/Default/folder_test_1_name", + "isArchive": false, + "isSystem": false, + "accessZoneId": 1011, + "workspace": "Default", + "id": 1001 + }, + { + "name": "folder_test_2_name", + "description": "", + "createdAt": "2022-07-08T06:56:01Z+0000", + "updatedAt": "2022-07-08T06:56:01Z+0000", + "url": "https://app-sjdemo1.marketo.com/#MF1002A1", + "folderId": { + "id": 1002, + "type": "Folder" + }, + "folderType": "Marketing Folder", + "parent": { + "id": 1001, + "type": "Folder" + }, + "path": "/Marketing Activities/Default/folder_test_1_name/folder_test_2_name", + "isArchive": false, + "isSystem": false, + "accessZoneId": 1011, + "workspace": "Default", + "id": 1002 + } + ] + }, + { + "success": true, + "errors": [], + "requestId": "15d3a#181dca1287a", + "warnings": [], + "result": [ + { + "name": "program_test_1_name", + "description": "", + "createdAt": "2022-06-01T01:58:52Z+0000", + "updatedAt": "2022-06-01T01:58:52Z+0000", + "url": "https://app-sjdemo1.marketo.com/#PG2001A1", + "folderId": { + "id": 2001, + "type": "Program" + }, + "folderType": "Marketing Program", + "parent": { + "id": 1001, + "type": "Folder" + }, + "path": "/Marketing Activities/Default/folder_test_1_name/program_test_1_name", + "isArchive": false, + "isSystem": false, + "accessZoneId": 1011, + "workspace": "Default", + "id": 2001 + } + ] + } + ] +} diff --git a/src/test/resources/fixtures/folder_response_full.json b/src/test/resources/fixtures/folder_response_full.json new file mode 100644 index 0000000..34a5ac1 --- /dev/null +++ b/src/test/resources/fixtures/folder_response_full.json @@ -0,0 +1,68 @@ +[ + { + "name": "folder_test_1_name", + "description": "", + "createdAt": "2022-05-30T10:01:40Z+0000", + "updatedAt": "2022-05-30T10:01:40Z+0000", + "url": "https://app-sjdemo1.marketo.com/#MF1001A1", + "folderId": { + "id": 1001, + "type": "Folder" + }, + "folderType": "Marketing Folder", + "parent": { + "id": 1000, + "type": "Folder" + }, + "path": "/Marketing Activities/Default/folder_test_1_name", + "isArchive": false, + "isSystem": false, + "accessZoneId": 1011, + "workspace": "Default", + "id": 1001 + }, + { + "name": "folder_test_2_name", + "description": "", + "createdAt": "2022-07-08T06:56:01Z+0000", + "updatedAt": "2022-07-08T06:56:01Z+0000", + "url": "https://app-sjdemo1.marketo.com/#MF1002A1", + "folderId": { + "id": 1002, + "type": "Folder" + }, + "folderType": "Marketing Folder", + "parent": { + "id": 1001, + "type": "Folder" + }, + "path": "/Marketing Activities/Default/folder_test_1_name/folder_test_2_name", + "isArchive": false, + "isSystem": false, + "accessZoneId": 1011, + "workspace": "Default", + "id": 1002 + }, + { + "name": "program_test_1_name", + "description": "", + "createdAt": "2022-06-01T01:58:52Z+0000", + "updatedAt": "2022-06-01T01:58:52Z+0000", + "url": "https://app-sjdemo1.marketo.com/#PG2001A1", + "folderId": { + "id": 2001, + "type": "Program" + }, + "folderType": "Marketing Program", + "parent": { + "id": 1001, + "type": "Folder" + }, + "path": "/Marketing Activities/Default/folder_test_1_name/program_test_1_name", + "isArchive": false, + "isSystem": false, + "accessZoneId": 1011, + "workspace": "Default", + "id": 2001 + } +] From f4e605603ec1896896257e639e98c6b5a5355505 Mon Sep 17 00:00:00 2001 From: Tetsuro Sano Date: Wed, 13 Jul 2022 19:47:54 +0900 Subject: [PATCH 34/39] Fix tag pattern --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 31ec02d..c4d2a6f 100644 --- a/build.gradle +++ b/build.gradle @@ -22,7 +22,7 @@ version = { if (vd.lastTag != "${tag}") { logger.warn "lastTag '${vd.lastTag}' is not '${tag}'" } - if (vd.commitDistance == 0 && vd.lastTag ==~ /^[0-9]+\.[0-9]+\.[0-9]+(\[.-][.a-zA-Z0-9-]+)?/) { + if (vd.commitDistance == 0 && vd.lastTag ==~ /^[0-9]+\.[0-9]+\.[0-9]+([.-][.a-zA-Z0-9-]+)?/) { vd.lastTag } else { "0.0.0.${vd.gitHash}" From 09363143ef1f7255726d6cb4ca6452b9f731b739 Mon Sep 17 00:00:00 2001 From: pn-koshikawa Date: Tue, 23 Jan 2024 04:08:40 +0000 Subject: [PATCH 35/39] update compileClasspath --- gradle/dependency-locks/compileClasspath.lockfile | 1 + 1 file changed, 1 insertion(+) diff --git a/gradle/dependency-locks/compileClasspath.lockfile b/gradle/dependency-locks/compileClasspath.lockfile index faf071a..21d3a22 100644 --- a/gradle/dependency-locks/compileClasspath.lockfile +++ b/gradle/dependency-locks/compileClasspath.lockfile @@ -17,6 +17,7 @@ javax.xml.bind:jaxb-api:2.2.11 net.jcip:jcip-annotations:1.0 org.apache.bval:bval-core:0.5 org.apache.bval:bval-jsr303:0.5 +org.apache.commons:commons-csv:1.8 org.apache.commons:commons-lang3:3.12.0 org.eclipse.jetty:jetty-client:9.4.51.v20230217 org.eclipse.jetty:jetty-http:9.4.51.v20230217 From ffb524aa6d67d0df2b0569ca9d2e3ff6bc3349e1 Mon Sep 17 00:00:00 2001 From: pn-koshikawa Date: Tue, 23 Jan 2024 07:24:37 +0000 Subject: [PATCH 36/39] fix build error --- .../org/embulk/input/marketo/rest/MarketoRestClientTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/org/embulk/input/marketo/rest/MarketoRestClientTest.java b/src/test/java/org/embulk/input/marketo/rest/MarketoRestClientTest.java index 4c86d5d..59f6e24 100644 --- a/src/test/java/org/embulk/input/marketo/rest/MarketoRestClientTest.java +++ b/src/test/java/org/embulk/input/marketo/rest/MarketoRestClientTest.java @@ -664,7 +664,7 @@ public void getFolders() throws Exception ArrayNode listPages = (ArrayNode) OBJECT_MAPPER.readTree(new String(ByteStreams.toByteArray(this.getClass().getResourceAsStream("/fixtures/folder_response.json")))).get("responses"); MarketoResponse page1 = OBJECT_MAPPER.readValue(listPages.get(0).toString(), RESPONSE_TYPE); MarketoResponse page2 = OBJECT_MAPPER.readValue(listPages.get(1).toString(), RESPONSE_TYPE); - doReturn(page1).doReturn(page2).when(marketoRestClient).doGet(eq(END_POINT + MarketoRESTEndpoint.GET_FOLDERS.getEndpoint()), isNull(), any(Multimap.class), any(MarketoResponseJetty92EntityReader.class)); + doReturn(page1).doReturn(page2).when(marketoRestClient).doGet(eq(END_POINT + MarketoRESTEndpoint.GET_FOLDERS.getEndpoint()), isNull(), any(Multimap.class), any(MarketoResponseJettyEntityReader.class)); RecordPagingIterable lists = marketoRestClient.getFolders(Optional.empty(), 2, Optional.empty()); Iterator iterator = lists.iterator(); ObjectNode folder1 = iterator.next(); @@ -675,7 +675,7 @@ public void getFolders() throws Exception Assert.assertEquals("folder_test_2_name", folder2.get("name").asText()); Assert.assertEquals("program_test_1_name", folder3.get("name").asText()); ArgumentCaptor immutableListMultimapArgumentCaptor = ArgumentCaptor.forClass(ImmutableListMultimap.class); - verify(marketoRestClient, times(2)).doGet(eq(END_POINT + MarketoRESTEndpoint.GET_FOLDERS.getEndpoint()), isNull(), immutableListMultimapArgumentCaptor.capture(), any(MarketoResponseJetty92EntityReader.class)); + verify(marketoRestClient, times(2)).doGet(eq(END_POINT + MarketoRESTEndpoint.GET_FOLDERS.getEndpoint()), isNull(), immutableListMultimapArgumentCaptor.capture(), any(MarketoResponseJettyEntityReader.class)); List params = immutableListMultimapArgumentCaptor.getAllValues(); ImmutableListMultimap params1 = params.get(0); Assert.assertEquals("0", params1.get("offset").get(0)); From 82909245526f54e08072ed0777d005af56f5bf2f Mon Sep 17 00:00:00 2001 From: pn-koshikawa Date: Tue, 30 Jan 2024 02:50:23 +0000 Subject: [PATCH 37/39] handle-null-pointer-exception-when-multithread --- .../delegate/ProgramMembersBulkExtractInputPlugin.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPlugin.java index bdac277..c377f6e 100644 --- a/src/main/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPlugin.java +++ b/src/main/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPlugin.java @@ -55,6 +55,7 @@ public class ProgramMembersBulkExtractInputPlugin extends MarketoBaseInputPluginDelegate { private final Logger logger = LoggerFactory.getLogger(getClass()); + private final Object pageBuilderLock = new Object(); public interface PluginTask extends MarketoBaseInputPluginDelegate.PluginTask, CsvTokenizer.PluginTask { @@ -214,7 +215,9 @@ private Future createFutureTask(PluginTask task, RecordImporter recordImporte while (csvRecords.hasNext()) { Map csvRecord = csvRecords.next(); ObjectNode objectNode = MarketoUtils.OBJECT_MAPPER.valueToTree(csvRecord); - recordImporter.importRecord(new AllStringJacksonServiceRecord(objectNode), pageBuilder); + synchronized (pageBuilderLock) { + recordImporter.importRecord(new AllStringJacksonServiceRecord(objectNode), pageBuilder); + } imported = imported + 1; } From f1939a8933a80d3f0cf88d9cfc5b1ee4c4f83e6b Mon Sep 17 00:00:00 2001 From: pn-koshikawa Date: Tue, 30 Jan 2024 03:09:16 +0000 Subject: [PATCH 38/39] update trocco version --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index f48c412..ea6ee48 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ group = "com.treasuredata.embulk.plugins" description = "Loads records from Marketo." version = { def baseVersion = "0.6.26" - def troccoVersion = "0.1.0" + def troccoVersion = "0.1.1" def tag = "${baseVersion}-trocco-${troccoVersion}" def vd = versionDetails() if (vd.lastTag != "${tag}") { From ea6311ad9fc31f8b5eeb574de539ae61cd991a9e Mon Sep 17 00:00:00 2001 From: pn-koshikawa Date: Tue, 30 Jan 2024 06:53:22 +0000 Subject: [PATCH 39/39] update comment --- .../marketo/delegate/ProgramMembersBulkExtractInputPlugin.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPlugin.java index c377f6e..cb6f98b 100644 --- a/src/main/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPlugin.java +++ b/src/main/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPlugin.java @@ -215,6 +215,7 @@ private Future createFutureTask(PluginTask task, RecordImporter recordImporte while (csvRecords.hasNext()) { Map csvRecord = csvRecords.next(); ObjectNode objectNode = MarketoUtils.OBJECT_MAPPER.valueToTree(csvRecord); + // MEMO: pageBuilderがスレッドアンセーフなために排他制御を利用する synchronized (pageBuilderLock) { recordImporter.importRecord(new AllStringJacksonServiceRecord(objectNode), pageBuilder); }