Source code for jupyter_format._reader

"""Functions for reading Jupyter files."""
import json
import re

import nbformat

from . import ParseError
from ._common import RE_JSON


[docs]def deserialize(source): """Convert ``.jupyter`` string representation to Jupyter notebook. Lines have to be terminated with ``'\\n'`` (a.k.a. :term:`universal newlines` mode). If *source* is an iterable, line terminators may be omitted. :param source: Content of ``.jupyter`` file. :type source: str or iterable of str :returns: A notebook node. :rtype: nbformat.NotebookNode """ lines = SourceLines(source) try: nb = parse(lines) except ParseError as e: if len(e.args) == 1: # Add line number e.args += lines.current, elif len(e.args) == 2: # Apply line number offset e.args = e.args[0], lines.current - e.args[1] raise e except Exception as e: raise ParseError(type(e).__name__ + ': ' + str(e), lines.current) return nb
def parse(lines): nb = header(lines) for line in lines: if word('markdown', line): cell = nbformat.v4.new_markdown_cell() elif line.startswith('code'): cell = nbformat.v4.new_code_cell() if line not in ('code', 'code '): cell.execution_count = word_plus_integer('code', line) elif word('raw', line): cell = nbformat.v4.new_raw_cell() elif word('notebook_metadata', line): nb.metadata = metadata(lines) for _ in lines: raise ParseError( 'All notebook metadata lines must be indented by 4 spaces ' 'and no subsequent lines are allowed') break else: raise ParseError( "Expected (unindented) cell type or 'notebook_metadata', " "got {!r}".format(line)) cell.source = indented_block(lines) for line in indented(1, lines): if word('cell_metadata', line): cell.metadata = metadata(lines) # NB: cell metadata must be at the end break if cell.cell_type in ('markdown', 'raw'): # attachments (since v4.1) attachment(line, lines, cell) elif cell.cell_type == 'code': code_output(line, lines, cell) nb.cells.append(cell) return nb def header(lines): nb = nbformat.v4.new_notebook() for line in lines: nb.nbformat = word_plus_integer('nbformat', line) if nb.nbformat != 4: raise ParseError('Only v4 notebooks are currently supported') break else: raise ParseError('First line must be "nbformat X"') for line in lines: nb.nbformat_minor = word_plus_integer('nbformat_minor', line) break else: raise ParseError('Second line must be "nbformat_minor Y"') return nb def attachment(line, lines, cell): if not line.startswith('attachment'): raise ParseError( "Only 'attachment' is allowed here, not {!r}".format(line)) if not hasattr(cell, 'attachments'): cell.attachments = {} name = word_plus_string('attachment', line) if name in cell.attachments: raise ParseError( 'Duplicate attachment name: {!r}'.format(name)) cell.attachments[name] = mime_bundle(lines) def code_output(line, lines, cell): kwargs = {} if line.startswith('stream'): output_type = 'stream' # NB: "name" is required! kwargs['name'] = word_plus_string('stream', line) kwargs['text'] = indented_block(lines) elif (word('display_data', line) or word('execute_result', line)): output_type = line if output_type == 'execute_result': kwargs['execution_count'] = cell.execution_count kwargs['data'] = mime_bundle(lines) for line in indented(2, lines): if not word('output_metadata', line): raise ParseError( "Only 'output_metadata' is allowed here, not {!r}" .format(line)) kwargs['metadata'] = metadata(lines) break elif line.startswith('error'): output_type = 'error' # NB: All fields are required kwargs['ename'] = word_plus_string('error', line) # TODO: check for non-empty? kwargs['evalue'] = indented_block(lines) kwargs['traceback'] = traceback(lines) else: raise ParseError( 'Expected output type, got {!r}'.format(line)) for line in indented(2, lines): raise ParseError('Invalid output data: {!r}'.format(line)) out = nbformat.v4.new_output(output_type, **kwargs) cell.outputs.append(out) def traceback(lines): for line in indented(2, lines): if not word('traceback', line): raise ParseError( "Expected 'traceback', got {!r}".format(line)) traceback = [] while True: frame = indented_block(lines) traceback.append(frame) for line in indented(3, lines): if word('-', line): break raise ParseError( 'Invalid traceback separator: {!r}'.format(line)) else: break return traceback raise ParseError("Missing 'traceback'") def mime_bundle(lines): bundle = {} for line in indented(3, lines): mime_type = line # TODO: allow whitespace? if mime_type != mime_type.strip(): # TODO: better error message? raise ParseError('Invalid MIME type: {!r}'.format(mime_type)) # TODO: check for repeated MIME type? content = indented_block(lines) if content: content += '\n' if RE_JSON.match(mime_type): bundle[mime_type] = parse_json(content) else: if content and content.endswith('\n') and content.strip('\n'): content = content[:-1] bundle[mime_type] = content return bundle def metadata(lines): return parse_json(indented_block(lines)) def parse_json(text): if not text: return {} try: data = json.loads(text) except json.JSONDecodeError as e: # Abuse JSONDecodeError constructor to calculate number of lines: total = json.JSONDecodeError('', text, -1).lineno raise ParseError( 'JSON error in column {}: {}'.format(e.colno + 4, e.msg), total - e.lineno + 1) return data def indented_block(lines): return '\n'.join(indented(4, lines)) def word_plus_integer(word, line): m = re.match(word + ' ([0-9]|[1-9][0-9]+)$', line) if not m: raise ParseError( 'Expected {!r} followed by a space and an integer'.format(word)) return int(m.group(1)) def word_plus_string(word, line): chars = len(word) # TODO: check if line[chars + 1] is a space? # TODO: use split() or partition()? if len(line) < chars + 2 or line[chars] != ' ': raise ParseError('Missing string after {!r}'.format(word)) return line[chars + 1:] def word(word, line): if line.startswith(word): if line != word: raise ParseError('No text allowed after {!r}'.format(word)) return True else: return False class SourceLines: """Iterator over source lines. Strips trailing newlines, tracks current line number, allows peeking. """ def __init__(self, source): if isinstance(source, str): source = source.splitlines() self._iter = iter(source) self.current = -1 self.advance() def peek(self): if isinstance(self._next, StopIteration): raise self._next return self._next def advance(self): try: line = next(self._iter) if line.endswith('\n'): line = line[:-1] self._next = line except StopIteration as e: self._next = e self.current += 1 def __iter__(self): return self def __next__(self): line = self.peek() self.advance() return line def indented(indentation, lines): """Iterator adaptor which stops if there is less indentation. Blank lines are forwarded as empty lines. """ while True: try: line = lines.peek() except StopIteration: break if line.startswith(' ' * indentation): line = line[indentation:] elif not line.strip(): line = '' # Blank line else: break lines.advance() yield line