Skip to content

Retrieve

Size a search cheaply first, then execute a plan as a resumable, checkpointed harvest, and pull fuller records.

scopus_count

scopus_count(query, years=None, field=None, view='STANDARD', **kwargs)

Return how many records the (optionally year-filtered) query matches.

A single cheap request that does not download the records, so it is the right way to size a search before committing quota to a harvest.

Source code in src/scopusflow/count.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def scopus_count(query: str, years: Optional[Sequence[int]] = None,
                 field: Optional[str] = None, view: str = "STANDARD",
                 **kwargs) -> int:
    """Return how many records the (optionally year-filtered) query matches.

    A single cheap request that does not download the records, so it is the right
    way to size a search before committing quota to a harvest.
    """
    if not query or not str(query).strip():
        raise ValueError("query must be a non-empty string.")
    q = _count_query(str(query).strip(), years, field)

    from pybliometrics.scopus import ScopusSearch  # imported lazily; needs a key

    return int(ScopusSearch(q, view=view, download=False, **kwargs).get_results_size())

fetch_plan

fetch_plan(plan, cache_dir=None, resume=True, format='parquet', should_stop=None, **kwargs)

Run every cell of plan and return one normalised DataFrame.

With cache_dir set, each cell is written to disk as it completes, so an interrupted or quota-limited run resumes without re-fetching finished cells. format selects the checkpoint format ("parquet" or "csv"); parquet silently falls back to CSV when no parquet engine is installed. Pass a zero-argument should_stop callable to allow co-operative cancellation: it is checked before each cell and the harvest stops (returning what it has) when it returns True. Per-cell progress is emitted on the "scopusflow" logger.

Source code in src/scopusflow/fetch.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def fetch_plan(
    plan: SearchPlan,
    cache_dir: str | None = None,
    resume: bool = True,
    format: str = "parquet",
    should_stop=None,
    **kwargs,
) -> pd.DataFrame:
    """Run every cell of ``plan`` and return one normalised DataFrame.

    With ``cache_dir`` set, each cell is written to disk as it completes, so an
    interrupted or quota-limited run resumes without re-fetching finished cells.
    ``format`` selects the checkpoint format ("parquet" or "csv"); parquet
    silently falls back to CSV when no parquet engine is installed. Pass a
    zero-argument ``should_stop`` callable to allow co-operative cancellation: it
    is checked before each cell and the harvest stops (returning what it has) when
    it returns ``True``. Per-cell progress is emitted on the ``"scopusflow"``
    logger.
    """
    if not isinstance(plan, SearchPlan):
        raise ValueError("plan must be a SearchPlan.")
    if format not in _FORMATS:
        raise ValueError("format must be 'parquet' or 'csv'.")

    from pybliometrics.scopus import ScopusSearch  # imported lazily; needs a key

    cache = Path(cache_dir) if cache_dir else None
    if cache is not None:
        cache.mkdir(parents=True, exist_ok=True)

    cells = plan.cells()
    total = len(cells)
    frames: list[pd.DataFrame] = []
    for cell in cells:
        if should_stop is not None and should_stop():
            logger.info("Stopped before cell %d/%d.", cell.cell, total)
            break

        if cache is not None and resume:
            existing = _find_checkpoint(cache, cell.cell)
            if existing is not None:
                logger.info("Cell %d/%d: loaded from cache.", cell.cell, total)
                frames.append(_read_checkpoint(existing))
                continue

        query = _cell_query(cell.query, cell.year, cell.date)
        logger.info("Cell %d/%d: fetching %s", cell.cell, total, query)
        search = ScopusSearch(query, view=cell.view, cursor=True, **kwargs)
        frame = to_records(search.results, query=query)

        if cache is not None:
            _write_checkpoint(frame, cache, cell.cell, format)
        frames.append(frame)

    if not frames:
        return pd.DataFrame(columns=RECORD_COLUMNS)
    out = pd.concat(frames, ignore_index=True)
    out["entry_number"] = range(1, len(out) + 1)
    logger.info("Retrieved %d records.", len(out))
    return out

scopus_abstract

scopus_abstract(ids, by='doi', view='META', **kwargs)

Retrieve abstracts for one or many ids, resilient per id.

by selects the lookup type ("doi", "eid" or "scopus_id"). Any id that fails is warned about and yields an all-NA row that still records the id.

Source code in src/scopusflow/abstract.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def scopus_abstract(
    ids,
    by: str = "doi",
    view: str = "META",
    **kwargs,
) -> pd.DataFrame:
    """Retrieve abstracts for one or many ids, resilient per id.

    ``by`` selects the lookup type ("doi", "eid" or "scopus_id"). Any id that
    fails is warned about and yields an all-NA row that still records the id.
    """
    if by not in _ID_TYPES:
        raise ValueError("by must be one of 'doi', 'eid', 'scopus_id'.")
    id_type = _ID_TYPES[by]
    id_column = "doi" if by == "doi" else "scopus_id"

    if isinstance(ids, str):
        ids = [ids]

    # Resolve the dependency once: a missing pybliometrics is a setup error and
    # must surface clearly, not masquerade as every id failing to retrieve.
    from pybliometrics.scopus import AbstractRetrieval  # lazy; needs a key

    rows = []
    for ident in ids:
        try:
            ab = AbstractRetrieval(ident, id_type=id_type, view=view, **kwargs)
            rows.append(_abstract_row(ab))
        except Exception:  # one bad id must not sink the batch
            warnings.warn(
                f"Could not retrieve abstract for {ident!r}; recording NA row.",
                stacklevel=2,
            )
            row = {col: pd.NA for col in ABSTRACT_COLUMNS}
            row[id_column] = ident
            rows.append(row)
    return pd.DataFrame(rows, columns=ABSTRACT_COLUMNS)