Compare commits

...
Sign in to create a new pull request.

1 commit

Author SHA1 Message Date
Zutatensuppe
9a13838d33 replay with ws 2021-06-05 15:14:36 +02:00
5 changed files with 247 additions and 38 deletions

View file

@ -43,6 +43,11 @@ const EV_SERVER_INIT = 4
const EV_CLIENT_EVENT = 2 const EV_CLIENT_EVENT = 2
const EV_CLIENT_INIT = 3 const EV_CLIENT_INIT = 3
const EV_CLIENT_INIT_REPLAY = 5
const EV_SERVER_INIT_REPLAY = 6
const EV_CLIENT_REPLAY_EVENT = 7
const EV_SERVER_REPLAY_EVENT = 8
const LOG_HEADER = 1 const LOG_HEADER = 1
const LOG_ADD_PLAYER = 2 const LOG_ADD_PLAYER = 2
const LOG_UPDATE_PLAYER = 4 const LOG_UPDATE_PLAYER = 4
@ -70,6 +75,11 @@ export default {
EV_CLIENT_EVENT, EV_CLIENT_EVENT,
EV_CLIENT_INIT, EV_CLIENT_INIT,
EV_CLIENT_INIT_REPLAY,
EV_SERVER_INIT_REPLAY,
EV_CLIENT_REPLAY_EVENT,
EV_SERVER_REPLAY_EVENT,
LOG_HEADER, LOG_HEADER,
LOG_ADD_PLAYER, LOG_ADD_PLAYER,
LOG_UPDATE_PLAYER, LOG_UPDATE_PLAYER,

View file

@ -112,6 +112,54 @@ function connect(
}) })
} }
let onReplayDataCallback = (logEntry: any[]) => {}
const onReplayData = (fn: any) => {
onReplayDataCallback = fn
}
function connectReplay(
address: string,
gameId: string,
clientId: string
): Promise<EncodedGame> {
clientSeq = 0
events = {}
setConnectionState(CONN_STATE_CONNECTING)
return new Promise(resolve => {
ws = new WebSocket(address, clientId + '|' + gameId)
ws.onopen = () => {
setConnectionState(CONN_STATE_CONNECTED)
send([Protocol.EV_CLIENT_INIT_REPLAY])
}
ws.onmessage = (e: MessageEvent) => {
const msg: ServerEvent = JSON.parse(e.data)
const msgType = msg[0]
if (msgType === Protocol.EV_SERVER_INIT_REPLAY) {
const game = msg[1]
resolve(game)
} else if (msgType === Protocol.EV_SERVER_REPLAY_EVENT) {
onReplayDataCallback(msg[1])
} else {
throw `[ 2021-06-05 invalid connectReplay msgType ${msgType} ]`
}
}
ws.onerror = () => {
setConnectionState(CONN_STATE_DISCONNECTED)
throw `[ 2021-05-15 onerror ]`
}
ws.onclose = (e: CloseEvent) => {
if (e.code === CODE_CUSTOM_DISCONNECT || e.code === CODE_GOING_AWAY) {
setConnectionState(CONN_STATE_CLOSED)
} else {
setConnectionState(CONN_STATE_DISCONNECTED)
}
}
})
}
async function requestReplayData( async function requestReplayData(
gameId: string, gameId: string,
offset: number, offset: number,
@ -139,7 +187,17 @@ function sendClientEvent(evt: GameEvent): void {
send([Protocol.EV_CLIENT_EVENT, clientSeq, events[clientSeq]]) send([Protocol.EV_CLIENT_EVENT, clientSeq, events[clientSeq]])
} }
function requestMoreReplayData(): void {
send([Protocol.EV_CLIENT_REPLAY_EVENT])
}
export default { export default {
connectReplay,
requestMoreReplayData,
onReplayData,
connect, connect,
requestReplayData, requestReplayData,
disconnect, disconnect,

View file

@ -310,28 +310,6 @@ export async function main(
HUD.setConnectionState(state) HUD.setConnectionState(state)
}) })
const queryNextReplayBatch = async (
gameId: string
): Promise<ReplayData> => {
const offset = REPLAY.dataOffset
REPLAY.dataOffset += REPLAY.dataSize
const replay: ReplayData = await Communication.requestReplayData(
gameId,
offset,
REPLAY.dataSize
)
// cut log that was already handled
REPLAY.log = REPLAY.log.slice(REPLAY.logPointer)
REPLAY.logPointer = 0
REPLAY.log.push(...replay.log)
if (replay.log.length < REPLAY.dataSize) {
REPLAY.final = true
}
return replay
}
let TIME: () => number = () => 0 let TIME: () => number = () => 0
const connect = async () => { const connect = async () => {
if (MODE === MODE_PLAY) { if (MODE === MODE_PLAY) {
@ -344,15 +322,31 @@ export async function main(
REPLAY.dataSize = 10000 REPLAY.dataSize = 10000
REPLAY.speeds = [0.5, 1, 2, 5, 10, 20, 50, 100, 250, 500] REPLAY.speeds = [0.5, 1, 2, 5, 10, 20, 50, 100, 250, 500]
REPLAY.speedIdx = 1 REPLAY.speedIdx = 1
const replay: ReplayData = await queryNextReplayBatch(gameId)
if (!replay.game) { Communication.onReplayData((logLines: any[][]) => {
if (logLines.length === 0) {
console.log('MUHHHHH FINAL!!!!')
REPLAY.final = true
} else {
if (REPLAY.logPointer === 0) {
REPLAY.log.push(...logLines)
} else {
REPLAY.log = REPLAY.log.slice(REPLAY.logPointer - 1)
REPLAY.logPointer = 1
REPLAY.log.push(...logLines)
}
}
})
const game: any = await Communication.connectReplay(wsAddress, gameId, clientId)
if (!game) {
throw '[ 2021-05-29 no game received ]' throw '[ 2021-05-29 no game received ]'
} }
const gameObject: GameType = Util.decodeGame(replay.game) const gameObject: GameType = Util.decodeGame(game)
Game.setGame(gameObject.id, gameObject) Game.setGame(gameObject.id, gameObject)
REPLAY.lastRealTs = Time.timestamp() REPLAY.lastRealTs = Time.timestamp()
REPLAY.gameStartTs = parseInt(replay.log[0][4], 10) REPLAY.gameStartTs = Game.getStartTs(gameId)
REPLAY.lastGameTs = REPLAY.gameStartTs REPLAY.lastGameTs = REPLAY.gameStartTs
REPLAY.paused = false REPLAY.paused = false
REPLAY.skipNonActionPhases = false REPLAY.skipNonActionPhases = false
@ -521,7 +515,7 @@ export async function main(
} }
// // TODO: remove (make changable via interface) // // TODO: remove (make changable via interface)
// REPLAY.skipNonActionPhases = true REPLAY.skipNonActionPhases = true
if (MODE === MODE_PLAY) { if (MODE === MODE_PLAY) {
Communication.onServerChange((msg: ServerEvent) => { Communication.onServerChange((msg: ServerEvent) => {
@ -580,8 +574,10 @@ export async function main(
} }
const next = async () => { const next = async () => {
if (REPLAY.logPointer + 1 >= REPLAY.log.length) { if (REPLAY.logPointer >= REPLAY.log.length) {
await queryNextReplayBatch(gameId) Communication.requestMoreReplayData()
to = setTimeout(next, 50)
return
} }
const realTs = Time.timestamp() const realTs = Time.timestamp()
@ -597,19 +593,18 @@ export async function main(
if (REPLAY.paused) { if (REPLAY.paused) {
break break
} }
const nextIdx = REPLAY.logPointer + 1 if (REPLAY.logPointer >= REPLAY.log.length) {
if (nextIdx >= REPLAY.log.length) {
break break
} }
const currLogEntry = REPLAY.log[REPLAY.logPointer] const lastLogEntry = REPLAY.logPointer > 0 ? REPLAY.log[REPLAY.logPointer - 1] : null
const currTs: Timestamp = REPLAY.gameStartTs + currLogEntry[currLogEntry.length - 1] const lastTs: Timestamp = REPLAY.gameStartTs + (lastLogEntry ? lastLogEntry[lastLogEntry.length - 1] : 0)
const nextLogEntry = REPLAY.log[nextIdx] const nextLogEntry = REPLAY.log[REPLAY.logPointer]
const nextTs: Timestamp = REPLAY.gameStartTs + nextLogEntry[nextLogEntry.length - 1] const nextTs: Timestamp = REPLAY.gameStartTs + nextLogEntry[nextLogEntry.length - 1]
if (nextTs > maxGameTs) { if (nextTs > maxGameTs) {
// next log entry is too far into the future // next log entry is too far into the future
if (REPLAY.skipNonActionPhases && (maxGameTs + 50 < nextTs)) { if (REPLAY.skipNonActionPhases && (maxGameTs + 50 < nextTs)) {
const skipInterval = nextTs - currTs const skipInterval = nextTs - lastTs
// lets skip to the next log entry // lets skip to the next log entry
// log.info('skipping non-action, from', maxGameTs, skipInterval) // log.info('skipping non-action, from', maxGameTs, skipInterval)
maxGameTs += skipInterval maxGameTs += skipInterval
@ -620,13 +615,15 @@ export async function main(
if (handleLogEntry(nextLogEntry, nextTs)) { if (handleLogEntry(nextLogEntry, nextTs)) {
RERENDER = true RERENDER = true
} }
REPLAY.logPointer = nextIdx REPLAY.logPointer++
} while (true) } while (true)
REPLAY.lastRealTs = realTs REPLAY.lastRealTs = realTs
REPLAY.lastGameTs = maxGameTs REPLAY.lastGameTs = maxGameTs
updateTimerElements() updateTimerElements()
if (!REPLAY.final) { if (REPLAY.final && REPLAY.logPointer + 1 >= REPLAY.log.length) {
// done
} else {
to = setTimeout(next, 50) to = setTimeout(next, 50)
} }
} }

View file

@ -1,10 +1,12 @@
import WebSocket from 'ws'
import fs from 'fs' import fs from 'fs'
import readline from 'readline' import readline from 'readline'
import stream from 'stream' import stream from 'stream'
import Time from '../common/Time' import Time from '../common/Time'
import { Timestamp } from '../common/Types' import { Game as GameType, ScoreMode, ShapeMode, SnapMode, Timestamp } from '../common/Types'
import { logger } from './../common/Util' import { logger } from './../common/Util'
import { DATA_DIR } from './../server/Dirs' import { DATA_DIR } from './../server/Dirs'
import Game from './Game'
const log = logger('GameLog.js') const log = logger('GameLog.js')
@ -81,7 +83,118 @@ const get = async (
}) })
} }
interface LineReader {
readLine: () => Promise<string>,
}
const createLineReader = async (
gameId: string
): Promise<LineReader|null> => {
const stream: fs.ReadStream = await new Promise(resolve => {
const file = filename(gameId)
if (!fs.existsSync(file)) {
return null
}
const instream = fs.createReadStream(file)
instream.on('readable', () => {
resolve(instream)
})
})
let line = ''
const readLine = async (): Promise<string> => {
return new Promise(resolve => {
let chunk
let resolved = false
while (null !== (chunk = stream.read(1))) {
if (chunk.toString() === "\n") {
resolve(line)
line = ''
resolved = true
break
}
line += chunk
}
if (!resolved) {
resolve('')
}
})
}
return {
readLine,
}
}
interface SocketGameLog {
socket: WebSocket
rl: LineReader
}
const connected: SocketGameLog[] = []
const getSocketGameLog = (
socket: WebSocket,
): SocketGameLog|null => {
for (const entry of connected) {
if (entry.socket === socket) {
return entry
}
}
return null
}
const open = async (socket: WebSocket, gameId: string): Promise<GameType|null> => {
const rl = await createLineReader(gameId)
if (!rl) {
return null
}
const socketGameLog = {
socket: socket,
rl: rl,
}
const line = await rl.readLine()
const log = JSON.parse(line)
connected.push(socketGameLog)
const g: GameType = await Game.createGameObject(
gameId,
log[2],
log[3],
log[4],
log[5] || ScoreMode.FINAL,
log[6] || ShapeMode.NORMAL,
log[7] || SnapMode.NORMAL,
)
return g
}
const getNextBySocket = async (socket: WebSocket): Promise<any[]> => {
const socketGameLog = getSocketGameLog(socket)
if (!socketGameLog) {
return []
}
const lines = []
for (let i = 0; i < 1000; i++) {
const line = await socketGameLog.rl.readLine()
if (line) {
try {
lines.push(JSON.parse(line))
} catch (e) {
log.error(e)
log.error(line)
}
} else {
break
}
}
return lines
}
export default { export default {
open,
getNextBySocket,
shouldLog, shouldLog,
create, create,
exists, exists,

View file

@ -247,6 +247,37 @@ wss.on('message', async (
const msg = JSON.parse(data as string) const msg = JSON.parse(data as string)
const msgType = msg[0] const msgType = msg[0]
switch (msgType) { switch (msgType) {
case Protocol.EV_CLIENT_INIT_REPLAY: {
if (!GameLog.exists(gameId)) {
throw `[gamelog ${gameId} does not exist... ]`
}
// should connect the socket with game log
// pseudo code
const game = await GameLog.open(socket, gameId)
if (!game) {
throw `[game not created :/ ]`
}
notify(
[Protocol.EV_SERVER_INIT_REPLAY, Util.encodeGame(game)],
[socket]
)
} break
case Protocol.EV_CLIENT_REPLAY_EVENT: {
if (!GameLog.exists(gameId)) {
throw `[gamelog ${gameId} does not exist... ]`
}
// should read next some lines from game log, using the game
// log connected with this socket
// pseudo code
const logEntries = await GameLog.getNextBySocket(socket)
notify(
[Protocol.EV_SERVER_REPLAY_EVENT, logEntries],
[socket]
)
} break
case Protocol.EV_CLIENT_INIT: { case Protocol.EV_CLIENT_INIT: {
if (!GameCommon.exists(gameId)) { if (!GameCommon.exists(gameId)) {
throw `[game ${gameId} does not exist... ]` throw `[game ${gameId} does not exist... ]`