commit e4b0e720fab071744d1ac62f438d6e51ea306ac3 Author: eater <=@eater.me> Date: Mon Jan 27 22:09:29 2020 +0100 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e59fc6d --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ + +*.iml +*.ipr +*.iws +.gradle +build diff --git a/README.md b/README.md new file mode 100644 index 0000000..9fed555 --- /dev/null +++ b/README.md @@ -0,0 +1,31 @@ +# Servitor + +> A workflow engine + +# Components + +This project is split is multiple components which _should_ allow for effective clustering of relevant parts. + +## [Engine](/engine) + +The main component of Servitor, this part executes all logic noted in the workflow files + +## [Relay](/relay) + +The relay component does all the talking to remote endpoints, waits for their answers and puts the result back in the queue for the engine to process + +## [Gateway](/gateway) + +The gateway component observes and records all actions happening on the relay and in the engine. +It also serves the API (and When Time Comes the web frontend). + +## [Monolith](/monolith) + +Monolith is a special version of Servitor, which combines all tech necessary to run Servitor in a single jar. +This will be most likely the first thing you'll want to use if you want to play around with Servitor or want to deploy it in a small setup. + +Monolith will not require -any- outside dependencies, it will use a bundled software for the message queues (ActiveMQ), key value store (???), and relation database (SQLite). + + + + diff --git a/blobs/example-config.toml b/blobs/example-config.toml new file mode 100644 index 0000000..c2b4ea1 --- /dev/null +++ b/blobs/example-config.toml @@ -0,0 +1,10 @@ +[servitor] + +[web] +bind = ":8888" + +[queue] +uri = "tcp://localhost:61616" +username = "artemis" +password = "simetraehcapa" + diff --git a/build.gradle b/build.gradle new file mode 100644 index 0000000..324d8cb --- /dev/null +++ b/build.gradle @@ -0,0 +1,79 @@ +plugins { + id 'org.jetbrains.kotlin.jvm' version '1.3.61' + id 'java' + id 'idea' +} + + +repositories { + mavenCentral() + jcenter() +} + +subprojects { + // Hack to allow defining global dependencies + apply plugin: 'java' + apply plugin: 'org.jetbrains.kotlin.jvm' + + group = 'wf.servitor' + version = '1.0-SNAPSHOT' + + def versions = project.ext.versions = [ + kotlin : '1.3.61', + jackson : '2.9.10', + joda_time : '2.10.5', + koin : '2.0.1', + kotlinx_coroutines: '1.3.2', + jexl : '3.1', + junit : '5.5.2', + activemq : '2.11.0', + ] + + repositories { + mavenCentral() + jcenter() + } + + dependencies { + // Kotlin + implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${versions.kotlin}" + + // Kotlin tests + implementation "org.jetbrains.kotlin:kotlin-test:${versions.kotlin}" + implementation "org.jetbrains.kotlin:kotlin-test-junit5:${versions.kotlin}" + + // Kotlin Coroutines + implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:${versions.kotlinx_coroutines}" + + // Koin + implementation "org.koin:koin-core:${versions.koin}" + + // ActiveMQ + implementation "org.apache.activemq:artemis-core-client:${versions.activemq}" + + // JUnit + testImplementation "org.junit.jupiter:junit-jupiter:${versions.junit}" + + if (project.name != 'common') { + implementation project(':common') + } + } + + compileKotlin { + kotlinOptions.jvmTarget = "12" + } + + compileTestKotlin { + kotlinOptions.jvmTarget = "12" + } + + sourceCompatibility = 12 + targetCompatibility = 12 + + test { + useJUnitPlatform() + testLogging { + events "passed", "skipped", "failed" + } + } +} diff --git a/common/build.gradle b/common/build.gradle new file mode 100644 index 0000000..cfa03df --- /dev/null +++ b/common/build.gradle @@ -0,0 +1,25 @@ +plugins { + id 'org.jetbrains.kotlin.jvm' + id 'java' +} + +def versions = project.ext.versions; + +repositories { + mavenCentral() +} + +dependencies { + // Jackson + implementation "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" + implementation "com.fasterxml.jackson.core:jackson-databind:${versions.jackson}" + implementation "com.fasterxml.jackson.module:jackson-module-kotlin:${versions.jackson}" + implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}" + + // Joda Time + implementation "joda-time:joda-time:${versions.joda_time}" + + // Konf + implementation 'com.uchuhimo:konf-core:0.22.1' + implementation 'com.uchuhimo:konf-toml:0.22.1' +} diff --git a/common/src/main/kotlin/wf/servitor/common/Event.kt b/common/src/main/kotlin/wf/servitor/common/Event.kt new file mode 100644 index 0000000..49b2a12 --- /dev/null +++ b/common/src/main/kotlin/wf/servitor/common/Event.kt @@ -0,0 +1,38 @@ +package wf.servitor.common + +import wf.servitor.common.workflow.Workflow +import java.io.Serializable +import java.util.* + +sealed class Event : Serializable { + data class Task( + override val workflow: Workflow, + val context: Map = mapOf(), + override val flow: String = "entry", + override val path: List = listOf(0), + val dispatchedValues: List = listOf(), + override val id: UUID = UUID.randomUUID() + ) : Event(), TaskLike + + + data class Relay( + val service: String, + val method: String, + val arguments: List, + val task: Task, + val id: UUID = UUID.randomUUID() + ) : Event() + + data class TaskUpdate( + val task: TaskLike, + val status: String + ) : Event() + + + interface TaskLike : Serializable { + val id: UUID + val workflow: Workflow + val flow: String + val path: List + } +} diff --git a/common/src/main/kotlin/wf/servitor/common/cli/MainCommand.kt b/common/src/main/kotlin/wf/servitor/common/cli/MainCommand.kt new file mode 100644 index 0000000..c6a2f37 --- /dev/null +++ b/common/src/main/kotlin/wf/servitor/common/cli/MainCommand.kt @@ -0,0 +1,2 @@ +package wf.servitor.common.cli + diff --git a/common/src/main/kotlin/wf/servitor/common/commonModule.kt b/common/src/main/kotlin/wf/servitor/common/commonModule.kt new file mode 100644 index 0000000..21ddea3 --- /dev/null +++ b/common/src/main/kotlin/wf/servitor/common/commonModule.kt @@ -0,0 +1,43 @@ +package wf.servitor.common + +import org.apache.activemq.artemis.api.core.client.ActiveMQClient +import org.apache.activemq.artemis.api.core.client.ClientSession +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory +import org.apache.activemq.artemis.api.core.client.ServerLocator +import org.koin.core.qualifier.named +import org.koin.core.scope.Scope +import org.koin.dsl.module +import wf.servitor.common.config.Config +import wf.servitor.common.config.createConfig +import wf.servitor.common.event.Session + +fun Scope.config(): Config = get() + +val commonModule = module { + single { createConfig(getOrNull(named("argument.config"))) } + + single { ActiveMQClient.createServerLocator(config().queue.url) } + + single { get().createSessionFactory() } + + single { + val serverLocator: ServerLocator = get() + val config = config().queue + + get().createSession( + config.username, + config.password, + false, + true, + true, + false, + serverLocator.ackBatchSize + ) + } + + single { get().createProducer() } + + single(named("queue.task")) { + Session(get(), config().queue.names.taskQueue) + } +} diff --git a/common/src/main/kotlin/wf/servitor/common/config/Config.kt b/common/src/main/kotlin/wf/servitor/common/config/Config.kt new file mode 100644 index 0000000..0cc5ea5 --- /dev/null +++ b/common/src/main/kotlin/wf/servitor/common/config/Config.kt @@ -0,0 +1,4 @@ +package wf.servitor.common.config + +class Config(val queue: Queue) { +} diff --git a/common/src/main/kotlin/wf/servitor/common/config/Queue.kt b/common/src/main/kotlin/wf/servitor/common/config/Queue.kt new file mode 100644 index 0000000..51cc05c --- /dev/null +++ b/common/src/main/kotlin/wf/servitor/common/config/Queue.kt @@ -0,0 +1,28 @@ +package wf.servitor.common.config + +class Queue( + val url: String, + val username: String?, + val password: String?, + val names: Names +) { + + class Names( + val prefix: String, + val task: String, + val relay: String, + val observer: String + ) { + val taskAddress + get() = "$prefix.$task" + + val taskQueue + get() = "$prefix.$task.queue" + + val relayAddress + get() = "$prefix.$relay" + + val relayQueue + get() = "$prefix.$relay.queue" + } +} diff --git a/common/src/main/kotlin/wf/servitor/common/config/QueueSpec.kt b/common/src/main/kotlin/wf/servitor/common/config/QueueSpec.kt new file mode 100644 index 0000000..3cc564e --- /dev/null +++ b/common/src/main/kotlin/wf/servitor/common/config/QueueSpec.kt @@ -0,0 +1,34 @@ +package wf.servitor.common.config + +import com.uchuhimo.konf.ConfigSpec + +object QueueSpec : ConfigSpec() { + val url by optional("tcp://localhost:61616", description = "url of ActiveMQ server") + val username by optional(null, description = "username for ActiveMQ server") + val password by optional(null, description = "password for ActiveMQ server") + + object Names : ConfigSpec() { + val namePrefix by optional( + "servitor", + name = "prefix", + description = "the prefix of all queues and addresses used in ActiveMQ" + ) + + val task by optional( + "task", + description = "the name of the task address, the queue will always appear at [prefix].[task].queue" + ) + + val observation by optional( + "observe", + description = "the name of the observation queue, which will listen to all messages send to [prefix].#" + ) + + val relay by optional( + "relay", + description = "the name of the relay address, the queue will appear at [prefix].[relay].queue" + ) + } +} + + diff --git a/common/src/main/kotlin/wf/servitor/common/config/config.kt b/common/src/main/kotlin/wf/servitor/common/config/config.kt new file mode 100644 index 0000000..26117f4 --- /dev/null +++ b/common/src/main/kotlin/wf/servitor/common/config/config.kt @@ -0,0 +1,39 @@ +package wf.servitor.common.config + +import com.uchuhimo.konf.source.toml +import com.uchuhimo.konf.Config.Companion as KonfConfig + +fun createConfig(file: String? = null): Config { + val config = run { + var config = KonfConfig { + addSpec(QueueSpec) + } + .from.toml.file("/etc/servitor/config.toml", optional = true) + .from.json.file("/etc/servitor/config.json", optional = true) + + if (file !== null) { + config = if (file.endsWith("json")) { + config.from.json.file(file) + } else { + config.from.toml.file(file) + } + } + + config + } + + return Config( + Queue( + config[QueueSpec.url], + config[QueueSpec.username], + config[QueueSpec.password], + Queue.Names( + config[QueueSpec.Names.namePrefix], + config[QueueSpec.Names.task], + config[QueueSpec.Names.relay], + config[QueueSpec.Names.observation] + ) + ) + + ) +} diff --git a/common/src/main/kotlin/wf/servitor/common/event/Session.kt b/common/src/main/kotlin/wf/servitor/common/event/Session.kt new file mode 100644 index 0000000..eb86cf2 --- /dev/null +++ b/common/src/main/kotlin/wf/servitor/common/event/Session.kt @@ -0,0 +1,86 @@ +package wf.servitor.common.event + +import org.apache.activemq.artemis.api.core.Message +import org.apache.activemq.artemis.api.core.RoutingType +import org.apache.activemq.artemis.api.core.client.ClientMessage +import org.apache.activemq.artemis.api.core.client.ClientSession +import wf.servitor.common.Event +import java.io.* +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock + +class Session(private val session: ClientSession, private val queue: String) { + private val consumer by lazy { + lock.withLock { + session.start() + } + session.createConsumer(queue) + } + + private val lock = ReentrantLock() + + private val producer by lazy { session.createProducer() } + + private fun makeMessage(routingType: RoutingType, obj: Event, group: String? = null): Message { + return session.createMessage(true).also { + it.routingType = routingType + + group?.let(it::setGroupID) + val bbos = ByteArrayOutputStream() + val oos = ObjectOutputStream(bbos) + oos.writeObject(obj) + it.bodyBuffer.writeBytes(bbos.toByteArray()) + } + } + + fun sendUpdate(update: Event.TaskUpdate) = lock.withLock { + println("Sending update") + println(update) + producer.send( + "servitor.observer", + makeMessage(RoutingType.ANYCAST, update, "servitor.task.${update.task.id}") + ) + } + + fun queueTask(task: Event.Task) = lock.withLock { + println("Sending task") + println(task) + + producer.send( + "servitor.task", + makeMessage(RoutingType.ANYCAST, task, "servitor.task.${task.id}") + ) + } + + fun queueRelay(relay: Event.Relay) = lock.withLock { + producer.send( + "servitor.relay", + makeMessage(RoutingType.ANYCAST, relay, "servitor.task.${relay.task.id}") + ) + } + + fun onMessage(block: (ClientMessage) -> Unit) { + lock.withLock { + consumer.setMessageHandler(block) + } + } + + fun close() { + session.commit() + session.close() + } + + inline fun extract(message: ClientMessage): T { + message.acknowledge() + + val byteArray = ByteArray(message.bodySize) + message.bodyBuffer.readBytes(byteArray) + val obj = ObjectInputStream(ByteArrayInputStream(byteArray)).readObject() + + if (obj !is T) { + throw RuntimeException("Queue is FUCKED") + } + + return obj + } +} diff --git a/common/src/main/kotlin/wf/servitor/common/utils/JacksonModule.kt b/common/src/main/kotlin/wf/servitor/common/utils/JacksonModule.kt new file mode 100644 index 0000000..2030292 --- /dev/null +++ b/common/src/main/kotlin/wf/servitor/common/utils/JacksonModule.kt @@ -0,0 +1,31 @@ +package wf.servitor.common.utils + +import com.fasterxml.jackson.core.JsonGenerator +import com.fasterxml.jackson.core.JsonParser +import com.fasterxml.jackson.core.type.TypeReference +import com.fasterxml.jackson.databind.DeserializationContext +import com.fasterxml.jackson.databind.JsonDeserializer +import com.fasterxml.jackson.databind.JsonSerializer +import com.fasterxml.jackson.databind.SerializerProvider +import com.fasterxml.jackson.databind.module.SimpleModule +import org.joda.time.DateTime +import wf.servitor.common.workflow.Flow +import wf.servitor.common.workflow.Step + +object JacksonModule : SimpleModule() { + val listStepType = object : TypeReference>() {} + + init { + addDeserializer(Flow::class.java, object : JsonDeserializer() { + override fun deserialize(p: JsonParser, ctxt: DeserializationContext): Flow { + return Flow(p.readValueAs>(listStepType).toMutableList()) + } + }) + + addSerializer(DateTime::class.java, object : JsonSerializer() { + override fun serialize(value: DateTime, gen: JsonGenerator, serializers: SerializerProvider) { + gen.writeString(value.toString()) + } + }) + } +} \ No newline at end of file diff --git a/common/src/main/kotlin/wf/servitor/common/workflow/Document.kt b/common/src/main/kotlin/wf/servitor/common/workflow/Document.kt new file mode 100644 index 0000000..99247b3 --- /dev/null +++ b/common/src/main/kotlin/wf/servitor/common/workflow/Document.kt @@ -0,0 +1,9 @@ +package wf.servitor.common.workflow + +import java.io.Serializable + + +class Document( + val name: String = "", + val workflow: Workflow +) : Serializable \ No newline at end of file diff --git a/common/src/main/kotlin/wf/servitor/common/workflow/Flow.kt b/common/src/main/kotlin/wf/servitor/common/workflow/Flow.kt new file mode 100644 index 0000000..c706943 --- /dev/null +++ b/common/src/main/kotlin/wf/servitor/common/workflow/Flow.kt @@ -0,0 +1,17 @@ +package wf.servitor.common.workflow + +import java.io.Serializable + +data class Flow(val steps: List = listOf()) : Serializable { + fun getStep(path: List): Step? { + if (path.isEmpty()) { + return null + } + + return path + .drop(1) + .fold(steps.getOrNull(path.first())) { s, index -> + s?.getChild(index) + } + } +} \ No newline at end of file diff --git a/common/src/main/kotlin/wf/servitor/common/workflow/Jackson.kt b/common/src/main/kotlin/wf/servitor/common/workflow/Jackson.kt new file mode 100644 index 0000000..6c13dd0 --- /dev/null +++ b/common/src/main/kotlin/wf/servitor/common/workflow/Jackson.kt @@ -0,0 +1,23 @@ +package wf.servitor.common.workflow + +import com.fasterxml.jackson.databind.DeserializationFeature +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.PropertyNamingStrategy +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory +import com.fasterxml.jackson.module.kotlin.registerKotlinModule +import wf.servitor.common.utils.JacksonModule + +object Jackson { + val yaml = ObjectMapper(YAMLFactory()).apply { + registerKotlinModule() + registerModule(JacksonModule) + disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) + propertyNamingStrategy = PropertyNamingStrategy.LOWER_CAMEL_CASE + } + + val json = ObjectMapper().apply { + registerKotlinModule() + registerModule(JacksonModule) + propertyNamingStrategy = PropertyNamingStrategy.LOWER_CAMEL_CASE + } +} \ No newline at end of file diff --git a/common/src/main/kotlin/wf/servitor/common/workflow/Step.kt b/common/src/main/kotlin/wf/servitor/common/workflow/Step.kt new file mode 100644 index 0000000..8d3ca7e --- /dev/null +++ b/common/src/main/kotlin/wf/servitor/common/workflow/Step.kt @@ -0,0 +1,95 @@ +package wf.servitor.common.workflow + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder +import wf.servitor.common.workflow.step.* +import java.io.Serializable + +@Suppress("unused") +@JsonDeserialize(builder = Step.Builder::class) +interface Step : Serializable { + val name: String + + fun getChild(index: Int): Step? { + return null + } + + fun children() = 0 + fun getScript(): String? = null + fun next(result: Any?): StepContinuation = StepContinuation.Continue + + @JsonPOJOBuilder(buildMethodName = "build", withPrefix = "with") + class Builder { + var name: String = "" + var `if`: String? = null + var `do`: String? = null + var then: String? = null + var `else`: String? = null + var options: List? = null + var collect: List? = null + + fun withName(value: String) { + this.name = value + } + + fun withIf(value: String) { + this.`if` = value + } + + fun withDo(value: String) { + this.`do` = value + } + + fun withThen(value: String) { + this.then = value + } + + fun withElse(value: String) { + this.`else` = value + } + + fun withOptions(value: List) { + this.options = value + } + + fun withCollect(value: List) { + this.collect = value + } + + fun build(): Step { + val `if` = this.`if` + val `do` = this.`do` + + if (`if` != null) { + if (`do` != null) { + return IfActionStep(`if`, `do`, name) + } + + val then = then + val `else` = `else` + + if (then != null || `else` != null) { + return IfJumpStep(`if`, then, `else`, name) + } + + error("Step missing 'then', 'else' or 'do'") + } + + val options = options + if (options != null) { + return OptionsStep(options, name) + } + + val collect = collect + if (collect != null) { + return CollectStep(collect, name) + } + + if (`do` != null) { + return ActionStep(`do`, name) + } + + error("No matching type of step could be found") + } + } +} \ No newline at end of file diff --git a/common/src/main/kotlin/wf/servitor/common/workflow/StepContinuation.kt b/common/src/main/kotlin/wf/servitor/common/workflow/StepContinuation.kt new file mode 100644 index 0000000..e643bb7 --- /dev/null +++ b/common/src/main/kotlin/wf/servitor/common/workflow/StepContinuation.kt @@ -0,0 +1,9 @@ +package wf.servitor.common.workflow + +sealed class StepContinuation { + object Continue : StepContinuation() + object End : StepContinuation() + + data class Flow(val name: String) : StepContinuation() + data class Step(val name: String) : StepContinuation() +} \ No newline at end of file diff --git a/common/src/main/kotlin/wf/servitor/common/workflow/Workflow.kt b/common/src/main/kotlin/wf/servitor/common/workflow/Workflow.kt new file mode 100644 index 0000000..c8a446b --- /dev/null +++ b/common/src/main/kotlin/wf/servitor/common/workflow/Workflow.kt @@ -0,0 +1,11 @@ +package wf.servitor.common.workflow + +import java.io.Serializable + +class Workflow( + val entry: Flow = Flow(), + val flows: MutableMap = mutableMapOf(), + val services: MutableMap> = mutableMapOf() +) : Serializable { + fun getStep(flow: String, path: List): Step? = (if (flow == "entry") entry else flows.get(flow))?.getStep(path) +} \ No newline at end of file diff --git a/common/src/main/kotlin/wf/servitor/common/workflow/step/ActionStep.kt b/common/src/main/kotlin/wf/servitor/common/workflow/step/ActionStep.kt new file mode 100644 index 0000000..28b61bd --- /dev/null +++ b/common/src/main/kotlin/wf/servitor/common/workflow/step/ActionStep.kt @@ -0,0 +1,7 @@ +package wf.servitor.common.workflow.step + +import wf.servitor.common.workflow.Step + +class ActionStep(val `do`: String, override val name: String) : Step { + override fun getScript() = `do` +} \ No newline at end of file diff --git a/common/src/main/kotlin/wf/servitor/common/workflow/step/CollectStep.kt b/common/src/main/kotlin/wf/servitor/common/workflow/step/CollectStep.kt new file mode 100644 index 0000000..7a04bfd --- /dev/null +++ b/common/src/main/kotlin/wf/servitor/common/workflow/step/CollectStep.kt @@ -0,0 +1,5 @@ +package wf.servitor.common.workflow.step + +import wf.servitor.common.workflow.Step + +class CollectStep(val collect: List, override val name: String) : Step \ No newline at end of file diff --git a/common/src/main/kotlin/wf/servitor/common/workflow/step/IfActionStep.kt b/common/src/main/kotlin/wf/servitor/common/workflow/step/IfActionStep.kt new file mode 100644 index 0000000..3e86c50 --- /dev/null +++ b/common/src/main/kotlin/wf/servitor/common/workflow/step/IfActionStep.kt @@ -0,0 +1,5 @@ +package wf.servitor.common.workflow.step + +import wf.servitor.common.workflow.Step + +class IfActionStep(val `if`: String, val `do`: String, override val name: String = `if`) : Step \ No newline at end of file diff --git a/common/src/main/kotlin/wf/servitor/common/workflow/step/IfJumpStep.kt b/common/src/main/kotlin/wf/servitor/common/workflow/step/IfJumpStep.kt new file mode 100644 index 0000000..fc6248a --- /dev/null +++ b/common/src/main/kotlin/wf/servitor/common/workflow/step/IfJumpStep.kt @@ -0,0 +1,10 @@ +package wf.servitor.common.workflow.step + +import wf.servitor.common.workflow.Step + +class IfJumpStep(val `if`: String, val then: String? = null, val `else`: String? = null, override val name: String = `if`) : + Step { + override fun getScript(): String? { + return `if` + } +} \ No newline at end of file diff --git a/common/src/main/kotlin/wf/servitor/common/workflow/step/OptionsStep.kt b/common/src/main/kotlin/wf/servitor/common/workflow/step/OptionsStep.kt new file mode 100644 index 0000000..cfbcba9 --- /dev/null +++ b/common/src/main/kotlin/wf/servitor/common/workflow/step/OptionsStep.kt @@ -0,0 +1,7 @@ +package wf.servitor.common.workflow.step + +import wf.servitor.common.workflow.Step + +class OptionsStep(val options: List