package wf.servitor.engine import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.sendBlocking import kotlinx.coroutines.selects.whileSelect import org.apache.activemq.artemis.api.core.client.ClientMessage import org.apache.commons.jexl3.JexlEngine import org.apache.commons.jexl3.JexlException import org.koin.core.KoinComponent import org.koin.core.inject import org.koin.core.qualifier.named import wf.servitor.common.Event.* import wf.servitor.common.event.Session import wf.servitor.common.workflow.Step import wf.servitor.common.workflow.StepContinuation import wf.servitor.engine.dispatcher.NamespaceAwareMapContext import wf.servitor.engine.exception.UnresolvedRemoteCallException class Engine : KoinComponent { private val jexl by inject() private val session by inject(named("queue.task")) private val channel = Channel(2) private var cancel = Channel() suspend fun run() { session.onMessage { channel.sendBlocking(it) } whileSelect { channel.onReceive { val task: Task = session.extract(it) println("Got task!") println(task) // TODO check tombstone store executeTask(task) true } cancel.onReceive { false } } session.close() } suspend fun stop() { this.cancel.send(Unit) } private fun executeTask(task: Task) { session.sendUpdate(TaskUpdate(task, "running")) val workflow = task.workflow val step = workflow.getStep(task.flow, task.path) if (step === null) { endTask(task) return } val script = step.getScript() if (script === null) { session.sendUpdate(TaskUpdate(task, "done")) nextTask(step, task) return } try { val (res, context) = this.execute(script, task) session.sendUpdate(TaskUpdate(task.copy(context = context), "done")) nextTask(step, task, context, res) } catch (e: UnresolvedRemoteCallException) { session.queueRelay(Relay(e.service, e.method, e.arguments, task)) return } catch (e: Throwable) { session.sendUpdate(TaskUpdate(task, "error")) return } } private fun nextTask( step: Step, task: Task, context: Map = task.context, result: Any? = null ) { when (val continuation = step.next(result)) { StepContinuation.End -> { val endedTask = task.copy(context = context, dispatchedValues = listOf()) endTask(endedTask) } StepContinuation.Continue -> { val next: List? = run select@{ val newList = task.path.toMutableList() if (newList.count() > 1) { val tree = newList.indices.drop(1).map { newList.subList(0, it) to task.workflow.getStep(task.flow, newList.subList(0, it - 1)) }.reversed() for ((path, parent) in tree) { if (parent == null) { continue } path[path.lastIndex]++ if (parent.children() < path[path.lastIndex] && null != parent.getChild(path[path.lastIndex])) { return@select path } } } val lastNext = listOf(newList.first() + 1) if (task.workflow.getStep(task.flow, lastNext) !== null) { lastNext } else { null } } if (next === null) { endTask(task, context) } else { session.queueTask(task.copy(context = context, path = next, dispatchedValues = listOf())) } } is StepContinuation.Flow -> { session.queueTask(Task(task.workflow, task.context, continuation.name)) } is StepContinuation.Step -> { val stepList: List = task.workflow.flows[task.flow]?.steps ?: listOf() var found = false for (item in 0..stepList.count()) { if (stepList[item].name == continuation.name) { session.queueTask( task.copy( context = context, path = listOf(item), dispatchedValues = listOf() ) ) found = true break } } if (!found) { session.sendUpdate(TaskUpdate(task, "error")) TODO("Do correct error handling") } } } } private fun endTask( task: Task, context: Map = task.context ) { session.sendUpdate(TaskUpdate(task.copy(dispatchedValues = listOf(), context = context), "ended")) } private fun execute(input: String, task: Task): Pair> { try { val mutableContext = task.context.toMutableMap() val jc = NamespaceAwareMapContext(task.workflow, mutableContext, task.dispatchedValues.toMutableList()) val script = jexl.createScript(input) return script.execute(jc) to mutableContext } catch (j: JexlException) { val cause = j.cause if (cause is UnresolvedRemoteCallException) { throw cause } throw j } } }