Skip to content

Commit

Permalink
Naming and code refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
augustjune committed Dec 1, 2019
1 parent debb696 commit ada26a8
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ rewrite.sortModifiers.order = [
]
docstrings = ScalaDoc

rewrite.neverInfix.excludeFilters = [andThen, or]
rewrite.neverInfix.excludeFilters = [andThen, or, ne]
51 changes: 25 additions & 26 deletions core/shared/src/main/scala/canoe/api/Bot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import canoe.models.messages.TelegramMessage
import canoe.models.{InputFile, Update}
import cats.effect.concurrent.{Deferred, Ref}
import cats.effect.{Concurrent, ConcurrentEffect, Resource, Timer}
import cats.instances.option._
import cats.syntax.all._
import fs2.concurrent.Topic
import fs2.{Pipe, Stream}
Expand All @@ -16,7 +17,6 @@ import scala.concurrent.duration.FiniteDuration
* interact with other Telegram users in a certain predefined way
*/
class Bot[F[_]: Concurrent] private[api] (source: UpdateSource[F]) {

/**
* Stream of all updates which your bot receives from Telegram service
*/
Expand Down Expand Up @@ -59,36 +59,36 @@ class Bot[F[_]: Concurrent] private[api] (source: UpdateSource[F]) {
* @return Stream of all updates which your bot receives from Telegram service
*/
def follow(scenarios: Scenario[F, Unit]*): Stream[F, Update] = {
def runScenarios(updates: Topic[F, Update]): Stream[F, Nothing] =
updates
.subscribe(1)
.through(pipes.messages)
.map(m => Stream.emits(scenarios).map(sc => fork(updates, m).through(sc.pipe)).parJoinUnbounded.drain)
.parJoinUnbounded

def fork(updates: Topic[F, Update], m: TelegramMessage): Stream[F, TelegramMessage] =
updates
.subscribe(1)
.through(filterMessages(m.chat.id))
.through(debounce)
.cons1(m)

def filterMessages(id: Long): Pipe[F, Update, TelegramMessage] =
_.through(pipes.messages).filter(_.chat.id == id)

def debounce[F[_]: Concurrent, A](in: Stream[F, A]): Stream[F, A] =
Stream.eval(Ref[F].of[Option[Deferred[F, A]]](None)).flatMap { ref =>
val hook = Stream
.repeatEval(Deferred[F, A])
.evalMap(df => ref.set(Some(df)) *> df.get)
def debounce[F[_]: Concurrent, A]: Pipe[F, A, A] =
input =>
Stream.eval(Ref[F].of[Option[Deferred[F, A]]](None)).flatMap { ref =>
val hook = Stream
.repeatEval(Deferred[F, A])
.evalMap(df => ref.set(Some(df)) *> df.get)

val update = in.evalMap(
a =>
ref.getAndSet(None).flatMap {
case Some(df) => df.complete(a)
case None => Concurrent[F].unit
}
)
val update = input.evalMap { a =>
ref.getAndSet(None).flatMap(_.traverse_(_.complete(a)))
}

hook.concurrently(update)
}

def track(updates: Topic[F, Update], m: TelegramMessage): Stream[F, TelegramMessage] =
debounce(updates.subscribe(1).through(filterMessages(m.chat.id))).cons1(m)

def runScenarios(updates: Topic[F, Update]): Stream[F, Nothing] =
updates
.subscribe(1)
.through(pipes.messages)
.map(m => Stream.emits(scenarios).map(sc => track(updates, m).through(sc.pipe).take(1)).parJoinUnbounded.drain)
.parJoinUnbounded
hook.concurrently(update)
}

Stream.eval(Broadcast[F, Update]).flatMap { topic =>
val pop = updates.evalTap(topic.publish1)
Expand All @@ -99,7 +99,6 @@ class Bot[F[_]: Concurrent] private[api] (source: UpdateSource[F]) {
}

object Bot {

/**
* Creates a bot which receives incoming updates using long polling mechanism.
*
Expand Down
12 changes: 6 additions & 6 deletions core/shared/src/main/scala/canoe/api/Broadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ import fs2.concurrent.{Queue, Topic}
* Each subscriber has own queue where each incoming elements is stored.
*
* Unlike stanard Topic impemenatation, doesn't contain initial value,
* thus each subscriber sees only elements that were published after
* the subscription was done.
* thus each subscriber sees only elements that were published after
* the subscription was done.
*/
private[api] class Broadcast[F[_], A](subs: Ref[F, List[Queue[F, A]]])(implicit C: Concurrent[F]) extends Topic[F, A] {
def publish: Pipe[F, A, Unit] =
_.evalMap(publish1)

def publish1(a: A): F[Unit] =
subs.get.flatMap(_.traverse(_.enqueue1(a)).void)
subs.get.flatMap(_.traverse_(_.enqueue1(a)))

def subscribe(maxQueued: Int): Stream[F, A] =
emptyQueue(maxQueued).evalTap(q => subs.update(q :: _)).flatMap(_.dequeue)
subscription(maxQueued).evalTap(q => subs.update(q :: _)).flatMap(_.dequeue)

private def emptyQueue(maxQueued: Int): Stream[F, Queue[F, A]] =
private def subscription(maxQueued: Int): Stream[F, Queue[F, A]] =
Stream.bracket(Queue.bounded[F, A](maxQueued))(
queue => subs.update(_.filter(_.ne(queue)))
queue => subs.update(_.filter(_ ne queue))
)

def subscribeSize(maxQueued: Int): Stream[F, (A, Int)] =
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/test/scala/canoe/api/BotSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class BotSpec extends AnyFunSuite {
.emits(messages.zipWithIndex.map {
case ((m, id), i) => MessageReceived(i, TextMessage(i, PrivateChat(id, None, None, None), -1, m))
})
.metered[IO](0.1.second)
.metered[IO](0.2.second)
}

test("updates returns updates from the source") {
Expand Down

0 comments on commit ada26a8

Please sign in to comment.