Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/opencode/src/session/compaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { Agent } from "@/agent/agent"
import { Plugin } from "@/plugin"
import { Config } from "@/config/config"
import { ProviderTransform } from "@/provider/transform"
import { defer } from "@/util/defer"

export namespace SessionCompaction {
const log = Log.create({ service: "session.compaction" })
Expand Down Expand Up @@ -106,6 +107,11 @@ export namespace SessionCompaction {
auto: boolean
overflow?: boolean
}) {
const time = Date.now()
await Session.setCompacting({ sessionID: input.sessionID, time })
await using _ = defer(async () => {
await Session.setCompacting({ sessionID: input.sessionID })
})
const userMessage = input.messages.findLast((m) => m.info.id === input.parentID)!.info as MessageV2.User

let messages = input.messages
Expand Down
21 changes: 21 additions & 0 deletions packages/opencode/src/session/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,27 @@ export namespace Session {
},
)

export const setCompacting = fn(
z.object({
sessionID: Identifier.schema("session"),
time: z.number().optional(),
}),
async (input) => {
return Database.use((db) => {
const row = db
.update(SessionTable)
.set({ time_compacting: input.time ?? null })
.where(eq(SessionTable.id, input.sessionID))
.returning()
.get()
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
const info = fromRow(row)
Database.effect(() => Bus.publish(Event.Updated, { info }))
return info
})
},
)

export const setPermission = fn(
z.object({
sessionID: Identifier.schema("session"),
Expand Down
121 changes: 120 additions & 1 deletion packages/opencode/test/session/compaction.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { describe, expect, test } from "bun:test"
import { afterEach, describe, expect, mock, spyOn, test } from "bun:test"
import path from "path"
import { SessionCompaction } from "../../src/session/compaction"
import { Token } from "../../src/util/token"
Expand All @@ -7,6 +7,11 @@ import { Log } from "../../src/util/log"
import { tmpdir } from "../fixture/fixture"
import { Session } from "../../src/session"
import type { Provider } from "../../src/provider/provider"
import { Agent } from "../../src/agent/agent"
import { Provider as ProviderRegistry } from "../../src/provider/provider"
import { SessionProcessor } from "../../src/session/processor"
import type { MessageV2 } from "../../src/session/message-v2"
import { Identifier } from "../../src/id/id"

Log.init({ print: false })

Expand Down Expand Up @@ -40,6 +45,10 @@ function createModel(opts: {
} as Provider.Model
}

afterEach(() => {
mock.restore()
})

describe("session.compaction.isOverflow", () => {
test("returns true when token count exceeds usable context", async () => {
await using tmp = await tmpdir()
Expand Down Expand Up @@ -243,6 +252,116 @@ describe("util.token.estimate", () => {
})
})

describe("session.compaction.process", () => {
test("sets compacting during processing and clears it after success", async () => {
await using tmp = await tmpdir()

await Instance.provide({
directory: tmp.path,
fn: async () => {
const session = await Session.create({})
const agent = await Agent.get("compaction")
const user = {
id: Identifier.ascending("message"),
role: "user",
sessionID: session.id,
time: { created: Date.now() },
agent: "build",
model: { providerID: "test", modelID: "test-model" },
tools: {},
} as MessageV2.User

await Session.updateMessage(user)

const model = createModel({ context: 100_000, output: 32_000 })
spyOn(Agent, "get").mockResolvedValue(agent)
spyOn(ProviderRegistry, "getModel").mockResolvedValue(model)
spyOn(SessionProcessor, "create").mockImplementation((input) => ({
get message() {
return input.assistantMessage
},
partFromToolCall(_callID: string) {
throw new Error("unused")
},
async process() {
const time = (await Session.get(input.sessionID)).time.compacting
expect(typeof time).toBe("number")
input.assistantMessage.finish = "stop"
return "stop"
},
}))

const result = await SessionCompaction.process({
parentID: user.id,
messages: [{ info: user, parts: [] }],
sessionID: session.id,
abort: new AbortController().signal,
auto: false,
})

expect(result).toBe("continue")
expect((await Session.get(session.id)).time.compacting).toBeUndefined()

await Session.remove(session.id)
},
})
})

test("clears compacting after failure", async () => {
await using tmp = await tmpdir()

await Instance.provide({
directory: tmp.path,
fn: async () => {
const session = await Session.create({})
const agent = await Agent.get("compaction")
const user = {
id: Identifier.ascending("message"),
role: "user",
sessionID: session.id,
time: { created: Date.now() },
agent: "build",
model: { providerID: "test", modelID: "test-model" },
tools: {},
} as MessageV2.User

await Session.updateMessage(user)

const model = createModel({ context: 100_000, output: 32_000 })
spyOn(Agent, "get").mockResolvedValue(agent)
spyOn(ProviderRegistry, "getModel").mockResolvedValue(model)
spyOn(SessionProcessor, "create").mockImplementation((input) => ({
get message() {
return input.assistantMessage
},
partFromToolCall(_callID: string) {
throw new Error("unused")
},
async process() {
const time = (await Session.get(input.sessionID)).time.compacting
expect(typeof time).toBe("number")
throw new Error("boom")
},
}))

await expect(
SessionCompaction.process({
parentID: user.id,
messages: [{ info: user, parts: [] }],
sessionID: session.id,
abort: new AbortController().signal,
auto: false,
}),
).rejects.toThrow("boom")

expect((await Session.get(session.id)).time.compacting).toBeUndefined()

await Session.remove(session.id)
},
})
})
})

describe("session.getUsage", () => {
test("normalizes standard usage to token format", () => {
const model = createModel({ context: 100_000, output: 32_000 })
Expand Down
30 changes: 30 additions & 0 deletions packages/opencode/test/session/session.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,33 @@ describe("step-finish token propagation via Bus event", () => {
{ timeout: 30000 },
)
})

describe("session compacting state", () => {
test("publishes compacting lifecycle updates", async () => {
await Instance.provide({
directory: projectRoot,
fn: async () => {
const session = await Session.create({})
const seen: Array<number | undefined> = []
const unsub = Bus.subscribe(Session.Event.Updated, (event) => {
if (event.properties.info.id !== session.id) return
seen.push(event.properties.info.time.compacting)
})

const time = Date.now()
await Session.setCompacting({ sessionID: session.id, time })
await Session.setCompacting({ sessionID: session.id })

await new Promise((resolve) => setTimeout(resolve, 100))

unsub()

expect(seen).toContain(time)
expect(seen.at(-1)).toBeUndefined()
expect((await Session.get(session.id)).time.compacting).toBeUndefined()

await Session.remove(session.id)
},
})
})
})
Loading