From fd49fb5609597fbd0dacced5087462c2949433f0 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Wed, 13 Jan 2021 22:12:59 +0100 Subject: [PATCH] Working implementation of Zeroconf (#1) * Working implementation of Zeroconf * Prepare 1st version * Fix 2.12 compile issue --- .github/workflows/ci.yml | 2 +- CHANGELOG.md | 7 + README.md | 28 +- build.sbt | 63 +++-- project/Dependencies.scala | 14 +- .../scala/fr/davit/scout/ZeroconfItSpec.scala | 105 +++++++ src/main/scala/fr/davit/scout/Zeroconf.scala | 261 +++++++++++++----- .../scala/fr/davit/scout/ZeroconfSpec.scala | 60 ---- 8 files changed, 370 insertions(+), 170 deletions(-) create mode 100644 CHANGELOG.md create mode 100644 src/it/scala/fr/davit/scout/ZeroconfItSpec.scala delete mode 100644 src/test/scala/fr/davit/scout/ZeroconfSpec.scala diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5058658..444b0b5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -55,4 +55,4 @@ jobs: run: sbt ++${{ matrix.scala }} scalafmtCheckAll headerCheckAll - name: Build project - run: sbt ++${{ matrix.scala }} compile \ No newline at end of file + run: 'sbt ++${{ matrix.scala }} test it:test' \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..00bb8ba --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog + +## Unreleased + +## v0.1.0 (2021-01-13) + +Initial release \ No newline at end of file diff --git a/README.md b/README.md index 02b2b26..bf3059e 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,9 @@ Zeroconf for scala (multicast DNS service discovery) ## Versions -Work in progress... +| Version | Release date | cats version | Scala versions | +| ------- | ------------ | ----------- | ------------------- | +| `0.1.0` | ??? | `2.2.0` | `2.13.4`, `2.12.12` | ## Getting scout @@ -18,8 +20,6 @@ libraryDependencies += "fr.davit" %% "scout" % "" ## Zeroconf -Scanning for services - ```scala import cats.effect.{ContextShift, IO, Timer} import fr.davit.scout.Zeroconf @@ -34,10 +34,28 @@ import scala.concurrent.duration._ implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) -val services = Zeroconf - .scan[IO](Zeroconf.Service("googlecast", "tcp")) +// service definition +val service = Zeroconf.Service("ipp", "tcp") + +// Scanning for service instances +val instances = Zeroconf + .scan[IO](service) .interruptAfter(50.seconds) .compile .toList .unsafeRunSync() + + +// instance definition +val instance = Zeroconf.Instance( + service = service, + name = "Ed’s Party Mix", + port = 1010, + target = "eds-musicbox", + information = Map("codec" -> "ogg"), + addresses = Seq(InetAddress.getByName("169.254.150.84")) // use local address when left empty +) + +// Registering an instance +Zeroconf.register[IO](instance) ``` \ No newline at end of file diff --git a/build.sbt b/build.sbt index 96a8a1f..ddbab78 100644 --- a/build.sbt +++ b/build.sbt @@ -13,44 +13,49 @@ lazy val filterScalacOptions = { options: Seq[String] => ThisBuild / crossScalaVersions := Seq("2.13.4", "2.12.12") ThisBuild / githubWorkflowBuild := Seq( WorkflowStep.Sbt(name = Some("Check project"), commands = List("scalafmtCheckAll", "headerCheckAll")), - WorkflowStep.Sbt(name = Some("Build project"), commands = List("compile")) // TODO run tests + WorkflowStep.Sbt(name = Some("Build project"), commands = List("test", "it:test")) ) ThisBuild / githubWorkflowTargetBranches := Seq("master") ThisBuild / githubWorkflowPublishTargetBranches := Seq.empty -lazy val commonSettings = Seq( - organization := "fr.davit", - organizationName := "Michel Davit", - version := "0.1.0-SNAPSHOT", - crossScalaVersions := (ThisBuild / crossScalaVersions).value, - scalaVersion := crossScalaVersions.value.head, - scalacOptions ~= filterScalacOptions, - homepage := Some(url(s"https://github.com/$username/$repo")), - licenses += ("Apache-2.0", new URL("https://www.apache.org/licenses/LICENSE-2.0.txt")), - startYear := Some(2020), - scmInfo := Some(ScmInfo(url(s"https://github.com/$username/$repo"), s"git@github.com:$username/$repo.git")), - developers := List( - Developer( - id = s"$username", - name = "Michel Davit", - email = "michel@davit.fr", - url = url(s"https://github.com/$username") - ) - ), - publishMavenStyle := true, - Test / publishArtifact := false, - publishTo := Some(if (isSnapshot.value) Opts.resolver.sonatypeSnapshots else Opts.resolver.sonatypeStaging), - credentials ++= (for { - username <- sys.env.get("SONATYPE_USERNAME") - password <- sys.env.get("SONATYPE_PASSWORD") - } yield Credentials("Sonatype Nexus Repository Manager", "oss.sonatype.org", username, password)).toSeq -) +lazy val commonSettings = Defaults.itSettings ++ + headerSettings(Configurations.IntegrationTest) ++ + Seq( + organization := "fr.davit", + organizationName := "Michel Davit", + version := "0.1.0-SNAPSHOT", + crossScalaVersions := (ThisBuild / crossScalaVersions).value, + scalaVersion := crossScalaVersions.value.head, + scalacOptions ~= filterScalacOptions, + homepage := Some(url(s"https://github.com/$username/$repo")), + licenses += ("Apache-2.0", new URL("https://www.apache.org/licenses/LICENSE-2.0.txt")), + startYear := Some(2020), + scmInfo := Some(ScmInfo(url(s"https://github.com/$username/$repo"), s"git@github.com:$username/$repo.git")), + developers := List( + Developer( + id = s"$username", + name = "Michel Davit", + email = "michel@davit.fr", + url = url(s"https://github.com/$username") + ) + ), + publishMavenStyle := true, + Test / publishArtifact := false, + publishTo := Some(if (isSnapshot.value) Opts.resolver.sonatypeSnapshots else Opts.resolver.sonatypeStaging), + credentials ++= (for { + username <- sys.env.get("SONATYPE_USERNAME") + password <- sys.env.get("SONATYPE_PASSWORD") + } yield Credentials("Sonatype Nexus Repository Manager", "oss.sonatype.org", username, password)).toSeq, + testFrameworks += new TestFramework("munit.Framework") + ) lazy val `scout` = (project in file(".")) + .configs(IntegrationTest) .settings(commonSettings: _*) .settings( libraryDependencies ++= Seq( + Dependencies.ScalaCollectionCompat, Dependencies.Taxonomy, - Dependencies.Test.ScalaTest + Dependencies.Test.MUnit ) ) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index fea7422..0900d3d 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -3,16 +3,18 @@ import sbt._ object Dependencies { object Versions { - val Decline = "1.3.0" - val ScalaTest = "3.2.2" - val Taxonomy = "0.2.0" + val Decline = "1.3.0" + val MUnit = "0.7.20" + val ScalaCollectionCompat = "2.3.2" + val Taxonomy = "0.3.0" } - val Decline = "com.monovore" %% "decline-effect" % Versions.Decline - val Taxonomy = "fr.davit" %% "taxonomy-fs2" % Versions.Taxonomy + val Decline = "com.monovore" %% "decline-effect" % Versions.Decline + val ScalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % Versions.ScalaCollectionCompat + val Taxonomy = "fr.davit" %% "taxonomy-fs2" % Versions.Taxonomy object Test { - val ScalaTest = "org.scalatest" %% "scalatest" % Versions.ScalaTest % "test" + val MUnit = "org.scalameta" %% "munit" % Versions.MUnit % "it,test" } } diff --git a/src/it/scala/fr/davit/scout/ZeroconfItSpec.scala b/src/it/scala/fr/davit/scout/ZeroconfItSpec.scala new file mode 100644 index 0000000..0d14775 --- /dev/null +++ b/src/it/scala/fr/davit/scout/ZeroconfItSpec.scala @@ -0,0 +1,105 @@ +/* + * Copyright 2020 Michel Davit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.davit.scout + +import cats.effect.{ContextShift, IO, Timer} +import fr.davit.taxonomy.fs2.Dns +import fr.davit.taxonomy.model.record.{ + DnsAAAARecordData, + DnsARecordData, + DnsPTRRecordData, + DnsRecordClass, + DnsRecordType, + DnsSRVRecordData, + DnsTXTRecordData +} +import fr.davit.taxonomy.model.{DnsMessage, DnsPacket, DnsQuestion, DnsResponseCode, DnsType} +import fr.davit.taxonomy.scodec.DnsCodec +import fs2.Stream +import munit.FunSuite +import scodec.Codec + +import java.net.{Inet4Address, Inet6Address, InetAddress, InetSocketAddress} +import scala.concurrent.ExecutionContext + +class ZeroconfItSpec extends FunSuite { + + implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) + implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) + implicit val coder: Codec[DnsMessage] = DnsCodec.dnsMessage + + val googleCastService = Zeroconf.Service("googlecast", "tcp") + + val ipv4 = InetAddress.getByName("1.2.3.4").asInstanceOf[Inet4Address] + val ipv6 = InetAddress.getByName("[2001:db8::8a2e:370:7334]").asInstanceOf[Inet6Address] + + val googleCastInstance = Zeroconf.Instance( + googleCastService, + "Scala FTW", + 8009, + "my-awesome-hostname", + Map("key" -> "value"), + List(ipv4, ipv6) + ) + + test("discover services") { + val result = Stream + .resource(Zeroconf.localMulticastSocket[IO]()) + .flatMap(Dns.listen[IO]) + .concurrently(Zeroconf.scan[IO](googleCastService)) + .map(_.message) + .head + .compile + .toList + .unsafeRunSync() + + assert(result.nonEmpty) + assert(result.head.header.`type` == DnsType.Query) + assert(result.head.questions.head.name == "_googlecast._tcp.local") + assert(result.head.questions.head.`type` == DnsRecordType.PTR) + } + + test("register a service instance") { + val question = DnsQuestion("_googlecast._tcp.local", DnsRecordType.PTR, false, DnsRecordClass.Internet) + val message = DnsMessage.query(id = 0, isRecursionDesired = false, questions = Seq(question)) + val packet = DnsPacket(new InetSocketAddress(InetAddress.getByName("224.0.0.251"), 5353), message) + + val result = Stream + .resource(Zeroconf.localMulticastSocket[IO]()) + .flatMap(s => Stream.eval(Dns.resolve(s, packet)).drain ++ Dns.listen(s)) // on multicast, resolve will receive its owns message + .concurrently(Zeroconf.register[IO](googleCastInstance)) + .map(_.message) + .head + .compile + .toList + .unsafeRunSync() + + assert(result.nonEmpty) + assert(result.head.header.`type` == DnsType.Response) + assert(result.head.header.responseCode == DnsResponseCode.Success) + assert(result.head.answers.head.name == "_googlecast._tcp.local") + assert(result.head.answers.head.data == DnsPTRRecordData("Scala FTW._googlecast._tcp.local")) + assert(result.head.additionals(0).name == "Scala FTW._googlecast._tcp.local") + assert(result.head.additionals(0).data == DnsSRVRecordData(0, 0, 8009, "my-awesome-hostname")) + assert(result.head.additionals(1).name == "Scala FTW._googlecast._tcp.local") + assert(result.head.additionals(1).data == DnsTXTRecordData(List("key=value"))) + assert(result.head.additionals(2).name == "my-awesome-hostname") + assert(result.head.additionals(2).data == DnsARecordData(ipv4)) + assert(result.head.additionals(3).name == "my-awesome-hostname") + assert(result.head.additionals(3).data == DnsAAAARecordData(ipv6)) + } +} diff --git a/src/main/scala/fr/davit/scout/Zeroconf.scala b/src/main/scala/fr/davit/scout/Zeroconf.scala index e1d38b1..42101f9 100644 --- a/src/main/scala/fr/davit/scout/Zeroconf.scala +++ b/src/main/scala/fr/davit/scout/Zeroconf.scala @@ -16,6 +16,7 @@ package fr.davit.scout +import cats.Show import cats.effect._ import cats.implicits._ import fr.davit.taxonomy.fs2.Dns @@ -26,35 +27,83 @@ import fs2._ import fs2.io.udp.{Socket, SocketGroup} import scodec.Codec -import java.net.{InetAddress, InetSocketAddress, NetworkInterface, StandardProtocolFamily} +import java.net._ import scala.concurrent.duration._ +import scala.collection.immutable +import scala.jdk.CollectionConverters._ +import scala.collection.compat object Zeroconf { + /** + * Service to be discovered by DNS-SD + * @param application Application type (eg tftp, printer) + * @param transport Transport type (eq tcp, udp) + * @param domain Domain where the service can be found. Defaults to 'local' + */ final case class Service( application: String, transport: String, domain: String = "local" - ) { - override def toString: String = s"_$application._$transport.$domain" - } + ) + /** + * Instance of a service + * @param service [[Service]] definition + * @param name Instance name of the service, intended to be human readable + * @param port Instance port for the service + * @param target Instance host name + * @param information Instance information + * @param addresses Instance ip address + */ final case class Instance( service: Service, name: String, port: Int, target: String, - information: Map[String, String] - ) { - override def toString: String = s"$name.$service" - } + information: Map[String, String] = Map.empty, + addresses: immutable.Seq[InetAddress] = List.empty + ) - private val LocalMulticastAddress = new InetSocketAddress(InetAddress.getByName("224.0.0.251"), 5353) - private implicit val codec: Codec[DnsMessage] = DnsCodec.dnsMessage + private val LocalMulticastAddress = new InetSocketAddress(InetAddress.getByName("224.0.0.251"), 5353) - private def localMulticastSocket[F[_]: Concurrent: ContextShift]: Resource[F, Socket[F]] = { - val networkInterface = NetworkInterface.getByName("wlp0s20f3") + private implicit val codec: Codec[DnsMessage] = DnsCodec.dnsMessage + private implicit val showService: Show[Service] = Show(s => s"_${s.application}._${s.transport}.${s.domain}") + private implicit val showInstance: Show[Instance] = Show(i => s"${i.name}.${i.service.show}") + + /** + * Finds the 'default' network interface on the machine + * Will pick the 1st interface that supports broadcast + * @return Default network interface + */ + private[scout] def defaultNetworkInterface[F[_]: Sync](): Resource[F, NetworkInterface] = { + val interface = Sync[F].delay { + (for { + itf <- NetworkInterface.getNetworkInterfaces.asScala if itf.isUp && !itf.isLoopback + addr <- itf.getInterfaceAddresses.asScala + _ <- Option(addr.getBroadcast) + } yield itf) + .to(compat.immutable.LazyList) + .headOption + .getOrElse(throw new Exception("No network interface wit broadcast support was found")) + } + Resource.liftF(interface) + } + + /** + * Creates the [[java.net.Socket]] resource bound on 224.0.0.251:5353 + * listening for multicast messages + * @param interface Network interface. Will use the [[defaultNetworkInterface]] if not provided + * @return Multicast socket + */ + private[scout] def localMulticastSocket[F[_]: Concurrent: ContextShift]( + interface: Option[NetworkInterface] = None + ): Resource[F, Socket[F]] = { for { + itf <- interface match { + case Some(i) => Resource.pure[F, NetworkInterface](i) + case None => defaultNetworkInterface[F]() + } blocker <- Blocker[F] socketGroup <- SocketGroup[F](blocker) socket <- socketGroup @@ -63,44 +112,62 @@ object Zeroconf { protocolFamily = Some(StandardProtocolFamily.INET), reuseAddress = true, allowBroadcast = true, - multicastInterface = Some(networkInterface), + multicastInterface = Some(itf), multicastTTL = Some(255) ) - .evalTap(_.join(LocalMulticastAddress.getAddress, networkInterface).void) + .evalTap(_.join(LocalMulticastAddress.getAddress, itf).void) } yield socket } - def scan[F[_]: Concurrent: ContextShift: Timer](service: Service): Stream[F, Instance] = { - val question = DnsQuestion(service.toString, DnsRecordType.PTR, unicastResponse = false, DnsRecordClass.Internet) + /** + * Periodically scans for [[Instance]] of the desired [[Service]]. + * @param service [[Service]] definition + * @param interface Network interface. Will use the [[defaultNetworkInterface]] if not provided + * @param nextDelay Applied to the previous delay to compute the next, e.g. to implement exponential backoff + * @return Stream of [[Instance]] + */ + def scan[F[_]: Concurrent: ContextShift: Timer]( + service: Service, + interface: Option[NetworkInterface] = None, + nextDelay: FiniteDuration => FiniteDuration = t => (t * 3).min(1.hour) + ): Stream[F, Instance] = { + val question = DnsQuestion(service.show, DnsRecordType.PTR, unicastResponse = false, DnsRecordClass.Internet) val message = DnsMessage.query(id = 0, isRecursionDesired = false, questions = Seq(question)) val packet = DnsPacket(LocalMulticastAddress, message) - def serviceInstance(dnsMessage: DnsMessage): Option[Instance] = { + def serviceInstance(dnsMessage: DnsMessage): Option[Instance] = for { ptr <- dnsMessage.answers.collectFirst { - case DnsResourceRecord(question.name, _, question.`class`, _, ptr: DnsPTRRecordData) => ptr + case DnsResourceRecord(question.name, _, question.`class`, _, DnsPTRRecordData(ptr)) => ptr } - srv <- dnsMessage.additionals.collectFirst { - case DnsResourceRecord(ptr.ptrdname, _, question.`class`, _, srv: DnsSRVRecordData) => srv + (port, target) <- dnsMessage.additionals.collectFirst { + case DnsResourceRecord(`ptr`, _, question.`class`, _, DnsSRVRecordData(_, _, port, target)) => (port, target) } txt <- dnsMessage.additionals.collectFirst { - case DnsResourceRecord(ptr.ptrdname, _, question.`class`, _, txt: DnsTXTRecordData) => txt + case DnsResourceRecord(`ptr`, _, question.`class`, _, DnsTXTRecordData(txt)) => txt } - information = txt.txt + } yield { + val information = txt .map(_.split('=')) .collect { case Array(key) => key -> "" case Array(key, value) => key -> value } .toMap - } yield Instance(service, ptr.ptrdname, srv.port, srv.target, information) - } + + // address record is not required + val addresses = dnsMessage.additionals.collect { + case DnsResourceRecord(`target`, _, question.`class`, _, DnsARecordData(ipv4)) => ipv4 + case DnsResourceRecord(`target`, _, question.`class`, _, DnsAAAARecordData(ipv6)) => ipv6 + } + Instance(service, ptr, port, target, information, addresses) + } for { socket <- Stream - .resource(localMulticastSocket[F]) + .resource(localMulticastSocket[F](interface)) exponentialDelay = Stream.emit(()) ++ Stream - .iterate(1.second)(t => (t * 3).min(1.hour)) + .iterate(1.second)(nextDelay) .flatMap(t => Stream.sleep_(t)) requester = Stream .emit(packet) @@ -111,56 +178,112 @@ object Zeroconf { .listen(socket) .concurrently(requester) .map(_.message) - .evalTap(m => Sync[F].delay(println(m))) .map(serviceInstance) .flattenOption } yield service } - def register[F[_]: Concurrent: ContextShift: Timer](instance: Instance): Stream[F, Unit] = { - val header = DnsHeader( - id = 0, - `type` = DnsType.Response, - opCode = DnsOpCode.StandardQuery, - isAuthoritativeAnswer = false, - isTruncated = false, - isRecursionDesired = false, - isRecursionAvailable = false, - responseCode = DnsResponseCode.Success, - countQuestions = 0, - countAnswerRecords = 1, - countAuthorityRecords = 0, - countAdditionalRecords = 2 - ) - val ptr = DnsResourceRecord( - name = instance.service.toString, - cacheFlush = false, - `class` = DnsRecordClass.Internet, - ttl = 1.minute, - data = DnsPTRRecordData(instance.name) - ) - val srv = DnsResourceRecord( - name = instance.toString, - cacheFlush = false, - `class` = DnsRecordClass.Internet, - ttl = 1.minute, - data = DnsSRVRecordData(0, 0, instance.port, instance.target) - ) - val txt = DnsResourceRecord( - name = instance.toString, - cacheFlush = false, - `class` = DnsRecordClass.Internet, - ttl = 1.minute, - data = DnsTXTRecordData(instance.information.map { case (k, v) => if (v.isEmpty) k else s"$k=$v" }.toList) - ) - val message = DnsMessage(header, List.empty, List(ptr), List.empty, List(srv, txt)) - val packet = DnsPacket(LocalMulticastAddress, message) + /** + * Register a [[Service]] [[Instance]] to be discovered with DNS-SD + * @param instance [[Instance]] to be discovered + * @param interface Network interface. Will use the [[defaultNetworkInterface]] if not provided + * @param ttl Time to live of the DNS records + */ + def register[F[_]: Concurrent: ContextShift: Timer]( + instance: Instance, + interface: Option[NetworkInterface] = None, + ttl: FiniteDuration = 2.minutes + ): Stream[F, Unit] = { + + def isServiceRequest(message: DnsMessage): Boolean = { + val isQuery = message.header.`type` == DnsType.Query + val isServiceQuestion = message.questions.exists { q => + q.name == instance.service.show && + q.`type` == DnsRecordType.PTR && + q.`class` == DnsRecordClass.Internet + } + isQuery && isServiceQuestion + } + + def knowsInstance(message: DnsMessage): Boolean = { + message.answers.exists { + case DnsResourceRecord(name, _, _, _, DnsPTRRecordData(ptr)) => + name == instance.service.show && ptr == instance.show + case _ => false + } + } + + def serviceResponse(addresses: Seq[InetAddress]): DnsPacket = { + val ptr = DnsResourceRecord( + name = instance.service.show, + cacheFlush = false, + `class` = DnsRecordClass.Internet, + ttl = ttl, + data = DnsPTRRecordData(instance.show) + ) + + val srv = DnsResourceRecord( + name = instance.show, + cacheFlush = true, + `class` = DnsRecordClass.Internet, + ttl = ttl, + data = DnsSRVRecordData(0, 0, instance.port, instance.target) + ) + + val txt = DnsResourceRecord( + name = instance.show, + cacheFlush = true, + `class` = DnsRecordClass.Internet, + ttl = ttl, + data = DnsTXTRecordData(instance.information.map { case (k, v) => if (v.isEmpty) k else s"$k=$v" }.toList) + ) + + val as = addresses + .map { + case ipv4: Inet4Address => DnsARecordData(ipv4) + case ipv6: Inet6Address => DnsAAAARecordData(ipv6) + } + .map { data => + DnsResourceRecord( + name = instance.target, + cacheFlush = true, + `class` = DnsRecordClass.Internet, + ttl = ttl, + data = data + ) + } + + val answers = List(ptr) + val additionals = List(srv, txt) ++ as + val header = DnsHeader( + id = 0, + `type` = DnsType.Response, + opCode = DnsOpCode.StandardQuery, + isAuthoritativeAnswer = true, + isTruncated = false, + isRecursionDesired = false, + isRecursionAvailable = false, + responseCode = DnsResponseCode.Success + ) + val message = DnsMessage(header, List.empty, answers, List.empty, additionals) + DnsPacket(LocalMulticastAddress, message) + } + for { socket <- Stream - .resource(localMulticastSocket[F]) - _ <- Stream - .fixedDelay(30.seconds) - .map(_ => packet) + .resource(localMulticastSocket[F](interface)) + addrs <- if (instance.addresses.isEmpty) { + Stream.eval(socket.localAddress).map(a => List(a.getAddress)) + } else { + Stream(instance.addresses) + } + response = serviceResponse(addrs) + _ <- Dns + .listen(socket) + .map(_.message) + .filter(isServiceRequest) + .filterNot(knowsInstance) + .map(_ => response) .through(Dns.stream(socket)) } yield () } diff --git a/src/test/scala/fr/davit/scout/ZeroconfSpec.scala b/src/test/scala/fr/davit/scout/ZeroconfSpec.scala deleted file mode 100644 index 1718d3a..0000000 --- a/src/test/scala/fr/davit/scout/ZeroconfSpec.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2020 Michel Davit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package fr.davit.scout - -import cats.effect.{ContextShift, IO, Timer} -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers - -import java.net.InetAddress -import scala.concurrent.ExecutionContext -import scala.concurrent.duration._ - -class ZeroconfSpec extends AnyFlatSpec with Matchers { - - implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) - implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) - - val googleCastService = Zeroconf.Service("googlecast", "tcp") - - val googleCastInstance = Zeroconf.Instance( - googleCastService, - "scala-cast-42", - 8009, - InetAddress.getLocalHost.toString, - Map.empty - ) - - "Zeroconf" should "get list of registered services" in { - val services = Zeroconf - .scan[IO](googleCastService) - .interruptAfter(10.seconds) - .compile - .toList - .unsafeRunSync() - println(services) - } - - it should "registered a new service" in { - Zeroconf - .register[IO](googleCastInstance) - .interruptAfter(50.seconds) - .compile - .drain - .unsafeRunSync() - } -}