import { Logger } from "@deltagreen/logger"
import { createExpirableObject, ExpirableObjectRT, sleep } from "@deltagreen/utils"
import dayjs from "dayjs"
import EventEmitter from "events"
import type { MqttClient } from "mqtt"
import TypedEmitter, { EventMap } from "typed-emitter"

import { debugFactory } from "./debugFactory"
import { Device, GenericDevicesRegister, GenericPartialResponse } from "./devices"
import {
  ModBusFunctionCode,
  ReadRegisterFunctionCode,
  readRegisterRequest,
  writeRegisterRequest,
} from "./modbus/modbusProtocol"
import { DEVICE_STATE_EXPIRATION, mapCalculatedRegisters, mapResponseToRegisters, ModBusReply } from "./modbus/utils"
import { EmodbusErrorMessage } from "./proto/deltalink"
import { SolarmanV5 } from "./SolarmanV5"

const debug = debugFactory("command-queue")

interface WriteRegistryCommand {
  serial: number
  type: "WRITE"
  functionCode: ModBusFunctionCode // asi tam muze bejt cokoliv nevim jestli to dokazu rozdelit na read a write
  deviceAddress: number
  registerAddress: number
  data: Buffer
  queueId: string
}

interface ReadRegistriesCommand {
  serial: number
  type: "READ"
  functionCode: ReadRegisterFunctionCode //  ReadRegisterFunctionCode
  deviceAddress: number
  registerAddress: number
  length: number
}

type QueueCommand = WriteRegistryCommand | ReadRegistriesCommand

interface CommandQueueOpts {
  commandTopic: string
  protocol: "udp" | "tcp"
  replyTopic: string
  logger?: Logger
  solarmanSerialNumber?: string
}

interface DiscoverResult {
  serial: string
  model: string
  version: string
  raw: Buffer
}

export type MessageEvents<R extends GenericDevicesRegister> = {
  timeout: () => void
  data: (data: GenericPartialResponse<R>) => void
  error: (error: unknown) => void
  modbusError: (error: EmodbusErrorMessage) => void
  discover: (payload: DiscoverResult) => void
}

export class BaseEmitter<T extends EventMap> extends (EventEmitter as new <T>() => TypedEmitter<T & EventMap>)<T> {}

export class CommandQueue<TDevice extends Device<GenericDevicesRegister>> extends BaseEmitter<
  MessageEvents<TDevice["registers"][number]>
> {
  timeout = 2000
  private txId = 1
  private MAX_TX_ID = 256

  private pendingWrites = new Map<string, { txId: number; sentAt: Date } | undefined>()
  private lastAddresses = new Map<number, number>()
  private mqttClient: MqttClient
  private commandTopic: string
  private protocol: "tcp" | "udp"
  private replyTopic: string
  private device: TDevice
  private logger?: Logger
  private isClosing = false
  private solarmanSerialNumber: string | undefined

  private queue: QueueCommand[] = []

  private deviceState: ExpirableObjectRT<GenericPartialResponse<TDevice["registers"][number]>> =
    createExpirableObject(DEVICE_STATE_EXPIRATION)

  constructor(device: TDevice, mqttClient: MqttClient, opts: CommandQueueOpts) {
    super()
    this.mqttClient = mqttClient
    this.replyTopic = opts.replyTopic
    this.commandTopic = opts.commandTopic
    this.device = device
    this.logger = opts.logger
    this.protocol = opts.protocol
    this.solarmanSerialNumber = opts.solarmanSerialNumber
    this.subscribe()
    this.doWork()
  }

  public publish(buffer: Buffer) {
    this.mqttClient.publish(this.commandTopic, buffer)
  }

  private onMessage(topic: string, payload: Buffer) {
    try {
      if (topic !== this.replyTopic) {
        return
      }

      const serialBuffer = payload.subarray(0, 2)
      const data = payload.subarray(2)
      const serial = serialBuffer.readUInt16BE(0)
      // console.log(serial, data)
      this.emit("rawdata", data)

      debug("<<< RESPONSE:", topic, serial, data, "\n", data.toString(), data.length)

      // TOHLE je goodwe hledatko
      if ((data.length == 86 || data.length == 85) && data.toString("hex").startsWith("aa557fc0")) {
        const o = 7
        // console.log(data.subarray(7).toString("hex"))
        // see https://yamasun.com.tw/upload/F_20170313191367UrC8jo.PDF
        // console.log("serial:", data.toString().substring(31 + o, 47 + o))
        // console.log("model: ", data.toString().substring(5 + o, 15 + o))
        // console.log("version: ", data.toString().substring(0 + o, 5 + o))
        this.emit("discover", {
          serial: data.subarray(31 + o, 47 + o).toString(),
          model: data.subarray(5 + o, 15 + o).toString(),
          version: data.subarray(0 + o, 5 + o).toString(),
          raw: data.subarray(7),
        })
        return
      }

      const message = this.device.preprocessResponse(data, { solarmanSerialNumber: this.solarmanSerialNumber })
      this.processResponse(message)
    } catch (error: unknown) {
      this.emit("error", error)
    }
  }

  private subscribe() {
    debug(`Adding onMessage event handler`)
    this.mqttClient.on("message", this.onMessage.bind(this))

    debug(`Subscribing to mqtt topic ${this.replyTopic}`)
    this.mqttClient.subscribe(this.replyTopic)
  }

  private doWork() {
    const command = this.queue.shift()

    if (command) {
      const serial = Buffer.allocUnsafe(2)
      serial.writeUInt16BE(command.serial)

      const transactionId = this.txId
      this.txId = (transactionId + 1) % this.MAX_TX_ID
      this.lastAddresses.set(transactionId, command.registerAddress)

      const wrapTCPModbus = (xbody: Buffer) => {
        // odeberu crc, protoze tcp modbus ho nechce
        const body = xbody.subarray(0, xbody.length - 2)
        const payload = Buffer.alloc(6 + body.length)

        payload.writeUInt16BE(transactionId, 0) // transaction id
        payload.writeUInt16BE(0x0000, 2) // protocol version
        payload.writeUInt16BE(body.length, 4) // length
        body.copy(payload, 6)

        return payload
      }

      const wrapSolarman = (xbody: Buffer) => {
        if (!this.solarmanSerialNumber) {
          throw new Error("Solarman serial number not set!")
        }
        const s = new SolarmanV5(this.solarmanSerialNumber)

        return s.wrapModbusFrame(xbody, transactionId)
      }

      // TODO refactor
      if (command.type == "READ") {
        debug("!posilam", command.registerAddress, command.length)
        // if (
        //   command.functionCode == ModBusFunctionCode.WriteRegisters ||
        //   command.functionCode == ModBusFunctionCode.WriteSingleHoldingRegister ||
        //   command.functionCode == ModBusFunctionCode.WriteSingleHoldingRegisterError
        // ) {
        //   throw new Error("TODO tohle nemaji zapisovat")
        // }

        let payload = readRegisterRequest(
          command.functionCode,
          command.deviceAddress,
          command.registerAddress,
          command.length,
        )

        if (this.solarmanSerialNumber) {
          payload = Buffer.from(wrapSolarman(payload))
        } else if (this.protocol == "tcp") {
          payload = wrapTCPModbus(payload)
        }

        debug(">>> RAW REQUEST:", payload.toString("hex"))
        const writeout = Buffer.concat([serial, payload])
        this.publish(writeout)
      } else if (command.type == "WRITE") {
        debug("!zapisuju", command.registerAddress, command.data)

        // txId adding to pending writes
        this.pendingWrites.set(command.queueId, { txId: transactionId, sentAt: new Date() })

        let payload = writeRegisterRequest(
          command.functionCode,
          command.deviceAddress,
          command.registerAddress,
          command.data,
        )

        // let payload = Buffer.from("0110118700060C0000000000000000000000007510","hex")
        // let payload = Buffer.from("0110118700060C0000000A00000000000000000BB0","hex")
        // let payload = Buffer.from("011011870002040000000afa7e","hex")

        if (this.solarmanSerialNumber) {
          payload = Buffer.from(wrapSolarman(payload))
        } else if (this.protocol == "tcp") {
          payload = wrapTCPModbus(payload)
        }

        const writeout = Buffer.concat([serial, payload])
        // console.log(writeout.toString())
        this.publish(writeout)
      } else {
        throw new Error("not implemented")
      }
    }

    if (this.isClosing && this.queue.length === 0) {
      return
    }

    setTimeout(this.doWork.bind(this), 100)
  }
  public goodWeDiscover() {
    const serial = Buffer.allocUnsafe(2)
    serial.writeUInt16BE(0)
    const writeout = Buffer.concat([serial, Buffer.from("AA55C07F0102000241", "hex")])
    // TOHLE funguje na DT
    // const writeout = Buffer.concat([serial, Buffer.from("7F03753100280409", "hex")])

    //     Trying command: AA55C07F0102000241
    // No response to AA55C07F0102000241 command
    // Trying command: 7F03753100280409
    // No response to 7F03753100280409 command
    // Trying command: 197d0001000dff045e50303036564657f6e60d
    // No response to 197d0001000dff045e50303036564657f6e60d command
    // Trying command: 680241b132313758303230380100b116
    // No response to 680241b132313758303230380100b116 command
    // Trying command: 680241b138303230583731320100b116
    // No response to 680241b138303230583731320100b116 command
    this.publish(writeout)
  }

  private processResponse(message: ModBusReply) {
    const { address, command, data, txId } = message

    const lastAddress = this.lastAddresses.get(txId)
    if (lastAddress === undefined) {
      this.logger?.warn(
        { address, data: data.toString("hex"), command, txId },
        `Received response with unknown txId: ${txId}.`,
      )
      return
    }

    if (
      command === ModBusFunctionCode.ReadMultipleHoldingRegisters ||
      command === ModBusFunctionCode.ReadInputRegisters
    ) {
      this.logger?.debug({ address, data: data.toString("hex"), command, txId }, `Received read response.`)
      const responseArray = mapResponseToRegisters(command, this.device.registers, lastAddress, data)
      this.emit("dataRaw", data)

      type Entry = [string, (typeof responseArray)[number]]
      const entries = responseArray.map((o) => [o.key, o] as Entry)
      const partialData = Object.fromEntries(entries) as unknown as GenericPartialResponse<TDevice["registers"][number]>

      this.deviceState.upsert(partialData)
      const calculatedData = mapCalculatedRegisters(this.deviceState.get(), this.device.registers)

      this.emit("data", { ...partialData, ...calculatedData })
    } else if (command == ModBusFunctionCode.WriteSingleHoldingRegister || ModBusFunctionCode.WriteRegisters) {
      debug("write response", message)

      for (const [queueId, pendingWrite] of this.pendingWrites.entries()) {
        if (pendingWrite === undefined) {
          continue
        }

        if (pendingWrite.txId !== txId) {
          continue
        }

        // mark write as not pending anymore
        this.pendingWrites.delete(queueId)
      }

      this.logger?.debug({ address, data: message.data.toString("hex"), command, txId }, `Received write response.`)
    } else {
      throw new Error("Nejakej bordel modbus command " + command)
    }
  }

  enqueue(qc: QueueCommand) {
    // reject new messages if closing
    if (this.isClosing) {
      return
    }

    if (qc.type === "WRITE") {
      this.pendingWrites.set(qc.queueId, undefined)
    }

    this.queue.push(qc)
  }

  async close(timeoutSeconds = 60) {
    this.isClosing = true
    const startedAt = dayjs()

    while (this.queue.length > 0) {
      const runningForSeconds = Math.abs(dayjs().diff(startedAt, "seconds", true))
      if (runningForSeconds > timeoutSeconds) {
        debug(`Waited for ${runningForSeconds}s which is more than timeout ${timeoutSeconds}. Aborting waiting.`)
        break
      }

      debug(`Waiting for queue to be empty (${runningForSeconds}s). Queue length: ${this.queue.length}.`)

      // wait for queue to empty
      await sleep(250)
    }

    debug(`Removing onMessage event handler`)
    this.mqttClient.off("message", this.onMessage)

    debug(`Unsubscribing from mqtt topic ${this.replyTopic}`)
    await this.mqttClient.unsubscribeAsync(this.replyTopic)
  }

  async waitForWriteResponse(queueId: string, timeoutSeconds = 5) {
    const startedAt = dayjs()

    while (true) {
      const runningForSeconds = Math.abs(dayjs().diff(startedAt, "seconds", true))
      if (runningForSeconds > timeoutSeconds) {
        debug(
          `Waited for write response with queueId ${queueId} for more than timeout ${timeoutSeconds}. Aborting waiting.`,
        )
        throw new Error(`Waiting for write response with queueId ${queueId} timeouted after ${runningForSeconds}s.`)
      }

      if (!this.pendingWrites.has(queueId)) {
        debug(`Write response with queueId ${queueId} received after ${runningForSeconds}s.`)
        return
      }

      debug(`Waiting for write response with queueId ${queueId} (${runningForSeconds}s).`)
      await sleep(250)
    }
  }
}
