bead.corpus

Streaming corpus ingestion and structural sampling. Turns raw external text (JSON Lines, optionally Zstandard-compressed; CSV/TSV; or language-model completions) into structurally filtered experimental Items: stream CorpusRecords from a CorpusSource, dependency-parse them, and keep only those whose parse satisfies a structural DSL constraint.

The whole pipeline is lazy, so a structural query can run over a multi-gigabyte corpus without loading it into memory.

Records

records

Streamed corpus records with provenance.

A CorpusRecord is the raw ingress of the corpus pipeline: one text unit drawn from an external source (a JSONL/CSV file, a language model) together with the provenance needed to trace it. Provenance keys follow the layers AnnotationMetadata shape (source_name, tool, model, created_at, confidence, formalism) alongside any raw source fields, so corpus-derived items carry layers-ready provenance from ingestion onward.

CorpusRecord

Bases: BeadBaseModel

A single streamed text record with provenance.

Attributes:

Name Type Description
text str

The text of the record.

source_name str

Identifier of the source the record was drawn from (e.g. a file basename, a corpus name, or a model name).

record_index int

0-based position of the record within its source stream.

provenance dict[str, ProvenanceValue]

Flat scalar provenance. Conventionally includes layers-aligned keys (source_name, tool, model, created_at, confidence, formalism) plus any raw source fields.

Source Protocol

base

Corpus source protocol.

A CorpusSource is anything that streams CorpusRecords. It is modeled as a runtime-checkable Protocol (behavior, not data) rather than a didactic model, mirroring the transform protocols elsewhere in bead.

CorpusSource

Bases: Protocol

A streaming source of corpus records.

Attributes:

Name Type Description
source_name str

Identifier stamped onto every record's source_name.

__iter__() -> Iterator[CorpusRecord]

Iterate the records of the source.

Sources

sources

Concrete corpus sources.

Streaming readers that turn external text data into CorpusRecords:

  • JsonlCorpusSource streams JSON Lines, transparently decompressing .zst (Zstandard) files.
  • CsvCorpusSource streams rows of a CSV/TSV file.

Both are lazy: records are produced one at a time, so multi-gigabyte corpora never load into memory.

JsonlCorpusSource

Stream JSON Lines (optionally Zstandard-compressed) as corpus records.

Parameters:

Name Type Description Default
path str | Path

Path to the .jsonl or .jsonl.zst file.

required
source_name str | None

Source identifier; defaults to the file name.

None
text_field str

JSON field holding the record text.

'text'
provenance_fields tuple[str, ...] | None

JSON fields to copy into each record's provenance. None (the default) retains every field except text_field so no source information (e.g. Reddit id/parent_id/link_id) is dropped; pass an explicit tuple to keep only a subset.

None
compression str

"auto" (detect .zst by suffix), "zst", or "none".

'auto'

__iter__() -> Iterator[CorpusRecord]

Yield one CorpusRecord per non-empty JSON line.

CompletionCorpusSource

Generate text from a language model as a corpus source.

Wraps any TextGenerator (e.g. an OpenAI or Anthropic adapter) and yields one CorpusRecord per generated completion, with the model and prompt recorded as layers-aligned provenance.

Parameters:

Name Type Description Default
generator TextGenerator

The model used to generate completions.

required
prompts Sequence[str]

Prompts to complete.

required
source_name str | None

Source identifier; defaults to the generator's model_name.

None
completions_per_prompt int

Number of completions to draw per prompt.

1
max_tokens int

Maximum tokens per completion.

256
temperature float

Sampling temperature.

1.0

__iter__() -> Iterator[CorpusRecord]

Yield one CorpusRecord per generated completion.

CsvCorpusSource

Stream rows of a CSV/TSV file as corpus records.

Parameters:

Name Type Description Default
path str | Path

Path to the CSV/TSV file.

required
text_column str

Column holding the record text.

required
source_name str | None

Source identifier; defaults to the file name.

None
provenance_columns tuple[str, ...] | None

Columns to copy into each record's provenance. None (the default) retains every column except text_column so no source information is dropped; pass an explicit tuple to keep only a subset.

None
sep str

Field separator ("," for CSV, "\\t" for TSV).

','

__iter__() -> Iterator[CorpusRecord]

Yield one CorpusRecord per CSV row with a non-empty text cell.

Pipeline

pipeline

Streaming corpus pipeline: parse, structurally filter, build items.

Composable lazy generators that turn a CorpusSource into structurally filtered Items:

parse_records -> filter_by_structure -> Items.

The whole chain is lazy, so a structural query (a DSL constraint over the dependency parse, e.g. a transitive-verb pattern) can be run over a multi-gigabyte corpus without loading it into memory.

record_to_item(record: CorpusRecord, parsed: ParsedSentence, *, item_template_id: UUID, tool: str, element_name: str = 'text', formalism: str = UNIVERSAL_DEPENDENCIES) -> Item

Build an Item from a corpus record and its parse.

The parse is projected onto spans and relations via parse_to_spans; the record's provenance plus the layers-aligned layer discriminators are stored on item_metadata.

Parameters:

Name Type Description Default
record CorpusRecord

The source record (supplies text and provenance).

required
parsed ParsedSentence

The dependency parse of record.text (or one of its sentences).

required
item_template_id UUID

Template the resulting item is associated with.

required
tool str

Parser identifier, recorded as provenance.

required
element_name str

Rendered-element name for the parsed text.

'text'
formalism str

Dependency formalism slug.

UNIVERSAL_DEPENDENCIES

Returns:

Type Description
Item

The constructed item with spans, relations, and provenance.

parse_records(source: Iterable[CorpusRecord], parser: DependencyParser, *, split_sentences: bool = True) -> Iterator[tuple[CorpusRecord, ParsedSentence]]

Parse each record, yielding (record, sentence) pairs.

Parameters:

Name Type Description Default
source Iterable[CorpusRecord]

The records to parse.

required
parser DependencyParser

The dependency parser to apply.

required
split_sentences bool

When True (default), multi-sentence records fan out to one pair per sentence. When False, only records that parse to exactly one sentence are emitted (multi-sentence records are skipped).

True

Yields:

Type Description
tuple[CorpusRecord, ParsedSentence]

A record paired with one of its parsed sentences.

filter_by_structure(parsed: Iterable[tuple[CorpusRecord, ParsedSentence]], constraint: str, *, item_template_id: UUID, tool: str, element_name: str = 'text', formalism: str = UNIVERSAL_DEPENDENCIES, evaluator: DSLEvaluator | None = None) -> Iterator[Item]

Yield items whose parse satisfies a structural DSL constraint.

Parameters:

Name Type Description Default
parsed Iterable[tuple[CorpusRecord, ParsedSentence]]

(record, sentence) pairs (e.g. from parse_records).

required
constraint str

A DSL expression evaluated with the item bound as self and item (e.g. 'upos(self, root(self)) == "VERB"').

required
item_template_id UUID

Template the resulting items are associated with.

required
tool str

Parser identifier, recorded as provenance.

required
element_name str

Rendered-element name for the parsed text.

'text'
formalism str

Dependency formalism slug.

UNIVERSAL_DEPENDENCIES
evaluator DSLEvaluator | None

Reused evaluator (one is created if None).

None

Yields:

Type Description
Item

Items whose parse satisfies constraint.

sample_corpus(source: Iterable[CorpusRecord], parser: DependencyParser, constraint: str, *, item_template_id: UUID, element_name: str = 'text', formalism: str = UNIVERSAL_DEPENDENCIES, split_sentences: bool = True, limit: int | None = None, evaluator: DSLEvaluator | None = None) -> Iterator[Item]

Stream, parse, and structurally filter a corpus into items.

Convenience composition of parse_records and filter_by_structure, optionally capped at limit items.

Parameters:

Name Type Description Default
source Iterable[CorpusRecord]

The corpus source.

required
parser DependencyParser

The dependency parser to apply (its tool is recorded as provenance).

required
constraint str

Structural DSL constraint each item must satisfy.

required
item_template_id UUID

Template the resulting items are associated with.

required
element_name str

Rendered-element name for the parsed text.

'text'
formalism str

Dependency formalism slug.

UNIVERSAL_DEPENDENCIES
split_sentences bool

Whether multi-sentence records fan out (see parse_records).

True
limit int | None

Maximum number of items to yield.

None
evaluator DSLEvaluator | None

Reused evaluator (one is created if None).

None

Yields:

Type Description
Item

Matching items, at most limit of them.