Skip to content

Commit 2232579

Browse files
committed
fix(plugins): fix task resources should be closed correctly (#224)
Resolves: #224
1 parent fbe8b6f commit 2232579

File tree

1 file changed

+20
-3
lines changed

1 file changed

+20
-3
lines changed

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceTask.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.concurrent.ConcurrentLinkedQueue;
3939
import java.util.concurrent.atomic.AtomicBoolean;
4040
import java.util.concurrent.atomic.AtomicInteger;
41+
import java.util.concurrent.atomic.AtomicLong;
4142
import java.util.stream.Collectors;
4243

4344
/**
@@ -77,6 +78,8 @@ public class FilePulseSourceTask extends SourceTask {
7778

7879
private final Map<String, Schema> valueSchemas = new HashMap<>();
7980

81+
private final AtomicLong taskThreadId = new AtomicLong(0);
82+
8083
/**
8184
* {@inheritDoc}
8285
*/
@@ -120,9 +123,10 @@ public void onCompleted(final FileContext context) {
120123
fileURIProvider = taskConfig.getFileURIProvider();
121124

122125
running.set(true);
126+
taskThreadId.set(Thread.currentThread().getId());
123127
LOG.info("Started FilePulse source task");
124128
} catch (final Throwable t) {
125-
// This task has failed, so close any resources (may be reopened if needed) before throwing
129+
// This task has failed, so close any resources (maybe reopened if needed) before throwing
126130
closeResources();
127131
throw t;
128132
}
@@ -302,9 +306,22 @@ private SourceRecord buildSourceRecord(final FileContext context,
302306
@Override
303307
public void stop() {
304308
LOG.info("Stopping FilePulse source task");
309+
310+
// In earlier versions of Kafka Connect, 'SourceTask::stop()' was not called from the task thread.
311+
// In this case, resources should be closed at the end of 'SourceTask::poll()'
312+
// when no longer running or if there is an error.
305313
running.set(false);
306-
synchronized (this) {
307-
notify();
314+
315+
// Since https://issues.apache.org/jira/browse/KAFKA-10792 the SourceTask::stop()
316+
// is called from the source task's dedicated thread
317+
if (taskThreadId.longValue() == Thread.currentThread().getId()) {
318+
closeResources();
319+
LOG.info("Stopped FilePulse source task.");
320+
} else {
321+
// For backward-compatibility with earlier versions of Kafka Connect.
322+
synchronized (this) {
323+
notify();
324+
}
308325
}
309326
}
310327

0 commit comments

Comments
 (0)