Skip to content

Conversation

@colmsnowplow
Copy link
Contributor

No description provided.

@colmsnowplow colmsnowplow changed the base branch from develop to kinesis-sink-tests November 10, 2023 19:42
client: KinesisAsyncClient,
streamName: String,
shardCount: Int
) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please can we have an explicit return type on this function? It helps elderly developers who don't know how to configure intellisense.


/** Resources which are shared across tests */
override val resource: Resource[IO, (Region, LocalStackContainer, Sink[IO])] =
override val resource: Resource[IO, (Region, LocalStackContainer, String => KinesisSinkConfig)] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe simpler to make the return type Resource[IO, (Region, LocalStackContainer)]. And then the spec can call getKinesisSinkConfig when needed.

.region(region)
.build()
)
def getKinesisClient(endpoint: URI, region: Region): KinesisAsyncClient =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that KinesisAsyncClient has a close() method, and we never call it.

I wonder if strictly speaking neither implementation here is correct. If the close() method is important (I don't know if it is!) then we should manage the client as a resource:

def getKinesisClient(endpoint: URI, region: Region): Resource[IO, KinesisAsyncClient] =
  ???

val testStream1Name = "test-source-stream-1"

val kinesisClient = getKinesisClient(localstack.getEndpoint, region)
createAndWaitForKinesisStream(kinesisClient, testStream1Name, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you do something blocking on a thread does not expect to be blocked. To be honest I have no idea how that affects these tests, whether that's a problem or not.

You could avoid the question by dropping this down into the for block like this:

for {
  _ <- IO.blocking{ createAndWaitForKinesisStream(kinesisClient, testStream1Name, 1) }
  refProcessed <- Ref[IO].of[List[ReceivedEvents]](Nil)
  // etc

@colmsnowplow colmsnowplow changed the base branch from kinesis-sink-tests to develop November 23, 2023 17:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants