Codebase list python-hl7 / 15a23db
Update upstream source from tag 'upstream/0.4.5' Update to upstream version '0.4.5' with Debian dir f34c6f3fddcf0c9d90bec4d83aab46a0b799ad3b Andreas Tille 1 year, 9 months ago
23 changed file(s) with 823 addition(s) and 591 deletion(s). Raw diff Collapse all Expand all
66
77 on:
88 push:
9 branches: [master]
9 branches: [main]
1010 pull_request:
1111 # The branches below must be a subset of the branches above
12 branches: [master]
12 branches: [main]
1313 schedule:
1414 - cron: '0 16 * * 1'
1515
1010 runs-on: ubuntu-latest
1111 strategy:
1212 matrix:
13 python-version: [3.5, 3.6, 3.7, 3.8, 3.9]
13 python-version: [3.7, 3.8, 3.9, "3.10"]
1414
1515 steps:
1616 - uses: actions/checkout@v2
66 SPHINXBUILD = $(shell pwd)/env/bin/sphinx-build
77
88 env: requirements.txt setup.py
9 test -f $(PYTHON) || virtualenv env
9 test -f $(PYTHON) || python3 -m venv env
1010 $(PIP) install -U -r requirements.txt
1111 $(PYTHON) setup.py develop
1212
5252 BLACK_ARGS=--check
5353 endif
5454 format:
55 $(BIN)/isort -rc $(ISORT_ARGS) hl7 tests
55 $(BIN)/isort $(ISORT_ARGS) hl7 tests
5656 $(BIN)/black $(BLACK_ARGS) hl7 tests
5757 .PHONY: isort
5858
5959 upload:
6060 rm -rf dist
6161 $(PYTHON) setup.py sdist bdist_wheel
62 twine upload dist/*
62 $(BIN)/twine upload dist/*
6363 .PHONY: upload
162162
163163 >>> SEP = '|^~\&'
164164 >>> CR_SEP = '\r'
165 >>> MSH = hl7.Segment(SEP[0], [hl7.Field(SEP[1], ['MSH'])])
166 >>> MSA = hl7.Segment(SEP[0], [hl7.Field(SEP[1], ['MSA'])])
165 >>> MSH = hl7.Segment(SEP[0], [hl7.Field(SEP[2], ['MSH'])])
166 >>> MSA = hl7.Segment(SEP[0], [hl7.Field(SEP[2], ['MSA'])])
167167 >>> response = hl7.Message(CR_SEP, [MSH, MSA])
168168 >>> response['MSH.F1.R1'] = SEP[0]
169169 >>> response['MSH.F2.R1'] = SEP[1:]
254254
255255 **Presentation Characters**
256256
257 HL7 defines a protocol for encoding presentation characters, These include hightlighting,
257 HL7 defines a protocol for encoding presentation characters, These include highlighting,
258258 and rich text functionality. The API does not currently allow for easy access to the
259259 escape/unescape logic. You must overwrite the message class escape and unescape methods,
260260 after parsing the message.
00 Changelog
11 =========
2
3 0.4.5 - March 2022
4 ---------------------
5
6 * Better support for :py:class:`HL7StreamProtocol` in Python 3.7, which lacks
7 `_reject_connection`
8
9 Thanks `Joseph Wortmann <https://github.com/joseph-wortmann>`_!
10
11
12 0.4.3 - March 2022
13 ---------------------
14
15 * Dropped support for Python 3.5 & 3.6. Python 3.7 - 3.10 now supported.
16 * Ensure :py:func:`hl7.parse_hl7` allows legitimate occurrences of "MSH" inside
17 the message contents
18
19 Thanks `Andrew Wason <https://github.com/rectalogic>`_!
20
221
322 0.4.2 - February 2021
423 ---------------------
103103 "github_repo": "python-hl7",
104104 "codecov_button": True,
105105 "github_banner": True,
106 "badge_branch": "main",
106107 # "page_width": "940",
107108 }
108109
246247 # The format is a list of tuples containing the path and title.
247248 # epub_pre_files = []
248249
249 # HTML files shat should be inserted after the pages created by sphinx.
250 # HTML files that should be inserted after the pages created by sphinx.
250251 # The format is a list of tuples containing the path and title.
251252 # epub_post_files = []
252253
88 HL7 is a communication protocol and message format for
99 health care data. It is the de-facto standard for transmitting data
1010 between clinical information systems and between clinical devices.
11 The version 2.x series, which is often is a pipe delimited format
11 The version 2.x series, which is often in a pipe delimited format,
1212 is currently the most widely accepted version of HL7 (there
1313 is an alternative XML-based format).
1414
163163 True
164164
165165 Since many many types of segments only have a single instance in a message
166 (e.g. PID or MSH), :py:meth:`hl7.Message.segment` provides a convienance
166 (e.g. PID or MSH), :py:meth:`hl7.Message.segment` provides a convenience
167167 wrapper around :py:meth:`hl7.Message.segments` that returns the first matching
168168 :py:class:`hl7.Segment`:
169169
232232 Python 2 vs Python 3 and Unicode vs Byte strings
233233 -------------------------------------------------
234234
235 python-hl7 supports Python 3.5+ and primarily deals with the unicode ``str`` type.
235 python-hl7 supports Python 3.7+ and primarily deals with the unicode ``str`` type.
236236
237237 Passing bytes to :py:func:`hl7.parse`, requires setting the
238238 ``encoding`` parameter, if using anything other than UTF-8. :py:func:`hl7.parse`
77 MalformedFileException,
88 MalformedSegmentException,
99 )
10 from .util import generate_message_control_id
10 from .util import escape, generate_message_control_id, unescape
1111
1212 logger = logging.getLogger(__file__)
1313
4848 def __init__(
4949 self, separator, sequence=[], esc="\\", separators="\r|~^&", factory=None
5050 ):
51 assert separator in separators
5152 # Initialize the list object, optionally passing in the
5253 # sequence. Since list([]) == [], using the default
5354 # parameter will not cause any issues.
5657 self.esc = esc
5758 self.separators = separators
5859 self.factory = factory if factory is not None else Factory
60
61 def create_file(self, seq):
62 """Create a new :py:class:`hl7.File` compatible with this container"""
63 return self.factory.create_file(
64 sequence=seq,
65 esc=self.esc,
66 separators=self.separators,
67 factory=self.factory,
68 )
69
70 def create_batch(self, seq):
71 """Create a new :py:class:`hl7.Batch` compatible with this container"""
72 return self.factory.create_batch(
73 sequence=seq,
74 esc=self.esc,
75 separators=self.separators,
76 factory=self.factory,
77 )
78
79 def create_message(self, seq):
80 """Create a new :py:class:`hl7.Message` compatible with this container"""
81 return self.factory.create_message(
82 sequence=seq,
83 esc=self.esc,
84 separators=self.separators,
85 factory=self.factory,
86 )
87
88 def create_segment(self, seq):
89 """Create a new :py:class:`hl7.Segment` compatible with this container"""
90 return self.factory.create_segment(
91 sequence=seq,
92 esc=self.esc,
93 separators=self.separators,
94 factory=self.factory,
95 )
96
97 def create_field(self, seq):
98 """Create a new :py:class:`hl7.Field` compatible with this container"""
99 return self.factory.create_field(
100 sequence=seq,
101 esc=self.esc,
102 separators=self.separators,
103 factory=self.factory,
104 )
105
106 def create_repetition(self, seq):
107 """Create a new :py:class:`hl7.Repetition` compatible with this container"""
108 return self.factory.create_repetition(
109 sequence=seq,
110 esc=self.esc,
111 separators=self.separators,
112 factory=self.factory,
113 )
114
115 def create_component(self, seq):
116 """Create a new :py:class:`hl7.Component` compatible with this container"""
117 return self.factory.create_component(
118 sequence=seq,
119 esc=self.esc,
120 separators=self.separators,
121 factory=self.factory,
122 )
59123
60124 def __getitem__(self, item):
61125 # Python slice operator was returning a regular list, not a
80144 return self.separator.join((str(x) for x in self))
81145
82146
83 class BuilderMixin(object):
84 """Mixin class that allows for the create functions
85 in the top-level container classes
86 """
87
88 def create_file(self, seq):
89 """Create a new :py:class:`hl7.File` compatible with this container"""
90 return self.factory.create_file(
91 self.separators[0],
92 seq,
93 esc=self.esc,
94 separators=self.separators,
95 factory=self.factory,
96 )
97
98 def create_batch(self, seq):
99 """Create a new :py:class:`hl7.Batch` compatible with this container"""
100 return self.factory.create_batch(
101 self.separators[0],
102 seq,
103 esc=self.esc,
104 separators=self.separators,
105 factory=self.factory,
106 )
107
108 def create_message(self, seq):
109 """Create a new :py:class:`hl7.Message` compatible with this container"""
110 return self.factory.create_message(
111 self.separators[0],
112 seq,
113 esc=self.esc,
114 separators=self.separators,
115 factory=self.factory,
116 )
117
118 def create_segment(self, seq):
119 """Create a new :py:class:`hl7.Segment` compatible with this container"""
120 return self.factory.create_segment(
121 self.separators[1],
122 seq,
123 esc=self.esc,
124 separators=self.separators[1:],
125 factory=self.factory,
126 )
127
128 def create_field(self, seq):
129 """Create a new :py:class:`hl7.Field` compatible with this container"""
130 return self.factory.create_field(
131 self.separators[2],
132 seq,
133 esc=self.esc,
134 separators=self.separators[2:],
135 factory=self.factory,
136 )
137
138 def create_repetition(self, seq):
139 """Create a new :py:class:`hl7.Repetition` compatible with this container"""
140 return self.factory.create_repetition(
141 self.separators[3],
142 seq,
143 esc=self.esc,
144 separators=self.separators[3:],
145 factory=self.factory,
146 )
147
148 def create_component(self, seq):
149 """Create a new :py:class:`hl7.Component` compatible with this container"""
150 return self.factory.create_component(
151 self.separators[4],
152 seq,
153 esc=self.esc,
154 separators=self.separators[4:],
155 factory=self.factory,
156 )
157
158
159 class File(Container, BuilderMixin):
147 class File(Container):
160148 """Representation of an HL7 file from the batch protocol.
161149 It contains a list of :py:class:`hl7.Batch`
162150 instances. It may contain FHS/FTS :py:class:`hl7.Segment` instances.
166154 """
167155
168156 def __init__(
169 self, separator, sequence=[], esc="\\", separators="\r|~^&", factory=None
170 ):
157 self, separator=None, sequence=[], esc="\\", separators="\r|~^&", factory=None
158 ):
159 assert not separator or separator == separators[0]
171160 super(File, self).__init__(
172 separator,
161 separator=separators[0],
173162 sequence=sequence,
174163 esc=esc,
175164 separators=separators,
246235 )
247236
248237
249 class Batch(Container, BuilderMixin):
238 class Batch(Container):
250239 """Representation of an HL7 batch from the batch protocol.
251240 It contains a list of :py:class:`hl7.Message` instances.
252241 It may contain BHS/BTS :py:class:`hl7.Segment` instances.
256245 """
257246
258247 def __init__(
259 self, separator, sequence=[], esc="\\", separators="\r|~^&", factory=None
260 ):
248 self, separator=None, sequence=[], esc="\\", separators="\r|~^&", factory=None
249 ):
250 assert not separator or separator == separators[0]
261251 super(Batch, self).__init__(
262 separator,
252 separator=separators[0],
263253 sequence=sequence,
264254 esc=esc,
265255 separators=separators,
336326 )
337327
338328
339 class Message(Container, BuilderMixin):
329 class Message(Container):
330 def __init__(
331 self, separator=None, sequence=[], esc="\\", separators="\r|~^&", factory=None
332 ):
333 assert not separator or separator == separators[0]
334 super(Message, self).__init__(
335 separator=separators[0],
336 sequence=sequence,
337 esc=esc,
338 separators=separators,
339 factory=factory,
340 )
341
340342 """Representation of an HL7 message. It contains a list
341343 of :py:class:`hl7.Segment` instances.
342344 """
378380 If key is an integer, ``__setitem__`` acts list a list, setting
379381 the :py:class:`hl7.Segment` held at that index:
380382
381 >>> h[1] = hl7.Segment("|", [hl7.Field("^", ['PID'], [''])])
383 >>> h[1] = hl7.Segment("|", [hl7.Field("~", ['PID'], [''])])
382384
383385 If the key is a string of length greater than 3,
384386 the key is parsed into an :py:class:`hl7.Accessor` and passed
438440 subcomponent_num=1,
439441 ):
440442 """
441 Extract a field using a future proofed approach, based on rules in:
442 http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing
443
444 'PID|Field1|Component1^Component2|Component1^Sub-Component1&Sub-Component2^Component3|Repeat1~Repeat2',
445
446 | PID.F3.R1.C2.S2 = 'Sub-Component2'
447 | PID.F4.R2.C1 = 'Repeat1'
448
449 Compatibility Rules:
450
451 If the parse tree is deeper than the specified path continue
452 following the first child branch until a leaf of the tree is
453 encountered and return that value (which could be blank).
454
455 Example:
456
457 | PID.F3.R1.C2 = 'Sub-Component1' (assume .SC1)
458
459 If the parse tree terminates before the full path is satisfied
460 check each of the subsequent paths and if every one is specified
461 at position 1 then the leaf value reached can be returned as the
462 result.
463
464 | PID.F4.R1.C1.SC1 = 'Repeat1' (ignore .SC1)
465 """
466 # Save original values for error messages
467 accessor = Accessor(
468 segment, segment_num, field_num, repeat_num, component_num, subcomponent_num
469 )
470
471 field_num = field_num or 1
472 repeat_num = repeat_num or 1
473 component_num = component_num or 1
474 subcomponent_num = subcomponent_num or 1
475
476 segment = self.segments(segment)(segment_num)
477 if field_num < len(segment):
478 field = segment(field_num)
479 else:
480 if repeat_num == 1 and component_num == 1 and subcomponent_num == 1:
481 return "" # Assume non-present optional value
482 raise IndexError("Field not present: {0}".format(accessor.key))
483
484 rep = field(repeat_num)
485
486 if not isinstance(rep, Repetition):
487 # leaf
488 if component_num == 1 and subcomponent_num == 1:
489 return (
490 rep
491 if accessor.segment == "MSH" and accessor.field_num in (1, 2)
492 else self.unescape(rep)
493 )
494 raise IndexError(
495 "Field reaches leaf node before completing path: {0}".format(
496 accessor.key
497 )
498 )
499
500 if component_num > len(rep):
501 if subcomponent_num == 1:
502 return "" # Assume non-present optional value
503 raise IndexError("Component not present: {0}".format(accessor.key))
504
505 component = rep(component_num)
506 if not isinstance(component, Component):
507 # leaf
508 if subcomponent_num == 1:
509 return self.unescape(component)
510 raise IndexError(
511 "Field reaches leaf node before completing path: {0}".format(
512 accessor.key
513 )
514 )
515
516 if subcomponent_num <= len(component):
517 subcomponent = component(subcomponent_num)
518 return self.unescape(subcomponent)
519 else:
520 return "" # Assume non-present optional value
443 Extract a field using a future proofed approach, based on rules in:
444 http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing
445
446 'PID|Field1|Component1^Component2|Component1^Sub-Component1&Sub-Component2^Component3|Repeat1~Repeat2',
447
448 | PID.F3.R1.C2.S2 = 'Sub-Component2'
449 | PID.F4.R2.C1 = 'Repeat1'
450
451 Compatibility Rules:
452
453 If the parse tree is deeper than the specified path continue
454 following the first child branch until a leaf of the tree is
455 encountered and return that value (which could be blank).
456
457 Example:
458
459 | PID.F3.R1.C2 = 'Sub-Component1' (assume .SC1)
460
461 If the parse tree terminates before the full path is satisfied
462 check each of the subsequent paths and if every one is specified
463 at position 1 then the leaf value reached can be returned as the
464 result.
465
466 | PID.F4.R1.C1.SC1 = 'Repeat1' (ignore .SC1)
467 """
468 return self.segments(segment)(segment_num).extract_field(
469 segment_num, field_num, repeat_num, component_num, subcomponent_num
470 )
521471
522472 def assign_field(
523473 self,
530480 subcomponent_num=None,
531481 ):
532482 """
533 Assign a value into a message using the tree based assignment notation.
534 The segment must exist.
535
536 Extract a field using a future proofed approach, based on rules in:
537 http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing
538 """
539 segment = self.segments(segment)(segment_num)
540
541 while len(segment) <= field_num:
542 segment.append(self.create_field([]))
543 field = segment(field_num)
483 Assign a value into a message using the tree based assignment notation.
484 The segment must exist.
485
486 Extract a field using a future proofed approach, based on rules in:
487 http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing
488 """
489 self.segments(segment)(segment_num).assign_field(
490 value, field_num, repeat_num, component_num, subcomponent_num
491 )
492
493 def escape(self, field, app_map=None):
494 """
495 See: http://www.hl7standards.com/blog/2006/11/02/hl7-escape-sequences/
496
497 To process this correctly, the full set of separators (MSH.1/MSH.2) needs to be known.
498
499 Pass through the message. Replace recognised characters with their escaped
500 version. Return an ascii encoded string.
501
502 Functionality:
503
504 * Replace separator characters (2.10.4)
505 * replace application defined characters (2.10.7)
506 * Replace non-ascii values with hex versions using HL7 conventions.
507
508 Incomplete:
509
510 * replace highlight characters (2.10.3)
511 * How to handle the rich text substitutions.
512 * Merge contiguous hex values
513 """
514 return escape(self, field, app_map)
515
516 def unescape(self, field, app_map=None):
517 """
518 See: http://www.hl7standards.com/blog/2006/11/02/hl7-escape-sequences/
519
520 To process this correctly, the full set of separators (MSH.1/MSH.2) needs to be known.
521
522 This will convert the identifiable sequences.
523 If the application provides mapping, these are also used.
524 Items which cannot be mapped are removed
525
526 For example, the App Map count provide N, H, Zxxx values
527
528 Chapter 2: Section 2.10
529
530 At the moment, this functionality can:
531
532 * replace the parsing characters (2.10.4)
533 * replace highlight characters (2.10.3)
534 * replace hex characters. (2.10.5)
535 * replace rich text characters (2.10.6)
536 * replace application defined characters (2.10.7)
537
538 It cannot:
539
540 * switch code pages / ISO IR character sets
541 """
542 return unescape(self, field, app_map)
543
544 def create_ack(
545 self, ack_code="AA", message_id=None, application=None, facility=None
546 ):
547 """
548 Create an hl7 ACK response :py:class:`hl7.Message`, per spec 2.9.2, for this message.
549
550 See http://www.hl7standards.com/blog/2007/02/01/ack-message-original-mode-acknowledgement/
551
552 ``ack_code`` options are one of `AA` (Application Accept), `AR` (Application Reject),
553 `AE` (Application Error), `CA` (Commit Accept - Enhanced Mode),
554 `CR` (Commit Reject - Enhanced Mode), or `CE` (Commit Error - Enhanced Mode)
555 (see HL7 Table 0008 - Acknowledgment Code)
556 ``message_id`` control message ID for ACK, defaults to unique generated ID
557 ``application`` name of sending application, defaults to receiving application of message
558 ``facility`` name of sending facility, defaults to receiving facility of message
559 """
560 source_msh = self.segment("MSH")
561 msh = self.create_segment([self.create_field(["MSH"])])
562
563 msh.assign_field(str(source_msh(1)), 1)
564 msh.assign_field(str(source_msh(2)), 2)
565 # Sending application is source receving application
566 msh.assign_field(
567 str(application) if application is not None else str(source_msh(5)), 3
568 )
569 # Sending facility is source receving facility
570 msh.assign_field(
571 str(facility) if facility is not None else str(source_msh(6)), 4
572 )
573 # Receiving application is source sending application
574 msh.assign_field(str(source_msh(3)), 5)
575 # Receiving facility is source sending facility
576 msh.assign_field(str(source_msh(4)), 6)
577 msh.assign_field(str(datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S")), 7)
578 # Message type code
579 msh.assign_field("ACK", 9, 1, 1)
580 # Copy trigger event from source
581 msh.assign_field(str(source_msh(9)(1)(2)), 9, 1, 2)
582 msh.assign_field("ACK", 9, 1, 3)
583 msh.assign_field(
584 message_id if message_id is not None else generate_message_control_id(), 10
585 )
586 msh.assign_field(str(source_msh(11)), 11)
587 msh.assign_field(str(source_msh(12)), 12)
588
589 msa = self.create_segment([self.create_field(["MSA"])])
590 msa.assign_field(str(ack_code), 1)
591 msa.assign_field(str(source_msh(10)), 2)
592 ack = self.create_message([msh, msa])
593
594 return ack
595
596 def __str__(self):
597 """Join a the child containers into a single string, separated
598 by the self.separator. This method acts recursively, calling
599 the children's __unicode__ method. Thus ``unicode()`` is the
600 approriate method for turning the python-hl7 representation of
601 HL7 into a standard string.
602
603 >>> str(hl7.parse(message)) == message
604 True
605
606 """
607 # Per spec, Message Construction Rules, Section 2.6 (v2.8), Message ends
608 # with the carriage return
609 return super(Message, self).__str__() + self.separator
610
611
612 class Segment(Container):
613 def __init__(
614 self, separator=None, sequence=[], esc="\\", separators="\r|~^&", factory=None
615 ):
616 assert not separator or separator == separators[1]
617 super(Segment, self).__init__(
618 separator=separators[1],
619 sequence=sequence,
620 esc=esc,
621 separators=separators,
622 factory=factory,
623 )
624
625 """Second level of an HL7 message, which represents an HL7 Segment.
626 Traditionally this is a line of a message that ends with a carriage
627 return and is separated by pipes. It contains a list of
628 :py:class:`hl7.Field` instances.
629 """
630
631 def extract_field(
632 self,
633 segment_num=1,
634 field_num=1,
635 repeat_num=1,
636 component_num=1,
637 subcomponent_num=1,
638 ):
639 """
640 Extract a field using a future proofed approach, based on rules in:
641 http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing
642
643 'PID|Field1|Component1^Component2|Component1^Sub-Component1&Sub-Component2^Component3|Repeat1~Repeat2',
644
645 | F3.R1.C2.S2 = 'Sub-Component2'
646 | F4.R2.C1 = 'Repeat1'
647
648 Compatibility Rules:
649
650 If the parse tree is deeper than the specified path continue
651 following the first child branch until a leaf of the tree is
652 encountered and return that value (which could be blank).
653
654 Example:
655
656 | F3.R1.C2 = 'Sub-Component1' (assume .SC1)
657
658 If the parse tree terminates before the full path is satisfied
659 check each of the subsequent paths and if every one is specified
660 at position 1 then the leaf value reached can be returned as the
661 result.
662
663 | F4.R1.C1.SC1 = 'Repeat1' (ignore .SC1)
664 """
665 # Save original values for error messages
666 accessor = Accessor(
667 self[0][0],
668 segment_num,
669 field_num,
670 repeat_num,
671 component_num,
672 subcomponent_num,
673 )
674
675 field_num = field_num or 1
676 repeat_num = repeat_num or 1
677 component_num = component_num or 1
678 subcomponent_num = subcomponent_num or 1
679
680 if field_num < len(self):
681 field = self(field_num)
682 else:
683 if repeat_num == 1 and component_num == 1 and subcomponent_num == 1:
684 return "" # Assume non-present optional value
685 raise IndexError("Field not present: {0}".format(accessor.key))
686
687 rep = field(repeat_num)
688
689 if not isinstance(rep, Repetition):
690 # leaf
691 if component_num == 1 and subcomponent_num == 1:
692 return (
693 rep
694 if accessor.segment == "MSH" and accessor.field_num in (1, 2)
695 else unescape(self, rep)
696 )
697 raise IndexError(
698 "Field reaches leaf node before completing path: {0}".format(
699 accessor.key
700 )
701 )
702
703 if component_num > len(rep):
704 if subcomponent_num == 1:
705 return "" # Assume non-present optional value
706 raise IndexError("Component not present: {0}".format(accessor.key))
707
708 component = rep(component_num)
709 if not isinstance(component, Component):
710 # leaf
711 if subcomponent_num == 1:
712 return unescape(self, component)
713 raise IndexError(
714 "Field reaches leaf node before completing path: {0}".format(
715 accessor.key
716 )
717 )
718
719 if subcomponent_num <= len(component):
720 subcomponent = component(subcomponent_num)
721 return unescape(self, subcomponent)
722 else:
723 return "" # Assume non-present optional value
724
725 def assign_field(
726 self,
727 value,
728 field_num=None,
729 repeat_num=None,
730 component_num=None,
731 subcomponent_num=None,
732 ):
733 """
734 Assign a value into a message using the tree based assignment notation.
735 The segment must exist.
736
737 Extract a field using a future proofed approach, based on rules in:
738 http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing
739 """
740
741 while len(self) <= field_num:
742 self.append(self.create_field([]))
743 field = self(field_num)
544744 if repeat_num is None:
545745 field[:] = [value]
546746 return
560760 component.append("")
561761 component(subcomponent_num, value)
562762
563 def escape(self, field, app_map=None):
564 """
565 See: http://www.hl7standards.com/blog/2006/11/02/hl7-escape-sequences/
566
567 To process this correctly, the full set of separators (MSH.1/MSH.2) needs to be known.
568
569 Pass through the message. Replace recognised characters with their escaped
570 version. Return an ascii encoded string.
571
572 Functionality:
573
574 * Replace separator characters (2.10.4)
575 * replace application defined characters (2.10.7)
576 * Replace non-ascii values with hex versions using HL7 conventions.
577
578 Incomplete:
579
580 * replace highlight characters (2.10.3)
581 * How to handle the rich text substitutions.
582 * Merge contiguous hex values
583 """
584 if not field:
585 return field
586
587 esc = str(self.esc)
588
589 DEFAULT_MAP = {
590 self.separators[1]: "F", # 2.10.4
591 self.separators[2]: "R",
592 self.separators[3]: "S",
593 self.separators[4]: "T",
594 self.esc: "E",
595 "\r": ".br", # 2.10.6
596 }
597
598 rv = []
599 for offset, c in enumerate(field):
600 if app_map and c in app_map:
601 rv.append(esc + app_map[c] + esc)
602 elif c in DEFAULT_MAP:
603 rv.append(esc + DEFAULT_MAP[c] + esc)
604 elif ord(c) >= 0x20 and ord(c) <= 0x7E:
605 rv.append(c)
606 else:
607 rv.append("%sX%2x%s" % (esc, ord(c), esc))
608
609 return "".join(rv)
610
611 def unescape(self, field, app_map=None): # noqa: C901
612 """
613 See: http://www.hl7standards.com/blog/2006/11/02/hl7-escape-sequences/
614
615 To process this correctly, the full set of separators (MSH.1/MSH.2) needs to be known.
616
617 This will convert the identifiable sequences.
618 If the application provides mapping, these are also used.
619 Items which cannot be mapped are removed
620
621 For example, the App Map count provide N, H, Zxxx values
622
623 Chapter 2: Section 2.10
624
625 At the moment, this functionality can:
626
627 * replace the parsing characters (2.10.4)
628 * replace highlight characters (2.10.3)
629 * replace hex characters. (2.10.5)
630 * replace rich text characters (2.10.6)
631 * replace application defined characters (2.10.7)
632
633 It cannot:
634
635 * switch code pages / ISO IR character sets
636 """
637 if not field or field.find(self.esc) == -1:
638 return field
639
640 DEFAULT_MAP = {
641 "H": "_", # Override using the APP MAP: 2.10.3
642 "N": "_", # Override using the APP MAP
643 "F": self.separators[1], # 2.10.4
644 "R": self.separators[2],
645 "S": self.separators[3],
646 "T": self.separators[4],
647 "E": self.esc,
648 ".br": "\r", # 2.10.6
649 ".sp": "\r",
650 ".fi": "",
651 ".nf": "",
652 ".in": " ",
653 ".ti": " ",
654 ".sk": " ",
655 ".ce": "\r",
656 }
657
658 rv = []
659 collecting = []
660 in_seq = False
661 for offset, c in enumerate(field):
662 if in_seq:
663 if c == self.esc:
664 in_seq = False
665 value = "".join(collecting)
666 collecting = []
667 if not value:
668 logger.warn(
669 "Error unescaping value [%s], empty sequence found at %d",
670 field,
671 offset,
672 )
673 continue
674 if app_map and value in app_map:
675 rv.append(app_map[value])
676 elif value in DEFAULT_MAP:
677 rv.append(DEFAULT_MAP[value])
678 elif value.startswith(".") and (
679 (app_map and value[:3] in app_map) or value[:3] in DEFAULT_MAP
680 ):
681 # Substitution with a number of repetitions defined (2.10.6)
682 if app_map and value[:3] in app_map:
683 ch = app_map[value[:3]]
684 else:
685 ch = DEFAULT_MAP[value[:3]]
686 count = int(value[3:])
687 rv.append(ch * count)
688
689 elif (
690 value[0] == "C"
691 ): # Convert to new Single Byte character set : 2.10.2
692 # Two HEX values, first value chooses the character set (ISO-IR), second gives the value
693 logger.warn(
694 "Error inline character sets [%s] not implemented, field [%s], offset [%s]",
695 value,
696 field,
697 offset,
698 )
699 elif (
700 value[0] == "M"
701 ): # Switch to new Multi Byte character set : 2.10.2
702 # Three HEX values, first value chooses the character set (ISO-IR), rest give the value
703 logger.warn(
704 "Error inline character sets [%s] not implemented, field [%s], offset [%s]",
705 value,
706 field,
707 offset,
708 )
709 elif value[0] == "X": # Hex encoded Bytes: 2.10.5
710 value = value[1:]
711 try:
712 for off in range(0, len(value), 2):
713 rv.append(chr(int(value[off : off + 2], 16)))
714 except Exception:
715 logger.exception(
716 "Error decoding hex value [%s], field [%s], offset [%s]",
717 value,
718 field,
719 offset,
720 )
721 else:
722 logger.exception(
723 "Error decoding value [%s], field [%s], offset [%s]",
724 value,
725 field,
726 offset,
727 )
728 else:
729 collecting.append(c)
730 elif c == self.esc:
731 in_seq = True
732 else:
733 rv.append(str(c))
734
735 return "".join(rv)
736
737 def create_ack(
738 self, ack_code="AA", message_id=None, application=None, facility=None
739 ):
740 """
741 Create an hl7 ACK response :py:class:`hl7.Message`, per spec 2.9.2, for this message.
742
743 See http://www.hl7standards.com/blog/2007/02/01/ack-message-original-mode-acknowledgement/
744
745 ``ack_code`` options are one of `AA` (Application Accept), `AR` (Application Reject),
746 `AE` (Application Error), `CA` (Commit Accept - Enhanced Mode),
747 `CR` (Commit Reject - Enhanced Mode), or `CE` (Commit Error - Enhanced Mode)
748 (see HL7 Table 0008 - Acknowledgment Code)
749 ``message_id`` control message ID for ACK, defaults to unique generated ID
750 ``application`` name of sending application, defaults to receiving application of message
751 ``facility`` name of sending facility, defaults to receiving facility of message
752 """
753 source_msh = self.segment("MSH")
754 msh = self.create_segment([self.create_field(["MSH"])])
755 msa = self.create_segment([self.create_field(["MSA"])])
756 ack = self.create_message([msh, msa])
757
758 ack.assign_field(str(source_msh(1)), "MSH", 1, 1)
759 ack.assign_field(str(source_msh(2)), "MSH", 1, 2)
760 # Sending application is source receving application
761 ack.assign_field(
762 str(application) if application is not None else str(source_msh(5)),
763 "MSH",
764 1,
765 3,
766 )
767 # Sending facility is source receving facility
768 ack.assign_field(
769 str(facility) if facility is not None else str(source_msh(6)), "MSH", 1, 4
770 )
771 # Receiving application is source sending application
772 ack.assign_field(str(source_msh(3)), "MSH", 1, 5)
773 # Receiving facility is source sending facility
774 ack.assign_field(str(source_msh(4)), "MSH", 1, 6)
775 ack.assign_field(
776 str(datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S")), "MSH", 1, 7
777 )
778 # Message type code
779 ack.assign_field("ACK", "MSH", 1, 9, 1, 1)
780 # Copy trigger event from source
781 ack.assign_field(str(source_msh(9)(1)(2)), "MSH", 1, 9, 1, 2)
782 ack.assign_field("ACK", "MSH", 1, 9, 1, 3)
783 ack.assign_field(
784 message_id if message_id is not None else generate_message_control_id(),
785 "MSH",
786 1,
787 10,
788 )
789 ack.assign_field(str(source_msh(11)), "MSH", 1, 11)
790 ack.assign_field(str(source_msh(12)), "MSH", 1, 12)
791
792 ack.assign_field(str(ack_code), "MSA", 1, 1)
793 ack.assign_field(str(source_msh(10)), "MSA", 1, 2)
794
795 return ack
796
797 def __str__(self):
798 """Join a the child containers into a single string, separated
799 by the self.separator. This method acts recursively, calling
800 the children's __unicode__ method. Thus ``unicode()`` is the
801 approriate method for turning the python-hl7 representation of
802 HL7 into a standard string.
803
804 >>> str(hl7.parse(message)) == message
805 True
806
807 """
808 # Per spec, Message Construction Rules, Section 2.6 (v2.8), Message ends
809 # with the carriage return
810 return super(Message, self).__str__() + self.separator
811
812
813 class Segment(Container):
814 """Second level of an HL7 message, which represents an HL7 Segment.
815 Traditionally this is a line of a message that ends with a carriage
816 return and is separated by pipes. It contains a list of
817 :py:class:`hl7.Field` instances.
818 """
819
820763 def _adjust_index(self, index):
821764 # First element is the segment name, so we don't need to adjust to get 1-based
822765 return index
834777
835778
836779 class Field(Container):
780 def __init__(
781 self, separator=None, sequence=[], esc="\\", separators="\r|~^&", factory=None
782 ):
783 assert not separator or separator == separators[2]
784 super(Field, self).__init__(
785 separator=separators[2],
786 sequence=sequence,
787 esc=esc,
788 separators=separators,
789 factory=factory,
790 )
791
837792 """Third level of an HL7 message, that traditionally is surrounded
838793 by pipes and separated by carets. It contains a list of strings
839794 or :py:class:`hl7.Repetition` instances.
841796
842797
843798 class Repetition(Container):
799 def __init__(
800 self, separator=None, sequence=[], esc="\\", separators="\r|~^&", factory=None
801 ):
802 assert not separator or separator == separators[3]
803 super(Repetition, self).__init__(
804 separator=separators[3],
805 sequence=sequence,
806 esc=esc,
807 separators=separators,
808 factory=factory,
809 )
810
844811 """Fourth level of an HL7 message. A field can repeat.
845812 It contains a list of strings or :py:class:`hl7.Component` instances.
846813 """
847814
848815
849816 class Component(Container):
817 def __init__(
818 self, separator=None, sequence=[], esc="\\", separators="\r|~^&", factory=None
819 ):
820 assert not separator or separator == separators[4]
821 super(Component, self).__init__(
822 separator=separators[4],
823 sequence=sequence,
824 esc=esc,
825 separators=separators,
826 factory=factory,
827 )
828
850829 """Fifth level of an HL7 message. A component is a composite datatypes.
851830 It contains a list of string sub-components.
852831 """
200200 self._encoding_errors = encoding_errors
201201
202202 def connection_made(self, transport):
203 if self._reject_connection:
203 # _reject_connection not added until 3.8
204 if getattr(self, "_reject_connection", False):
204205 context = {
205206 "message": (
206207 "An open stream was garbage collected prior to "
307308 self._encoding_errors = encoding_errors or "strict"
308309
309310 def writemessage(self, message):
310 """Writes an :py:class:`hl7.Message` to the stream.
311 """
311 """Writes an :py:class:`hl7.Message` to the stream."""
312312 self.writeblock(str(message).encode(self.encoding, self.encoding_errors))
9595 strmsg = lines.strip()
9696 # The method for parsing the message
9797 plan = create_parse_plan(strmsg, factory)
98 # Start spliting the methods based upon the ParsePlan
98 # Start splitting the methods based upon the ParsePlan
9999 return _split(strmsg, plan)
100100
101101
102102 def _create_batch(batch, messages, encoding, factory):
103 """Creates a :py:class:`hl7.Batch`
104 """
103 """Creates a :py:class:`hl7.Batch`"""
105104 kwargs = {
106 "separator": "\r",
107105 "sequence": [
108106 parse(message, encoding=encoding, factory=factory) for message in messages
109107 ],
188186
189187 def _create_file(file, batches, encoding, factory):
190188 kwargs = {
191 "separator": "\r",
192189 "sequence": [
193190 _create_batch(batch[0], batch[1], encoding, factory) for batch in batches
194191 ],
249246 batches = []
250247 messages = []
251248 in_batch = False
252 # Split the file into lines, reatining the ends
249 # Split the file into lines, retaining the ends
253250 for line in lines.strip(_HL7_WHITESPACE).splitlines(keepends=True):
254251 # strip out all whitespace MINUS the '\r'
255252 line = line.strip(_HL7_WHITESPACE)
318315 seps = text[4:sep_end_off]
319316 text = text[sep_end_off + 1 :]
320317 data = [
321 plan.factory.create_field("", [seg]),
322 plan.factory.create_field("", [sep0]),
323 plan.factory.create_field(sep0, [seps]),
318 plan.factory.create_field(
319 sequence=[seg], esc=plan.esc, separators=plan.separators
320 ),
321 plan.factory.create_field(
322 sequence=[sep0], esc=plan.esc, separators=plan.separators
323 ),
324 plan.factory.create_field(
325 sequence=[seps], esc=plan.esc, separators=plan.separators
326 ),
324327 ]
325328 else:
326329 data = []
337340 the details stored within the message.
338341 """
339342 # We will always use a carriage return to separate segments
340 separators = ["\r"]
343 separators = "\r"
341344
342345 # Extract the rest of the separators. Defaults used if not present.
343346 if strmsg[:3] not in ("MSH", "FHS", "BHS"):
347350 sep0 = strmsg[3]
348351 seps = list(strmsg[3 : strmsg.find(sep0, 4)])
349352
350 separators.append(seps[0])
353 separators += seps[0]
351354 if len(seps) > 2:
352 separators.append(seps[2]) # repetition separator
355 separators += seps[2] # repetition separator
353356 else:
354 separators.append("~") # repetition separator
357 separators += "~" # repetition separator
355358 if len(seps) > 1:
356 separators.append(seps[1]) # component separator
359 separators += seps[1] # component separator
357360 else:
358 separators.append("^") # component separator
361 separators += "^" # component separator
359362 if len(seps) > 4:
360 separators.append(seps[4]) # sub-component separator
363 separators += seps[4] # sub-component separator
361364 else:
362 separators.append("&") # sub-component separator
365 separators += "&" # sub-component separator
363366 if len(seps) > 3:
364367 esc = seps[3]
365368 else:
373376 factory.create_repetition,
374377 factory.create_component,
375378 ]
376 return _ParsePlan(separators, containers, esc, factory)
379 return _ParsePlan(separators[0], separators, containers, esc, factory)
377380
378381
379382 class _ParsePlan(object):
383386
384387 # field, component, repetition, escape, subcomponent
385388
386 def __init__(self, separators, containers, esc, factory):
389 def __init__(self, seperator, separators, containers, esc, factory):
387390 # TODO test to see performance implications of the assertion
388391 # since we generate the ParsePlan, this should never be in
389392 # invalid state
390 assert len(containers) == len(separators)
393 assert len(containers) == len(separators[separators.find(seperator) :])
394 self.separator = seperator
391395 self.separators = separators
392396 self.containers = containers
393397 self.esc = esc
394398 self.factory = factory
395399
396 @property
397 def separator(self):
398 """Return the current separator to use based on the plan."""
399 return self.separators[0]
400
401400 def container(self, data):
402 """Return an instance of the approriate container for the *data*
401 """Return an instance of the appropriate container for the *data*
403402 as specified by the current plan.
404403 """
405404 return self.containers[0](
406 self.separator, data, self.esc, self.separators, self.factory
405 sequence=data,
406 esc=self.esc,
407 separators=self.separators,
408 factory=self.factory,
407409 )
408410
409411 def next(self):
416418 # the separators and containers lists. Use self.__class__()
417419 # in case :class:`hl7.ParsePlan` is subclassed
418420 return self.__class__(
419 self.separators[1:], self.containers[1:], self.esc, self.factory
421 self.separators[self.separators.find(self.separator) + 1],
422 self.separators,
423 self.containers[1:],
424 self.esc,
425 self.factory,
420426 )
421427 # When we have no separators and containers left, return None,
422428 # which indicates that we have nothing further.
424430
425431 def applies(self, text):
426432 """return True if the separator or those if the children are in the text"""
427 for s in self.separators:
433 for s in self.separators[self.separators.find(self.separator) :]:
428434 if text.find(s) >= 0:
429435 return True
430436 return False
1414 :rtype: bool
1515 """
1616 # Prevent issues if the line is empty
17 return line and line.strip()[:3] == "MSH" and line.count("MSH") == 1
17 if not line:
18 return False
19 msh = line.strip()[:4]
20 if len(msh) != 4:
21 return False
22 return msh[:3] == "MSH" and line.count("\rMSH" + msh[3]) == 0
1823
1924
2025 def isbatch(line):
2126 """
22 Batches are wrapped in BHS / BTS or have more than one
23 message
24 BHS = batch header segment
25 BTS = batch trailer segment
27 Batches are wrapped in BHS / BTS or have more than one
28 message
29 BHS = batch header segment
30 BTS = batch trailer segment
2631 """
2732 return line and (
2833 line.strip()[:3] == "BHS"
3237
3338 def isfile(line):
3439 """
35 Files are wrapped in FHS / FTS, or may be a batch
36 FHS = file header segment
37 FTS = file trailer segment
40 Files are wrapped in FHS / FTS, or may be a batch
41 FHS = file header segment
42 FTS = file trailer segment
3843 """
3944 return line and (line.strip()[:3] == "FHS" or isbatch(line))
4045
4146
4247 def split_file(hl7file):
4348 """
44 Given a file, split out the messages.
45 Does not do any validation on the message.
46 Throws away batch and file segments.
49 Given a file, split out the messages.
50 Does not do any validation on the message.
51 Throws away batch and file segments.
4752 """
4853 rv = []
4954 for line in hl7file.split("\r"):
8085 # Add 4 chars of uniqueness
8186 unique = "".join(random.sample(alphanumerics, 4))
8287 return timestamp + unique
88
89
90 def escape(container, field, app_map=None):
91 """
92 See: http://www.hl7standards.com/blog/2006/11/02/hl7-escape-sequences/
93
94 To process this correctly, the full set of separators (MSH.1/MSH.2) needs to be known.
95
96 Pass through the message. Replace recognised characters with their escaped
97 version. Return an ascii encoded string.
98
99 Functionality:
100
101 * Replace separator characters (2.10.4)
102 * replace application defined characters (2.10.7)
103 * Replace non-ascii values with hex versions using HL7 conventions.
104
105 Incomplete:
106
107 * replace highlight characters (2.10.3)
108 * How to handle the rich text substitutions.
109 * Merge contiguous hex values
110 """
111 if not field:
112 return field
113
114 esc = str(container.esc)
115
116 DEFAULT_MAP = {
117 container.separators[1]: "F", # 2.10.4
118 container.separators[2]: "R",
119 container.separators[3]: "S",
120 container.separators[4]: "T",
121 container.esc: "E",
122 "\r": ".br", # 2.10.6
123 }
124
125 rv = []
126 for offset, c in enumerate(field):
127 if app_map and c in app_map:
128 rv.append(esc + app_map[c] + esc)
129 elif c in DEFAULT_MAP:
130 rv.append(esc + DEFAULT_MAP[c] + esc)
131 elif ord(c) >= 0x20 and ord(c) <= 0x7E:
132 rv.append(c)
133 else:
134 rv.append("%sX%2x%s" % (esc, ord(c), esc))
135
136 return "".join(rv)
137
138
139 def unescape(container, field, app_map=None): # noqa: C901
140 """
141 See: http://www.hl7standards.com/blog/2006/11/02/hl7-escape-sequences/
142
143 To process this correctly, the full set of separators (MSH.1/MSH.2) needs to be known.
144
145 This will convert the identifiable sequences.
146 If the application provides mapping, these are also used.
147 Items which cannot be mapped are removed
148
149 For example, the App Map count provide N, H, Zxxx values
150
151 Chapter 2: Section 2.10
152
153 At the moment, this functionality can:
154
155 * replace the parsing characters (2.10.4)
156 * replace highlight characters (2.10.3)
157 * replace hex characters. (2.10.5)
158 * replace rich text characters (2.10.6)
159 * replace application defined characters (2.10.7)
160
161 It cannot:
162
163 * switch code pages / ISO IR character sets
164 """
165 if not field or field.find(container.esc) == -1:
166 return field
167
168 DEFAULT_MAP = {
169 "H": "_", # Override using the APP MAP: 2.10.3
170 "N": "_", # Override using the APP MAP
171 "F": container.separators[1], # 2.10.4
172 "R": container.separators[2],
173 "S": container.separators[3],
174 "T": container.separators[4],
175 "E": container.esc,
176 ".br": "\r", # 2.10.6
177 ".sp": "\r",
178 ".fi": "",
179 ".nf": "",
180 ".in": " ",
181 ".ti": " ",
182 ".sk": " ",
183 ".ce": "\r",
184 }
185
186 rv = []
187 collecting = []
188 in_seq = False
189 for offset, c in enumerate(field):
190 if in_seq:
191 if c == container.esc:
192 in_seq = False
193 value = "".join(collecting)
194 collecting = []
195 if not value:
196 logger.warn(
197 "Error unescaping value [%s], empty sequence found at %d",
198 field,
199 offset,
200 )
201 continue
202 if app_map and value in app_map:
203 rv.append(app_map[value])
204 elif value in DEFAULT_MAP:
205 rv.append(DEFAULT_MAP[value])
206 elif value.startswith(".") and (
207 (app_map and value[:3] in app_map) or value[:3] in DEFAULT_MAP
208 ):
209 # Substitution with a number of repetitions defined (2.10.6)
210 if app_map and value[:3] in app_map:
211 ch = app_map[value[:3]]
212 else:
213 ch = DEFAULT_MAP[value[:3]]
214 count = int(value[3:])
215 rv.append(ch * count)
216
217 elif (
218 value[0] == "C"
219 ): # Convert to new Single Byte character set : 2.10.2
220 # Two HEX values, first value chooses the character set (ISO-IR), second gives the value
221 logger.warn(
222 "Error inline character sets [%s] not implemented, field [%s], offset [%s]",
223 value,
224 field,
225 offset,
226 )
227 elif value[0] == "M": # Switch to new Multi Byte character set : 2.10.2
228 # Three HEX values, first value chooses the character set (ISO-IR), rest give the value
229 logger.warn(
230 "Error inline character sets [%s] not implemented, field [%s], offset [%s]",
231 value,
232 field,
233 offset,
234 )
235 elif value[0] == "X": # Hex encoded Bytes: 2.10.5
236 value = value[1:]
237 try:
238 for off in range(0, len(value), 2):
239 rv.append(chr(int(value[off : off + 2], 16)))
240 except Exception:
241 logger.exception(
242 "Error decoding hex value [%s], field [%s], offset [%s]",
243 value,
244 field,
245 offset,
246 )
247 else:
248 logger.exception(
249 "Error decoding value [%s], field [%s], offset [%s]",
250 value,
251 field,
252 offset,
253 )
254 else:
255 collecting.append(c)
256 elif c == container.esc:
257 in_seq = True
258 else:
259 rv.append(str(c))
260
261 return "".join(rv)
55 Forth element can be 'dev' < 'a' < 'b' < 'rc' < 'final'. An empty 4th
66 element is equivalent to 'final'.
77 """
8 VERSION = (0, 4, 2, "final")
8 VERSION = (0, 4, 5, "final")
99
1010
1111 def get_version():
00 # pip Requirements for developing python-hl7 (not required to use as a library)
1 tox==3.14.4
2 flake8==3.8.3
3 Sphinx==2.4.1
4 coverage==5.0.3
5 isort==4.3.21
6 black==19.10b0; python_version > "3.5"
1 tox==3.24.5
2 flake8==4.0.1
3 Sphinx==4.4.0
4 coverage==6.3.2
5 isort==5.10.1
6 black==22.3.0
7 twine==3.8.0
8 wheel==0.37.1
2020 author="John Paulett",
2121 author_email="john@paulett.org",
2222 url="http://python-hl7.readthedocs.org",
23 project_urls={
24 "Source": "https://github.com/johnpaulett/python-hl7",
25 },
2326 license="BSD",
2427 platforms=["POSIX", "Windows"],
2528 keywords=[
4649 install_requires=[],
4750 test_suite="tests",
4851 tests_require=[],
49 entry_points={"console_scripts": ["mllp_send=hl7.client:mllp_send",],},
52 entry_points={
53 "console_scripts": [
54 "mllp_send=hl7.client:mllp_send",
55 ],
56 },
5057 zip_safe=True,
5158 )
6161 # We intentionally don't add inspect.iscoroutinefunction() check
6262 # for func argument because there is no way
6363 # to check for async function reliably:
64 # 1. It can be "async def func()" iself
64 # 1. It can be "async def func()" itself
6565 # 2. Class can implement "async def __call__()" method
6666 # 3. Regular "def func()" that returns awaitable object
6767 self.addCleanup(*(func, *args), **kwargs)
438438
439439 # If a string is longer than _diffThreshold, use normal comparison instead
440440 # of difflib. See #11763.
441 _diffThreshold = 2 ** 16
441 _diffThreshold = 2**16
442442
443443 # Attribute used by TestSuite for classSetUp
444444
448448
449449 def __init__(self, methodName="runTest"):
450450 """Create an instance of the class that will use the named test
451 method when executed. Raises a ValueError if the instance does
452 not have a method with the specified name.
451 method when executed. Raises a ValueError if the instance does
452 not have a method with the specified name.
453453 """
454454 self._testMethodName = methodName
455455 self._outcome = None
828828
829829 def assertRaises(self, expected_exception, *args, **kwargs):
830830 """Fail unless an exception of class expected_exception is raised
831 by the callable when invoked with specified positional and
832 keyword arguments. If a different type of exception is
833 raised, it will not be caught, and the test case will be
834 deemed to have suffered an error, exactly as for an
835 unexpected exception.
836
837 If called with the callable and arguments omitted, will return a
838 context object used like this::
839
840 with self.assertRaises(SomeException):
841 do_something()
842
843 An optional keyword argument 'msg' can be provided when assertRaises
844 is used as a context object.
845
846 The context manager keeps a reference to the exception as
847 the 'exception' attribute. This allows you to inspect the
848 exception after the assertion::
849
850 with self.assertRaises(SomeException) as cm:
851 do_something()
852 the_exception = cm.exception
853 self.assertEqual(the_exception.error_code, 3)
831 by the callable when invoked with specified positional and
832 keyword arguments. If a different type of exception is
833 raised, it will not be caught, and the test case will be
834 deemed to have suffered an error, exactly as for an
835 unexpected exception.
836
837 If called with the callable and arguments omitted, will return a
838 context object used like this::
839
840 with self.assertRaises(SomeException):
841 do_something()
842
843 An optional keyword argument 'msg' can be provided when assertRaises
844 is used as a context object.
845
846 The context manager keeps a reference to the exception as
847 the 'exception' attribute. This allows you to inspect the
848 exception after the assertion::
849
850 with self.assertRaises(SomeException) as cm:
851 do_something()
852 the_exception = cm.exception
853 self.assertEqual(the_exception.error_code, 3)
854854 """
855855 context = _AssertRaisesContext(expected_exception, self)
856856 try:
861861
862862 def assertWarns(self, expected_warning, *args, **kwargs):
863863 """Fail unless a warning of class warnClass is triggered
864 by the callable when invoked with specified positional and
865 keyword arguments. If a different type of warning is
866 triggered, it will not be handled: depending on the other
867 warning filtering rules in effect, it might be silenced, printed
868 out, or raised as an exception.
869
870 If called with the callable and arguments omitted, will return a
871 context object used like this::
872
873 with self.assertWarns(SomeWarning):
874 do_something()
875
876 An optional keyword argument 'msg' can be provided when assertWarns
877 is used as a context object.
878
879 The context manager keeps a reference to the first matching
880 warning as the 'warning' attribute; similarly, the 'filename'
881 and 'lineno' attributes give you information about the line
882 of Python code from which the warning was triggered.
883 This allows you to inspect the warning after the assertion::
884
885 with self.assertWarns(SomeWarning) as cm:
886 do_something()
887 the_warning = cm.warning
888 self.assertEqual(the_warning.some_attribute, 147)
864 by the callable when invoked with specified positional and
865 keyword arguments. If a different type of warning is
866 triggered, it will not be handled: depending on the other
867 warning filtering rules in effect, it might be silenced, printed
868 out, or raised as an exception.
869
870 If called with the callable and arguments omitted, will return a
871 context object used like this::
872
873 with self.assertWarns(SomeWarning):
874 do_something()
875
876 An optional keyword argument 'msg' can be provided when assertWarns
877 is used as a context object.
878
879 The context manager keeps a reference to the first matching
880 warning as the 'warning' attribute; similarly, the 'filename'
881 and 'lineno' attributes give you information about the line
882 of Python code from which the warning was triggered.
883 This allows you to inspect the warning after the assertion::
884
885 with self.assertWarns(SomeWarning) as cm:
886 do_something()
887 the_warning = cm.warning
888 self.assertEqual(the_warning.some_attribute, 147)
889889 """
890890 context = _AssertWarnsContext(expected_warning, self)
891891 return context.handle("assertWarns", args, kwargs)
947947
948948 def assertEqual(self, first, second, msg=None):
949949 """Fail if the two objects are unequal as determined by the '=='
950 operator.
950 operator.
951951 """
952952 assertion_func = self._getAssertEqualityFunc(first, second)
953953 assertion_func(first, second, msg=msg)
954954
955955 def assertNotEqual(self, first, second, msg=None):
956956 """Fail if the two objects are equal as determined by the '!='
957 operator.
957 operator.
958958 """
959959 if not first != second:
960960 msg = self._formatMessage(
964964
965965 def assertAlmostEqual(self, first, second, places=None, msg=None, delta=None):
966966 """Fail if the two objects are unequal as determined by their
967 difference rounded to the given number of decimal places
968 (default 7) and comparing to zero, or by comparing that the
969 difference between the two objects is more than the given
970 delta.
971
972 Note that decimal places (from zero) are usually not the same
973 as significant digits (measured from the most significant digit).
974
975 If the two objects compare equal then they will automatically
976 compare almost equal.
967 difference rounded to the given number of decimal places
968 (default 7) and comparing to zero, or by comparing that the
969 difference between the two objects is more than the given
970 delta.
971
972 Note that decimal places (from zero) are usually not the same
973 as significant digits (measured from the most significant digit).
974
975 If the two objects compare equal then they will automatically
976 compare almost equal.
977977 """
978978 if first == second:
979979 # shortcut
10101010
10111011 def assertNotAlmostEqual(self, first, second, places=None, msg=None, delta=None):
10121012 """Fail if the two objects are equal as determined by their
1013 difference rounded to the given number of decimal places
1014 (default 7) and comparing to zero, or by comparing that the
1015 difference between the two objects is less than the given delta.
1016
1017 Note that decimal places (from zero) are usually not the same
1018 as significant digits (measured from the most significant digit).
1019
1020 Objects that are equal automatically fail.
1013 difference rounded to the given number of decimal places
1014 (default 7) and comparing to zero, or by comparing that the
1015 difference between the two objects is less than the given delta.
1016
1017 Note that decimal places (from zero) are usually not the same
1018 as significant digits (measured from the most significant digit).
1019
1020 Objects that are equal automatically fail.
10211021 """
10221022 if delta is not None and places is not None:
10231023 raise TypeError("specify delta or places not both")
316316 "",
317317 ]
318318 )
319
320 sample_msh = "\r".join(
321 [
322 "MSH|^~\\&|HNAM_PM|HNA500|AIG||20131017140041||ADT^A01|Q150084616T145947960|P|2.3",
323 "PID|1|2148790^^^MSH_MRN^MR|2148790^^^MSH_MRN^MR~162840^^^MSH_EMPI^CM|3722^0^^MSH_DTC^REFE~184737^0^^IID^DONOR ~Q2147670^0^^MSQ_MRN|RUFUSS^MELLODIAL^^^^^CURRENT||19521129|F|RUFUSS^MELLODIAL^^^^^PREVIOUS|OT|221 CANVIEW AVENUE^66-D^BRONX^NY^10454^USA^HOME^^058||3472444150^HOME~(000)000-0000^ALTERNATE||ENGLISH|M|PEN|O75622322^^^MSH_FIN_NBR^FIN NB|125544697|||HIS|||0",
324 'PV1|0001|I|MBN1^MBN1^06|4| 863968||03525^FARP^YONAN|03525^FARP^YONAN|""|NUR|||N|5|| U|03525^FARP^YONAN|I|01|T22~SLF|||||||||||||||||||E||AC|||20140210225300|""',
325 'DG1|0001|I9|440.21^ATHEROSCLEROSIS W/INT CLAUDCTN^I9|ATHEROSCLEROSIS W/INT CLAUDCTN|""|A|||||||.00||9',
326 'IN1|0001|A10A|A10|HIP COMP MCAID|PO BOX 223^""^NEW YORK^NY^10116^US^^^""|HIP ON LINE|""|""|""|||""|""|25892261^""^""|C|BENNETT^NELLY|4^SELF|10981226|322-10 GOODLIN AVE^APT B31^FLUSHING^NY^11355^US^^^61|Y|""||||||Y||""|||||||-JNJ45517',
327 'IN2||062420044|""|||""|||||||||||||||||||60094|""|||||||||||||||||||||||||||||||',
328 'IN1|0002|GMED|""|MEDICAID|""|""|""|""|""|||""|""||X|BENNETT^NELLY|4^SELF|10981226|322-10 GOODLIN AVE^APT B31^FLUSHING^NY^11355^US^^^61|""|""||||||""||||||',
329 'IN2||062420044|""|||""|||||||||||||||||||""|""||||||||||||||||||||||||||||||||""',
330 'IN1|0003|SLFJ|""|SELF-PAY|""|""|""|""|""|||""|""||P|BENNETT^NELLY|4^SELF|10981226|322-10 GOODLIN AVE^APT B31^FLUSHING^NY^11355^US^^^61|""|""||||||""||||||',
331 "",
332 ]
333 )
00 # -*- coding: utf-8 -*-
11 from unittest import TestCase
22
3 from hl7 import Accessor
3 from hl7 import Accessor, Field, Message, Segment
44
55
66 class AccessorTest(TestCase):
1919 def test_equality(self):
2020 self.assertEqual(Accessor("FOO", 1, 3, 4), Accessor("FOO", 1, 3, 4))
2121 self.assertNotEqual(Accessor("FOO", 1), Accessor("FOO", 2))
22
23 def test_string(self):
24 SEP = "|^~\\&"
25 CR_SEP = "\r"
26 MSH = Segment(SEP[0], [Field(SEP[2], ["MSH"])])
27 MSA = Segment(SEP[0], [Field(SEP[2], ["MSA"])])
28 response = Message(CR_SEP, [MSH, MSA])
29 response["MSH.F1.R1"] = SEP[0]
30 response["MSH.F2.R1"] = SEP[1:]
31 self.assertEqual(str(response), "MSH|^~\\&|\rMSA\r")
32
33 response["MSH.F9.R1.C1"] = "ORU"
34 response["MSH.F9.R1.C2"] = "R01"
35 response["MSH.F9.R1.C3"] = ""
36 response["MSH.F12.R1"] = "2.4"
37 response["MSA.F1.R1"] = "AA"
38 response["MSA.F3.R1"] = "Application Message"
39 self.assertEqual(
40 str(response),
41 "MSH|^~\\&|||||||ORU^R01^|||2.4\rMSA|AA||Application Message\r",
42 )
4242 def test_send_message_unicode(self):
4343 self.client.socket.recv.return_value = "thanks"
4444
45 result = self.client.send_message(u"foobar")
45 result = self.client.send_message("foobar")
4646 self.assertEqual(result, "thanks")
4747
4848 self.client.socket.send.assert_called_once_with(b"\x0bfoobar\x1c\x0d")
1111 class ConstructionTest(TestCase):
1212 def test_create_msg(self):
1313 # Create a message
14 MSH = hl7.Segment(SEP[0], [hl7.Field(SEP[1], ["MSH"])])
15 MSA = hl7.Segment(SEP[0], [hl7.Field(SEP[1], ["MSA"])])
14 MSH = hl7.Segment(SEP[0], [hl7.Field(SEP[2], ["MSH"])])
15 MSA = hl7.Segment(SEP[0], [hl7.Field(SEP[2], ["MSA"])])
1616 response = hl7.Message(CR_SEP, [MSH, MSA])
1717 response["MSH.F1.R1"] = SEP[0]
1818 response["MSH.F2.R1"] = SEP[1:]
2020
2121 def test_append(self):
2222 # Append a segment to a message
23 MSH = hl7.Segment(SEP[0], [hl7.Field(SEP[1], ["MSH"])])
23 MSH = hl7.Segment(SEP[0], [hl7.Field(SEP[2], ["MSH"])])
2424 response = hl7.Message(CR_SEP, [MSH])
2525 response["MSH.F1.R1"] = SEP[0]
2626 response["MSH.F2.R1"] = SEP[1:]
27 MSA = hl7.Segment(SEP[0], [hl7.Field(SEP[1], ["MSA"])])
27 MSA = hl7.Segment(SEP[0], [hl7.Field(SEP[2], ["MSA"])])
2828 response.append(MSA)
2929 self.assertEqual(str(response), "MSH|^~\\&|\rMSA\r")
3030
3131 def test_append_from_source(self):
3232 # Copy a segment between messages
33 MSH = hl7.Segment(SEP[0], [hl7.Field(SEP[1], ["MSH"])])
34 MSA = hl7.Segment(SEP[0], [hl7.Field(SEP[1], ["MSA"])])
33 MSH = hl7.Segment(SEP[0], [hl7.Field(SEP[2], ["MSH"])])
34 MSA = hl7.Segment(SEP[0], [hl7.Field(SEP[2], ["MSA"])])
3535 response = hl7.Message(CR_SEP, [MSH, MSA])
3636 response["MSH.F1.R1"] = SEP[0]
3737 response["MSH.F2.R1"] = SEP[1:]
412412 def test_create_parse_plan(self):
413413 plan = hl7.parser.create_parse_plan(sample_hl7)
414414
415 self.assertEqual(plan.separators, ["\r", "|", "~", "^", "&"])
415 self.assertEqual(plan.separators, "\r|~^&")
416416 self.assertEqual(
417417 plan.containers, [Message, Segment, Field, Repetition, Component]
418418 )
430430 plan = hl7.parser.create_parse_plan(sample_hl7)
431431
432432 n1 = plan.next()
433 self.assertEqual(n1.separators, ["|", "~", "^", "&"])
433 self.assertEqual(n1.separators, "\r|~^&")
434 self.assertEqual(n1.separator, "|")
434435 self.assertEqual(n1.containers, [Segment, Field, Repetition, Component])
435436
436437 n2 = n1.next()
437 self.assertEqual(n2.separators, ["~", "^", "&"])
438 self.assertEqual(n2.separators, "\r|~^&")
439 self.assertEqual(n2.separator, "~")
438440 self.assertEqual(n2.containers, [Field, Repetition, Component])
439441
440442 n3 = n2.next()
441 self.assertEqual(n3.separators, ["^", "&"])
443 self.assertEqual(n3.separators, "\r|~^&")
444 self.assertEqual(n3.separator, "^")
442445 self.assertEqual(n3.containers, [Repetition, Component])
443446
444447 n4 = n3.next()
445 self.assertEqual(n4.separators, ["&"])
448 self.assertEqual(n4.separators, "\r|~^&")
449 self.assertEqual(n4.separator, "&")
446450 self.assertEqual(n4.containers, [Component])
447451
448452 n5 = n4.next()
1010 sample_file1,
1111 sample_file2,
1212 sample_hl7,
13 sample_msh,
1314 )
1415
1516
2223 self.assertFalse(hl7.ishl7(sample_file))
2324 self.assertFalse(hl7.ishl7(sample_file1))
2425 self.assertFalse(hl7.ishl7(sample_file2))
26 self.assertTrue(hl7.ishl7(sample_msh))
2527
2628 def test_ishl7_empty(self):
2729 self.assertFalse(hl7.ishl7(""))
00 [tox]
11 envlist =
2 py39, py38, py37, py36, py35, docs
2 py310, py39, py38, py37, docs
33
44 [testenv]
55 commands =
66 python -m unittest discover -t . -s tests
7
8 [testenv:py35]
9 basepython = python3.5
10
11 [testenv:py36]
12 basepython = python3.6
137
148 [testenv:py37]
159 basepython = python3.7
2014 [testenv:py39]
2115 basepython = python3.9
2216
17 [testenv:py310]
18 basepython = python3.10
19
2320 [testenv:docs]
2421 whitelist_externals = make
2522 deps =