Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,40 @@
*/
package com.snowplowanalytics.snowplow.micro

import cats.Id
import java.io.File
import java.net.URI
import java.nio.file.{Path, Paths}
import java.security.{KeyStore, SecureRandom}
import java.util.concurrent.TimeUnit
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}

import pureconfig.generic.auto._
import pureconfig.generic.{FieldCoproductHint, ProductHint}
import pureconfig.{CamelCase, ConfigFieldMapping, ConfigSource}

import scala.io.Source

import com.typesafe.config.{Config, ConfigFactory}

import cats.effect.Clock
import cats.implicits._

import io.circe.Json
import io.circe.parser.parse
import io.circe.syntax._

import com.snowplowanalytics.iglu.client.IgluCirceClient
import com.snowplowanalytics.iglu.client.resolver.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.Registry

import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}

import com.snowplowanalytics.snowplow.collectors.scalastream.model.{CollectorConfig, SinkConfig}

import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf
import com.snowplowanalytics.snowplow.enrich.common.utils.JsonUtils
import com.typesafe.config.{Config, ConfigFactory}
import io.circe.Json
import io.circe.parser.parse
import io.circe.syntax._
import pureconfig.generic.auto._
import pureconfig.generic.{FieldCoproductHint, ProductHint}
import pureconfig.{CamelCase, ConfigFieldMapping, ConfigSource}

import java.io.File
import java.net.URI
import java.nio.file.{Path, Paths}
import java.security.{KeyStore, SecureRandom}
import java.util.concurrent.TimeUnit
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
import scala.io.Source

/** Contain functions to parse the command line arguments,
* to parse the configuration for the collector, Akka HTTP and Iglu
Expand All @@ -56,21 +63,12 @@ private[micro] object ConfigHelper {

implicit val sinkConfigHint = new FieldCoproductHint[SinkConfig]("enabled")

// Copied from Enrich - necessary for parsing enrichment configs
implicit val clockProvider: Clock[Id] = new Clock[Id] {
final def realTime(unit: TimeUnit): Id[Long] =
unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS)

final def monotonic(unit: TimeUnit): Id[Long] =
unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS)
}

type EitherS[A] = Either[String, A]

case class MicroConfig(
collectorConfig: CollectorConfig,
igluResolver: Resolver[Id],
igluClient: IgluCirceClient[Id],
igluResolver: Resolver[IO],
igluClient: IgluCirceClient[IO],
enrichmentConfigs: List[EnrichmentConf],
akkaConfig: Config,
sslContext: Option[SSLContext],
Expand Down Expand Up @@ -208,16 +206,16 @@ private[micro] object ConfigHelper {
}

/** Instantiate an Iglu client from its configuration file. */
def getIgluClientFromSource(igluConfigSource: Source, extraRegistry: Option[Registry]): Either[String, (Resolver[Id], IgluCirceClient[Id])] =
def getIgluClientFromSource(igluConfigSource: Source, extraRegistry: Option[Registry]): Either[String, (Resolver[IO], IgluCirceClient[IO])] =
for {
text <- Either.catchNonFatal(igluConfigSource.mkString).leftMap(_.getMessage)
json <- parse(text).leftMap(_.show)
config <- Resolver.parseConfig(json).leftMap(_.show)
resolver <- Resolver.fromConfig[Id](config).leftMap(_.show).value
resolver <- Resolver.fromConfig[IO](config).leftMap(_.show).value
completeResolver = resolver.copy(repos = resolver.repos ++ extraRegistry)
} yield (completeResolver, IgluCirceClient.fromResolver[Id](completeResolver, config.cacheSize))
} yield (completeResolver, IgluCirceClient.fromResolver[IO](completeResolver, config.cacheSize))

def getEnrichmentRegistryFromPath(path: Path, igluClient: IgluCirceClient[Id]) = {
def getEnrichmentRegistryFromPath(path: Path, igluClient: IgluCirceClient[IO]) = {
val schemaKey = SchemaKey(
"com.snowplowanalytics.snowplow",
"enrichments",
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ package com.snowplowanalytics.snowplow.micro
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.model.StatusCodes.NotFound
import cats.Id

import cats.effect.IO

import io.circe.generic.auto._

import com.snowplowanalytics.iglu.client.resolver.Resolver

import com.snowplowanalytics.iglu.core.{SchemaVer, SchemaKey}

import IdImplicits._
import CirceSupport._

class IgluService(resolver: Resolver[Id]) {
class IgluService(resolver: Resolver[IO]) {

def get(vendor: String, name: String, versionStr: String): Route =
SchemaVer.parseFull(versionStr) match {
Expand Down
107 changes: 62 additions & 45 deletions src/main/scala/com.snowplowanalytics.snowplow.micro/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,65 +12,77 @@
*/
package com.snowplowanalytics.snowplow.micro

import java.io.File

import scala.sys.process._

import org.slf4j.LoggerFactory

import akka.actor.ActorSystem
import akka.http.scaladsl.{ConnectionContext, Http}
import cats.Id

import cats.implicits._

import cats.effect.{Blocker, ContextShift, ExitCode, IO, IOApp, Sync}

import com.snowplowanalytics.snowplow.collectors.scalastream.model.CollectorSinks

import com.snowplowanalytics.forex.ZonedClock

import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, EnrichmentConf}
import com.snowplowanalytics.snowplow.enrich.common.utils.BlockerF
import com.snowplowanalytics.snowplow.micro.ConfigHelper.MicroConfig
import org.slf4j.LoggerFactory

import java.io.File
import scala.sys.process._
import com.snowplowanalytics.snowplow.micro.ConfigHelper.MicroConfig

/** Read the configuration and instantiate Snowplow Micro,
* which acts as a `Collector` and has an in-memory sink
* holding the valid and invalid events.
* It offers an HTTP endpoint to query this sink.
*/
object Main {
object Main extends IOApp {
lazy val logger = LoggerFactory.getLogger(getClass())

def main(args: Array[String]): Unit = {
val config = ConfigHelper.parseConfig(args)
def run(args: List[String]): IO[ExitCode] = {
val config = ConfigHelper.parseConfig(args.toArray)
run(config)
}

def setupEnrichments(configs: List[EnrichmentConf]): EnrichmentRegistry[Id] = {
configs.flatMap(_.filesToCache).foreach { case (uri, location) =>
logger.info(s"Downloading ${uri}...")
uri.toURL #> new File(location) !!
}

val enrichmentRegistry = EnrichmentRegistry.build[Id](configs, BlockerF.noop).value match {
case Right(ok) => ok
case Left(e) =>
throw new IllegalArgumentException(s"Error while enabling enrichments: $e.")
}

val loadedEnrichments = enrichmentRegistry.productIterator.toList.collect {
case Some(e: Enrichment) => e.getClass.getSimpleName
}
if (loadedEnrichments.nonEmpty) {
logger.info(s"Enabled enrichments: ${loadedEnrichments.mkString(", ")}")
} else {
logger.info(s"No enrichments enabled.")
}

enrichmentRegistry
def setupEnrichments(
blocker: Blocker,
configs: List[EnrichmentConf]
): IO[EnrichmentRegistry[IO]] = {
val maybeRegistry = for {
registry <- EnrichmentRegistry.build[IO](configs, BlockerF.ofBlocker[IO](blocker))
_ <- configs.flatMap(_.filesToCache).traverse_ { case (uri, location) =>
IO(logger.info(s"Downloading ${uri}...")) >>
blocker.blockOn(IO(uri.toURL #> new File(location) !!))
}
enabledEnrichments = registry.productIterator.toList.collect {
case Some(e: Enrichment) => e.getClass.getSimpleName
}
///_ <- log
} yield registry


//if (loadedEnrichments.nonEmpty) {
// logger.info(s"Enabled enrichments: ${loadedEnrichments.mkString(", ")}")
//} else {
// logger.info(s"No enrichments enabled.")
//}

maybeRegistry
}

/** Create the in-memory sink,
* get the endpoints for both the collector and to query Snowplow Micro,
* and start the HTTP server.
*/
def run(config: MicroConfig): Unit = {
def run(config: MicroConfig): IO[ExitCode] = Blocker[IO].use { blocker =>
implicit val system = ActorSystem.create("snowplow-micro", config.akkaConfig)
implicit val executionContext = system.dispatcher

val enrichmentRegistry = setupEnrichments(config.enrichmentConfigs)
val enrichmentRegistry = setupEnrichments[IO](blocker, config.enrichmentConfigs)
val sinks = CollectorSinks(
MemorySink(config.igluClient, enrichmentRegistry, config.outputEnrichedTsv),
MemorySink(config.igluClient, enrichmentRegistry, config.outputEnrichedTsv)
Expand All @@ -79,21 +91,26 @@ object Main {

val routes = Routing.getMicroRoutes(config.collectorConfig, sinks, igluService)

Http()
.newServerAt(config.collectorConfig.interface, config.collectorConfig.port)
.bind(routes)
.foreach { binding =>
logger.info(s"REST interface bound to ${binding.localAddress}")
}

config.sslContext.foreach { sslContext =>
val http = IO(
Http()
.newServerAt(config.collectorConfig.interface, config.collectorConfig.ssl.port)
.enableHttps(ConnectionContext.httpsServer(sslContext))
.newServerAt(config.collectorConfig.interface, config.collectorConfig.port)
.bind(routes)
.foreach { binding =>
logger.info(s"HTTPS REST interface bound to ${binding.localAddress}")
logger.info(s"REST interface bound to ${binding.localAddress}")
}
}
}
)

val https = IO(
config.sslContext.foreach { sslContext =>
Http()
.newServerAt(config.collectorConfig.interface, config.collectorConfig.ssl.port)
.enableHttps(ConnectionContext.httpsServer(sslContext))
.bind(routes)
.foreach { binding =>
logger.info(s"HTTPS REST interface bound to ${binding.localAddress}")
}
}
)

}.as(ExitCode.Success)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,45 @@
*/
package com.snowplowanalytics.snowplow.micro

import org.joda.time.DateTime

import org.slf4j.LoggerFactory

import cats.implicits._
import cats.Id
import cats.data.Validated

import io.circe.syntax._
import org.joda.time.DateTime
import org.slf4j.LoggerFactory

import cats.effect.IO

import com.snowplowanalytics.iglu.client.IgluCirceClient

import com.snowplowanalytics.snowplow.analytics.scalasdk.{Event, EventConverter}

import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, Payload, Processor}

import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.Sink

import com.snowplowanalytics.snowplow.enrich.common.adapters.{AdapterRegistry, RawEvent}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{EnrichmentManager, EnrichmentRegistry}
import com.snowplowanalytics.snowplow.enrich.common.loaders.ThriftLoader
import com.snowplowanalytics.snowplow.enrich.common.utils.ConversionUtils
import IdImplicits._
import com.snowplowanalytics.snowplow.badrows.BadRow.{EnrichmentFailures, SchemaViolations, TrackerProtocolViolations}
import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline

import com.snowplowanalytics.snowplow.badrows.BadRow.{EnrichmentFailures, SchemaViolations, TrackerProtocolViolations}

/** Sink of the collector that Snowplow Micro is.
* Contains the functions that are called for each tracking event sent
* to the collector endpoint.
* The events are received as `CollectorPayload`s serialized with Thrift.
* For each event it tries to validate it using Common Enrich,
* and then stores the results in-memory in [[ValidationCache]].
*/
private[micro] final case class MemorySink(igluClient: IgluCirceClient[Id], enrichmentRegistry: EnrichmentRegistry[Id], outputEnrichedTsv: Boolean) extends Sink {
private[micro] final case class MemorySink(
igluClient: IgluCirceClient[IO],
enrichmentRegistry: EnrichmentRegistry[IO],
outputEnrichedTsv: Boolean
) extends Sink {
val MaxBytes = Int.MaxValue
private val processor = Processor(buildinfo.BuildInfo.name, buildinfo.BuildInfo.version)
private lazy val logger = LoggerFactory.getLogger("EventLog")
Expand Down Expand Up @@ -69,8 +82,8 @@ private[micro] final case class MemorySink(igluClient: IgluCirceClient[Id], enri
*/
private[micro] def processThriftBytes(
thriftBytes: Array[Byte],
igluClient: IgluCirceClient[Id],
enrichmentRegistry: EnrichmentRegistry[Id],
igluClient: IgluCirceClient[IO],
enrichmentRegistry: EnrichmentRegistry[IO],
processor: Processor
): Unit =
ThriftLoader.toCollectorPayload(thriftBytes, processor) match {
Expand Down Expand Up @@ -126,11 +139,19 @@ private[micro] final case class MemorySink(igluClient: IgluCirceClient[Id], enri
*/
private[micro] def validateEvent(
rawEvent: RawEvent,
igluClient: IgluCirceClient[Id],
enrichmentRegistry: EnrichmentRegistry[Id],
igluClient: IgluCirceClient[IO],
enrichmentRegistry: EnrichmentRegistry[IO],
processor: Processor
): Either[(List[String], BadRow), GoodEvent] =
EnrichmentManager.enrichEvent[Id](enrichmentRegistry, igluClient, processor, DateTime.now(), rawEvent, EtlPipeline.FeatureFlags(acceptInvalid = false, legacyEnrichmentOrder = false), ())
EnrichmentManager.enrichEvent[IO](
enrichmentRegistry,
igluClient,
processor,
DateTime.now(),
rawEvent,
EtlPipeline.FeatureFlags(acceptInvalid = false, legacyEnrichmentOrder = false),
IO.unit
)
.subflatMap { enriched =>
EventConverter.fromEnriched(enriched)
.leftMap { failure =>
Expand Down